2015-05-25 15 views
6

Jestem całkiem nowy, iskra i język Scala i chciałby unia wszystkie RDD w postaci listy, jak poniżej (List<RDD> to RDD):Spark: Jak Unia listy <RDD> do RDD

val data = for (item <- paths) yield { 
     val ad_data_path = item._1 
     val ad_data = SparkCommon.sc.textFile(ad_data_path).map { 
      line => { 
       val ad_data = new AdData(line) 
       (ad_data.ad_id, ad_data) 
      } 
     }.distinct() 
    } 
val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _) 

uruchomić kod w IntelliJ natomiast zawsze pojawia się błąd jak:

ava.lang.NullPointerException 
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) 
at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59) 
at org.apache.spark.rdd.RDD.union(RDD.scala:438) 
at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) 
at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Ktoś ma jakiś pomysł na temat tego błędu? Z góry dzięki :)

Odpowiedz

17

Może to być przyczyną,

val listA = 1 to 10 
for(i <- listA; if i%2 == 0)yield {i} 

powróci Vector (2,4,6,8,10), natomiast

for(i <- listA; if i%2 == 0)yield {val c = i} 

powróci wektor ((),(),(),(),())

Tak właśnie dzieje się w twoim przypadku. Inicjujesz ad_data, ale nie zwracasz go z powrotem, by uzyskać zwrot.

Jeśli chodzi o Twoje pytanie to dotyczy, tj Lista [RDD] do RDD

oto rozwiązanie:

val listA = sc.parallelize(1 to 10) 
val listB = sc.parallelize(10 to 1 by -1) 

tworzenia listę 2 ZOPS

val listC = List(listA,listB) 

przelicz Lista [RDD] do RDD

val listD = listC.reduce(_ union _) 

Hope, to odpowiedzi na swoje pytanie.

+0

Thanks a partia, problem rozwiązany za pomocą twojego rozwiązania. – juffun

+0

@juffun, możesz zaakceptować odpowiedź, jeśli rozwiązanie zadziałało dla ciebie :) – Akash

+0

pewnie, już zaakceptowane. – juffun

0

Kolejna prosta metoda konwersji z listy RDD na RDD. SparkContext ma dwie przeciążone metody Union, jeden przyjmuje dwa RDD i inne akceptuje listę RDD

Union (pierwszy, rest) unia (RDD: seq [RDD [T]]))

Powiązane problemy