2015-02-09 10 views
7

Zaimplementowałem rozwiązanie do grupy RDD[K, V] za pomocą klucza i obliczyć dane według każdej grupy (K, RDD[V]), używając partitionBy i Partitioner. Niemniej jednak nie jestem pewien, czy jest to naprawdę skuteczne i chciałbym mieć twój punkt widzenia.Używanie PartitionBy do dzielenia i wydajnego obliczania grup RDD według klucza

Oto przykładowy przypadek: zgodnie z listą [K: Int, V: Int], obliczyć V s oznacza dla każdej grupy K, wiedząc, że powinien on być rozpowszechniane i że V wartości mogą być bardzo duże. To powinno dać:

List[K, V] => (K, mean(V)) 

Prosta klasa partycjonowania:

class MyPartitioner(maxKey: Int) extends Partitioner { 

    def numPartitions = maxKey 

    def getPartition(key: Any): Int = key match { 
     case i: Int if i < maxKey => i 
    } 
    } 

Kod partycji:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) 

     val rdd = sc.parallelize(l) 
     val p = rdd.partitionBy(new MyPartitioner(4)).cache() 

     p.foreachPartition(x => { 
     try { 
      val r = sc.parallelize(x.toList) 
      val id = r.first() //get the K partition id 
      val v = r.map(x => x._2) 
      println(id._1 + "->" + mean(v)) 
     } catch { 
      case e: UnsupportedOperationException => 0 
     } 
     }) 

Wyjście jest:

1->13, 2->4, 3->7

Moje pytania to:

  1. co tak naprawdę się dzieje, dzwoniąc pod numer partitionBy? (przepraszam, nie znalazłem wystarczającej liczby specyfikacji)
  2. Czy to naprawdę wydajne, aby mapować według partycji, wiedząc, że w moim przypadku produkcji nie byłoby zbyt dużo kluczy (jak 50 dla próbki) przez bardzo wiele wartości (jak 1 milion na próbkę)
  3. Jaki jest koszt paralellize(x.toList)? Czy to jest zgodne z tym? (Potrzebuję RDD na wejściu mean())
  4. Jak zrobiłbyś to sam?

Pozdrowienia

Odpowiedz

4

Twój kod nie powinien działać. Nie można przekazać obiektu SparkContext do executorów. (To nie jest Serializable.) Również nie rozumiem, dlaczego musiałbyś.

Aby obliczyć średnią, należy obliczyć sumę i liczbę oraz przyjąć ich stosunek. Domyślny program partycjonujący zrobi wszystko dobrze.

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { 
    case class SumCount(sum: Double, count: Double) 
    val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), 
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) 
    sumCounts.map(sc => sc.sum/sc.count) 
} 

Jest to wydajne, jednoprzebiegowe obliczenie, które dobrze się generalizuje.

+0

dziękuję za odpowiedź, oczywiście nie może działać, nie mam całego odruchu sztuczek kodu iskrowego i zostałem rozpieszczony przez mój lokalny jvm. Niemniej jednak, w rzeczywistości nie muszę obliczać średniej, ale złożonej metody ml i potrzebuję RDD [Vector]. Jak mogę uzyskać listę (klucz, RDD [Vector]) z unikalnego RDD [Int, Int]? Nie znalazłem rozwiązania. – Seb

+0

Myślę, że jest to podobny temat: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 Nie jestem pewien, jak chcesz zrobić 'Vector's z 'Int's. Ale jeśli chcesz uzyskać jeden RDD na klucz, musisz podzielić oryginalny RDD, a to jest omówione w połączonej odpowiedzi. Jeśli nie da ci odpowiedzi, proponuję zadać kolejne pytanie, być może z jasnym wyjaśnieniem na wysokim poziomie, co chcesz zrobić. –

Powiązane problemy