2017-01-28 9 views
6

Jak mogę odczytać plik jako strumień z hdfs używając Apache Spark Java? Nie chcę czytać całego pliku, chcę mieć strumień plików, aby zatrzymać czytanie pliku, gdy spełniony jest jakiś warunek, jak mogę to zrobić za pomocą Apache Spark?Apache Spark odczytuje plik jako strumień z HDFS

+0

Zobacz te: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala – yoga

+0

Ten przykład nie jest związane z moim pytaniem. – Maksym

+0

Czy możesz wyjaśnić lepiej, co próbujesz osiągnąć? Dlaczego potrzebujesz go jako strumienia (w przeciwieństwie do zwykłego czytania go jako RDD/Dataframe)? Czy pytasz, jak mieć iskrową transmisję strumieniową, przeczytać zawartość katalogu HDFS i zatrzymać się po zakończeniu (zamiast czekać na następny okres czasu)? Czy mówisz również o DStream lub strumieniowaniu strukturalnym? –

Odpowiedz

1

Można użyć pliku strumieniowego HDFS stosując metodę SSC

val SSC = new StreamingContext (sparkConf, Sekundy (batchTime))

val dStream = ssc.fileStream [LongWritable, Tekst, TextInputFormat] ( streamDirectory , (x: ścieżka) => prawda newFilesOnly = fałsz)

Stosując wyżej API filtra param funkcji filtrowania do ścieżki przetwarzania.

Jeśli twój warunek nie ma ścieżki/nazwy pliku i zależy od danych, musisz zatrzymać kontekst przesyłania strumieniowego, jeśli spełniony jest warunek.

W tym celu należy użyć implementacji wątku, 1) W jednym wątku należy sprawdzić, czy kontekst przesyłania strumieniowego jest zatrzymany, a jeśli ssc został zatrzymany, powiadomić inny wątek, aby poczekał i utworzyć nowy kontekst przesyłania strumieniowego.

2) W drugim wątku należy sprawdzić pod kątem warunku, a jeśli warunek spełni, przerwać kontekst przesyłania strumieniowego.

Proszę dać mi znać, jeśli potrzebujesz wyjaśnienia.

+0

Problem, który mam, np. Dwa tysiące plików i chcę odczytać tylko N wierszy (od kilku do miliardów). Twoje rozwiązanie będzie kosztowne. – Maksym