2016-11-22 16 views
5

Mając Dataset<Row> z pojedynczej kolumny ciągów json:analizowania zestawu danych kolumnę JSON do DataSet <Row>

+--------------------+ 
|    value| 
+--------------------+ 
|{"Context":"00AA0...| 
+--------------------+ 

Json próbki:

{"Context":"00AA00AA","MessageType":"1010","Module":"1200"} 

Jak mogę najefektywniej dotrzeć Dataset<Row>, który wygląda tak:

+--------+-----------+------+ 
| Context|MessageType|Module| 
+--------+-----------+------+ 
|00AA00AA|  1010| 1200| 
+--------+-----------+------+ 

Przetwarzam te dane w stre am, wiem, że iskra może to zrobić za niego siebie, gdy czytam go z pliku:

spark 
.readStream() 
.schema(MyPojo.getSchema()) 
.json("src/myinput") 

ale teraz czytam dane z Kafki i to daje mi dane w innej formie. Wiem, że mogę używać niektórych parserów takich jak Gson, ale chciałbym, aby iskra zrobiła to za mnie.

Odpowiedz

1

Wypróbuj tę próbkę.

public class SparkJSONValueDataset { 
    public static void main(String[] args) { 
     SparkSession spark = SparkSession 
       .builder() 
       .appName("SparkJSONValueDataset") 
       .config("spark.sql.warehouse.dir", "/file:C:/temp") 
       .master("local") 
       .getOrCreate(); 

     //Prepare data Dataset<Row> 
     List<String> data = Arrays.asList("{\"Context\":\"00AA00AA\",\"MessageType\":\"1010\",\"Module\":\"1200\"}"); 
     Dataset<Row> df = spark.createDataset(data, Encoders.STRING()).toDF().withColumnRenamed("_1", "value"); 
     df.show(); 

     //convert to Dataset<String> and Read 
     Dataset<String> df1 = df.as(Encoders.STRING()); 
     Dataset<Row> df2 = spark.read().json(df1.javaRDD()); 
     df2.show(); 
     spark.stop(); 
    } 
} 
+0

ty za odpowiedź. Domyślam się, że to zadziała, ale naprawdę nie podoba mi się pomysł wysłania DF ponownie do czytnika :( –

Powiązane problemy