2015-07-02 6 views
7

Mam środowisko strumieniowania iskrzenia z iskrem 1.2.0, w którym pobieram dane z lokalnego folderu i za każdym razem, gdy znajduję nowy plik dodany do folderu wykonuję transformację.Napisz RDD do HDFS w kontekście strumieniowania iskrowego

val ssc = new StreamingContext(sc, Seconds(10)) 
val data = ssc.textFileStream(directory) 

Aby wykonać moją analizę na danych DStream muszę przekształcić go w tablicy

var arr = new ArrayBuffer[String](); 
    data.foreachRDD { 
    arr ++= _.collect() 
} 

Potem wykorzystać dane uzyskane wyodrębnić informacje chcę i zapisać je na HDFS.

val myRDD = sc.parallelize(arr) 
myRDD.saveAsTextFile("hdfs directory....") 

Ponieważ ja naprawdę potrzebne do manipulowania danymi z tablicy, że to niemożliwe, aby zapisać dane na HDFS z DStream.saveAsTextFiles("...") (która będzie działać w porządku) i muszę zapisać RDD, ale z tego preocedure końcu mam puste pliki wyjściowe nazwanych part-00000 itp. ...

Dzięki arr.foreach(println) jestem w stanie zobaczyć poprawne wyniki transofmacji.

Podejrzewam, że ta iskra próbuje przy każdej partii pisać dane w tych samych plikach, usuwając to, co zostało napisane wcześniej. Próbowałem zapisać w dynamicznym folderze nazwanym, takim jak myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()), ale zawsze tworzone jest tylko jedno zagięcie, a pliki wyjściowe są nadal puste.

Jak mogę zapisać RDD w HDFS w kontekście strumieniowania iskier?

+0

Chyba problemem jest to, że ARR nie jest dostępny na wszystkich pracowników. Czy próbowałeś nadać swój arr, a następnie napisać go na hdfs? –

+0

ponieważ muszę monitorować folder i przechwytywać wszystkie nowe przesyłane pliki i iskrzenie strumieniowe brzmi jak dobre rozwiązanie. To nie jest jedna maszyna, ale 2 komputery-klaster. Teraz właśnie piszę pliki jako tekst, ale w przyszłości będę musiał napisać pliki parkietu i to całkiem proste dzięki Spark – drstein

+0

Czy spróbujesz tego? var arr = new ArrayBuffer [String](); val transmitowane = sc.broadcast (ARR) data.foreachRDD { transmitowane ++ = _.collect() } val myRDD = sc.parallelize (transmitowane) myRDD.saveAsTextFile ("katalog HDFS ....") –

Odpowiedz

5

Używasz Spark Streaming w sposób, w jaki nie został zaprojektowany. Chciałbym polecić upuszczenie za pomocą Sparka do twojego przypadku użycia lub zaadaptować twój kod tak, aby działał w sposób Sparka. Zbieranie tablicy do sterownika jest sprzeczne z celem używania rozproszonego silnika i sprawia, że ​​twoja aplikacja efektywnie działa na pojedynczej maszynie (dwa urządzenia będą również powodowały więcej narzutów niż tylko przetwarzanie danych na jednej maszynie).

Wszystko, co możesz zrobić z tablicą, możesz zrobić ze Spark. Po prostu uruchom obliczenia w strumieniu rozproszonym na robotach i napisz swoje wyniki za pomocą DStream.saveAsTextFiles(). Możesz użyć foreachRDD + saveAsParquet(path, overwrite = true), aby napisać do pojedynczego pliku.

+0

Dzięki, całkowicie rozumiem, postaram się zmienić logikę trasform, aby użyć DStream. Czy wiesz, czy możliwe jest strumieniowanie iskier w każdej partii, aby zapisać zapisy w tym samym pliku? W tej chwili otrzymuję nowy folder z nowymi plikami co każdy interwał wsadowy. – drstein

+1

Tak, z foreachRDD + saveAsParquet istnieje opcja nadpisania. –

+0

@MariusSoutier możesz mi pomóc z tym 'http: // stackoverflow.com/questions/39363586/problem-podczas przechowywania danych z iskry strumieniowej do cassanadra' – Naresh

2

@vzamboni: Spark 1.5+ dataframes api ma tę funkcję:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);