2015-04-15 8 views
9

Wystąpił problem z próbą parsowania json w mojej pracy iskry. Używam spark 1.1.0, json4s i Cassandra Spark Connector. Wyjątkiem jest rzucony:Wyjątek niezeralizowalny Spark podczas analizowania JSON z json4s

java.io.NotSerializableException: org.json4s.DefaultFormats

Badanie DefaultFormats towarzysz obiekt, a ten stack pytanie, jasne jest, że nie można DefaultFormats odcinkach. Teraz pozostaje pytanie, co robić.

Widzę, że to ticket najwyraźniej rozwiązało ten problem w bazie kodu iskrownika, dodając słowo kluczowe przejściowy, ale nie jestem pewien dokładnie, jak lub gdzie zastosować go w mojej sprawie. Czy rozwiązaniem jest tylko tworzenie instancji klasy DefaultFormats na executorach, aby uniknąć serializacji razem? Czy istnieje inna biblioteka analizowania JSON dla scala/iskry, której używają ludzie? Początkowo próbowałem używać samego jacksona, ale natrafiłem na kilka błędów z adnotacjami, których nie mogłem łatwo rozwiązać, a json4s działało po wyjęciu z pudełka. Oto mój kod:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 
implicit val formats = DefaultFormats 

val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y) 

Robię moje json parsowanie w funkcji checkUa. Starałem się sprawić, by leniwy był leniwy, w nadziei, że to jakoś opóźni egzekucję, ale nie przyniosło to efektu. Być może przeniesienie niejawnego val wewnątrz checkUA? Każda rada bardzo doceniona.

Odpowiedz

12

To już zostało odebrane w an open ticket with json4s. Rozwiązaniem jest umieszczenie deklaracji implicit wewnątrz funkcji

val count = rdd 
       .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)}) 
       .reduce((x, y) => x + y) 
+0

Dziękuję Ci odpowiedzieć, to działało jak czar. Oto kolejne iskrowe pytanie JSON4s, które mam ... utknąłem ponownie. http://stackoverflow.com/questions/29666487/json4s-cant-find-constructor-w-spark – worker1138

+0

To nie wydaje się działać dla mnie: https://stackoverflow.com/questions/48454611/spark-using- json4s-has-serialization-failed – pferrel

2

Miałem ten sam błąd, gdy mogę umieścić deklarację implicit val formats = ... wewnątrz metody, która zawiera analizowania, zamiast deklarowania go na klasy (obiektu).

Więc byłoby to wygeneruje błąd:

object Application { 

    //... Lots of other code here, which eventually calls 
    // setupStream(...) 

    def setupStream(streamingContext: StreamingContext, 
          brokers: String, 
          topologyTopicName: String) = { 
    implicit val formats = DefaultFormats 
    _createDStream(streamingContext, brokers, topologyTopicName) 
     // Remove the message key, which is always null in our case 
     .map(_._2) 
     .map((json: String) => parse(json).camelizeKeys 
     .extract[Record[TopologyMetadata, Unused]]) 
     .print() 
} 

Ale to byłoby w porządku:

object Application { 

    implicit val formats = DefaultFormats 

    //... Lots of other code here, which eventually calls 
    // setupStream(...) 

    def setupStream(streamingContext: StreamingContext, 
          brokers: String, 
          topologyTopicName: String) = { 
    _createDStream(streamingContext, brokers, topologyTopicName) 
     // Remove the message key, which is always null in our case 
     .map(_._2) 
     .map((json: String) => parse(json).camelizeKeys 
     .extract[Record[TopologyMetadata, Unused]]) 
     .print() 
} 
+0

Czy możesz udostępnić swój kod? – worker1138

+0

Oczywiście, edytowałem swoją odpowiedź za pomocą przykładowego kodu –

+0

Nie wydaje mi się, że to działa: https://stackoverflow.com/questions/48454611/spark-using-json4s-has-serialization-fails – pferrel

Powiązane problemy