Spark/Scala prawdopodobnie ma już metodę helper/util, która zrobi to za mnie.
Masz rację. Spark ma już swój własny kod schematu i typu danych, którego używa do wnioskowania schematu z bazowych źródeł danych (csv, json itd.). Możesz więc spojrzeć na to, aby zaimplementować własne (rzeczywista implementacja jest oznaczona jako prywatna dla Sparka i jest przywiązany do RDD i klas wewnętrznych, więc nie może być użyty bezpośrednio z kodu spoza Sparka, ale powinien dać ci dobry pomysł, jak go wykonać.)
Biorąc pod uwagę, że csv jest typu płaskiego (i json może mieć zagnieżdżone struktura), wnioskowanie schematu CSV jest względnie prostsze i powinno pomóc w wykonaniu zadania, które chcesz osiągnąć powyżej. Wyjaśnię więc, jak działa wnioskowanie csv (wnioskowanie jsona wymaga uwzględnienia potencjalnie zagnieżdżonej struktury, ale wnioskowanie typu danych jest dość analogiczne).
W tym prologu rzeczą, którą chcesz obejrzeć, jest obiekt CSVInferSchema. W szczególności spójrz na metodę infer
, która pobiera RDD[Array[String]]
i określa typ danych dla każdego elementu tablicy w całym RDD. Sposób, w jaki to robi, wyznacza każde pole jako NullType
na początku, a następnie, gdy przechodzi do następnego rzędu wartości (Array[String]
) w RDD
, aktualizuje już wywnioskowaną DataType
do nowego DataType
, jeśli nowy DataType
jest bardziej szczegółowy.Dzieje się to here:
val rootTypes: Array[DataType] =
tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)
Teraz inferRowType
callsinferField
dla każdego pola w wierszu. inferField
implementation jest to, czego prawdopodobnie szukasz - przyjmuje typ wywnioskowany do tej pory dla konkretnego pola i wartość ciągu pola dla bieżącego wiersza jako parametru. Następnie zwraca albo istniejący wywnioskowany typ, albo nowy wnioskowany typ jest bardziej szczegółowy niż nowy typ.
odpowiedniej sekcji kodu jest następujący:
typeSoFar match {
case NullType => tryParseInteger(field, options)
case IntegerType => tryParseInteger(field, options)
case LongType => tryParseLong(field, options)
case _: DecimalType => tryParseDecimal(field, options)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
throw new UnsupportedOperationException(s"Unexpected data type $other")
}
Należy pamiętać, że jeśli typeSoFar
jest NullType to najpierw stara się analizować je jako Integer
ale tryParseInteger
wezwanie jest łańcuchem wezwanie do niższego typu parsowania. Jeśli więc nie będzie w stanie przeanalizować wartości jako Integer, wywoła ona tryParseLong
, która po niepowodzeniu wywoła tryParseDecimal
, która w przypadku niepowodzenia wywoła tryParseDouble
w.o.f.w.i. tryParseTimestamp
w.o.f.w.i tryParseBoolean
w.o.f.w.i. wreszcie stringType
.
Można więc użyć niemalże podobnej logiki do wdrożenia dowolnego przypadku użycia. (Jeśli nie musisz łączyć się między wierszami, po prostu zaimplementuj wszystkie metody tryParse*
dosłownie i po prostu wywołaj tryParseInteger
. Nie musisz wpisywać własnego wyrażenia.)
Mam nadzieję, że to pomoże.
Jaki jest Twój przypadek użycia dla tej operacji? Spark już wprowadza schematy i typy danych dla większości bazowych źródeł danych. Czy chcesz wdrożyć własne źródło danych? –
Naprawdę nie rozumiem twojego problemu, czy mógłbyś tak jak Sachin powiedzieć wyjaśnić przypadek użycia? Chcesz wywnioskować z String? Nie rozumiem tego. –