Jestem newbie na Spark (Moja wersja to 1.6.0) i teraz staram się rozwiązać problem poniżej:Jak wykonać „Lookup” operacja na dataframes Spark podano wiele warunków
Załóżmy, że istnieje są dwa pliki źródłowe:
- Pierwszy (w skrócie) jest dużym, który zawiera kolumny o nazwach A1, B1, C1 i inne 80 kolumn. Wewnątrz są zapisy 230K.
- Drugi (B za krótki) jest małą tablicą przeglądową, która zawiera kolumny o nazwach A2, B2, C2 i D2. Wewnątrz znajduje się 250 rekordów.
Teraz musimy wstawić nową kolumnę do, podane poniżej logika:
- pierwsze wyszukiwanie A1, B1 i C1 w B (odpowiadające kolumny są A2, B2 i C2), czy udana , zwróć D2 jako wartość nowo dodanej kolumny. Jeśli nic nie znalazłem ...
- Następnie wyszukaj A1, B1 w B. Jeśli się powiedzie, zwróć D2. Jeśli nic nie znaleziono ...
- Ustaw domyślną wartość „NA”
już przeczytać w plikach i przekształca je w ramkach danych. W pierwszej sytuacji uzyskałem wynik, lewy zewnętrzny łącząc je razem. Ale nie mogę znaleźć dobrego sposobu na następny krok.
Obecnie próbuję zbudować nową ramkę danych, łącząc A i B, stosując mniej rygorystyczny stan. Jednak nie mam pojęcia, jak zaktualizować bieżącą ramkę danych z drugiej. Czy istnieje jakaś bardziej intuicyjna i skuteczna metoda rozwiązania tego problemu?
Dzięki za wszystkie odpowiedzi.
----------------------------- Aktualizacja na 20160309 -------------- ------------------
Ostatecznie przyjęta odpowiedź @mlk. Ciągle wielkie dzięki @ zero323 za jego/jej świetne komentarze na temat UDF kontra join, generowanie kodu Tungsten to naprawdę kolejny problem, z którym mamy do czynienia. Ale ponieważ musimy zrobić dziesiątki odnośnika i średnia 4 warunki dla każdego odnośnika, byłego rozwiązanie jest bardziej odpowiedni ...
Ostateczne rozwiązanie jest jakoś wygląda jak poniżej fragment kodu:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
(aStr: String, bStr: String, cStr: String) =>
tableBroadcast.value.find {
case TableType(a, b, c, _) =>
(a == aStr && b == bStr && c == cStr) ||
(a == aStr && b == bStr)
}.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
To prawdopodobnie droga. Dostarczyłem również alternatywne rozwiązanie z 'joinami '. – zero323
Dzięki mlk. Jeśli tabela odnośników jest duża (500K * 50), czy nadal dobrze jest ją transmitować? –
I moje inne pytanie brzmi, przypuśćmy, że muszę wykonać 30 odnośników na różnych kolumnach i napisać 50 UDF, czy wpłynie to na wydajność? –