Zaimplementowałem rozwiązanie do grupy RDD[K, V]
za pomocą klucza i obliczyć dane według każdej grupy (K, RDD[V])
, używając partitionBy
i Partitioner
. Niemniej jednak nie jestem pewien, czy jest to naprawdę skuteczne i chciałbym mieć twój punkt widzenia.Używanie PartitionBy do dzielenia i wydajnego obliczania grup RDD według klucza
Oto przykładowy przypadek: zgodnie z listą [K: Int, V: Int]
, obliczyć V
s oznacza dla każdej grupy K
, wiedząc, że powinien on być rozpowszechniane i że V
wartości mogą być bardzo duże. To powinno dać:
List[K, V] => (K, mean(V))
Prosta klasa partycjonowania:
class MyPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
Kod partycji:
val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))
val rdd = sc.parallelize(l)
val p = rdd.partitionBy(new MyPartitioner(4)).cache()
p.foreachPartition(x => {
try {
val r = sc.parallelize(x.toList)
val id = r.first() //get the K partition id
val v = r.map(x => x._2)
println(id._1 + "->" + mean(v))
} catch {
case e: UnsupportedOperationException => 0
}
})
Wyjście jest:
1->13, 2->4, 3->7
Moje pytania to:
- co tak naprawdę się dzieje, dzwoniąc pod numer
partitionBy
? (przepraszam, nie znalazłem wystarczającej liczby specyfikacji) - Czy to naprawdę wydajne, aby mapować według partycji, wiedząc, że w moim przypadku produkcji nie byłoby zbyt dużo kluczy (jak 50 dla próbki) przez bardzo wiele wartości (jak 1 milion na próbkę)
- Jaki jest koszt
paralellize(x.toList)
? Czy to jest zgodne z tym? (PotrzebujęRDD
na wejściumean()
) - Jak zrobiłbyś to sam?
Pozdrowienia
dziękuję za odpowiedź, oczywiście nie może działać, nie mam całego odruchu sztuczek kodu iskrowego i zostałem rozpieszczony przez mój lokalny jvm. Niemniej jednak, w rzeczywistości nie muszę obliczać średniej, ale złożonej metody ml i potrzebuję RDD [Vector]. Jak mogę uzyskać listę (klucz, RDD [Vector]) z unikalnego RDD [Int, Int]? Nie znalazłem rozwiązania. – Seb
Myślę, że jest to podobny temat: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 Nie jestem pewien, jak chcesz zrobić 'Vector's z 'Int's. Ale jeśli chcesz uzyskać jeden RDD na klucz, musisz podzielić oryginalny RDD, a to jest omówione w połączonej odpowiedzi. Jeśli nie da ci odpowiedzi, proponuję zadać kolejne pytanie, być może z jasnym wyjaśnieniem na wysokim poziomie, co chcesz zrobić. –