2016-01-12 22 views
5

Dwa pytania, odpowiedź na ogólną pozwoli mi zorientować się, jak minimalnie mogę stworzyć MVCE.Serializator Kryo powodujący wyjątek w podstawowej klasie Scala WrappedArray

1) W jaki sposób mogę się zarejestrować, aby zarejestrować WrappedArray z góry (i każdej innej klasy w Scali, z której mógłbym skorzystać)? Czy to normalne, że musisz rejestrować klasy z bibliotek przy pomocy Kryo?

a konkretnie:

2) Jak to naprawić? (Chcę się przyznać, może mam coś innego screwy dzieje, że jeśli odzwierciedla fałszywy błąd tutaj, więc nie zabić siebie próbując odtworzyć tego)

DANE

testowania programu iskry w Javie przy użyciu naszego klienta zajęcia związane z genetyką i statystyki, na Spark 1.4.1, Scala 2.11.5 z następującymi ustawieniami SparkConf:

// for kyro serializer it wants to register all classes that need to be serialized 
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class}; 

SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData") 
       <SNIP other settings to declare master> 
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
       //require registration of all classes with Kryo 
       .set("spark.kryo.registrationRequired", "true") 
       .registerKryoClasses(kryoClassArray); 

otrzymuję ten błąd (powtórzony na końcu długiego błędu w wykazie):

Caused by: java.lang.IllegalArgumentException: Class is not 
registered: scala.collection.mutable.WrappedArray$ofRef Note: To 
register this class use: 
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 

Ale nigdy nie nazywam tej klasy z mojego kodu. Mogę dodać scala.collection.mutable.WrappedArray do kryoClassArray, ale to nie rozwiązuje problemu. Jeśli dodaję scala.collection.mutable.WrappedArray $ ofRef.class (zgodnie z sugestią dotyczącą błędu), który jest błędem składni, widzę, że nie mogę tutaj zadeklarować anonimowej funkcji?

MVCE: Uruchomiłem MVCE, ale problem polega na tym, że wykonanie jednej z naszych klas wymaga zewnętrznych bibliotek i plików tekstowych/danych. Kiedy już usuwam nasze zajęcia, nie mam problemu. Jeśli ktoś mógłby odpowiedzieć na ogólne pytanie, może pomóc w ustaleniu, ile MVCE mogę wymyślić.

Kiedy piszę to pytanie, otrzymałem pozwolenie na aktualizację do wersji 1.5.2, zobaczę, czy jest tam jakakolwiek zmiana i pytanie o aktualizację, jeśli tak.

Krótki z MVCE oto moje deklaracje klasy:

public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable { 

class PrintHetSharing implements VoidFunction<DropResult> { 

class SparkDoDrop implements Function<Integer, Integer> { 

Pełne błędy:

16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0 
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. 
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef 
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) 
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) 
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) 
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef 
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 
+0

1. Wiedząc, co wymagać będzie zajęcia serializacji wymaga możesz sprawdzić kod i zrozumieć, co robi (wkleiłeś tylko próbkę conf, a nie użycie). 2. Takie same jak 1, niemożliwe do odpowiedzi bez próbki kodu. –

+0

Pewnie, @DanielL. Będę edytować w jakimś kodzie. Ale brzmi to tak, jak mówisz, że muszę znać każdą podstawową klasę? Ogólna zasada?Piszę Javę, więc nie spodziewałem się, że potrzebuję wiedzy o podstawowych klasach Scala, aby działało Kryo. Dzięki – JimLohse

+0

@ Daniell. Doceniam żądanie MVCE, problem, na który się natknę, polega na tym, że do wykonania jednej z naszych klas wymagane są zewnętrzne biblioteki i pliki tekstowe/danych. Kiedy już usuwam nasze klasy i zapotrzebowanie na nasze pliki, nie mam problemu. Jeśli ktoś mógłby odpowiedzieć na ogólne pytanie, mógłby mi pomóc w ustaleniu, ile MVCE mogę wymyślić. Wdrażam Serializable we wszystkich klasach, jawnie lub poprzez implementację Functions from Spark jak importowanie org.apache.spark.api.java.function.Function i org.apache.spark.api.java.function.VoidFunction – JimLohse

Odpowiedz

6

W Scala należy rozwiązać ten problem, dodając „scala.collection.mutable.WrappedArray.ofRef [_]” jako zarejestrowany klasy jak w poniższym fragmencie:

conf.registerKryoClasses(
    Array(
    ... 
    classOf[Person], 
    classOf[Array[Person]], 
    ... 
    classOf[scala.collection.mutable.WrappedArray.ofRef[_]] 
) 
) 
+0

Najpierw twoja odpowiedź wygląda jak Scala, ale jestem na Jawie, ale mam rację :) Doceniam odpowiedź, ale podstawowe pytanie jest bez odpowiedzi, dlaczego powinienem zadeklarować tę klasę, kiedy będę Używasz go? Nie muszę zadeklarować każdej klasy w Spark, dlaczego ten? Nie próbowałem używać Kryo przez chwilę, powinienem go ponownie wdrożyć, skoro nasze rozwiązanie znajduje się znacznie dalej, a Spark to kilka wersji nowszych. +1, dzięki! – JimLohse

+0

Nie wiem, dlaczego tag java nie był w pytaniu, mój zły, przepraszam, był w pytaniu, ale nie tagach, oops – JimLohse

+2

przyjął tę odpowiedź teraz, gdy ponownie to przeglądam, nie zapewnia kompletnej odpowiedzi jako ten kod scala nie działa w Javie. Mimo to jest bliżej konkretnej odpowiedzi. Mógłbym przysiąc, że ktoś napisał, jak dodać tę klasę Scala do tablicy Javy, używając przykładu .ofRef [] lub Java-esque $ ofRef, które nie działają. Póki co zrelaksowałem ustawienie "wymagane" na Kryo. – JimLohse

2

Nie trzeba zrobić wszystko serializacji, niezależnie od niego jest częścią biblioteki klienta lub nie. Ale musisz zrobić dowolną lambdę, która będzie miała wpływ na wykonawców serializowalnych. Te nie działają na głównym węźle, więc nie ma sposobu, aby zapobiec serializacji (ani nie chcesz, ponieważ cały cel Sparka jest rozproszonym obliczaniem).

Przykłady i takie (a jeśli jeszcze nie jesteś w pełni zrozumiały), sprawdź: the official docs about this.

+0

Dzięki, czyszczenie ogólne pytanie pozwala mi wiedzieć, gdzie skupić się na moich wysiłkach, bardzo przydatne! Nadal jestem nieco zaskoczony, dlaczego klasa scala klasy WrappedArray jest zgłaszana jako ta, której nie można serializować. Rozwiążę mój kod i odłożę go razem. Rozumiem anonimowe funkcje i używam ich podczas korzystania z wbudowanych klas - kiedy używam naszych klas, deklaruję je osobno. Nadal będę pracować nad MVCE ponownie dziękuję – JimLohse

Powiązane problemy