Jak korzystać z KafkaUtils.createDirectStream
z przesunięciami dla określonego Topic
w Pyspark?Jak utworzyć InputDStream z przesunięciami w PySpark (używając KafkaUtils.createDirectStream)?
7
A
Odpowiedz
7
Jeśli chcesz utworzyć RDD z rekordów w temacie Kafki, użyj statycznego zestawu krotek. dostępne
dokonać wszystkich import
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
Następnie należy utworzyć słownik Kafki Brokers
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
Następnie utworzeniu przesunięcia obiektu
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
Wreszcie Ci stworzyć RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
Aby utworzyć Stream z przesunięciami trzeba wykonać następujące czynności:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
Następnie należy utworzyć kontekst sparkstreaming używając sparkcontext
ssc = StreamingContext(sc, 1)
Następny Założyliśmy wszystkie nasze parametry
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
start = 0
partition = 0
topic = 'topic'
Następnie tworzymy naszą fromOffset słownik
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long
końcu tworzymy Stream
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
1
można zrobić:
from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = {topicpartion: int(start)}
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
{"metadata.broker.list": brokers}, fromOffsets = fromoffset)
Uwaga: Spark 2.2.0, Python 3.6
Powiązane problemy
- 1. Jak podzielić wektor na kolumny - używając PySpark
- 2. Jak korzystać z klasy Scala w Pyspark
- 3. CSS: Trudność z położeniem w tle różnymi przesunięciami narożników
- 4. filtrowanie Kolumna w PySpark
- 5. Jak działa funkcja Pyspark mapPartitions?
- 6. Jak ustawić spark.sql.parquet.output.committer.class w pyspark
- 7. jak utworzyć pakietowy, działający jar, używając Ant
- 8. jak utworzyć wielowierszowy wykres używając dc.js
- 9. Jak mogę utworzyć strzałkę używając tylko CSS?
- 10. Jak utworzyć nieistniejące podkatalogi rekurencyjnie używając Bash?
- 11. Jak utworzyć wykres 4d używając Pythona z matplotlib
- 12. Jak wyodrębnić identyfikator aplikacji z kontekstu PySpark
- 13. Jak uruchomić graphx z Python/pyspark?
- 14. Eksplodować w PySpark
- 15. Wyodrębnianie słownika z RDD w Pyspark
- 16. Jak utworzyć arrayType dla WSDL w Pythonie (używając suds)?
- 17. Jak utworzyć "niezapisaną grupę" w HTML/używając CSS?
- 18. Budowanie wiersza z dyktanda w pySpark
- 19. Jak utworzyć wiele rodzin kolumn w tabeli HBase używając powłoki
- 20. Jak utworzyć cinemagraphs w Objective-C używając AVFoundation
- 21. Liczby losowe generowanie w PySpark
- 22. Rejestrowanie PySpark?
- 23. Jak zmienić nazwy kolumn danych w pyspark?
- 24. Jak zdobyć liczbę pracowników (executorów) w PySpark?
- 25. uruchamianie skryptu Pyspark na EMR
- 26. Zastosuj StringIndexer kilku kolumn w PySpark Dataframe
- 27. Filtrowanie Pyspark DataFrame z SQL-jak w punkcie
- 28. Pivot wiele kolumn - pyspark
- 29. Pyspark StructType nie jest zdefiniowany
- 30. Running nosetests for pyspark
ale pojawia się błąd "TypeError: unhashable type:" TopicAndPartition "" – pangpang
To jest nieaktualne dla K afka 0.8 i Spark 2.0+ :( – rjurney