2016-03-22 10 views
7

Chciałbym utworzyć JSON z ramki danych Spark v.1.6 (przy użyciu scala). Wiem, że istnieje proste rozwiązanie polegające na wykonaniu df.toJSON.Spark Row to JSON

Jednak mój problem wygląda nieco inaczej. Rozważmy na przykład dataframe z następującymi kolumnami:

| A |  B  | C1 |  C2 | C3 | 
------------------------------------------- 
| 1 | test  | ab | 22 | TRUE | 
| 2 | mytest | gh | 17 | FALSE | 

chciałbym mieć w końcu do dataframe z

| A |  B  |      C     | 
---------------------------------------------------------------- 
| 1 | test  | { "c1" : "ab", "c2" : 22, "c3" : TRUE } | 
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } | 

gdzie C jest JSON zawierający C1, C2, C3. Niestety, podczas kompilacji nie wiem, jak wygląda ramka danych (z wyjątkiem kolumn A i B, które są zawsze "naprawione").

Co do tego, dlaczego potrzebuję tego: Używam Protobuf do wysyłania wyników. Niestety, moja ramka danych czasami ma więcej kolumn niż się spodziewano i nadal wysyłałbym je przez Protobuf, ale nie chcę określać wszystkich kolumn w definicji.

Jak mogę to osiągnąć?

+0

jeszcze dataframe – navige

+0

Nie, przepraszam, mam na myśli raczej jak dodać 'C1, C2, C3' jako kolumna JSON ciąg do istniejącego dataframe. Zaktualizowałem post, aby wyjaśnić wersję Spark i scala jako języka. – navige

+0

Przepraszamy! Oczywiście, właśnie zaktualizowałem pytanie (wraz z powodem, dla którego chciałbym to osiągnąć) i dodałem przykład. – navige

Odpowiedz

7

Spark 2.1 powinna mieć natywne wsparcie dla tego przypadku użycia (patrz #15354).

import org.apache.spark.sql.functions.to_json 
df.select(to_json(struct($"c1", $"c2", $"c3"))) 
4

Pierwszy pozwala konwertować C do struct:

val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C")) 

to struktura może być przekształcony JSONL użyciu toJSON jak poprzednio:

dfStruct.toJSON.collect 
// Array[String] = Array(
// {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
// {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}}) 

nie jestem świadomy każdej wbudowanej metody, która można przekonwertować pojedynczą kolumnę, ale można ją przekonwertować pojedynczo i join lub użyć ulubionego parsera JSON w UDF.

case class C(C1: String, C2: Int, C3: Boolean) 

object CJsonizer { 
    import org.json4s._ 
    import org.json4s.JsonDSL._ 
    import org.json4s.jackson.Serialization 
    import org.json4s.jackson.Serialization.write 

    implicit val formats = Serialization.formats(org.json4s.NoTypeHints) 

    def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3)) 
} 


val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
    CJsonizer.toJSON(c1, c2, c3)) 

df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3")) 
+0

Właściwie to moje pytanie dotyczy tak naprawdę drugiej części konwersji poszczególnych kolumn na JSON. Wspomniałeś o "łączeniu" kolumn, ale to nie działa tak jak mam z jednej strony 'RDD [String]', az drugiej strony 'DataFrame' – navige

+1

Jak mówi, po prostu użyj' UDF '. Nie musisz nawet używać pełnowymiarowego parsera JSON w 'UDF' - możesz po prostu stworzyć ciąg JSON w locie używając' map' i 'mkString'. Prawdopodobnie będziesz musiał użyć 'DataFrame.columns' lub ewentualnie' DataFrame.dtypes', aby utworzyć instrukcję 'select' i jako podstawę' map' w 'UDF'. –

+0

Zgadzam się z @DavidGriffin - udf może być tutaj najprostszym rozwiązaniem. A Jackson i json4s są już przeciągnięci z innymi zależnościami. – zero323

3

Tutaj nie parser JSON, i dostosowuje się do schematu:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit} 

df.select(
    col(df.columns(0)), 
    col(df.columns(1)), 
    concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => { 
     val c = dt._1; 
     val t = dt._2; 
     concat(
     lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ), 
     col(c), 
     lit(if(t=="StringType") "\""; else "") 
    ) 
    }):_*), 
    lit("}") 
) as "C" 
).collect() 
+0

wygląda trochę hacky, ale działa :-) – navige

+1

Yup i tak. JSON jest ogólnie hakowaty, jeśli mnie pytasz. –