2014-07-21 10 views
11

Planujemy przenieść kod świnki Apache do nowej platformy Spark.Jak zaimplementować "Łączenie krzyżowe" w Spark?

Świnia ma koncepcję "Bag/Tuple/Field" i zachowuje się podobnie do relacyjnej bazy danych. Pig zapewnia obsługę połączeń CROSS/INNER/OUTER.

Dla CROSS JOIN, możemy użyć alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

Ale jak przejść do platformy Spark nie mogłem znaleźć żadnego odpowiednika w API Spark. Masz jakiś pomysł?

+0

To nie jest jeszcze gotowy, ale Spork (wieprzowych na iskry) jest budowany obecnie, więc nie musisz zmieniać żadnego kodu – aaronman

Odpowiedz

18

To jest oneRDD.cartesian(anotherRDD).

+0

Dzięki, cartesian join to pseudonim cross join –

2

Oto zalecana wersja iskry zbiorów danych 2.x oraz DataFrames:

scala> val ds1 = spark.range(10) 
ds1: org.apache.spark.sql.Dataset[Long] = [id: bigint] 

scala> ds1.cache.count 
res1: Long = 10 

scala> val ds2 = spark.range(10) 
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint] 

scala> ds2.cache.count 
res2: Long = 10 

scala> val crossDS1DS2 = ds1.crossJoin(ds2) 
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint] 

scala> crossDS1DS2.count 
res3: Long = 100 

Alternatywnie możliwe jest użycie składni tradycyjnej przyłączyć się nie przyłączyć warunek. Użyj tej opcji konfiguracji, aby uniknąć następującego błędu.

spark.conf.set("spark.sql.crossJoin.enabled", true) 

Błąd przy tej konfiguracji jest pominięty (za pomocą "join" składni specjalnie):

scala> val crossDS1DS2 = ds1.join(ds2) 
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint] 

scala> crossDS1DS2.count 
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans 
... 
Join condition is missing or trivial. 
Use the CROSS JOIN syntax to allow cartesian products between these relations.; 

Powiązane: spark.sql.crossJoin.enabled for Spark 2.x