Próbuję technologii, które będę używać do budowania potoku danych w czasie rzeczywistym, i mam uruchomić niektóre problemy eksportowania mojej zawartości do pliku.Strumień wyjściowy Apache Spark w Pythonie
Mam zainstalowanego lokalnego klastra kafka i producenta node.js, który wysyła prostą wiadomość tekstową tylko po to, aby przetestować funkcjonalność i uzyskać przybliżoną ocenę złożoności implementacji.
To jest praca ze strumieniem iskier, która czyta kafejkę i próbuję ją napisać do pliku.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "KafkaStreamingConsumer")
ssc = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "consumer-group", {"test": 1})
kafkaStream.saveAsTextFile('out.txt')
print 'Event recieved in window: ', kafkaStream.pprint()
ssc.start()
ssc.awaitTermination()
Błąd widzę przy składaniu pracy iskra:
kafkaStream.saveAsTextFile('out.txt')
AttributeError: 'TransformedDStream' object has no attribute 'saveAsTextFile'
Brak obliczenia lub transformacje są wykonywane na danych, po prostu chcę, aby zbudować przepływu. Co muszę zmienić/dodać, aby móc eksportować dane do pliku?
dziękuję, działało, moje złe, bo nie dokładnie sprawdzam dokumentów –