2016-05-04 26 views
9

Jestem uczniem Apache Spark i natknąłem się na RDD akcję aggregate, której nie mam pojęcia, jak to działa. Może ktoś przeliterować i szczegółowo wyjaśnić krok po kroku w jaki sposób dochodzimy do poniżej wyniku za kod tutajRDD Agregacja w iskrze

RDD input = {1,2,3,3} 

RDD Aggregate function : 

rdd.aggregate((0, 0)) 
((x, y) => 
(x._1 + y, x._2 + 1), 
(x, y) => 
(x._1 + y._1, x._2 + y._2)) 

output : {9,4} 

Dzięki

Odpowiedz

18

Jeżeli nie jesteś pewien, co się dzieje, to najlepiej do naśladowania typy. Pomijając niejawny ClassTag dla zwięzłości zaczniemy coś takiego

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

Jeśli ignorować wszelkie dodatkowe parametry zobaczysz, że aggregate jest funkcją, która odwzorowuje od RDD[T] do U. Oznacza to, że typ wartości na wejściu RDD nie musi być taki sam jak typ wartości wyjściowej. Tak to jest wyraźnie inny niż na przykład reduce:

def reduce(func: (T, T) ⇒ T): T 

lub fold:

def fold(zeroValue: T)(op: (T, T) => T): T 

samo jak fold, aggregate wymaga zeroValue. Jak wybrać? Powinien to być element tożsamości (neutralny) w odniesieniu do combOp.

Trzeba też zapewnić dwie funkcje:

  • seqOp który odwzorowuje od (U, T) do U
  • combOp odwzorowującej z (U, U) do U

tylko w oparciu o ten podpisów należy już zobaczyć że tylko seqOp może uzyskać dostęp do nieprzetworzonych danych. Pobiera pewną wartość typu U kolejną z typu T i zwraca wartość typu U. W Twoim przypadku jest to funkcja z następującym podpisem

((Int, Int), Int) => (Int, Int) 

w tym momencie prawdopodobnie podejrzewam, że jest używany do jakiejś składanej jak działania.

Druga funkcja przyjmuje dwa argumenty typu U i zwraca wartość typu U. Jak wspomniano wcześniej, powinno być jasne, że nie dotyka on oryginalnych danych i może działać tylko na wartościach już przetworzonych przez seqOp. W twoim przypadku ta funkcja ma podpis w następujący sposób:

((Int, Int), (Int, Int)) => (Int, Int) 

Jak możemy to wszystko zebrać razem?

  1. Najpierw każda strefa jest łączone przy użyciu standardowych Iterator.aggregate z zeroValue, seqOp i combOp przekazany jako z, seqop i combop respectivelly.Od InterruptibleIterator stosowany wewnętrznie nie zastępują aggregate powinien być wykonany jako prosty foldLeft(zeroValue)(seqOp)

  2. Następny wyniki częściowe zebrane z każdej partycji są agregowane przy użyciu combOp

Załóżmy, że wejście RDD posiada trzy partycje z następujących rozkład wartości:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

Można oczekiwać, że wykonanie, ignorując bezwzględny rozkaz, będzie równoznaczne z mniej więcej tak:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1) 
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2) 

Seq(Iterator(1, 2), Iterator(3, 3), Iterator()) 
    .map(_.foldLeft((0, 0))(seqOp)) 
    .reduce(combOp) 

foldLeft na jednej partycji może wyglądać następująco:

Iterator(1, 2).foldLeft((0, 0))(seqOp) 
Iterator(2).foldLeft((1, 1))(seqOp) 
(3, 2) 

i we wszystkich partycjach

Seq((3,2), (6,2), (0,0)) 

co w połączeniu daje zaobserwowanych wyników:

(3 + 6 + 0, 2 + 2 + 0) 
(9, 4) 

Na ogół jest to powszechne sposoby można znaleźć w całym Spark gdzie przechodzą wartości neutralne, funkcja wykorzystywane do przetwarzania wartości w każdej strefie i funkcją służy do scalania częściowych agregacji z różnych partycji. Niektóre inne przykłady obejmują:

  • aggregateByKey
  • User Defined agregujące Funkcje
  • Aggregators na Spark Datasets.
1

Oto moje zrozumienie dla odniesienia:

Wyobraź sobie, że dwa węzły, brać wejście z dwóch pierwszych elementów listy {1,2}, a drugi bierze {3, 3}. (Partycja jest tu tylko na wygodne)

w pierwszym węźle: "(x, y) => (x._1 x._2 + r, + 1)" pierwszy x oznacza (0 , 0) jak podano, a y jest twoim pierwszym elementem 1, a będziesz miał wyjście (0 + 1, 0 + 1), następnie pojawi się twój drugi element y = 2 i wyjście (1 + 2, 1 + 1), który jest (3, 2)

W drugim węźle ta sama procedura dzieje się równolegle, a będziesz miał (6, 2).

"(x, y) => (x._1 + y._1, x._2 + y._2)", mówi, aby połączyć dwa węzły, a otrzymasz (9,4)


jedno uwagę zasługuje (0,0) jest faktycznie dodana do wyniku długości (RDD) +1 razy.

"scala> rdd.aggregate ((1,1)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x. _1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9) "