2015-08-12 13 views
6

Próbuję technologii, które będę używać do budowania potoku danych w czasie rzeczywistym, i mam uruchomić niektóre problemy eksportowania mojej zawartości do pliku.Strumień wyjściowy Apache Spark w Pythonie

Mam zainstalowanego lokalnego klastra kafka i producenta node.js, który wysyła prostą wiadomość tekstową tylko po to, aby przetestować funkcjonalność i uzyskać przybliżoną ocenę złożoności implementacji.

To jest praca ze strumieniem iskier, która czyta kafejkę i próbuję ją napisać do pliku.

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

# Create a local StreamingContext with two working thread and batch interval of 1 second 
sc = SparkContext("local[2]", "KafkaStreamingConsumer") 
ssc = StreamingContext(sc, 10) 

kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "consumer-group", {"test": 1}) 

kafkaStream.saveAsTextFile('out.txt') 

print 'Event recieved in window: ', kafkaStream.pprint() 

ssc.start() 
ssc.awaitTermination() 

Błąd widzę przy składaniu pracy iskra:

kafkaStream.saveAsTextFile('out.txt') 
AttributeError: 'TransformedDStream' object has no attribute 'saveAsTextFile' 

Brak obliczenia lub transformacje są wykonywane na danych, po prostu chcę, aby zbudować przepływu. Co muszę zmienić/dodać, aby móc eksportować dane do pliku?

Odpowiedz