2013-07-12 15 views
45

mam dwa RDD jest tak, że chcesz dołączyć i wyglądają tak:Spark: jaka jest najlepsza strategia dołączania do 2-kluczowego RDD z jednym kluczem RDD?

val rdd1:RDD[(T,U)] 
val rdd2:RDD[((T,W), V)] 

Zdarza się tak, że kluczowymi wartościami rdd1 są unikalne, a także, że wartości krotkowe kluczu rdd2 są wyjątkowe . Chciałbym przyłączyć się do dwóch zestawów danych tak, aby uzyskać następujące RDD:

val rdd_joined:RDD[((T,W), (U,V))] 

Co jest najbardziej skutecznym sposobem osiągnięcia tego celu? Oto kilka pomysłów, o których myślałem.

Wariant 1:

val m = rdd1.collectAsMap 
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))}) 

Opcja 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct 
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2) 

Wariant 1 będzie zbierać wszystkie dane do pana, prawda? Tak więc nie wydaje się to dobrym rozwiązaniem, jeśli rdd1 jest duży (w moim przypadku jest stosunkowo duży, chociaż o rząd wielkości mniejszy niż rdd2). Opcja 2 to brzydki, wyraźny i kartezjański produkt, który również wydaje się bardzo nieefektywny. Inną możliwością, która przyszła mi do głowy (ale jeszcze nie próbowałem) jest wykonanie opcji 1 i nadanie mapy, chociaż lepiej byłoby nadawać w "inteligentny" sposób, aby klucze mapy znajdowały się w pobliżu klucze od rdd2.

Czy ktoś wcześniej spotkał się z taką sytuacją? Byłbym szczęśliwy, gdybym miał twoje myśli.

Dzięki!

Odpowiedz

56

Jedną opcją jest wykonanie połączenia broadcast, poprzez pobranie rdd1 do sterownika i nadanie go wszystkim twórcom map; wykonano prawidłowo, to niech nam uniknąć kosztownych Shuffle dużej rdd2 RDD:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) 
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) 

val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) 
val joined = rdd2.mapPartitions({ iter => 
    val m = rdd1Broadcast.value 
    for { 
    ((t, w), u) <- iter 
    if m.contains(t) 
    } yield ((t, w), (u, m.get(t).get)) 
}, preservesPartitioning = true) 

preservesPartitioning = true mówi Spark, że funkcja ta mapa nie zmienia klucze rdd2; pozwoli to Sparkowi uniknąć ponownej partycjonowania rdd2 dla wszystkich kolejnych operacji, które łączą się w oparciu o klucz (t, w).

Ta transmisja może być nieskuteczna, ponieważ wiąże się z wąskim gardłem komunikacyjnym u kierowcy. Zasadniczo możliwe jest nadawanie jednego RDD innemu bez udziału kierowcy; Mam prototyp tego, co chciałbym uogólnić i dodać do Sparka.

Inną opcją jest ponowne odwzorowanie klawiszy rdd2 i użycie metody Spark join; będzie to wymagało pełnego Shuffle rdd2 (i ewentualnie rdd1):

rdd1.join(rdd2.map { 
    case ((t, w), u) => (t, (w, u)) 
}).map { 
    case (t, (v, (w, u))) => ((t, w), (u, v)) 
}.collect() 

Na moim wejściu próbki, zarówno z tych sposobów uzyskania takiego samego rezultatu:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C))) 

Trzecią opcją byłoby restrukturyzacji rdd2 więc kluczem jest t, a następnie wykonaj powyższe sprzężenie.

+1

Myślę, że jesteś drugą opcją, to najprawdopodobniej najłatwiejszy sposób, chociaż restrukturyzacja rdd2 byłaby wygodna. – Noah

+0

Będę musiał dowiedzieć się więcej o działaniu funkcji mapPartitions, ale wygląda na to, czego szukałem. Zgadzam się również, że mogę zmienić strukturę "rdd2" i poprzez serię map powrócić do pierwotnej rzeczy, której pragnąłem. Przyjrzę się obu opcjom i zobaczę, jak dobrze one działają w moim przypadku użycia. Dzięki za sugestie! – RyanH

+0

Dla pierwszej opcji, gdy próbuję val rdd1Broadcast = sc.broadcast (rdd1.collectAsMap()) zwraca dane niekompletne. Czy istnieje sposób dostosowania pierwszej opcji za pomocą metody collect() zamiast metody collecAsMap()? –

12

Innym sposobem na to jest utworzenie niestandardowego programu do partycjonowania, a następnie użycie zipPartitions do przyłączenia się do RDD.

import org.apache.spark.HashPartitioner 

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { 

    override def getPartition(key: Any): Int = key match { 
    case k: Tuple2[Int, String] => super.getPartition(k._1) 
    case _ => super.getPartition(key) 
    } 

} 

val numSplits = 8 
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits)) 
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits)) 

val result = rdd2.zipPartitions(rdd1)(
    (iter2, iter1) => { 
    val m = iter1.toMap 
    for { 
     ((t: Int, w), u) <- iter2 
     if m.contains(t) 
     } yield ((t, w), (u, m.get(t).get)) 
    } 
).partitionBy(new HashPartitioner(numSplits)) 

result.glom.collect 
Powiązane problemy