mam dwa RDD jest tak, że chcesz dołączyć i wyglądają tak:Spark: jaka jest najlepsza strategia dołączania do 2-kluczowego RDD z jednym kluczem RDD?
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Zdarza się tak, że kluczowymi wartościami rdd1
są unikalne, a także, że wartości krotkowe kluczu rdd2
są wyjątkowe . Chciałbym przyłączyć się do dwóch zestawów danych tak, aby uzyskać następujące RDD:
val rdd_joined:RDD[((T,W), (U,V))]
Co jest najbardziej skutecznym sposobem osiągnięcia tego celu? Oto kilka pomysłów, o których myślałem.
Wariant 1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Opcja 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Wariant 1 będzie zbierać wszystkie dane do pana, prawda? Tak więc nie wydaje się to dobrym rozwiązaniem, jeśli rdd1 jest duży (w moim przypadku jest stosunkowo duży, chociaż o rząd wielkości mniejszy niż rdd2). Opcja 2 to brzydki, wyraźny i kartezjański produkt, który również wydaje się bardzo nieefektywny. Inną możliwością, która przyszła mi do głowy (ale jeszcze nie próbowałem) jest wykonanie opcji 1 i nadanie mapy, chociaż lepiej byłoby nadawać w "inteligentny" sposób, aby klucze mapy znajdowały się w pobliżu klucze od rdd2
.
Czy ktoś wcześniej spotkał się z taką sytuacją? Byłbym szczęśliwy, gdybym miał twoje myśli.
Dzięki!
Myślę, że jesteś drugą opcją, to najprawdopodobniej najłatwiejszy sposób, chociaż restrukturyzacja rdd2 byłaby wygodna. – Noah
Będę musiał dowiedzieć się więcej o działaniu funkcji mapPartitions, ale wygląda na to, czego szukałem. Zgadzam się również, że mogę zmienić strukturę "rdd2" i poprzez serię map powrócić do pierwotnej rzeczy, której pragnąłem. Przyjrzę się obu opcjom i zobaczę, jak dobrze one działają w moim przypadku użycia. Dzięki za sugestie! – RyanH
Dla pierwszej opcji, gdy próbuję val rdd1Broadcast = sc.broadcast (rdd1.collectAsMap()) zwraca dane niekompletne. Czy istnieje sposób dostosowania pierwszej opcji za pomocą metody collect() zamiast metody collecAsMap()? –