2016-02-18 10 views
6

Chciałbym utworzyć RDD do zbierania wyników obliczeń iteracyjnych.Tworzenie RDD do zbierania wyników obliczeń iteracyjnych

Jak można użyć pętli (lub alternatywnego) wymienić następujące kod:

import org.apache.spark.mllib.random.RandomRDDs._  

val n = 10 

val step1 = normalRDD(sc, n, seed = 1) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong) 
val result2 = result1.zip(step3) 

... 

val step50 = normalRDD(sc, n, seed = (step49.max).toLong) 
val result49 = result48.zip(step50) 

(tworzenie RDD kroku n i skompresowanie następnie razem w celu byłoby również ok jak długo 50 ZOPS tworzone są iteracyjnie respektować nasion = (etap (n-1) .max) warunek)

+0

będzie używać 'Stream.unfold' z scalaz celu wygenerowania strumień kroków, a następnie zip to sam ze sobą i/lub scanRight .. – Reactormonk

Odpowiedz

6

funkcji rekurencyjnej zadziała:

/** 
* The return type is an Option to handle the case of a user specifying 
* a non positive number of steps. 
*/ 
def createZippedNormal(sc : SparkContext, 
         numPartitions : Int, 
         numSteps : Int) : Option[RDD[Double]] = { 

    @scala.annotation.tailrec 
    def accum(sc : SparkContext, 
      numPartitions : Int, 
      numSteps : Int, 
      currRDD : RDD[Double], 
      seed : Long) : RDD[Double] = { 
    if(numSteps <= 0) currRDD 
    else { 
     val newRDD = normalRDD(sc, numPartitions, seed) 
     accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) 
    } 
    } 

    if(numSteps <= 0) None 
    else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) 
} 
+0

Rekursja ogona nie ochroni cię przed linią RDD dmuchającą w stos :) – zero323

+0

@ zero323 Zgoda. Jednak ten problem jest związany z wymaganiami pytania. Każda odpowiedź miałaby podobny problem. –

+0

Chciałem tylko wskazać, że budujesz rekurencyjną strukturę danych za kulisami, która nie zostanie zoptymalizowana pod kątem ogona. Nic więcej :) I faktycznie możesz go rozwiązać i uniknąć problemu za pomocą punktów kontrolnych. Jest nawet rozwiązalny bez jednego zip :) – zero323

Powiązane problemy