2016-03-01 10 views
6

Jestem newbie na Spark (Moja wersja to 1.6.0) i teraz staram się rozwiązać problem poniżej:Jak wykonać „Lookup” operacja na dataframes Spark podano wiele warunków

Załóżmy, że istnieje są dwa pliki źródłowe:

  • Pierwszy (w skrócie) jest dużym, który zawiera kolumny o nazwach A1, B1, C1 i inne 80 kolumn. Wewnątrz są zapisy 230K.
  • Drugi (B za krótki) jest małą tablicą przeglądową, która zawiera kolumny o nazwach A2, B2, C2 i D2. Wewnątrz znajduje się 250 rekordów.

Teraz musimy wstawić nową kolumnę do, podane poniżej logika:

  • pierwsze wyszukiwanie A1, B1 i C1 w B (odpowiadające kolumny są A2, B2 i C2), czy udana , zwróć D2 jako wartość nowo dodanej kolumny. Jeśli nic nie znalazłem ...
  • Następnie wyszukaj A1, B1 w B. Jeśli się powiedzie, zwróć D2. Jeśli nic nie znaleziono ...
  • Ustaw domyślną wartość „NA”

już przeczytać w plikach i przekształca je w ramkach danych. W pierwszej sytuacji uzyskałem wynik, lewy zewnętrzny łącząc je razem. Ale nie mogę znaleźć dobrego sposobu na następny krok.

Obecnie próbuję zbudować nową ramkę danych, łącząc A i B, stosując mniej rygorystyczny stan. Jednak nie mam pojęcia, jak zaktualizować bieżącą ramkę danych z drugiej. Czy istnieje jakaś bardziej intuicyjna i skuteczna metoda rozwiązania tego problemu?

Dzięki za wszystkie odpowiedzi.

----------------------------- Aktualizacja na 20160309 -------------- ------------------

Ostatecznie przyjęta odpowiedź @mlk. Ciągle wielkie dzięki @ zero323 za jego/jej świetne komentarze na temat UDF kontra join, generowanie kodu Tungsten to naprawdę kolejny problem, z którym mamy do czynienia. Ale ponieważ musimy zrobić dziesiątki odnośnika i średnia 4 warunki dla każdego odnośnika, byłego rozwiązanie jest bardziej odpowiedni ...

Ostateczne rozwiązanie jest jakoś wygląda jak poniżej fragment kodu:

``` 
import sqlContext.implicits._ 
import com.github.marklister.collections.io._ 

case class TableType(A: String, B: String, C: String, D: String) 
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("...")) 
val lkupD = udf { 
    (aStr: String, bStr: String, cStr: String) => 
    tableBroadcast.value.find { 
     case TableType(a, b, c, _) => 
     (a == aStr && b == bStr && c == cStr) || 
     (a == aStr && b == bStr) 
    }.getOrElse(TableType("", "", "", "NA")).D 
} 
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C")) 
``` 

Odpowiedz

4

Ponieważ B jest małe Myślę, że najlepszym sposobem na zrobienie tego będzie zmienna rozgłaszania i funkcja zdefiniowana przez użytkownika.

// However you get the data... 
case class BType(A2: Int, B2: Int, C2 : Int, D2 : String) 
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200")) 

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER") 


// Broadcast B so all nodes have a copy of it. 
val Bbradcast = sc.broadcast(B) 

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {(a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 } 

// Use the UDF in a select 
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show 
+1

To prawdopodobnie droga. Dostarczyłem również alternatywne rozwiązanie z 'joinami '. – zero323

+0

Dzięki mlk. Jeśli tabela odnośników jest duża (500K * 50), czy nadal dobrze jest ją transmitować? –

+0

I moje inne pytanie brzmi, przypuśćmy, że muszę wykonać 30 odnośników na różnych kolumnach i napisać 50 UDF, czy wpłynie to na wydajność? –

2

Tylko dla odniesienia rozwiązanie bez UDF:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1")) 
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2")) 

// Match A, B and C 
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1") 
// Match A and B mismatch C 
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2") 

val toDrop = b1.columns ++ b2.columns 

toDrop.foldLeft(a 
    .join(b1, expr1, "leftouter") 
    .join(b2, expr2, "leftouter") 
    // If there is match on A, B, C then D_1 should be not NULL 
    // otherwise we fall-back to D_2 
    .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c)) 

Zakłada istnieje co najwyżej jeden mecz w każdej kategorii (wszystkie trzy kolumny, albo dwa pierwsze) lub powielić wiersze wyjścia są pożądany.

UDF vs DOŁĄCZ:

Istnieje wiele czynników do rozważenia i nie ma prostej odpowiedzi tutaj:

Wady:

  • broadcast joins wymagają przekazywania danych dwa razy do węzły robocze. Na razie tabele broadcasted nie są buforowane (SPARK-3863) i jest mało prawdopodobne, aby uległy zmianie w najbliższej przyszłości (rozdzielczość: później).
  • join operacja jest stosowana dwukrotnie, nawet jeśli jest pełna zgodność.

Plusy:

  • join i coalesce są przezroczyste dla optymalizatora podczas UDF nie są.
  • Działając bezpośrednio z wyrażeniami SQL, można korzystać ze wszystkich optymalizacji wolframu, w tym generowania kodu, podczas gdy UDF nie.