2014-10-06 13 views
12

Chciałbym zatrzymać tylko tych pracowników, którzy mają identyfikator oddziału wymieniony w drugiej tabeli.Filtr oparty na innym RDD w Spark

Employee table 
LastName DepartmentID 
Rafferty 31 
Jones 33 
Heisenberg 33 
Robinson 34 
Smith 34 

Department table 
DepartmentID 
31 
33 

Próbowałem następujący kod, który nie działa:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
employee.filter(lambda e: e[1] in department).collect() 

Py4JError: An error occurred while calling o344.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 

pomysłów? Używam Spark 1.1.0 z Python. Jednak zaakceptowałbym odpowiedź Scala lub Pythona.

+0

Czy wymagają listę dział być RDD? – maasg

+0

Niezupełnie. Lista departamentów jest ładowana z HDFS, ale nie jest zbyt duża. – poiuytrez

Odpowiedz

19

w tym przypadku, co chcesz osiągnąć jest filtrowanie na każdej partycji z danymi zawartymi w tabeli dział: byłoby to podstawowe rozwiązanie:

val dept = deptRdd.collect.toSet 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)} 

Jeśli dane dział jest duża, zmienna emisja zwiększy wydajność poprzez dostarczanie danych raz do wszystkich węzłów zamiast szeregować je z każdego zadania

val deptBC = sc.broadcast(deptRdd.collect.toSet) 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)} 

Mimo że użycie sprzężenia zadziała, jest to bardzo kosztowne rozwiązanie, ponieważ wymaga rozproszonego przetasowania danych (byKey), aby uzyskać połączenie. Biorąc pod uwagę, że wymóg jest prosty filtr, wysyłanie danych do każdej partycji (jak pokazano powyżej) zapewni znacznie lepszą wydajność.

+0

wybacz mi, jeśli się mylę, ale czy partycjaBy() nie rozwiąże losowania rozdzielonego według klucza? Nie mówienie, że to rozwiąże problem łączenia jest droższe, ponieważ nie sądzę, by to było, po prostu stwierdziłem, że join nie wymaga shuffle w 100% przypadków. – TurnipEntropy

10

W końcu zaimplementowałem rozwiązanie przy użyciu sprzężenia. Musiałem dodać wartość 0 do działu w celu uniknięcia wyjątek od Spark:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
# invert id and name to get id as the key 
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0])) 
# add a 0 value to avoid an exception 
department = sc.parallelize(department).map(lambda d: (d,0)) 

employee.join(department).map(lambda e: (e[1][0], e[0])).collect() 

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)] 
0

Filtrowanie wiele wartości w wielu kolumnach:

W przypadku, gdy jesteś ciągnięcie danych z bazy danych (Hive lub SQL dB dla tego przykładu) i trzeba filtrować na wielu kolumnach, to może po prostu być łatwiej załadować tabelę z pierwszego filtra, następnie iteracyjne filtry przez RDD (wielu małych iteracji jest zachęcać sposób programowania Spark):

{ 
    import org.apache.spark.sql.hive.HiveContext 
    val hc = new HiveContext(sc) 

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)") 
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20") 
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500") 

} 

oczywiście trzeba znać swoje dane trochę do filtrowania odpowiednie wartości, ale jest to część procesu analizy.

0

dla tego samego exm powyżej, chciałbym zachować tylko pracowników, które zawierały lub w identyfikatorze departamentu, o którym mowa w drugiej tabeli. ale to musi być nie dołączyć operację, chciałbym zobaczyć go w „zawarty” lub „na” mam na myśli 33 to „w” 334 i 335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
Powiązane problemy