2016-03-22 22 views
8

Mam klasę, która implementuje niestandardowy serializator Kryo, implementując metody read() i write() z com.esotericsoftware.kryo.Serializer (patrz przykład poniżej). Jak mogę zarejestrować ten niestandardowy serializator za pomocą Sparka?Spark Kryo: Zarejestruj niestandardowy serializator

Oto przykład kodu sudo, co mam:

class A() 

CustomASerializer extends com.esotericsoftware.kryo.Serializer[A]{ 
    override def write(kryo: Kryo, output: Output, a: A): Unit = ??? 
    override def read(kryo: Kryo, input: Input, t: Class[A]): A = ??? 
} 

val kryo: Kryo = ... 
kryo.register(classOf[A], new CustomASerializer()); // I can register my serializer 

Teraz w Spark:

val sparkConf = new SparkConf() 
sparkConf.registerKryoClasses(Array(classOf[A])) 

Niestety, Spark nie daje mi możliwość zarejestrowania mój serializatora niestandardowego . Masz pomysł, czy jest jakiś sposób na zrobienie tego?

+0

Spójrz na spark.kryo.classesToRegister – Sohaib

+1

[Ta odpowiedź] (http://stackoverflow.com/questions/32667068/save-spark-dataframe-into-elasticsearch-can-t-handle-type - wyjątek) nie jest bezpośrednią odpowiedzią na twoje pytanie, ale podane wyjaśnienie da ci więcej szczegółów na temat niestandardowej rejestracji serializatora w iskrze. – eliasah

Odpowiedz

11

Stwórz własną KryoRegistrator z tym niestandardowym serializatora zarejestrowany:

package com.acme 

class MyRegistrator extends KryoRegistrator { 
    override def registerClasses(kryo: Kryo) { 
    kryo.register(classOf[A], new CustomASerializer()) 
    } 
} 

Następnie ustaw spark.kryo.registrator do pełni kwalifikowaną nazwą Twojego użytkownika, np Registrator com.acme.MyRegistrator:

val conf = new SparkConf() 
conf.set("spark.kryo.registrator", "com.acme.KryoRegistrator") 
+0

Rozwiązuje to problem. Dzięki! – marios

+1

To nie jest bardzo jasne w dokumentacji iskry, ale to absolutnie działa. Jeśli napotkasz problemy z tym, że Kryo nie może serializować klasy za pomocą konstruktora no-arg w Spark (dla mnie był to org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering), to może rozwiązać problem za pomocą kryo.register (LazilyGeneratedOrdering.class, new JavaSerializer()); Dziękuję Ci! – jhnclvr

Powiązane problemy