Chciałbym przekazać obiekt z węzła sterownika do innych węzłów, w których znajduje się RDD, tak aby każda partycja RDD mogła uzyskać dostęp do tego obiektu, jak pokazano w poniższym fragmencie.Jak sparować obiekt za pomocą Kryo Spark?
object HelloSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Testing HelloSpark")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.HelloKryoRegistrator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 20, 4)
val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))
rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
.collect()
.foreach(println)
sc.stop
}
}
// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
}
}
//My serializer
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
output.writeInt(obj.getLength)
output.writeInt(obj.getOffset)
output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
}
override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
val length = input.readInt()
val offset = input.readInt()
val bytes = new Array[Byte](length)
input.read(bytes, offset, length)
new ImmutableBytesWritable(bytes)
}
}
we fragmencie powyżej, próbowałem serializacji ImmutableBytesWritable przez Kryo w Spark, więc zrobiłem follwing:
- Konfiguracja SparkConf instancja przeszła iskra kontekst, czyli ustawić "spark.serializer" do "org.apache.spark.serializer.KryoSerializer" i ustaw "spark.kryo.registrator" na "xt.HelloKryoRegistrator ";
- Napisz niestandardową klasę rejestru Kryo, w której rejestruję klasę ImmutableBytesWritable;
- Napisz serializatora dla ImmutableBytesWritable
Jednak, kiedy złożyć mój wniosek iskry w trybie przędzy klienta następujący wyjątek:
Wyjątek w wątku „głównym” org. apache.spark.SparkException: Zadanie nie można serializować w org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) w org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) w org.apache.sp ark.SparkContext.clean (SparkContext.scala: 1242) w org.apache.spark.rdd.RDD.map (RDD.scala: 270) w xt.HelloSpark $ .main (HelloSpark.scala: 23) przy xt .HelloSpark.main (HelloSpark.scala) na sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) na sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) w sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) w java.lang.reflect.Method.invoke (Method.java:606) at org.apache.spark.deploy.SparkSubmit $ .launch (SparkSubmit.scala: 325) at org.apache.spark .deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) at org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)spowodowane przez: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable w java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1183) w java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java : 1547) w java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) w java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1431) w java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1177) w java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:347) w org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) w org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSer ializer.scala: 73) w org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ...12 więcej
Wydaje się, że ImmutableBytesWritable nie mogą być szeregowane przez Kryo. Jaki jest więc właściwy sposób, aby Spark serializował obiekt za pomocą Kryo? Czy Kryo może serializować dowolny typ?
To samo dzieje się ze mną, nawet o wiele bardziej prosta konfiguracja (tylko ustawienie config serializer i rejestracji klas). Zanotuj ten wiersz twojego stosu: 'org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 73)', z jakiegoś powodu próbuje użyć serializacji Java, nawet jeśli powiedziałeś mu, żeby tego nie robił. –
Czy udało się rozwiązać ten problem? Mam ten sam problem. – Nilesh