2015-09-10 14 views
6

Mam RDD[String], wordRDD. Mam również funkcję, która tworzy RDD [String] z ciągu/słowa. Chciałbym stworzyć nowy RDD dla każdego ciągu w wordRDD. Oto moje próby:Jak utworzyć kolekcję RDD z RDD?

1) nie powiodło się, ponieważ Spark nie obsługuje zagnieżdżone RDD:

var newRDD = wordRDD.map(word => { 
    // execute myFunction() 
    (new MyClass(word)).myFunction() 
}) 

2) nie powiodło się (prawdopodobnie z powodu kwestii zakresu):

var newRDD = sc.parallelize(new Array[String](0)) 
val wordArray = wordRDD.collect 
for (w <- wordArray){ 
    newRDD = sc.union(newRDD,(new MyClass(w)).myFunction()) 
} 

Mój idealny wynik byłby wyglądać następująco:

// input RDD (wordRDD) 
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...) 

// myFunction behavior 
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl') 

// after executing myFunction() on each word in wordRDD: 
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...) 

znalazłem odpowiednie pytanie tutaj: Spark when union a lot of RDD throws stack overflow error, ale to nie mój adres kwestia.

Odpowiedz

3

Zastosowanie flatMap dostać RDD[String] jak chcesz.

var allWords = wordRDD.flatMap { word => 
    (new MyClass(word)).myFunction().collect() 
} 
+1

Jak to ma działać równolegle? Wszystko, co dzieje się wewnątrz 'wordRDD.map', jest wykonywane w klastrze. Zatem wewnętrzny 'collect' musi wywołać nowe zadanie Sparka z działającego zadania. Podejrzewam, że nie będzie działał w sposób rozproszony. –

+0

Mógłby także zmienić funkcję zwracania tablic zamiast RDD, ale pytanie nie określało rzeczywistej funkcji. –

+0

Ale jego opis mówi, że ma on funkcję, zakładam, że jest to 'myFunction', która tworzy' RDD [String] 'z ciągu/słowa. –

3

Nie można utworzyć obiektu RDD z poziomu innego niż RDD.

Możliwe jest jednak przepisanie funkcji myFunction: String => RDD[String], która generuje wszystkie słowa z wejścia, w którym jedna litera jest usuwana, do innej funkcji modifiedFunction: String => Seq[String], dzięki czemu można z niej korzystać z poziomu RDD. W ten sposób będzie on również wykonywany równolegle w klastrze. Posiadając modifiedFunction możesz uzyskać ostateczne RDD ze wszystkimi słowami, po prostu dzwoniąc pod numer wordRDD.flatMap(modifiedFunction).

Zasadniczą kwestią jest użycie flatMap (aby map i flatten przemiany):

def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 

    val input = sc.parallelize(Seq("apple", "ananas", "banana")) 

    // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...) 
    val result = input.flatMap(modifiedFunction) 
} 

def modifiedFunction(word: String): Seq[String] = { 
    word.indices map { 
    index => word.substring(0, index) + word.substring(index+1) 
    } 
} 
Powiązane problemy