2015-04-28 7 views
5

To pytanie jest skierowane do osób znających py4j - i może pomóc w rozwiązaniu błędu trawienia. Próbuję dodać metodę do pyspark PythonMLLibAPI, który akceptuje RDD o nazwietak, działa i zwraca wynik w postaci RDD.Pyspark py4j PickleException: "oczekiwane zero argumentów dla konstrukcji ClassDict"

Metoda ta jest wzorowana sposobu PYthonMLLibAPI.trainALSModel(), który analogicznie istniejące odpowiednie części są:

def trainALSModel(
    ratingsJRDD: JavaRDD[Rating], 
    ..) 

istniejących pyton klasy Ocena wykorzystywane do modelowania nowy kod wynosi:

class Rating(namedtuple("Rating", ["user", "product", "rating"])): 
    def __reduce__(self): 
     return Rating, (int(self.user), int(self.product), float(self.rating)) 

Oto próba Więc oto odpowiednie klasy:

New pyton klasa pyspark.mllib.clustering.MatrixEntry:

from collections import namedtuple 
class MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])): 
    def __reduce__(self): 
     return MatrixEntry, (long(self.x), long(self.y), float(self.weight)) 

New metoda foobarRDD W PythonMLLibAPI:

def foobarRdd(
    data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = { 
    val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)} 
    rdd 
    } 

Teraz spróbujmy go:

from pyspark.mllib.clustering import MatrixEntry 

def convert_to_MatrixEntry(tuple): 
    return MatrixEntry(*tuple) 

from pyspark.mllib.clustering import * 
pic = PowerIterationClusteringModel(2) 
tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)] 
trdd = sc.parallelize(map(convert_to_MatrixEntry,tups)) 

# print out the RDD on python side just for validation 
print "%s" %(repr(trdd.collect())) 

from pyspark.mllib.common import callMLlibFunc 
pic = callMLlibFunc("foobar", trdd) 

Istotne fragmenty re sults:

[(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5] 

który pokazuje, że wejście rdd jest "całe". Jednak marynowanie był nieszczęśliwy:

5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14) 
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict 
(for pyspark.mllib.clustering.MatrixEntry) 
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) 
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) 
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) 
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:84) 
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) 
    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167) 
    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) 
    at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:724) 

Poniżej jest wizualnym śladu pyton wywołanie stosu:

enter image description here

+0

Witam, czy to rozwiązałeś? –

Odpowiedz

8

miałem ten sam błąd jak używałem MLlib, a okazało się, że I zwrócił zły typ danych w jednej z moich funkcji. Działa teraz po zwykłym rzutowaniu na zwróconą wartość. Może to nie jest odpowiedź, której szukasz, ale jest to przynajmniej wskazówka, aby kierunek mógł nadążyć.

+0

Nie pracuję już nad tym projektem - i dlatego nie mogę go zweryfikować. Jednak wydaje się, że rozsądne rozważania zostały wznowione. – javadba

1

Otrzymałem ten błąd, używając wersji Spark> = 2.0.

Spark zmienia fikcyjność MLlib na nowszą przestrzeń nazw ML. W rezultacie istnieją dwa rodzaje SparseVector: ml.linalg.SparseVector i mllib.linalg.SparseVector

Niektóre funkcje MLlib nadal oczekują starszej mllib rodzaju

from pyspark.ml.linalg import Vectors 
# convert ML vector to older MLlib vector 
old_vec = Vectors.fromML(new_vec) 

HTH

+0

To było bardzo pomocne - dziękuję! Tyle tylko, że w wersji 2.1.1 'fromML' już nie istnieje, więc musiałem stworzyć obiekt ręcznie, wykonując' pyspark.mllib.linalg.SparseVector (sv.size, sv.indices, sv.values) ' , gdzie 'sv' był moim obiektem' pyspark.ml.linalg.SparseVector'. – LateCoder

1

miał ten sam problem, kilka razy. numpy typy nie zawierają niejawnych konwersji na pyspark.sql.types.

Dokonaj prostej wyraźnej konwersji na natywny system typów. W moim przypadku było to:

float(vector_a.dot(vector_b)