2015-06-29 16 views
9

Zaczynając od Spark DataFrame, aby stworzyć matrycę wektorową do dalszego przetwarzania analitycznego.Iterowanie przez iskrę RDD

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache() 
feature_matrix_vectors.first() 

Dane wyjściowe są tablicą wektorów. Niektóre z tych wektorze mają wartość null w nich

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]) 
... 
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null]) 

Od tego chcę iterację matrycy wektora i utworzyć tablicę LabeledPoint z 0 (zero), gdy wektor zawiera null, inaczej z 1.

def f(row): 
    if row.contain(None): 
     LabeledPoint(1.0,row) 
    else: 
     LabeledPoint(0.0,row) 

próbowałem iterację matrycy wektora za pomocą

feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) # create a generator of row sums 
next(feature_matrix_labeledPoint) # Run the iteration protocol 

ale to nie działa.

TypeError: 'PipelinedRDD' object is not iterable 

Każda pomoc będzie wielki

+0

to tak ma odpowiedź szczegóły http://stackoverflow.com/a/25296061/429476 –

Odpowiedz

7

RDDs nie są spadek zamiennik list Pythona. Musisz użyć akcji lub transformacji, które są dostępne na danym RDD. Tutaj można po prostu użyć map:

from pyspark.mllib.linalg import DenseVector 
from pyspark.mllib.regression import LabeledPoint 


feature_matrix_vectors = sc.parallelize([ 
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]), 
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None]) 
]) 

(feature_matrix_vectors 
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v)) 
    .collect()) 
Powiązane problemy