2015-07-16 12 views

Odpowiedz

69

z surowego SQL można używać CONCAT:

  • W Pythonie

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) 
    df.registerTempTable("df") 
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 
    
  • W Scala

    import sqlContext.implicits._ 
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") 
    df.registerTempTable("df") 
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 
    

Od Spark 1.5.0 można użyć concat funkcja wit H DataFrame API

  • W Pythonie

    from pyspark.sql.functions import concat, col, lit 
    
    df.select(concat(col("k"), lit(" "), col("v"))) 
    
  • Scala:

    import org.apache.spark.sql.functions.{concat, lit} 
    
    df.select(concat($"k", lit(" "), $"v")) 
    

Istnieje również concat_ws funkcja, która bierze separator ciągu jako pierwszy argument.

+0

Co się stanie, jeśli dataFrame ma wartość pustą? jak to df = sqlContext.createDataFrame ([("foo", 1), ("bar", 2), ("check", null)], ("k", "v")) –

+1

@TarunKumar Czy masz na myśli coś w stylu [this] (http://stackoverflow.com/a/33152113/1560062)? – zero323

+0

to jest to, co chciałem. dziękuje –

14

Jeśli chcesz zrobić to za pomocą DF, możesz użyć udf, aby dodać nową kolumnę na podstawie istniejących kolumn.

val sqlContext = new SQLContext(sc) 
case class MyDf(col1: String, col2: String) 

//here is our dataframe 
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F")) 
)) 

//Define a udf to concatenate two passed in string values 
val getConcatenated = udf((first: String, second: String) => { first + " " + second }) 

//use withColumn method to add a new column called newColName 
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show() 
+0

Czy istnieje sposób dynamicznego łączenia kolumn z wejściowego łańcucha? – ashK

+0

To nie jest optymalne, w porównaniu do DataFrame.concat_ws, ponieważ Spark nie optymalizuje bardzo udfs/w ogóle. Oczywiście w momencie, gdy potrzebujesz niestandardowej logiki w swojej konkatenacji, nie będziesz w stanie uniknąć udf. –

4

Oto kolejny sposób to zrobić za pyspark:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit 

#Create your data frame 
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa']) 

#Use select, concat, and lit functions to do the concatenation 
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African')) 

#Show the new data frame 
personDF.show() 

----------RESULT------------------------- 

84 
+------------+ 
|East African| 
+------------+ 
| Ethiopian| 
|  Kenyan| 
|  Ugandan| 
|  Rwandan| 
+------------+ 
0

Innym sposobem, aby to zrobić w pySpark użyciu SqlContext ...

#Suppose we have a dataframe: 
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2']) 

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname')) 
13

Oto jak można zrobić niestandardowe nazewnictwo:

import pyspark 
from pyspark.sql import functions as sf 
sc = pyspark.SparkContext() 
sqlc = pyspark.SQLContext(sc) 
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2']) 
df.show() 

daje,

+--------+--------+ 
|colname1|colname2| 
+--------+--------+ 
| row11| row12| 
| row21| row22| 
+--------+--------+ 

utworzyć nową kolumnę przez złączenie:

df = df.withColumn('joined_column', 
        sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2'))) 
df.show() 

+--------+--------+-------------+ 
|colname1|colname2|joined_column| 
+--------+--------+-------------+ 
| row11| row12| row11_row12| 
| row21| row22| row21_row22| 
+--------+--------+-------------+ 
+0

Dlaczego nazywacie 'sf.lit ('_')' a nie tylko ''_''? –

+2

'świeci' tworzy kolumnę' _' – muon

3

Oto propozycja dla kiedy nie znasz numeru lub nazwy kolumn w Dataframe.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*)) 
Powiązane problemy