2016-12-22 20 views
5

Muszę dodać nową kolumnę Spark DF MapType w oparciu o istniejące kolumny, w których nazwa kolumny jest kluczem, a wartość jest wartością.pyspark: Utwórz kolumnę MapType z istniejących kolumn

Jako przykład - Mam ten DF:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6), 
         ('d23d', 1.5, 2.0, 2.2), 
         ('as3d', 2.2, 4.3, 9.0) 
          ]) 
schema = StructType([StructField('key', StringType(), True), 
        StructField('metric1', FloatType(), True), 
        StructField('metric2', FloatType(), True), 
        StructField('metric3', FloatType(), True)]) 
df = sqlContext.createDataFrame(rdd, schema) 

+----+-------+-------+-------+ 
| key|metric1|metric2|metric3| 
+----+-------+-------+-------+ 
|123k| 1.3| 6.3| 7.6| 
|d23d| 1.5| 2.0| 2.2| 
|as3d| 2.2| 4.3| 9.0| 
+----+-------+-------+-------+ 

jestem już tak daleko, że można stworzyć structType z tego:

nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric") 
df2 = df.select("key", nameCol) 

+----+-------------+ 
| key|  metric| 
+----+-------------+ 
|123k|[1.3,6.3,7.6]| 
|d23d|[1.5,2.0,2.2]| 
|as3d|[2.2,4.3,9.0]| 
+----+-------------+ 

Ale to, co jest mi potrzebne kolumna metryczny z am MapType, gdzie klucz jest nazwą kolumny:

+----+-------------------------+ 
| key|     metric| 
+----+-------------------------+ 
|123k|Map(metric1 -> 1.3, me...| 
|d23d|Map(metric1 -> 1.5, me...| 
|as3d|Map(metric1 -> 2.2, me...| 
+----+-------------------------+ 

Jakieś wskazówki, w jaki sposób mogę przekształcić dane?

Dzięki!

Odpowiedz

8

W oprogramowaniu Spark 2.0 lub nowszym można użyć create_map. Pierwsze kilka import:

from pyspark.sql.functions import lit, col, create_map 
from itertools import chain 

create_map oczekuje przepleciony sekwencję keys i values które mogą być tworzone na przykład tak:

metric = create_map(list(chain(*(
    (lit(name), col(name)) for name in df.columns if "metric" in name 
)))).alias("metric") 

i są wykorzystywane select:

df.select("key", metric) 

Z przykład dane wynik to:

+----+---------------------------------------------------------+ 
|key |metric             | 
+----+---------------------------------------------------------+ 
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)  | 
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)  | 
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)  | 
+----+---------------------------------------------------------+ 

Jeśli używasz starszej wersji Spark będziesz musiał użyć UDF:

from pyspark.sql import Column 
from pyspark.sql.functions import struct 
from pyspark.sql.types import DataType 

def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column: 
    args = [struct(lit(name), col(name)) for name in cols] 
    as_map_ = udf(
     lambda *args: dict(args), 
     MapType(StringType(), key_type) 
    ) 
    return as_map_(*args) 

które mogłyby być wykorzystane w następujący sposób:

df.select("key", 
    as_map(*[name for name in df.columns if "metric" in name]).alias("metric")) 
+0

wasze rozwiązania ładnie wygląda, to może być wykorzystane aby odpowiedzieć: https://stackoverflow.com/questions/45445077/pyspark-spark-dataframe-aggregate-columns-in-map-type? –

Powiązane problemy