2013-06-30 11 views
6

Czy istnieje prosty sposób wykorzystania równoległych kolekcji scala bez załadowania pełnej kolekcji do pamięci?Równoległe przetwarzanie danych większych niż rozmiar pamięci

Na przykład mam dużą kolekcję i chciałbym wykonać określoną operację (zwijanie) równolegle tylko na małej porcji, która mieści się w pamięci, niż na innym kawałku itd., A na końcu rekombinować wyniki z wszystkie kawałki.

Wiem, że aktorzy mogą być wykorzystywani, ale byłoby naprawdę miło korzystać z par-kolekcji.

Pisałem rozwiązanie, ale nie jest to miłe:

def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = { 
    new Iterator[Iterable[A]] { 
     var rest = list 
     def hasNext = !rest.isEmpty 
     def next = { 
     val chunk = rest.take(chunkSize) 
     rest = rest.drop(chunkSize) 
     chunk 
     } 
    }.toIterable 
    }            

    def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = { 
    val chunks: Iterable[Iterable[A]] = split(list, chunkSize) 
    def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) } 
    chunks.foldLeft(acc)(combineChunk) 
    }            

    val chunkSize = 10000000       
    val x = 1 to chunkSize*10     

    def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n } 

    foldPar(0)(x,chunkSize,sum) 
+1

Powiedziałbym, że poprawny model obliczeniowy tutaj będzie * mapa zredukować * (a więc może to być [Spark] (http://spark-project.org/examples/)), a nie aktorzy per se. –

+0

Formalnie - tak, ale czas przetwarzania nie jest rozsądny w tym przypadku, więc można całkowicie uruchomić na jednej maszynie. –

Odpowiedz

4

Twój pomysł jest bardzo schludny i szkoda nie ma takiej funkcji już dostępne (AFAIK).

Właśnie przeobraziłem twój pomysł w nieco krótszy kod. Po pierwsze, uważam, że przy równoległym składaniu przydatne jest użycie pojęcia monoid - jest to struktura z operacją asocjacyjną i elementem zerowym. Łączność jest ważna, ponieważ nie znamy kolejności, w której łączymy wyniki, które są obliczane równolegle. I element zerowy jest ważny, abyśmy mogli podzielić obliczenia na bloki i zacząć składać je od zera. Nie ma w tym nic nowego, jest tylko to, czego oczekuje się od kolekcji Scali.

// The function defined by Monoid's apply must be associative 
// and zero its identity element. 
trait Monoid[A] 
    extends Function2[A,A,A] 
{ 
    val zero: A 
} 

Następnie Scala Iterator s już przydatny sposób grouped(Int): GroupedIterator[Seq[A]] który tnie iteracyjnej w sekwencji o stałej wielkości. Jest bardzo podobny do twojego split. To pozwala nam odciąć zasilanie na bloki o stałym rozmiarze, a następnie zastosować równoległe metody gromadzenia Scala na nich:

def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A = 
    c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid)) 
         .fold(monoid.zero)(monoid); 

falcuje każdy blok przy użyciu ramy kolekcje równoległe, a następnie (bez jakiejkolwiek parallelization) połączyć wyniki pośrednie.

Przykład:

// Example: 
object SumMonoid extends Monoid[Long] { 
    override val zero: Long = 0; 
    override def apply(x: Long, y: Long) = x + y; 
} 
val it = Iterator.range(1, 10000001).map(_.toLong) 
println(parFold(it, 100000)(SumMonoid)); 
+0

Przyjemne użycie monoidów, nigdy tego nie wiadomo. Odnośnie zgrupowanej metody miałem wątpliwości, że może załadować całe rzeczy do pamięci, ale to znaczy, że tak nie jest. –

+0

Przetestuję twoje rozwiązanie nieco później, ale wygląda na to, że powinno działać i jest dużo bardziej zwięzłe. Wielkie dzięki! –

+0

@MikhailGolubtsov Proszę dać mi znać, jak idzie testowanie, jestem też ciekawy. Zrobiłem sobie tylko kilka podstawowych testów. –

Powiązane problemy