2015-10-21 6 views

Odpowiedz

7

Jeśli chcesz utworzyć RDD z rekordów w temacie Kafki, użyj statycznego zestawu krotek. dostępne

dokonać wszystkich import

from pyspark.streaming.kafka import KafkaUtils, OffsetRange 

Następnie należy utworzyć słownik Kafki Brokers

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} 

Następnie utworzeniu przesunięcia obiektu

start = 0 
until = 10 
partition = 0 
topic = 'topic'  
offset = OffsetRange(topic,partition,start,until) 
offsets = [offset] 

Wreszcie Ci stworzyć RDD:

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets) 

Aby utworzyć Stream z przesunięciami trzeba wykonać następujące czynności:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition 
from pyspark.streaming import StreamingContext 

Następnie należy utworzyć kontekst sparkstreaming używając sparkcontext

ssc = StreamingContext(sc, 1) 

Następny Założyliśmy wszystkie nasze parametry

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} 
start = 0 
partition = 0 
topic = 'topic'  

Następnie tworzymy naszą fromOffset słownik

topicPartion = TopicAndPartition(topic,partition) 
fromOffset = {topicPartion: long(start)} 
//notice that we must cast the int to long 

końcu tworzymy Stream

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
fromOffsets=fromOffset) 
+0

ale pojawia się błąd "TypeError: unhashable type:" TopicAndPartition "" – pangpang

+1

To jest nieaktualne dla K afka 0.8 i Spark 2.0+ :( – rjurney

1

można zrobić:

from pyspark.streaming.kafka import TopicAndPartition 
topic = "test" 
brokers = "localhost:9092" 
partition = 0 
start = 0 
topicpartion = TopicAndPartition(topic, partition) 
fromoffset = {topicpartion: int(start)} 
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \ 
     {"metadata.broker.list": brokers}, fromOffsets = fromoffset) 

Uwaga: Spark 2.2.0, Python 3.6

Powiązane problemy