2015-03-31 15 views
7

Mam kilka krotek, które są w postaci złożonych kluczy i wartości. Na przykład,Jak grupować według wielu kluczy w iskrze?

tfile.collect() = [(('id1','pd1','t1'),5.0), 
    (('id2','pd2','t2'),6.0), 
    (('id1','pd1','t2'),7.5), 
    (('id1','pd1','t3'),8.1) ] 

Chcę wykonać sql jak operacje na tej kolekcji, gdzie mogę agregacji informacji w oparciu o id [1..n] lub pd [1..n]. Chcę zaimplementować za pomocą wanilii pyspark apis i nie używając SQLContext. W mojej obecnej implementacji czytam z wielu plików i scalania RDD.

def readfile(): 
    fr = range(6,23) 
    tfile = sc.union([sc.textFile(basepath+str(f)+".txt") 
         .map(lambda view: set_feature(view,f)) 
         .reduceByKey(lambda a, b: a+b) 
         for f in fr]) 
    return tfile 

Zamierzam utworzyć zagregowaną tablicę jako wartość. Na przykład:

agg_tfile = [((id1,pd1),[5.0,7.5,8.1])] 

gdzie 5,0,75,8,1 reprezentuje [t1, t2, t3]. Obecnie osiągam to samo przy użyciu kodu Villa Pythona za pomocą słowników. Działa dobrze dla mniejszych zestawów danych. Ale martwię się, ponieważ może to nie być skalowalne dla większych zestawów danych. Czy istnieje skuteczny sposób osiągnięcia tego samego przy użyciu pyspark apis?

+0

Zamiast ' unii "bardziej wydajne jest ładowanie wszystkich plików z wywołaniem' wholeTextFiles' (jeśli istnieje w PySpark) –

+0

Oto Scala [redukcja agregacji przez multi ple klucze] (http://dmtolpeko.com/2015/02/12/multi-column-key-and-value-reduce--tuple-in-spark/) i Python [union redukcji wielu wartości] (http://stackoverflow.com/questions/30895033/spark-use-reducebykey-instead-of-groupbykey-and-mapbyvalues) – ecoe

Odpowiedz

13

Domyślam się, że chcesz transponować dane według wielu pól.

Prosty sposób polega na łączeniu pól docelowych, które będą grupować, i uczynienia z nich klucza w sparowanym RDD. Na przykład:

lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1']) 
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b) 
print rdd.collect() 

Otrzymasz przetransponowany wynik.

[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')] 
+0

Jest to zdecydowanie interesujący sposób rozwiązania tego problemu. Wymyśliłem inny sposób osiągnięcia tego samego. Ale myślę, że twoja metoda może być znacznie szybsza niż moja. Dzielę się również własnym rozwiązaniem. – Rahul

+0

Czy PySpark nie ma 'groupByKey'? –

+0

PySpark ma metodę [groupBykey] (https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html). Jednak pytanie ma tendencję do grupowania rekordów w oparciu o dwa pola, zamiast robić agregację, taką jak 'SELECT sum (value) FROM GROUP GROUP id, pd'. Więc 'groupBykey' może nie pomóc. – dapangmao

2

I grupowane ((ID1, T1), ((p1,5.0), (p2,6.0)) i tak dalej ... w mojej funkcji mapy. Później zmniejszyć stosując map_group który tworzy tablicę dla [P1, P2,...] i wypełnia wartości w odpowiednich pozycjach.

def map_group(pgroup): 
    x = np.zeros(19) 
    x[0] = 1 
    value_list = pgroup[1] 
    for val in value_list: 
     fno = val[0].split('.')[0] 
     x[int(fno)-5] = val[1] 
    return x 

tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \ 
       .reduceByKey(lambda p,q:p+q) \ 
       .map(lambda d: (d[0], map_group(d))) 

To ma ochoty na drogie rozwiązania w zakresie obliczeń. Ale działa teraz.

Powiązane problemy