2016-05-31 11 views
6

Mam iskrę DF z rzędami Seq[(String, String, String)]. Staram się zrobić jakąś flatMap z tym, ale wszystko staram kończy się wyrzuceniemWyodrębnianie `Seq [(String, String, String)]` od iskry DataFrame

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema nie mogą być oddane do scala.Tuple3

mogę się pojedynczy rząd albo wiele rzędów od DF dobrze

df.map{ r => r.getSeq[Feature](1)}.first 

powraca

Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] ..... 

i typ danych RDD wydaje się być poprawny.

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

Schemat DF jest

root 
|-- article_id: long (nullable = true) 
|-- content_processed: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- lemma: string (nullable = true) 
| | |-- pos_tag: string (nullable = true) 
| | |-- ne_tag: string (nullable = true) 

Znam ten problem jest związany z iskra sql leczenia wiersze RDD jak org.apache.spark.sql.Row choć idiotycznie powiedzieć, że jest to Seq[(String, String, String)]. Jest powiązane pytanie (link poniżej), ale odpowiedź na to pytanie nie działa dla mnie. Nie jestem też dość znany iskrowi, aby dowiedzieć się, jak przekształcić go w działające rozwiązanie.

Czy wiersze są Row[Seq[(String, String, String)]] lub Row[(String, String, String)] lub Seq[Row[(String, String, String)]] lub coś jeszcze bardziej szalonego.

Próbuję zrobić coś takiego

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1) 

który wydaje się działać, ale faktycznie nie

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first 

rzuca powyższy błąd. Więc jak mam (na przykład) uzyskać pierwszy element drugiej krotki w każdym rzędzie?

Również WHY iskra została zaprojektowana, aby to zrobić, wydaje się idiotyczne twierdzić, że coś jest jednego rodzaju, podczas gdy w rzeczywistości nie jest i nie może zostać przekonwertowane na zastrzegany typ.


Powiązane pytanie: GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

Powiązane raport o błędzie: http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

+0

Czy mogę zapytać, kto zagłosował na to pytanie i dlaczego? – eliasah

+0

związany z tym raport o błędzie był dla mnie rozwiązaniem –

Odpowiedz

8

Dobrze, że nie twierdzi, że jest to krotka. twierdzi, że jest to struct który mapuje :

import org.apache.spark.sql.Row 

case class Feature(lemma: String, pos_tag: String, ne_tag: String) 
case class Record(id: Long, content_processed: Seq[Feature]) 

val df = Seq(
    Record(1L, Seq(
    Feature("ancient", "jj", "o"), 
    Feature("olympia_greece", "nn", "location") 
)) 
).toDF 

val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0)) 

Musisz znaleźć dokładne zasady mapowania w Spark SQL programming guide.

Od nie jest dokładnie ładna struktura prawdopodobnie będziesz chciał mapować go do czegoś przydatne:

content.map(_.map { 
    case Row(lemma: String, pos_tag: String, ne_tag: String) => 
    (lemma, pos_tag, ne_tag) 
}) 

czyli

content.map(_.map (row => (
    row.getAs[String]("lemma"), 
    row.getAs[String]("pos_tag"), 
    row.getAs[String]("ne_tag") 
))) 

Wreszcie nieco bardziej zwięzłe podejście z Datasets:

df.as[Record].rdd.map(_.content_processed) 

lub

df.select($"content_processed").as[Seq[(String, String, String)]] 

chociaż wydaje się, że w tej chwili jest nieco błędny.

Istnieje ważna różnica w pierwszym podejściu (Row.getAs), a drugim (Dataset.as). Ten pierwszy wyodrębnia obiekty jako Any i stosuje się do nich: Ta ostatnia używa enkoderów do transformacji między typami wewnętrznymi i pożądaną reprezentacją.

+0

Dobrze, nie zdawałem sobie sprawy, że mogę poprosić o treść z powrotem jako w zasadzie wszystko i będzie to sprawdzane tylko w czasie wykonywania (jestem pewien, że jest to wspomniane gdzieś w dokumentach) . 'df.select ($" content_processed "). map (_. getSeq [(String, String)] (0))' zwraca również _something_, ale tak naprawdę nie działa. Zakładam, że to nie jest artefakt wyciągania danych z Cassandry, ale coś nieodłącznego dla DataFrames? –

+0

Cóż, tak. 'Row' to po prostu kolekcja' Any'. Wszystko, co dzieje się w 'getSeq' i innych metodach, jest równoważne' (wiersz (i): Any) .asInstanceOf [T] ' – zero323

Powiązane problemy