2015-09-08 13 views
13

mam główny, który tworzy kontekst zapłonową:Spark sql Dataframe - import sqlContext.implicits._

val sc = new SparkContext(sparkConf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

Następnie tworzy dataframe i robi filtry i walidacje na dataframe.

val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00") 

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0)) 
    // record length cannot be < 2 
    .na.drop(3) 
    // round to hours 
    .withColumn("time",convertToHourly($"time")) 

Działa to świetnie.

ale gdy próbuję porusza moje walidacji do innego pliku, wysyłając dataframe do

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 

że dostaje Dataframe & robi walidacji i przekształceń: Wydaje się, że muszę

import sqlContext.implicits._ 

To avoid the error: “value $ is not a member of StringContext” that happens on line: .withColumn("time",convertToHourly($"time"))

Ale do korzystania z import sqlContext.implicits._ potrzebuję również sqlContext zdefiniowanego w nowym złożyć tak:

val sc = new SparkContext(sparkConf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

lub wysłać go do

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 
function 

czuję się separacji próbuję zrobić do 2 plików (główny & walidacji) nie jest wykonane prawidłowo ...

Każdy pomysł, jak to zaprojektować? Lub po prostu wyślij sqlContext do funkcji?

Dzięki!

+0

Kiedy chcę, aby oddzielić takie rzeczy po prostu przejść SqlContext w konstruktorze nowej klasy, a następnie importować sqlContext.implicits._ raz w każdej klasie. Nie mogłem wymyślić nic lepszego, więc głosuję na to pytanie i czekam na lepsze sugestie. – Niemand

Odpowiedz

11

Można pracować z pojedynczą instancją klasy SQLContext. Można spojrzeć na ten przykład w spark repository

/** Lazily instantiated singleton instance of SQLContext */ 
object SQLContextSingleton { 

    @transient private var instance: SQLContext = _ 

    def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
     instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
} 
... 
//And wherever you want you can do 
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
import sqlContext.implicits._ 
+1

Dzięki! Użyłem obiektu singleton, ale w moim przypadku chcę go utworzyć tylko raz, tak zrobiłem: obiekt SQLContextSingleton { @transient var instancja: SQLContext = _ } następnie zainicjalizowany z głównej i użyty w walidacjach. Dzięki za pomoc! –

Powiązane problemy