Mam środowisko strumieniowania iskrzenia z iskrem 1.2.0, w którym pobieram dane z lokalnego folderu i za każdym razem, gdy znajduję nowy plik dodany do folderu wykonuję transformację.Napisz RDD do HDFS w kontekście strumieniowania iskrowego
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
Aby wykonać moją analizę na danych DStream muszę przekształcić go w tablicy
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
Potem wykorzystać dane uzyskane wyodrębnić informacje chcę i zapisać je na HDFS.
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
Ponieważ ja naprawdę potrzebne do manipulowania danymi z tablicy, że to niemożliwe, aby zapisać dane na HDFS z DStream.saveAsTextFiles("...")
(która będzie działać w porządku) i muszę zapisać RDD, ale z tego preocedure końcu mam puste pliki wyjściowe nazwanych part-00000 itp. ...
Dzięki arr.foreach(println)
jestem w stanie zobaczyć poprawne wyniki transofmacji.
Podejrzewam, że ta iskra próbuje przy każdej partii pisać dane w tych samych plikach, usuwając to, co zostało napisane wcześniej. Próbowałem zapisać w dynamicznym folderze nazwanym, takim jak myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
, ale zawsze tworzone jest tylko jedno zagięcie, a pliki wyjściowe są nadal puste.
Jak mogę zapisać RDD w HDFS w kontekście strumieniowania iskier?
Chyba problemem jest to, że ARR nie jest dostępny na wszystkich pracowników. Czy próbowałeś nadać swój arr, a następnie napisać go na hdfs? –
ponieważ muszę monitorować folder i przechwytywać wszystkie nowe przesyłane pliki i iskrzenie strumieniowe brzmi jak dobre rozwiązanie. To nie jest jedna maszyna, ale 2 komputery-klaster. Teraz właśnie piszę pliki jako tekst, ale w przyszłości będę musiał napisać pliki parkietu i to całkiem proste dzięki Spark – drstein
Czy spróbujesz tego? var arr = new ArrayBuffer [String](); val transmitowane = sc.broadcast (ARR) data.foreachRDD { transmitowane ++ = _.collect() } val myRDD = sc.parallelize (transmitowane) myRDD.saveAsTextFile ("katalog HDFS ....") –