2016-09-29 13 views
7

Próbuję napisać funkcję Scala, które można wywnioskować Spark DataTypes na podstawie dostarczonych ciągu wejściowego:Wnioskując Spark TypDanych z napisowych

/** 
* Example: 
* ======== 
* toSparkType("string") => StringType 
* toSparkType("boolean") => BooleanType 
* toSparkType("date") => DateType 
* etc. 
*/ 
def toSparkType(inputType : String) : DataType = { 
    var dt : DataType = null 

    if(matchesStringRegex(inputType)) { 
     dt = StringType 
    } else if(matchesBooleanRegex(inputType)) { 
     dt = BooleanType 
    } else if(matchesDateRegex(inputType)) { 
     dt = DateType 
    } else if(...) { 
     ... 
    } 

    dt 
} 

moim celem jest wspieranie duży podzbiór, jeśli nie wszystkie, dostępny DataTypes. Jak zacząłem realizacji tej funkcji, mam do myślenia: „Spark/Scala prawdopodobnie już metodę pomocnik/util, który zrobi to za mnie.” Po tym wszystkim, wiem, że mogę zrobić coś takiego:

var structType = new StructType() 

structType.add("some_new_string_col", "string", true, Metadata.empty) 
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty) 
structType.add("some_new_date_col", "date", true, Metadata.empty) 

I Scala i/lub Spark będą domyślnie konwertować mój argument "string" na StringType itd. Tak więc pytam: co magia mogę zrobić ze Spark lub Scala, aby pomóc mi wdrożyć moją metodę konwertera?

+1

Jaki jest Twój przypadek użycia dla tej operacji? Spark już wprowadza schematy i typy danych dla większości bazowych źródeł danych. Czy chcesz wdrożyć własne źródło danych? –

+0

Naprawdę nie rozumiem twojego problemu, czy mógłbyś tak jak Sachin powiedzieć wyjaśnić przypadek użycia? Chcesz wywnioskować z String? Nie rozumiem tego. –

Odpowiedz

12

Spark/Scala prawdopodobnie ma już metodę helper/util, która zrobi to za mnie.

Masz rację. Spark ma już swój własny kod schematu i typu danych, którego używa do wnioskowania schematu z bazowych źródeł danych (csv, json itd.). Możesz więc spojrzeć na to, aby zaimplementować własne (rzeczywista implementacja jest oznaczona jako prywatna dla Sparka i jest przywiązany do RDD i klas wewnętrznych, więc nie może być użyty bezpośrednio z kodu spoza Sparka, ale powinien dać ci dobry pomysł, jak go wykonać.)

Biorąc pod uwagę, że csv jest typu płaskiego (i json może mieć zagnieżdżone struktura), wnioskowanie schematu CSV jest względnie prostsze i powinno pomóc w wykonaniu zadania, które chcesz osiągnąć powyżej. Wyjaśnię więc, jak działa wnioskowanie csv (wnioskowanie jsona wymaga uwzględnienia potencjalnie zagnieżdżonej struktury, ale wnioskowanie typu danych jest dość analogiczne).

W tym prologu rzeczą, którą chcesz obejrzeć, jest obiekt CSVInferSchema. W szczególności spójrz na metodę infer, która pobiera RDD[Array[String]] i określa typ danych dla każdego elementu tablicy w całym RDD. Sposób, w jaki to robi, wyznacza każde pole jako NullType na początku, a następnie, gdy przechodzi do następnego rzędu wartości (Array[String]) w RDD, aktualizuje już wywnioskowaną DataType do nowego DataType, jeśli nowy DataType jest bardziej szczegółowy.Dzieje się to here:

val rootTypes: Array[DataType] = 
     tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes) 

Teraz inferRowTypecallsinferField dla każdego pola w wierszu. inferFieldimplementation jest to, czego prawdopodobnie szukasz - przyjmuje typ wywnioskowany do tej pory dla konkretnego pola i wartość ciągu pola dla bieżącego wiersza jako parametru. Następnie zwraca albo istniejący wywnioskowany typ, albo nowy wnioskowany typ jest bardziej szczegółowy niż nowy typ.

odpowiedniej sekcji kodu jest następujący:

typeSoFar match { 
     case NullType => tryParseInteger(field, options) 
     case IntegerType => tryParseInteger(field, options) 
     case LongType => tryParseLong(field, options) 
     case _: DecimalType => tryParseDecimal(field, options) 
     case DoubleType => tryParseDouble(field, options) 
     case TimestampType => tryParseTimestamp(field, options) 
     case BooleanType => tryParseBoolean(field, options) 
     case StringType => StringType 
     case other: DataType => 
      throw new UnsupportedOperationException(s"Unexpected data type $other") 
     } 

Należy pamiętać, że jeśli typeSoFar jest NullType to najpierw stara się analizować je jako Integer ale tryParseInteger wezwanie jest łańcuchem wezwanie do niższego typu parsowania. Jeśli więc nie będzie w stanie przeanalizować wartości jako Integer, wywoła ona tryParseLong, która po niepowodzeniu wywoła tryParseDecimal, która w przypadku niepowodzenia wywoła tryParseDouble w.o.f.w.i. tryParseTimestamp w.o.f.w.i tryParseBoolean w.o.f.w.i. wreszcie stringType.

Można więc użyć niemalże podobnej logiki do wdrożenia dowolnego przypadku użycia. (Jeśli nie musisz łączyć się między wierszami, po prostu zaimplementuj wszystkie metody tryParse* dosłownie i po prostu wywołaj tryParseInteger. Nie musisz wpisywać własnego wyrażenia.)

Mam nadzieję, że to pomoże.

+2

Dobra odpowiedź !!!! – gsamaras

0

Od , nie wydaje się, możesz robić, co chcesz, magicznie, sprawdzić na przykład ten przykład:

import com.scalakata._ 

@instrument class Playground { 
    val x = 5 
    def f[T](v: T) = v 
    f(x) 
    val y = "boolean" 
    f(y) 
    def manOf[T: Manifest](t: T): Manifest[T] = manifest[T] 
    println(manOf(y)) 
} 

którą składa się po przeczytaniu I want to get the type of a variable at runtime.


Teraz z , ponieważ nie mam instalację w miejscu teraz, nie mogę komponować przykład, ale nie ma nic oczywiste w użyciu, więc chciałbym zaproponować Ci kontynuować pisanie toSparkType() jak ty Zaczęło się, ale najpierw spójrz na Source code for pyspark.sql.types.


Widzisz problem polega na tym, że zawsze przekazujesz ciąg znaków.

7

Tak, oczywiście, Spark ma magię, której potrzebujesz.

W obiekcie Spark 2.x jest to obiekt CatalystSqlParser, zdefiniowany jako here.

Na przykład:

import org.apache.spark.sql.catalyst.parser.CatalystSqlParser 

CatalystSqlParser.parseDataType("string") // StringType 
CatalystSqlParser.parseDataType("int") // IntegerType 

I tak dalej.

Ale jak rozumiem, nie jest częścią publicznego interfejsu API i dlatego może zmieniać się w następnych wersjach bez ostrzeżeń.

Więc może po prostu zaimplementować metody jak:

def toSparkType(inputType: String): DataType = CatalystSqlParser.parseDataType(inputType) 
+0

Witam Mam ciąg taki jak ten "IntegerType" i chcę utworzyć obiekt DataType. Jak mogę to zrobić? Powyższa metoda nie działa dla mnie – user1870400

Powiązane problemy