2015-10-14 7 views
5

programowanie z pyspark w klastrze Spark, dane są duże i podzielone na części, więc nie można ich załadować do pamięci ani sprawdzić stanu poprawności dane łatwopyspark: TypeError: IntegerType nie może zaakceptować obiektu w typie <type 'unicode'>

zasadzie wygląda

af.b Current%20events 1 996 
af.b Kategorie:Musiek 1 4468 
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209 
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214 

dane wikipedia:

czytałem ją z AWS S3, a następnie spróbować skonstruować zapłonową Dataframe z następującego kodu Pythona w pyspark intepreter:

parts = data.map(lambda l: l.split()) 
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3])) 


fields = [StructField("project", StringType(), True), 
StructField("title", StringType(), True), 
StructField("count", IntegerType(), True), 
StructField("byte_size", StringType(), True)] 

schema = StructType(fields) 

df = sqlContext.createDataFrame(wikis, schema) 

wszystkie wyglądają w porządku, tylko createDataFrame daje mi błąd

Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame 
rdd, schema = self._createFromRDD(data, schema, samplingRatio) 
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD 
_verify_type(row, schema) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type 
_verify_type(v, f.dataType) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type 
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj))) 
TypeError: IntegerType can not accept object in type <type 'unicode'> 

dlaczego nie mogę ustawić trzecią kolumnę, która należy liczyć się IntegerType? Jak mogę to rozwiązać?

+1

Czy to możliwe, ponieważ twoje Structured byte_size ma typ StringType i powinno być typu IntegerType? – ccheneson

+0

@ ccheneson thx za komentarz –

Odpowiedz

5

Jak zauważyłeś przez ccheneson, przekazujesz błędne typy.

Zakładając data wygląda następująco:

data = sc.parallelize(["af.b Current%20events 1 996"]) 

Po pierwszej mapie masz RDD[List[String]]:

parts = data.map(lambda l: l.split()) 
parts.first() 
## ['af.b', 'Current%20events', '1', '996'] 

Druga mapa konwertuje go na krotki (String, String, String, String):

wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3])) 
wikis.first() 
## ('af.b', 'Current%20events', '1', '996') 

Twój schema stwierdza, że ​​3r d jest liczbą całkowitą kolumny:

[f.dataType for f in schema.fields] 
## [StringType, StringType, IntegerType, StringType] 

Schema jest używany najczęściej w celu uniknięcia pełnego skanowania tabeli wywnioskować typy i nie wykonuje żadnych odlewów typu.

Można też rzucić swoje dane podczas ostatniej mapie:

wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3])) 

lub zdefiniować count jako StringType i rzucać kolumna

fields[2] = StructField("count", StringType(), True) 
schema = StructType(fields) 

wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count") 

Na marginesie count jest zarezerwowana słowo w SQL i powinnaś t być używane jako nazwa kolumny. W Sparku będzie działać zgodnie z oczekiwaniami w niektórych kontekstach, aw innym zakończy się niepowodzeniem.

0

Dzięki apache 2.0 możesz dać iskrowi schemat schematów danych. Ogólnie rzecz biorąc musisz obsłużyć funkcję analizatora składni, jak argumentowano powyżej:

"Kiedy schemat jest Brak, będzie próbował wywnioskować schemat (nazwy kolumn i typy) z danych, które powinny być RDD rzędu, lub o nazwietuple lub dyktować. "

Powiązane problemy