2015-12-25 15 views
13

Mam tysiące małych plików w HDFS. Musisz przetworzyć nieco mniejszy podzbiór plików (który jest ponownie w tysiącach), lista plików zawiera listę ścieżek do plików, które należy przetworzyć.Stackoverflow z powodu długiej linii RDD

// fileList == list of filepaths in HDFS 

var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD 

for (i <- 0 to fileList.size() - 1) { 

val filePath = fileStatus.get(i) 
val fileRDD = sparkContext.textFile(filePath) 
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line)) 

masterRDD = masterRDD.union(sampleRDD) 

} 

masterRDD.first() 

// Po wyjściu z pętli, wykonując żadnych wyników działań w stackoverflow błędu ze względu na długi rodu RDD

Exception in thread "main" java.lang.StackOverflowError 
    at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    ===================================================================== 
    ===================================================================== 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 

Odpowiedz

27

Ogólnie można wykorzystać punkty kontrolne złamać długi rodowód. Mniej lub bardziej podobny do tego powinno działać:

import org.apache.spark.rdd.RDD 
import scala.reflect.ClassTag 

val checkpointInterval: Int = ??? 

def loadAndFilter(path: String) = sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _)) 

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) 
    (acc: RDD[T], xi: (RDD[T], Int)) = { 
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint 
    else xi._1.union(acc) 
    } 

val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] 
fileList.map(loadAndFilter).zipWithIndex 
    .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval)) 

W tej konkretnej sytuacji znacznie prostsze rozwiązanie powinno być wykorzystanie SparkContext.union metody:

val masterRDD = sc.union(
    fileList.map(path => sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _))) 
) 

Różnica pomiędzy tymi metodami powinno być oczywiste, jeśli wziąć spojrzenie na DAG generowane przez pętlę/reduce:

enter image description here

i Pojedynczy union:

enter image description here

Oczywiście jeśli pliki są małe, można połączyć wholeTextFiles z flatMap i czytać wszystkie naraz:

sc.wholeTextFiles(fileList.mkString(",")) 
    .flatMap{case (path, text) => 
    text.split("\n").filter(_.startsWith("#####")).map((path, _))} 
+4

Najlepszy kiedykolwiek korzystać z sc.union() –

Powiązane problemy