2015-04-25 11 views
9

wpadłem this line w źródle Apache kodu Sparkjak interpretować RDD.treeAggregate

val (gradientSum, lossSum, miniBatchSize) = data 
    .sample(false, miniBatchFraction, 42 + i) 
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
     seqOp = (c, v) => { 
     // c: (grad, loss, count), v: (label, features) 
     val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) 
     (c._1, c._2 + l, c._3 + 1) 
     }, 
     combOp = (c1, c2) => { 
     // c: (grad, loss, count) 
     (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) 
     } 
    ) 

mam wiele kłopotów przeczytaniu tego:

  • pierwsze nie mogę znaleźć nic w internecie, że dokładnie wyjaśnia, jak działa treeAggregate, jakie są znaczenia tych paramerów.
  • Po drugie, tutaj .treeAggregate wydaje się mieć dwa()() następujące po nazwie metody. Co to może znaczyć? Czy to jest jakaś specjalna składnia scala, której nie rozumiem.
  • W końcu widzę, że zarówno seqOp, jak i comboOp zwracają krotkę 3 elementową, która pasuje do oczekiwanej zmiennej po lewej stronie, ale która właściwie jest zwracana?

To oświadczenie musi być naprawdę zaawansowane. Nie mogę zacząć tego odcyfrowywać.

Odpowiedz

13

treeAggregate to wyspecjalizowana implementacja aggregate, która iteracyjnie stosuje funkcję łączenia do podzbioru partycji. Odbywa się to w celu uniemożliwienia zwracania wszystkich częściowych wyników do sterownika, w przypadku których redukcja pojedynczego przejścia miałaby miejsce w przypadku klasycznej wersji aggregate.

Dla wszystkich celów praktycznych, treeAggregate jest zgodna z tą samą zasadą co aggregate wyjaśniona w tej odpowiedzi: Explain the aggregate functionality in Python z tym wyjątkiem, że pobiera dodatkowy parametr wskazujący głębokość poziomu częściowej agregacji.

Pozwól mi spróbować wyjaśnić, co dzieje się tutaj w szczególności:

Na kruszywo, musimy zero, funkcję sumatora oraz funkcję redukcji. aggregate używa currying do określenia wartości zerowej niezależnie od funkcji łączenia i zmniejszania.

Możemy następnie przeanalizować powyższą funkcję w ten sposób. Miejmy nadzieję, że pomaga zrozumienie:

val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L) 
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long) = (c, v) => { 
     // c: (grad, loss, count), v: (label, features) 
     val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) 
     (c._1, c._2 + l, c._3 + 1) 
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => { 
     // c: (grad, loss, count) 
     (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) 
     } 

Wtedy możemy przepisać wezwanie do treeAggregate bardziej trawieniu postaci:

val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction) 

tego formularza „Wyciąg” powstały krotka do wymienionych wartości gradientSum, lossSum, miniBatchSize do dalszego użytkowania .

Należy zauważyć, że pobiera dodatkowy parametr depth, który jest zadeklarowany z wartością domyślną depth = 2, dlatego też, ponieważ nie jest podany w tym konkretnym wywołaniu, przyjmuje tę wartość domyślną.

Powiązane problemy