2015-04-24 14 views
15

Mam plik CSV, w którym pole jest datetime w określonym formacie. Nie mogę zaimportować go bezpośrednio do Mojego Dataframe, ponieważ musi to być znacznik czasu. Więc zaimportować go jako ciąg znaków i przekształcić go w Timestamp jak tenLepszy sposób konwertowania pola ciągu na znacznik czasu w Spark

import java.sql.Timestamp 
import java.text.SimpleDateFormat 
import java.util.Date 
import org.apache.spark.sql.Row 

def getTimestamp(x:Any) : Timestamp = { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Timestamp(d.getTime()); 
     return t 
    } 
} 

def convert(row : Row) : Row = { 
    val d1 = getTimestamp(row(3)) 
    return Row(row(0),row(1),row(2),d1) 
} 

Czy istnieje lepszy, bardziej zwięzły sposób, aby to zrobić, z API Dataframe lub iskrą-SQL? Powyższa metoda wymaga utworzenia RDD i ponownego nadania schematu DataFame.

Odpowiedz

6

nie grałem z Spark SQL jeszcze, ale myślę, że to byłoby bardziej idiomatyczne Scala (null wykorzystanie nie jest uważane za dobre praktyki):

def getTimestamp(s: String) : Option[Timestamp] = s match { 
    case "" => None 
    case _ => { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    Try(new Timestamp(format.parse(s).getTime)) match { 
     case Success(t) => Some(t) 
     case Failure(_) => None 
    }  
    } 
} 

Proszę zauważyć Zakładam wiesz elementy rodzaje uprzednio (jeśli czytasz go z pliku csv, wszystkie są String), dlatego używam odpowiedniego typu, takiego jak String, a nie Any (wszystko jest podtypem Any).

To zależy również od sposobu obsługi wyjątków podczas analizowania. W takim przypadku, jeśli wystąpi wyjątek analizowania, po prostu zwracana jest wartość None.

Można go używać dalej z:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3)) 
+0

Zrobiłem to już wcześniej. Czułem, że powinienem zająć się podstawową kwestią, zanim przejdziemy do takich drobiazgów. Jeśli istnieje lepsze rozwiązanie, może wcale nie być konieczne. Problem dotyczy pliku rows.map, który zwraca rdd i będzie musiał zostać przekonwertowany na ddf. Tak może być, że brakuje ddf api lub nie wiem jak to zrobić. – user568109

+0

Nie wiem, czy jest inny sposób, ale można przekonwertować dowolny RDD na DF bez problemu. W tym konkretnym przykładzie z 'sqlContext.createDataFrame (rowRDD, schema)'. Dla mnie iskr sql jest przyjemny do zapytania twoich danych w sposób podobny do SQL, a nie do parsowania samych danych (na takie rzeczy, użyj prostych RDD). – jarandaf

+0

Spróbuj (nowy znacznik czasu (format.parse (s) .getTime)) toOption – nont

1

Chciałbym przenieść metoda getTimeStamp pisał przez was do mapPartitions RDD i ponowne GenericMutableRow między rzędami w iteratora:

val strRdd = sc.textFile("hdfs://path/to/cvs-file") 
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter => 
    new Iterator[Row] { 
    val row = new GenericMutableRow(4) 
    var current: Array[String] = _ 

    def hasNext = iter.hasNext 
    def next() = { 
     current = iter.next() 
     row(0) = current(0) 
     row(1) = current(1) 
     row(2) = current(2) 

     val ts = getTimestamp(current(3)) 
     if(ts != null) { 
     row.update(3, ts) 
     } else { 
     row.setNullAt(3) 
     } 
     row 
    } 
    } 
} 

I nadal należy używać schematu do generowania obiektu DataFrame Wykorzystanie GenericMutableRow wewnątrz implementacji iteratora może być znaleźć w Aggregate Operator, InMemoryColumnarTableScan, ParquetTableOperations itp

+0

Jest bardzo blisko mojego aktualnego kodu. Również jeśli chcesz sparsować plik csv, prawdopodobnie powinieneś użyć iskry-csv zamiast podziału. Punkt, który chciałem zrobić, to dodawanie i mutowanie kolumn, które zwrócą ci rdd, który znowu będzie musiał zostać przekonwertowany na ddf przez podanie schematu. Czy istnieje krótsza trasa? – user568109

+0

@ user568109, Nie sądzę, że istnieje. Ponieważ isksk-sql potrzebuje schematu, musi jakoś go zdobyć. Jeśli użyjesz RDD [CaseClassX], iskr-sql automatycznie wyprowadzi dla ciebie schemat, z definicji klasy sprawy. Ale używasz tutaj wiersza (Array [Any]), żadna wnioskowanie DataType nie może tam iść, więc po prostu przekazać jeden. –

+0

Myślę, że używanie jednego odniesienia, mutowanie go za każdym razem i zwracanie go jako odniesienia jest receptą na katastrofę. Czy faktycznie zastosowałeś to podejście? – maasg

1

mam ISO8601 znacznik czasu w moim zbiorze i musiałem go do formatu „yyyy-MM-dd” przerobić. Oto co zrobiłem:

import org.joda.time.{DateTime, DateTimeZone} 
object DateUtils extends Serializable { 
    def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC) 
    def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC) 
} 

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd")) 

I możesz po prostu użyć UDF w zapytaniu iskrowym SQL.

31

Spark> = 2,2

import org.apache.spark.sql.functions.to_timestamp 

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") 
df.withColumn("ts", ts).show(2, false) 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+-------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+-------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01| 
// |2 |#[email protected]#@#    |null    | 
// +---+-------------------+-------------------+ 

Spark> = 1,6, < 2,2

Można użyć funkcji data przetwarzania, które zostały wprowadzone w Spark 1.5. Zakładając, że następujące dane:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts") 

Można użyć unix_timestamp do analizowania ciągów i oddać go do timestamp

import org.apache.spark.sql.functions.unix_timestamp 

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+---------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+---------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| 
// |2 |#[email protected]#@#    |null     | 
// +---+-------------------+---------------------+ 

Jak widać to obejmuje zarówno analizowania i obsługi błędów.

Spark> = 1.5, < 1,6

Będziesz musiał użyć użyć czegoś takiego:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp") 

lub

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp") 

powodu SPARK-11724.

Spark < 1,5

powinieneś być w stanie z nich korzystać z expr i HiveContext.

0

użyłbym https://github.com/databricks/spark-csv

Będzie to wywnioskować znaczników czasu dla ciebie.

import com.databricks.spark.csv._ 
val rdd: RDD[String] = sc.textFile("csvfile.csv") 

val df : DataFrame = new CsvParser().withDelimiter('|') 
     .withInferSchema(true) 
     .withParseMode("DROPMALFORMED") 
     .csvRdd(sqlContext, rdd) 
Powiązane problemy