2015-04-18 10 views
37

Próbuję dowiedzieć się, dlaczego mój groupByKey wraca następujące:PySpark groupByKey powrocie pyspark.resultiterable.ResultIterable

[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)] 

Mam flatMapped wartości, które wyglądają tak:

[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')] 

I Mam tylko proste:

groupRDD = columnRDD.groupByKey() 

Odpowiedz

53

To, co otrzymujesz, to obiekt, który pozwala ci na powtarzanie ove r wyniki. Wyniki grupy groupByKey można przekształcić w listę, wywołując listę() wartości, np.

example = sc.parallelize([(0, u'D'), (0, u'D'), (1, u'E'), (2, u'F')]) 

example.groupByKey().collect() 
# Gives [(0, <pyspark.resultiterable.ResultIterable object ......] 

example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect() 
# Gives [(0, [u'D', u'D']), (1, [u'E']), (2, [u'F'])] 
+23

'przykład.groupByKey(). MapValues ​​(list) .collect()' jest krótszy i działa również –

+0

Jak mogę mapować za pomocą 'ResultIterable' type? – xxx222

15

można również użyć

example.groupByKey().mapValues(list) 
1

zamiast korzystania groupByKey(), proponuję użyć cogroup(). Możesz odnieść się do poniższego przykładu.

[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] 

Przykład:

>>> x = sc.parallelize([("foo", 1), ("bar", 4)]) 
>>> y = sc.parallelize([("foo", -1)]) 
>>> z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] 
>>> print(z) 

Powinieneś uzyskać żądaną moc ...

1

Przykład:

r1 = sc.parallelize([('a',1),('b',2)]) 
r2 = sc.parallelize([('b',1),('d',2)]) 
r1.cogroup(r2).mapValues(lambdax:tuple(reduce(add,__builtin__.map(list,x)))) 

Wynik:

[('d', (2,)), ('b', (2, 1)), ('a', (1,))]