2015-12-25 11 views
13

Ho migliaia di piccoli file in HDFS. È necessario elaborare un sottoinsieme leggermente più piccolo di file (che è di nuovo in migliaia), fileList contiene un elenco di percorsi file che devono essere elaborati.Stackoverflow dovuto al lungo RDD Lineage

// fileList == list of filepaths in HDFS 

var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD 

for (i <- 0 to fileList.size() - 1) { 

val filePath = fileStatus.get(i) 
val fileRDD = sparkContext.textFile(filePath) 
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line)) 

masterRDD = masterRDD.union(sampleRDD) 

} 

masterRDD.first() 

// Una volta fuori dal ciclo, eseguire qualsiasi azione termina con errore StackOverflow causa di un lungo lignaggio di RDD

Exception in thread "main" java.lang.StackOverflowError 
    at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    ===================================================================== 
    ===================================================================== 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 

risposta

27

In generale è possibile utilizzare i punti di controllo per rompere lunghe linee. Alcuni più o meno simile a questo dovrebbe funzionare:

import org.apache.spark.rdd.RDD 
import scala.reflect.ClassTag 

val checkpointInterval: Int = ??? 

def loadAndFilter(path: String) = sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _)) 

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) 
    (acc: RDD[T], xi: (RDD[T], Int)) = { 
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint 
    else xi._1.union(acc) 
    } 

val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] 
fileList.map(loadAndFilter).zipWithIndex 
    .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval)) 

In questa particolare situazione di una soluzione molto più semplice dovrebbe essere quello di utilizzare SparkContext.union metodo:

val masterRDD = sc.union(
    fileList.map(path => sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _))) 
) 

A differenza tra questi metodi dovrebbe essere ovvio quando si prende uno sguardo al DAG generato dal ciclo/reduce:

enter image description here

e singolo union:

enter image description here

Naturalmente se i file sono di piccole dimensioni è possibile combinare con wholeTextFilesflatMap e leggere tutti i file in una volta:

sc.wholeTextFiles(fileList.mkString(",")) 
    .flatMap{case (path, text) => 
    text.split("\n").filter(_.startsWith("#####")).map((path, _))} 
+4

migliore mai usare di sc.union() –