Ponieważ twoje pytanie jest raczej niejasne, pomyślmy o ogólnych strategiach, które można wykorzystać do rozwiązania tego problemu.
Standardowym rozwiązaniem jest buforowanie, ale ponieważ wyraźnie chcesz tego uniknąć, zakładam tutaj pewne dodatkowe ograniczenia. Sugeruje to, że pewne podobne rozwiązania, takie
nie przyjęcia. Oznacza to, że musisz znaleźć trochę, aby manipulować samym rurociągiem.
Mimo że wiele transformacji można zgnieść, każda transformacja tworzy nowy RDD. To, w połączeniu z Twoim stwierdzeniem dotyczącym buforowania, określa relatywnie silne ograniczenia dotyczące możliwych rozwiązań.
Zacznijmy od najprostszego możliwego przypadku, w którym wszystkie rurociągi mogą być wyrażone jako jednostopniowe zlecenia. Ogranicza to nasze wybory, aby odwzorowywać jedynie zadania i proste zadania polegające na zmniejszeniu liczby map (takie jak opisane w pytaniu). Rurociągi takie jak ten można łatwo wyrazić jako sekwencję operacji na lokalnych iteratorach. Więc co następuje
import org.apache.spark.util.StatCounter
def isEven(x: Long) = x % 2 == 0
def isOdd(x: Long) = !isEven(x)
def p1(rdd: RDD[Long]) = {
rdd
.filter(isEven _)
.aggregate(StatCounter())(_ merge _, _ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.filter(isOdd _)
.reduce(_ + _)
}
można wyrazić jako:
def p1(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)))
.collect
.reduce(_ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _)))
.collect
.reduce(_ + _)
// identity _
}
W tym momencie możemy przepisać oddzielne zadania w następujący sposób:
def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V) = {
rdd.mapPartitions(iter => {
val items = iter.toList
Iterator((f(items.iterator), g(items.iterator)))
})
}
def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = {
rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2)))
}
def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2))
val rdd = sc.range(0L, 100L)
def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _)
evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity)
Największym problemem jest to, że mamy do skwapliwie oceń każdą partycję, aby móc zastosować poszczególne potoki. Oznacza to, że ogólne wymagania dotyczące pamięci mogą być znacznie wyższe w porównaniu do tej samej logiki stosowanej osobno. Bez buforowania * jest również bezużyteczny w przypadku wieloetapowych zadań.
Alternatywnym rozwiązaniem jest przetwarzanie danych elementem mądry ale traktują każdą pozycję jako krotki seqs:
def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = {
rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) }
}
def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
f: T => Boolean, g: U => Boolean) = {
rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) }
}
def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X)
(s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = {
rdd.mapPartitions(iter => {
var accT = zt
var accU = zu
iter.foreach { case (ts, us) => {
accT = ts.foldLeft(accT)(s1)
accU = us.foldLeft(accU)(s2)
}}
Iterator((accT, accU))
}).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) }
}
Z API jak to możemy wyrazić wstępne rurociągi jak:
val rddSeq = rdd.map(x => (Seq(x), Seq(x)))
aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
_ merge _, _ + _, _ merge _, _ + _
)
ten podejście jest nieco silniejsze niż poprzednie (w razie potrzeby można łatwo wdrożyć niektóre podzestawy metod byKey
), a wymagania dotyczące pamięci w typowych potokach powinny być porównywalne do podstawowego interfejsu API, ale jest również znacznie bardziej uciążliwe.
* Można sprawdzić an answer dostarczone przez eje przykłady multipleksowania.
Czy ten rurociąg jest odczytywany wewnątrz pojedynczego zadania Spark? Lub dwie oddzielne oferty pracy? –
Ta sama praca (SparkContext) – IttayD
Czy korzystanie z [IgniteRDD] (https://ignite.apache.org/features/igniterdd.html) działa dla ciebie? Można załadować dane do współużytkowanego RDD, a następnie obrobić oba potoki. – heenenee