2013-06-14 13 views
7

Chciałabym wiedzieć, czy istnieje elegancki sposób, aby osiągnąć coś takiego:rozdzielić strumień w wielu

val l = Stream.from(1) 

val parts = l.some_function(3) //any number 

parts.foreach(println(_)) 

> 1,4,7,10... 
> 2,5,8,11... 
> 3,6,9,12... 

Właściwie muszę taką operację na strumienie parallelization - aby podzielić dane na wielu aktorów bez ładowanie całego materiału do pamięci.

Odpowiedz

4

The odpowiedź od Split a scala list into n interleaving lists w pełni spełnia warunki, trochę zmodyfikowane, aby pasowały do ​​strumieni:

def round[A](seq: Iterable[A], n: Int) = { 
    (0 until n).map(i => seq.drop(i).sliding(1, n).flatten) 
} 
round(Stream.from(1),3).foreach(i => println(i.take(3).toList)) 
List(1, 4, 7) 
List(2, 5, 8) 
List(3, 6, 9) 
2

Jedyne co mogę myśleć:

def distribute[T](n: Int)(x: Stream[T]) = (0 until n).map { p => 
    x.zipWithIndex.collect { 
    case (e,i) if i % n == p => e 
    } 
} 

To trochę brzydki, bo każdy z podstrumieni musi całkowicie przemierzać strumienia głównego. Ale nie sądzę, że można to złagodzić, zachowując (pozorną) niezmienność.

Czy myślałeś o wysyłaniu pojedynczych zadań do aktorów i o "dystrybutorze zadań", który robi dokładnie to?

+0

Tak, pomyślałem o tym. Muszę połączyć wyniki z aktorami, a problem polega na tym, że wyniki pośrednie pochłaniają też dużo pamięci i chcę, żeby było kilku aktorów i tyle samo zadań/wyników.Mimo to mogłem zmodernizować aktorów, aby ponownie wykorzystać wyniki z poprzednich zadań i pójdę tą drogą, jeśli nie ma prostego sposobu na podzielenie strumienia. –

0
scala> (1 to 30 grouped 3).toList.transpose foreach println 
List(1, 4, 7, 10, 13, 16, 19, 22, 25, 28) 
List(2, 5, 8, 11, 14, 17, 20, 23, 26, 29) 
List(3, 6, 9, 12, 15, 18, 21, 24, 27, 30) 
+0

Czy to działa dobrze w przypadku strumieni? – gzm0

+0

zmień 'toList' na' toStream' i przekonaj się sam ... – sschaef

+1

'Stream.from (1) .grouped (3) .toStream.transpose foreach println' wisi w nieskończonej pętli ... – gzm0

2

Proste podejście polega na wygenerowaniu sekwencji arytmetycznej dla indeksów, które chcesz, a następnie odwzorowaniu ich na strumień. Sposób stosuje się wyciągnąć odpowiednie wartości:

def f[A](s:Stream[A], n:Int) = 
    0 until n map (i => Iterator.iterate(0)(_+n) map (s drop i)) 

f(Stream from 1, 3) map (_ take 4 mkString ",") 
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12) 

bardziej wydajnych rozwiązanie wykorzystują iterację którego Kolejna metoda po prostu powraca do wartości od strumienia podczas następnego indeksu arytmetycznej sekwencji:

def comb[A](s:Stream[A], first:Int, step:Int):Iterator[A] = new Iterator { 
    var i  = first - step 
    def hasNext = true 
    def next = { i += step; s(i) } 
} 
def g[A](s:Stream[A], n:Int) = 
    0 until n map (i => comb(s,i,n)) 

g(Stream from 1, 3) map (_ take 4 mkString ",") 
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12) 

Wspomniałeś, że to było dla aktorów, chociaż - jeśli to Akka, być może mógłbyś użyć round-robin router.

AKTUALIZACJA: Powyższe (najwyraźniej niepoprawnie) zakłada, że ​​może być więcej pracy do wykonania, dopóki program jest uruchomiony, więc hasNext zawsze zwraca true; zobacz odpowiedź Mikhaila na wersję, która działa również z skończonymi strumieniami.

AKTUALIZACJA: Mikhail orzekł, że w rzeczywistości this answer to a prior StackOverflow question ma odpowiedź, która działa dla strumieni skończonych i nieskończonych (chociaż nie wygląda na to, że działałaby prawie tak dobrze jak iterator).

+0

Tworzenie iteratorów wygląda dobrze. Jedyne, co w twojej implementacji maNastępnie zawsze zwraca true - traktuje tylko nieskończone kolekcje, dla wspólnego przypadku kod będzie bardziej złożony. Użyłem aktorów ze standardowej biblioteki Scala, ale wydaje się, że Akka jest tego warta, dzięki. –

+0

Zobacz także http://stackoverflow.com/questions/11132788/split-a-scala-list-into-n-interleaving-lists?lq=1 dla skończonego przypadku. – AmigoNico

+0

Ouch! Funkcja "przesuwania" z krokiem sprawdziła się. Pasuje również do strumieni. Można więc uniknąć pisania niestandardowego iteratora. –

0

Nie znalazłem żadnej takiej funkcji w bibliotece Scala, więc zmodernizowałem wersję iteratora odpowiedzi AmigoNico. Kod traktuje zbiory zarówno skończone, jak i nieskończone.

def splitRoundRobin[A](s: Iterable[A], n: Int) = { 
    def comb[A](s: Iterable[A], first: Int, step: Int): Iterator[A] = new Iterator[A] { 
     val iter = s.iterator 
     var nextElem: Option[A] = iterToNext(first) 
     def iterToNext(elemsToSkip: Int) = { 
     iterToNextRec(None, elemsToSkip) 
     } 
     def iterToNextRec(next: Option[A], repeat: Int): Option[A] = repeat match { 
     case 0 => next 
     case _ => if (iter.hasNext) iterToNextRec(Some(iter.next()), repeat - 1) else None 
     } 
     def hasNext = nextElem.isDefined || { 
     nextElem = iterToNext(step) 
     nextElem.isDefined 
     } 
     def next = { 
     var result = if (nextElem.isDefined) nextElem.get else throw new IllegalStateException("No next") 
     nextElem = None 
     result 
     } 
    } 
    0 until n map (i => comb(s, i, n)) 
    } 

    splitRoundRobin(1 to 12 toStream, 3) map (_.toList.mkString(",")) 
// Vector(3,6,9,12, 1,4,7,10, 2,5,8,11) 

    splitRoundRobin(Stream from 1, 3) map (_.take(4).mkString(",")) 
//> Vector(3,6,9,12, 1,4,7,10, 2,5,8,11) 
0
def roundRobin[T](n: Int, xs: Stream[T]) = { 
    val groups = xs.grouped(n).map(_.toIndexedSeq).toStream 
    (0 until n).map(i => groups.flatMap(_.lift(i))) 
} 

działa w nieskończonej przypadku:

scala> roundRobin(3, Stream.from(0)).map(_.take(3).force.mkString).mkString(" ") 
res6: String = 036 147 258 

użyciu flatMap/lift zamiast zwykłego map/apply Oznacza to działa nawet jeśli wejście jest skończony, a długość nie jest wielokrotnością z:

scala> roundRobin(3, Stream.from(0).take(10)).map(_.mkString).mkString(" ") 
res5: String = 0369 147 258