2015-04-27 13 views
9

Il messaggio di eccezione come seguejoda formato DateTime causa di un errore di puntatore nullo a scintilla RDD funzioni

classe User ha gettato un'eccezione: lavoro interrotto a causa di mancanza fase: Task 0 in fase 1.0 fallito 4 volte, il fallimento più recente : Attività persa 0.3 nella fase 1.0 (TID 11, 10.215.155.82): java.lang.NullPointerException a org.joda.time.tz.CachedDateTimeZone.getInfo (CachedDateTimeZone.java:143) a org.joda.time. tz.CachedDateTimeZone.getOffset (CachedDateTimeZone.java:103) allo org.joda.time.format.DateTimeFormatter.printTo (DateTimeFormatter.java:676).515.053.691,36321 milioni a org.joda.time.format.DateTimeFormatter.printTo (DateTimeFormatter.java:521) a org.joda.time.format.DateTimeFormatter.print (DateTimeFormatter.java:625) a org.joda.time .base.AbstractDateTime.toString (AbstractDateTime.java:328) a com.xxx.ieg.face.demo.DateTimeNullReferenceReappear $$ anonfun $ 3 $$ anonfun $ applicare $ 1.Apply (DateTimeNullReferenceReappear.scala: 41) a com .xxx.ieg.face.demo.DateTimeNullReferenceReappear $$ anonfun $ 3 $$ anonfun $ apply $ 1.apply (DateTimeNullReferenceReappear.scala: 41) a scala.collection.TraversableLike $$ anonfun $ groupBy $ 1.apply (TraversableLike.scala: 328) allo scala.collection.TraversableLike $$ anonfun $ groupBy $ 1.apply (TraversableLike.scala: 327) a scala.collection.Iterator $ class.foreach (Iterator.scala: 727) a org.apache.spark.util.collection .CompactBuffer $$ anon $ 1.foreach (CompactBuffer.scala: 113) a scala.collection.IterableLike $ class.foreach (IterableLike.scala: 72) a org.apache.spark.util.collection.CompactBuffer.foreach (CompactBuffer.scala: 28) a scala.collection.TraversableLike $ class.groupBy (TraversableLike.scala: 327) a org.apache.spark.util.collection.CompactBuffer.groupBy (CompactBuffer.scala: 28) a com.xxx.ieg.face.demo.DateTimeNullReferenceReappear $$ anonfun $ 3.apply (DateTimeNullR eferenceReappear.scala: 41) a com.xxx.ieg.face.demo.DateTimeNullReferenceReappear $$ anonfun $ 3.apply (DateTimeNullReferenceReappear.scala: 40) a scala.collection.Iterator $$ anon $ 11.next (Iterator.scala : 328) a scala.collection.Iterator $$ anon $ 10.next (Iterator.scala: 312) a scala.collection.Iterator $ class.foreach (Iterator.scala: 727) a scala.collection.AbstractIterator.foreach (Iterator.scala: 1157) a scala.collection.generic.Growable $ class. $ Plus $ plus $ eq (Growable.scala: 48) a scala.collection.mutable.ArrayBuffer. $ Plus $ plus $ eq (ArrayBuffer.scala: 103) allo scala.collection.mutable.ArrayBuffer. $ Plus $ plus $ eq (ArrayBuffer.scala: 47) a scala.collection.TraversableOnce $ class.to (TraversableOnce.scala: 273) a scala.collection.AbstractIterator.to (Iterator.scala: 1157) a scala.collection.TraversableOnce $ class.toBuffer (TraversableOnce.scala : 265) a scala.collection.AbstractIterator.toBuffer (Iterator.scala: 1157) a scala.collection.TraversableOnce $ class.toArray (TraversableOnce.scala: 252) a scala.collection.AbstractIterator.toArray (Iterator.scala : 1157) a org.apache.spark.rdd.RDD $$ anonfun $ 26.apply (RDD.scala: 1081) a org.apache.spark.rdd.RDD $$ anonfun $ 26.apply (RDD.scala: 1081) a org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.apply (SparkContext.scala: 1314) a org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.apply (SparkContext.scala: 1314) su org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 61) su org.apache.spark.scheduler.Task. run (Task.scala: 56) a org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 196) a java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) a java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615) a java.lang.Thread.run (Thread.java:744)

Il mio codice come segue:

import org.apache.hadoop.conf.Configuration 
import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkContext._ 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.joda.time.DateTime 
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter } 




object DateTimeNullReferenceReappear extends App { 

    case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0) 

    val cfg = new Configuration 
    val sparkConf = new SparkConf() 
    sparkConf.setAppName("bourne_exception_reappear") 
    val sc = new SparkContext(sparkConf) 

val data = TDWSparkContext.tdwTable( // this function just read data from an data warehouse 
    sc, 
    tdwuser = FaceConf.TDW_USER, 
    tdwpasswd = FaceConf.TDW_PASSWORD, 
    dbName = "my_db", 
    tblName = "my_table", 
    parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329")) 
    .map(row => { 
    Record(uin = row(2), 
     date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)), 
     value = row(4).toDouble) 
    }).map(x => (x.uin, (x.date, x.value))) 
    .groupByKey 
    .map(x => { 
    x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here 
    }) 

//  val data = TDWSparkContext.tdwTable( // It works, as I don't user datetime toString in the groupBy 
//  sc, 
//  tdwuser = FaceConf.TDW_USER, 
//  tdwpasswd = FaceConf.TDW_PASSWORD, 
//  dbName = "hy", 
//  tblName = "t_dw_cf_oss_tblogin", 
//  parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329")) 
//  .map(row => { 
//  Record(uin = row(2), 
//   date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)), 
//   value = row(4).toDouble) 
//  }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value))) 
//  .groupByKey 
//  .map(x => { 
//  x._2.groupBy(_._1).mapValues(_.map(_._2).sum) 
//  }) 

    data.take(10).map(println) 

} 

Così, sembra che la chiamata toString nel groupBy causare l'eccezione, così chiunque può spiegare?

Grazie

+0

Bene ... 'NullPointerException' si verifica quando si tenta di chiamare qualsiasi funzione su una variabile che contiene valori' null' di qualsiasi tipo. Quindi ... questo significa che nel tuo 'x._2' ci sono alcune tuple in cui il primo membro (' _._ 1') è 'null'. –

+0

Potresti aggiungere il risultato di questo? 'TDWSparkContext.tdwTable (// questa funzione è sufficiente leggere i dati da un data warehouse sc, tdwuser = FaceConf.TDW_USER, tdwpasswd = FaceConf.TDW_PASSWORD, dbName = "my_db", tblName = "my_table", parts = Matrice ("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329")) .map (riga => { Record (uin = riga (2) , date = DateTimeFormat.forPattern ("yyyyMMdd"). ParseDateTime (row (0)), value = row (4) .toDouble) }). Map (x => (x.uin, (x.date, x.value))) ' –

+3

Basta sostituire questo' groupBy (_._ 1.toString ("yyyyMMdd")) 'di' groupBy (d => {if (d. _1! = Null) {d._1.toString ("yyyyMMdd")} else {"I am a placeholder"}}) '. Puoi scegliere di fare quello che vuoi con i tuoi segnaposto. –

risposta

9

È necessario disabilitare Kryo, utilizzare Kryo JodaTime Serializers o evitare la serializzazione dell'oggetto DateTime, ovvero passare Longs.

1

Non sappiamo molto del "problema". Quindi possiamo provare a seguire experimat che ci permetterà di vedere di più sul problema.

Sostituire la seguente parte,

map(x => { 
    x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here 
}) 

Con questo,

map(x => { 
    x._2.groupBy(t => { 
    val dateStringTry = Try(t._2.toString("yyyyMMdd")) 
    dateStringTry match { 
     case Success(dateString) => Right(dateString) 
     case Failure(e) => { 
     println("=========== Null Tuple Description ==========") 
     println("Problem Tuple :: [" + t + "]") 
     println("Error Info :: [" + e.getMessage + "]") 
     // finally the stack trace, if needed 
     // e.printStackTrace() 
     prinln("=============================================") 
     Left(e) 
     } 
    } 
    }) 
}) 

Controlliamo il risultato dell'esecuzione di questo esperimento.

1

Il problema sembra essere che DateTime perde qualcosa durante la serializzazione in Spark (il che accade molto lì credo). Nel mio caso lo Chronology è stato incasinato che ha causato la stessa eccezione.

Una soluzione davvero molto hacky che ha lavorato per me è quello di ricreare l'DateTime poco prima di utilizzarlo, ad es .:

date.toMutableDateTime.toDateTime

Questo sembra per ripristinare qualsiasi bit mancanti e tutto funziona dopo.

La soluzione inviata da Marius Soutier per disabilitare Kryo ha funzionato anche per me. Questo è un approccio meno hacky.

0

Si prega di fare riferimento a questo - https://issues.apache.org/jira/browse/SPARK-4170

In sostanza, non si dovrebbe essere estendendo scala.App per la classe principale. Potrebbe non funzionare correttamente in alcuni casi. Utilizzare invece un metodo esplicito main().

Ecco l'avvertimento documentato nel codice Spark 1.6.1 (In SparkSubmit classe)

// SPARK-4170 
if (classOf[scala.App].isAssignableFrom(mainClass)) { 
    printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") 
} 
0

Il problema qui è cattivo serializzazione di Joda di CachedDateTimeZone - che include un campo transitoria che non ottiene serializzato, rimanendo null nell'oggetto deserializzato.

È possibile creare e registrare il proprio Serializer che gestisce questo oggetto correttamente:

import com.esotericsoftware.kryo.Kryo; 
import com.esotericsoftware.kryo.Serializer; 
import com.esotericsoftware.kryo.io.Input; 
import com.esotericsoftware.kryo.io.Output; 
import org.joda.time.DateTimeZone; 
import org.joda.time.tz.CachedDateTimeZone; 

public class JodaCachedDateTimeZoneSerializer extends Serializer<CachedDateTimeZone> { 

    public JodaCachedDateTimeZoneSerializer() { 
     setImmutable(true); 
    } 

    @Override 
    public CachedDateTimeZone read(final Kryo kryo, final Input input, final Class<CachedDateTimeZone> type) { 
     // reconstruct from serialized ID: 
     final String id = input.readString(); 
     return CachedDateTimeZone.forZone(DateTimeZone.forID(id)); 
    } 

    @Override 
    public void write(final Kryo kryo, final Output output, final CachedDateTimeZone cached) { 
     // serialize ID only: 
     output.writeString(cached.getID()); 
    } 
} 

Poi, nella tua classe che estende KryoRegistrator, aggiungere:

kryo.register(classOf[CachedDateTimeZone], new JodaCachedDateTimeZoneSerializer()) 

In questo modo non c'è bisogno di disabilitare Kryo o astenersi dall'usare Joda.

0
sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer"); 
Problemi correlati