2015-07-24 14 views
12

Obecnie próbuję wyodrębnić bazę danych z MongoDB i użyć Sparka do przetrawienia w ElasticSearch z geo_points.Jak dodać nową kolumnę Struct do DataFrame

Baza danych Mongo ma wartości szerokości i długości geograficznej, ale ElasticSearch wymaga, aby były one rzucane do typu geo_point.

Czy istnieje sposób w Spark, aby skopiować lat i lon kolumny do nowej kolumny, która jest array lub struct?

Każda pomoc jest doceniana!

Odpowiedz

33

Zakładam zacząć od jakiegoś płaskiego schematu jak ten:

root 
|-- lat: double (nullable = false) 
|-- long: double (nullable = false) 
|-- key: string (nullable = false) 

Pierwszy pozwala tworzyć przykładowe dane:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{col, udf} 
import org.apache.spark.sql.types._ 

val rdd = sc.parallelize(
    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil) 

val schema = StructType(
    StructField("lat", DoubleType, false) :: 
    StructField("long", DoubleType, false) :: 
    StructField("key", StringType, false) ::Nil) 

val df = sqlContext.createDataFrame(rdd, schema) 

Łatwym sposobem jest użycie klasy UDF i case:

case class Location(lat: Double, long: Double) 
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long)) 

val dfRes = df. 
    withColumn("location", makeLocation(col("lat"), col("long"))). 
    drop("lat"). 
    drop("long") 

dfRes.printSchema 

i otrzymujemy

root 
|-- key: string (nullable = false) 
|-- location: struct (nullable = true) 
| |-- lat: double (nullable = false) 
| |-- long: double (nullable = false) 

Twarda sposobem jest przekształcenie danych i zastosować schemat potem:

val rddRes = df. 
    map{case Row(lat, long, key) => Row(key, Row(lat, long))} 

val schemaRes = StructType(
    StructField("key", StringType, false) :: 
    StructField("location", StructType(
     StructField("lat", DoubleType, false) :: 
     StructField("long", DoubleType, false) :: Nil 
    ), true) :: Nil 
) 

sqlContext.createDataFrame(rddRes, schemaRes).show 

i uzyskać oczekiwany wynik

+------+-------------+ 
| key|  location| 
+------+-------------+ 
|Warsaw|[52.23,21.01]| 
| Corte| [42.3,9.15]| 
+------+-------------+ 

Tworzenie zagnieżdżonego schemat od zera może być uciążliwe, więc jeśli możesz Poleciłbym pierwsze podejście. Można go łatwo rozszerzyć, jeśli potrzebujesz bardziej zaawansowanego strukturę:

case class Pin(location: Location) 
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long)) 

df. 
    withColumn("pin", makePin(col("lat"), col("long"))). 
    drop("lat"). 
    drop("long"). 
    printSchema 

i otrzymujemy oczekiwany wynik:

root 
|-- key: string (nullable = false) 
|-- pin: struct (nullable = true) 
| |-- location: struct (nullable = true) 
| | |-- lat: double (nullable = false) 
| | |-- long: double (nullable = false) 

Niestety nie masz kontroli nad nullable polu więc jeśli jest ważne dla projektu będziesz trzeba określić schemat.

Wreszcie można użyć struct funkcję wprowadzoną w 1.4:

import org.apache.spark.sql.functions.struct 

df.select($"key", struct($"lat", $"long").alias("location")) 
+0

Dzięki @ zero323 do dokładnej odpowiedzi! To pomaga grupie. Czy wiesz, w jaki sposób mogę wykonać to mapowanie rekursywnie dla typów zagnieżdżonych? Te dane są brzydsze, niż się spodziewałem. –

+0

Nie widzę żadnego powodu, dla którego nie możesz. – zero323

+0

Hi @ zero323 - Czy wiesz, czy istnieje sposób użycia metody UDF tworzenia struktury, jeśli w nowej strukturze jest więcej niż 10 kolumn? Wydaje się, że UDF mają ograniczenie na 10 zmiennych wejściowych. –

1

Spróbuj tego:

import org.apache.spark.sql.functions._ 

df.registerTempTable("dt") 

dfres = sql("select struct(lat,lon) as colName from dt") 
Powiązane problemy