2015-04-07 16 views
9

Mam RDD (możemy nazwać myrdd), gdzie każdy rekord w RDD jest postaci:Tworzenie DataFrame Spark z RDD list

[('column 1',value), ('column 2',value), ('column 3',value), ... , ('column 100',value)] 

chciałbym konwersji na DataFrame w pyspark - jaki jest najłatwiejszy sposób na zrobienie tego?

+0

To nie jest do końca jasne z pytaniem gdzie masz problem . Czy to fakt, że masz tyle kolumn? Czy tylko zapisy twojego RDD to listy krotek? –

Odpowiedz

29

Jak o użyciu metody toDF? Potrzebujesz tylko dodać nazwy pól.

df = rdd.toDF(['column', 'value']) 
+0

ta odpowiedź działa, a rozwiązanie, które zamieściłem poniżej (w oparciu o twoją odpowiedź) zamieniłoby RDD, jak opisano powyżej, na DataFrame – mgoldwasser

2

Spójrz na DataFrame documentation, aby ten przykład zadziałał, ale to powinno zadziałać. Jestem zakładając swoją RDD nazywa my_rdd

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

# You have a ton of columns and each one should be an argument to Row 
# Use a dictionary comprehension to make this easier 
def record_to_row(record): 
    schema = {'column{i:d}'.format(i = col_idx):record[col_idx] for col_idx in range(1,100+1)} 
    return Row(**schema) 


row_rdd = my_rdd.map(lambda x: record_to_row(x)) 

# Now infer the schema and you have a DataFrame 
schema_my_rdd = sqlContext.inferSchema(row_rdd) 

# Now you have a DataFrame you can register as a table 
schema_my_rdd.registerTempTable("my_table") 

Nie pracuję dużo z DataFrames w Spark ale to powinno wystarczyć

+0

może być konieczne dodanie wiersza po utworzeniu sqlContext w celu załadowania biblioteki implicits: "import sqlContext .implicits._". Zobacz https://spark.apache.org/docs/1.3.0/sql-programming-guide.html –

+0

Czy to nie jest coś scala? Moja odpowiedź jest napisana w Pythonie –

8

Odpowiedź przez @dapangmao mnie do tego rozwiązania:

my_df = my_rdd.map(lambda l: Row(**dict(l))).toDF() 
1

W pyspark, powiedzmy, że masz dataframe nazwie jak userDF.

>>> type(userDF) 
<class 'pyspark.sql.dataframe.DataFrame'> 

Pozwala właśnie przekonwertować go do RDD (

userRDD = userDF.rdd 
>>> type(userRDD) 
<class 'pyspark.rdd.RDD'> 

a teraz można zrobić pewne manipulacje i nazwać na przykład mapy funkcję:

newRDD = userRDD.map(lambda x:{"food":x['favorite_food'], "name":x['name']}) 

Wreszcie, pozwala stworzyć DataFrame z sprężysta rozproszony zbiór danych (RDD) .

newDF = sqlContext.createDataFrame(newRDD, ["food", "name"]) 

>>> type(ffDF) 
<class 'pyspark.sql.dataframe.DataFrame'> 

To wszystko.

byłem uderzenie to ostrzeżenie przed kiedy próbowałem zadzwonić:

newDF = sc.parallelize(newRDD, ["food","name"] : 

.../spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row inst warnings.warn("Using RDD of dict to inferSchema is deprecated. " 

Więc nie trzeba tego robić ...

Powiązane problemy