2015-02-17 22 views
6

Chciałbym przekazać obiekt z węzła sterownika do innych węzłów, w których znajduje się RDD, tak aby każda partycja RDD mogła uzyskać dostęp do tego obiektu, jak pokazano w poniższym fragmencie.Jak sparować obiekt za pomocą Kryo Spark?

object HelloSpark { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf() 
       .setAppName("Testing HelloSpark") 
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
       .set("spark.kryo.registrator", "xt.HelloKryoRegistrator") 

     val sc = new SparkContext(conf) 
     val rdd = sc.parallelize(1 to 20, 4) 
     val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test")) 

     rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !") 
      .collect() 
      .foreach(println) 

     sc.stop 
    } 
} 

// My registrator 
class HelloKryoRegistrator extends KryoRegistrator { 
    override def registerClasses(kryo: Kryo) = { 
     kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer()) 
    } 
} 

//My serializer 
class HelloSerializer extends Serializer[ImmutableBytesWritable] { 
    override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = { 
     output.writeInt(obj.getLength) 
     output.writeInt(obj.getOffset) 
     output.writeBytes(obj.get(), obj.getOffset, obj.getLength) 
    } 

    override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = { 
     val length = input.readInt() 
     val offset = input.readInt() 
     val bytes = new Array[Byte](length) 
     input.read(bytes, offset, length) 

     new ImmutableBytesWritable(bytes) 
    } 
} 

we fragmencie powyżej, próbowałem serializacji ImmutableBytesWritable przez Kryo w Spark, więc zrobiłem follwing:

  1. Konfiguracja SparkConf instancja przeszła iskra kontekst, czyli ustawić "spark.serializer" do "org.apache.spark.serializer.KryoSerializer" i ustaw "spark.kryo.registrator" na "xt.HelloKryoRegistrator ";
  2. Napisz niestandardową klasę rejestru Kryo, w której rejestruję klasę ImmutableBytesWritable;
  3. Napisz serializatora dla ImmutableBytesWritable

Jednak, kiedy złożyć mój wniosek iskry w trybie przędzy klienta następujący wyjątek:

Wyjątek w wątku „głównym” org. apache.spark.SparkException: Zadanie nie można serializować w org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) w org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) w org.apache.sp ark.SparkContext.clean (SparkContext.scala: 1242) w org.apache.spark.rdd.RDD.map (RDD.scala: 270) w xt.HelloSpark $ .main (HelloSpark.scala: 23) przy xt .HelloSpark.main (HelloSpark.scala) na sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) na sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) w sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) w java.lang.reflect.Method.invoke (Method.java:606) at org.apache.spark.deploy.SparkSubmit $ .launch (SparkSubmit.scala: 325) at org.apache.spark .deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) at org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)spowodowane przez: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable w java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1183) w java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java : 1547) w java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) w java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1431) w java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1177) w java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:347) w org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) w org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSer ializer.scala: 73) w org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ...12 więcej

Wydaje się, że ImmutableBytesWritable nie mogą być szeregowane przez Kryo. Jaki jest więc właściwy sposób, aby Spark serializował obiekt za pomocą Kryo? Czy Kryo może serializować dowolny typ?

+0

To samo dzieje się ze mną, nawet o wiele bardziej prosta konfiguracja (tylko ustawienie config serializer i rejestracji klas). Zanotuj ten wiersz twojego stosu: 'org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 73)', z jakiegoś powodu próbuje użyć serializacji Java, nawet jeśli powiedziałeś mu, żeby tego nie robił. –

+0

Czy udało się rozwiązać ten problem? Mam ten sam problem. – Nilesh

Odpowiedz

0

Dzieje się tak, ponieważ w swoim zamknięciu używasz ImmutableBytesWritable. Spark nie obsługuje serializacji zamknięcia przy użyciu Kryo (tylko obiekty w RDD). Można przyjąć pomocy Sposoby rozwiązania problemu:

Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?

Wystarczy serializacji obiektów przed przejazdem zamknięcia i De-serialize później. Takie podejście działa, nawet jeśli twoje klasy nie są Serializowalne, ponieważ używa Kryo za kulisami. Potrzebujesz tylko curry. ;)

Oto przykładowy szkic:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) 
       (foo: Foo) : Bar = { 
    kryoWrapper.value.apply(foo) 
} 
val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _ 
rdd.flatMap(mapper).collectAsMap() 

object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) { 
    def apply(foo: Foo) : Bar = { //This is the real function } 
} 
Powiązane problemy