Jeżeli nie jesteś pewien, co się dzieje, to najlepiej do naśladowania typy. Pomijając niejawny ClassTag
dla zwięzłości zaczniemy coś takiego
abstract class RDD[T] extends Serializable with Logging
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
Jeśli ignorować wszelkie dodatkowe parametry zobaczysz, że aggregate
jest funkcją, która odwzorowuje od RDD[T]
do U
. Oznacza to, że typ wartości na wejściu RDD
nie musi być taki sam jak typ wartości wyjściowej. Tak to jest wyraźnie inny niż na przykład reduce
:
def reduce(func: (T, T) ⇒ T): T
lub fold
:
def fold(zeroValue: T)(op: (T, T) => T): T
samo jak fold
, aggregate
wymaga zeroValue
. Jak wybrać? Powinien to być element tożsamości (neutralny) w odniesieniu do combOp
.
Trzeba też zapewnić dwie funkcje:
seqOp
który odwzorowuje od (U, T)
do U
combOp
odwzorowującej z (U, U)
do U
tylko w oparciu o ten podpisów należy już zobaczyć że tylko seqOp
może uzyskać dostęp do nieprzetworzonych danych. Pobiera pewną wartość typu U
kolejną z typu T
i zwraca wartość typu U
. W Twoim przypadku jest to funkcja z następującym podpisem
((Int, Int), Int) => (Int, Int)
w tym momencie prawdopodobnie podejrzewam, że jest używany do jakiejś składanej jak działania.
Druga funkcja przyjmuje dwa argumenty typu U
i zwraca wartość typu U
. Jak wspomniano wcześniej, powinno być jasne, że nie dotyka on oryginalnych danych i może działać tylko na wartościach już przetworzonych przez seqOp
. W twoim przypadku ta funkcja ma podpis w następujący sposób:
((Int, Int), (Int, Int)) => (Int, Int)
Jak możemy to wszystko zebrać razem?
Najpierw każda strefa jest łączone przy użyciu standardowych Iterator.aggregate
z zeroValue
, seqOp
i combOp
przekazany jako z
, seqop
i combop
respectivelly.Od InterruptibleIterator
stosowany wewnętrznie nie zastępują aggregate
powinien być wykonany jako prosty foldLeft(zeroValue)(seqOp)
Następny wyniki częściowe zebrane z każdej partycji są agregowane przy użyciu combOp
Załóżmy, że wejście RDD posiada trzy partycje z następujących rozkład wartości:
Iterator(1, 2)
Iterator(2, 3)
Iterator()
Można oczekiwać, że wykonanie, ignorując bezwzględny rozkaz, będzie równoznaczne z mniej więcej tak:
val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
.map(_.foldLeft((0, 0))(seqOp))
.reduce(combOp)
foldLeft
na jednej partycji może wyglądać następująco:
Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
i we wszystkich partycjach
Seq((3,2), (6,2), (0,0))
co w połączeniu daje zaobserwowanych wyników:
(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
Na ogół jest to powszechne sposoby można znaleźć w całym Spark gdzie przechodzą wartości neutralne, funkcja wykorzystywane do przetwarzania wartości w każdej strefie i funkcją służy do scalania częściowych agregacji z różnych partycji. Niektóre inne przykłady obejmują:
aggregateByKey
- User Defined agregujące Funkcje
Aggregators
na Spark Datasets
.