2016-05-19 16 views
9

Próbuję złączyć dwa dataframes PySpark z niektórych kolumn, które są tylko na każdej z nich:Łączy dwa dataframes PySpark

from pyspark.sql.functions import randn, rand 

df_1 = sqlContext.range(0, 10) 

+--+ 
|id| 
+--+ 
| 0| 
| 1| 
| 2| 
| 3| 
| 4| 
| 5| 
| 6| 
| 7| 
| 8| 
| 9| 
+--+ 

df_2 = sqlContext.range(11, 20) 

+--+ 
|id| 
+--+ 
| 10| 
| 11| 
| 12| 
| 13| 
| 14| 
| 15| 
| 16| 
| 17| 
| 18| 
| 19| 
+--+ 

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) 
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2")) 

i teraz chcę wygenerować trzeci dataframe. Chciałbym coś takiego pand concat:

df_1.show() 
+---+--------------------+--------------------+ 
| id|    uniform|    normal| 
+---+--------------------+--------------------+ 
| 0| 0.8122802274304282| 1.2423430583597714| 
| 1| 0.8642043127063618| 0.3900018344856156| 
| 2| 0.8292577771850476| 1.8077401259195247| 
| 3| 0.198558705368724| -0.4270585782850261| 
| 4|0.012661361966674889| 0.702634599720141| 
| 5| 0.8535692890157796|-0.42355804115129153| 
| 6| 0.3723296190171911| 1.3789648582622995| 
| 7| 0.9529794127670571| 0.16238718777444605| 
| 8| 0.9746632635918108| 0.02448061333761742| 
| 9| 0.513622008243935| 0.7626741803250845| 
+---+--------------------+--------------------+ 

df_2.show() 
+---+--------------------+--------------------+ 
| id|    uniform|   normal_2| 
+---+--------------------+--------------------+ 
| 11| 0.3221262660507942| 1.0269298899109824| 
| 12| 0.4030672316912547| 1.285648175568798| 
| 13| 0.9690555459609131|-0.22986601831364423| 
| 14|0.011913836266515876| -0.678915153834693| 
| 15| 0.9359607054250594|-0.16557488664743034| 
| 16| 0.45680471157575453| -0.3885563551710555| 
| 17| 0.6411908952297819| 0.9161177183227823| 
| 18| 0.5669232696934479| 0.7270125277020573| 
| 19| 0.513622008243935| 0.7626741803250845| 
+---+--------------------+--------------------+ 

#do some concatenation here, how? 

df_concat.show() 

| id|    uniform|    normal| normal_2 | 
+---+--------------------+--------------------+------------+ 
| 0| 0.8122802274304282| 1.2423430583597714| None  | 
| 1| 0.8642043127063618| 0.3900018344856156| None  | 
| 2| 0.8292577771850476| 1.8077401259195247| None  | 
| 3| 0.198558705368724| -0.4270585782850261| None  | 
| 4|0.012661361966674889| 0.702634599720141| None  | 
| 5| 0.8535692890157796|-0.42355804115129153| None  | 
| 6| 0.3723296190171911| 1.3789648582622995| None  | 
| 7| 0.9529794127670571| 0.16238718777444605| None  | 
| 8| 0.9746632635918108| 0.02448061333761742| None  | 
| 9| 0.513622008243935| 0.7626741803250845| None  | 
| 11| 0.3221262660507942| None    | 0.123  | 
| 12| 0.4030672316912547| None    |0.12323  | 
| 13| 0.9690555459609131| None    |0.123  | 
| 14|0.011913836266515876| None    |0.18923  | 
| 15| 0.9359607054250594| None    |0.99123  | 
| 16| 0.45680471157575453| None    |0.123  | 
| 17| 0.6411908952297819| None    |1.123  | 
| 18| 0.5669232696934479| None    |0.10023  | 
| 19| 0.513622008243935| None    |0.916332123 | 
+---+--------------------+--------------------+------------+ 

to możliwe?

Odpowiedz

10
df_concat = df_1.union(df_2) 

W dataframes mogą mieć identyczne kolumny, w takim przypadku można użyć withColumn() stworzyć normal_1 i normal_2

+0

Dzięki. Problem, jak już wspomniałem powyżej, polega na tym, że kolumny nie są identyczne między dwiema ramkami danych. – Ivan

15

Może spróbuj utworzyć kolumny nieistniejącego i nazywając union (unionAll dla Spark 1.6 lub niższej):

cols = ['id', 'uniform', 'normal', 'normal_2']  

df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols) 
df_2_new = df_2.withColumn("normal", lit(None)).select(cols) 

result = df_1_new.union(df_2_new) 
+1

'unionAll()' jest przestarzałe w iskrze 2.0. użyj 'union()' zamiast –

+0

możesz zmienić nazwę używając 'withColumnRenamed' https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=selectexpr#pyspark.sql.DataFrame.withColumnRenamed – Hunle

3

Oto jeden ze sposobów, aby to zrobić, w przypadku gdy jest nadal użyteczny: Pobiegłem to w pyspark powłoki, Python w wersji 2.7.12 i mój Spark zainstalować była wersja 2.0.1.

PS: Sądzę, że zamierzałeś użyć różnych nasion dla df_1 df_2, a poniższy kod to odzwierciedla.

from pyspark.sql.types import FloatType 
from pyspark.sql.functions import randn, rand 
import pyspark.sql.functions as F 

df_1 = sqlContext.range(0, 10) 
df_2 = sqlContext.range(11, 20) 
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) 
df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2")) 

def get_uniform(df1_uniform, df2_uniform): 
    if df1_uniform: 
     return df1_uniform 
    if df2_uniform: 
     return df2_uniform 

u_get_uniform = F.udf(get_uniform, FloatType()) 

df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id")) 

Oto wyjścia uzyskać:

df_1.show() 
+---+-------------------+--------------------+ 
| id|   uniform|    normal| 
+---+-------------------+--------------------+ 
| 0|0.41371264720975787| 0.5888539012978773| 
| 1| 0.7311719281896606| 0.8645537008427937| 
| 2| 0.1982919638208397| 0.06157382353970104| 
| 3|0.12714181165849525| 0.3623040918178586| 
| 4| 0.7604318153406678|-0.49575204523675975| 
| 5|0.12030715258495939| 1.0854146699817222| 
| 6|0.12131363910425985| -0.5284523629183004| 
| 7|0.44292918521277047| -0.4798519469521663| 
| 8| 0.8898784253886249| -0.8820294772950535| 
| 9|0.03650707717266999| -2.1591956435415334| 
+---+-------------------+--------------------+ 

df_2.show() 
+---+-------------------+--------------------+ 
| id|   uniform|   normal_2| 
+---+-------------------+--------------------+ 
| 11| 0.1982919638208397| 0.06157382353970104| 
| 12|0.12714181165849525| 0.3623040918178586| 
| 13|0.12030715258495939| 1.0854146699817222| 
| 14|0.12131363910425985| -0.5284523629183004| 
| 15|0.44292918521277047| -0.4798519469521663| 
| 16| 0.8898784253886249| -0.8820294772950535| 
| 17| 0.2731073068483362|-0.15116027592854422| 
| 18| 0.7784518091224375| -0.3785563841011868| 
| 19|0.43776394586845413| 0.47700719174464357| 
+---+-------------------+--------------------+ 

df_3.show() 
+---+-----------+--------------------+--------------------+      
| id| uniform|    normal|   normal_2| 
+---+-----------+--------------------+--------------------+ 
| 0| 0.41371265| 0.5888539012978773|    null| 
| 1| 0.7311719| 0.8645537008427937|    null| 
| 2| 0.19829196| 0.06157382353970104|    null| 
| 3| 0.12714182| 0.3623040918178586|    null| 
| 4| 0.7604318|-0.49575204523675975|    null| 
| 5|0.120307155| 1.0854146699817222|    null| 
| 6| 0.12131364| -0.5284523629183004|    null| 
| 7| 0.44292918| -0.4798519469521663|    null| 
| 8| 0.88987845| -0.8820294772950535|    null| 
| 9|0.036507078| -2.1591956435415334|    null| 
| 11| 0.19829196|    null| 0.06157382353970104| 
| 12| 0.12714182|    null| 0.3623040918178586| 
| 13|0.120307155|    null| 1.0854146699817222| 
| 14| 0.12131364|    null| -0.5284523629183004| 
| 15| 0.44292918|    null| -0.4798519469521663| 
| 16| 0.88987845|    null| -0.8820294772950535| 
| 17| 0.27310732|    null|-0.15116027592854422| 
| 18| 0.7784518|    null| -0.3785563841011868| 
| 19| 0.43776396|    null| 0.47700719174464357| 
+---+-----------+--------------------+--------------------+ 
0

Należy to zrobić dla Ciebie ...

from pyspark.sql.types import FloatType 
from pyspark.sql.functions import randn, rand, lit, coalesce, col 
import pyspark.sql.functions as F 

df_1 = sqlContext.range(0, 6) 
df_2 = sqlContext.range(3, 10) 
df_1 = df_1.select("id", lit("old").alias("source")) 
df_2 = df_2.select("id") 

df_1.show() 
df_2.show() 
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\ 
    .select(\ 
    [coalesce(df_1.id, df_2.id).alias("id")] +\ 
    [col("df_1." + c) for c in df_1.columns if c != "id"])\ 
    .sort("id") 
df_3.show() 
0

Nad odpowiedziami są bardzo eleganckie. Napisałem tę funkcję dawno temu, gdzie również starałem się połączyć dwie ramki danych z oddzielnymi kolumnami.

Załóżmy, że masz dataframe sdf1 i sdf2

from pyspark.sql import functions as F 
from pyspark.sql.types import * 

def unequal_union_sdf(sdf1, sdf2): 
    s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema) 
    s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema) 

    for i,j in s_df2_schema.difference(s_df1_schema): 
     sdf1 = sdf1.withColumn(i,F.lit(None).cast(j)) 

    for i,j in s_df1_schema.difference(s_df2_schema): 
     sdf2 = sdf2.withColumn(i,F.lit(None).cast(j)) 

    common_schema_colnames = sdf1.columns 
    sdk = \ 
     sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames)) 
    return sdk 

sdf_concat = unequal_union_sdf(sdf1, sdf2)