2016-03-15 9 views
5

Wywołuję funkcję w scala, która daje RDD[(Long,Long,Double)] jako wyjście.Scal wiele RDD generowanych w pętli

def helperfunction(): RDD[(Long, Long, Double)]

nazywam tę funkcję w pętli w innej części kodu i chcę, aby scalić wszystkie wygenerowane RDD. Pętla wywołując funkcję wygląda tak

for (i <- 1 to n){ 
    val tOp = helperfunction() 
    // merge the generated tOp 
} 

Co chcę zrobić coś podobnego do tego, co StringBuilder zrobiłby dla ciebie w Javie, kiedy chciał się połączyć za sznurki. Mam spojrzał na technikach łączących RDD, które w większości wskazują na użyciu funkcji związkowej jak ten

RDD1.union(RDD2) 

Wymaga to jednak oba RDD mają zostać wygenerowane przed podjęciem unii. Chciałem zainicjować var ​​RDD1, aby zebrać wyniki poza pętlą for, ale nie jestem pewien jak zainicjować puste RDD typu [(Long,Long,Double)]. Również zaczynam od iskry, więc nie jestem nawet pewien, czy jest to najbardziej elegancka metoda rozwiązania tego problemu.

Odpowiedz

4

Zamiast Vars, można użyć funkcjonalnych paradygmat programowania, aby osiągnąć to, co chcesz:

val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _) 

Ponadto, jeśli nadal trzeba utworzyć pusty RDD, można zrobić to za pomocą:

val empty = sc.emptyRDD[(long, long, String)] 
+0

IIRC nie można związać RDD z pustymRDD, dopóki Spark 2.0. – MrChristine

+0

jak to zrobić, jeśli musisz przekazać indeks pętli do funkcji pomocnika? – G3M

+0

Jeśli chcesz przekazać indeks pętli do funkcji pomocnika, możesz zrobić coś takiego: 'val rdd = (1 do n) .zipWithIndex.map {case (x, index) => helperFunction (i)} .reduce (_ union _) ' Oczywiście w tym przypadku nie jest to konieczne, ponieważ mamy kolekcję zwiększającą liczbę całkowitą, ale można zastąpić' (1 do n) 'z dowolnej kolekcji –

2

Masz rację, że nie jest to optymalny sposób, aby to zrobić, ale potrzebowalibyśmy więcej informacji na temat tego, co próbujesz osiągnąć, generując nowe RDD z każdym wywołaniem funkcji pomocnika.

Możesz zdefiniować 1 RDD przed pętlą i przypisać mu var, a następnie uruchomić go w pętli. Oto przykład:

val rdd = sc.parallelize(1 to 100) 
val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble)) 
var new_rdd = rdd_tuple 
println("Initial RDD count: " + new_rdd.count()) 
for (i <- 2 to 4) { 
    new_rdd = new_rdd.union(rdd_tuple) 
} 
println("New count after loop: " + new_rdd.count()) 
+0

Każdy organ ma kod Java dla tego samego scenariusza? – Neethu