2015-03-24 12 views
5

bez ostrzeżeń amortyzacyjne w zapłonowej SQL 1.2.1 następujący kod przestał działać 1,3Czy jest to błąd regresji w programie Spark 1.3?

pracował w 1.2.1 (bez ostrzeżenia amortyzacyjne)

val sqlContext = new HiveContext(sc) 
import sqlContext._ 
val jsonRDD = sqlContext.jsonFile(jsonFilePath) 
jsonRDD.registerTempTable("jsonTable") 

val jsonResult = sql(s"select * from jsonTable") 
val foo = jsonResult.zipWithUniqueId().map { 
    case (Row(...), uniqueId) => // do something useful 
    ... 
} 

foo.registerTempTable("...") 

przestało działać na 1.3.0 (po prostu nie kompiluje, a ja tylko zmiana do 1.3)

jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method 

nie workar pracy ound:

chociaż to może dać mi RDD [wiersz]:

jsonResult.rdd.zipWithUniqueId() 

teraz to nie będzie działać jako RDD[Row] nie posiada registerTempTable metodę oczywiście

 foo.registerTempTable("...") 

Tutaj to moje pytania:

  1. Czy istnieje obejście problemu? (np. czy po prostu robię to źle?)
  2. Czy to błąd? (Myślę, że wszystko, co zatrzymuje kompilację, która działała w poprzedniej wersji, bez ostrzeżenia @deprecated jest oczywiście błędem regresji)

Odpowiedz

5

To nie jest błąd, ale przepraszam za zamieszanie! Aż do wersji Spark 1.3, Spark SQL był oznaczony jako komponent alfa, ponieważ API wciąż ulegało zmianom. Dzięki Spark 1.3 ukończyliśmy i ustabilizowaliśmy API. Pełny opis tego, co musisz zrobić, gdy portowanie znajduje się w the documentation.

mogę też odpowiedzieć na konkretne pytania i dać pewne uzasadnienie, dlaczego zrobiliśmy te zmiany

przestała działać w wersji 1.3.0 (po prostu nie kompiluje, a ja tylko zmiana do 1.3) jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

DataFrames są teraz jednym zunifikowanym interfejsem zarówno w Scala jak i Java. Ponieważ jednak musimy zachować kompatybilność z istniejącym interfejsem API RDD do końca 1.X, DataFrames, nie są to RDD s. Aby uzyskać reprezentację RDD można nazwać df.rdd lub df.javaRDD

Dodatkowo, ponieważ baliśmy się trochę zamieszania, które mogą się zdarzyć z niejawne konwersje, zrobiliśmy to tak, że trzeba jawnie wywołać rdd.toDF powoduje konwersję z RDD do pojawić się. Jednak ta konwersja działa automatycznie tylko wtedy, gdy RDD przechowuje obiekty dziedziczące po Product (tj. Krotki lub klasy przypadków).

Wracając do pierwotnego pytania, jeśli chcesz przekształcić wiersze z dowolnym schematem, musisz wyraźnie powiedzieć Spark SQL o strukturze danych po operacji na mapie (ponieważ kompilator nie może).

import org.apache.spark.sql.types._ 
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil)) 
val newSchema = 
    StructType(
    StructField("uniqueId", IntegerType) +: jsonData.schema.fields) 

val augmentedRows = jsonData.rdd.zipWithUniqueId.map { 
    case (row, id) => 
    Row.fromSeq(id +: row.toSeq) 
} 

val newDF = sqlContext.createDataFrame(augmentedRows, newSchema) 
+0

Dzięki! Chyba powinienem najpierw przeczytać podręcznik;) https: // iskra.apache.org/docs/1.3.0/sql-programming-guide.html#interoperating-with-rdds –