2016-06-16 22 views
14

Po to, aby wszystko było trudne, chciałbym spożywać wiadomości z kolejki rabbitMQ. Teraz wiem, że istnieje wtyczka do MQTT na królika (https://www.rabbitmq.com/mqtt.html).SparkStreaming, RabbitMQ i MQTT w Pythonie za pomocą pika

Jednak nie mogę zrobić przykładu pracy, w której Spark zużywa wiadomość, która została wyprodukowana z pika.

Na przykład używam prostego programu wordcount.py Here (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html), aby zobaczyć, czy mogę widzę komunikat producent w następujący sposób:

import sys 
import pika 
import json 
import future 
import pprofile 

def sendJson(json): 

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    channel = connection.channel() 

    channel.queue_declare(queue='analytics', durable=True) 
    channel.queue_bind(exchange='analytics_exchange', 
         queue='analytics') 

    channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json) 
    connection.close() 

if __name__ == "__main__": 
    with open(sys.argv[1],'r') as json_file: 
    sendJson(json_file.read()) 

sparkstreaming konsument jest następujące:

import sys 
import operator 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.mqtt import MQTTUtils 

sc = SparkContext(appName="SS") 
sc.setLogLevel("ERROR") 
ssc = StreamingContext(sc, 1) 
ssc.checkpoint("checkpoint") 
#ssc.setLogLevel("ERROR") 


#RabbitMQ 

"""EXCHANGE = 'analytics_exchange' 
EXCHANGE_TYPE = 'direct' 
QUEUE = 'analytics' 
ROUTING_KEY = 'analytics' 
RESPONSE_ROUTING_KEY = 'analytics-response' 
""" 


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883" 
topic = "analytics" 

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) 
#dummy functions - nothing interesting... 
words = mqttStream.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word, 1)) 
wordCounts = pairs.reduceByKey(lambda x, y: x + y) 

wordCounts.pprint() 
ssc.start() 
ssc.awaitTermination() 

jednak w przeciwieństwie do prostego WordCount przykład, nie mogę uzyskać to do pracy i pojawia się następujący błąd:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8) 
java.lang.NullPointerException 
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457) 
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273) 

Więc moje pytania, jakie powinny być ustawienia w kategoriach MQTTUtils.createStream(ssc, brokerUrl, topic) aby słuchać w kolejce i czy są jakieś bardziej pełniejsze przykłady i sposoby ich map na tych RabbitMQ.

używam mojego kodu konsumentów z: ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

I zostały zaktualizowane kodem producenta następująco parametrów TCP jak sugeruje jeden komentarz:

url_location = 'tcp://localhost' 
url = os.environ.get('', url_location) 
params = pika.URLParameters(url) 
connection = pika.BlockingConnection(params) 

i iskra strumieniowe jak:

brokerUrl = "tcp://127.0.0.1:5672" 
topic = "#" #all messages 

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) 
records = mqttStream.flatMap(lambda line: json.loads(line)) 
count = records.map(lambda rec: len(rec)) 
total = count.reduce(lambda a, b: a + b) 
total.pprint() 

Odpowiedz

2

Wygląda używasz niewłaściwy numer portu. Zakładając, że:

  • masz lokalną instancję RabbitMQ działa z ustawieniami domyślnymi i masz włączone MQTT wtyczki (rabbitmq-plugins enable rabbitmq_mqtt) i ponownie uruchomić serwer RabbitMQ
  • wliczone spark-streaming-mqtt podczas wykonywania spark-submit/pyspark (albo z packages lub jars/driver-class-path)

można połączyć za pomocą protokołu TCP z tcp://localhost:1883. Musisz także pamiętać, że MQTT używa amq.topic.

Skrócona:

  • tworzyć Dockerfile o następującej treści:

    FROM rabbitmq:3-management 
    
    RUN rabbitmq-plugins enable rabbitmq_mqtt 
    
  • budować Docker obraz:

    docker build -t rabbit_mqtt . 
    
  • początkowy obraz i poczekać, aż serwer jest rea dy:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    
  • tworzyć producer.py z następującej treści:

    import pika 
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost')) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='amq.topic', 
           type='topic', durable=True) 
    
    for i in range(1000): 
        channel.basic_publish(
         exchange='amq.topic', # amq.topic as exchange 
         routing_key='hello', # Routing key used by producer 
         body='Hello World {0}'.format(i) 
        ) 
        time.sleep(3) 
    
    connection.close() 
    
  • producent początek

    python producer.py 
    

    i odwiedzić konsola zarządzania http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    , aby zobaczyć, że komunikaty są odbierane.

  • tworzyć consumer.py o następującej treści:

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    from pyspark.streaming.mqtt import MQTTUtils 
    
    sc = SparkContext() 
    ssc = StreamingContext(sc, 10) 
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883", # Note both port number and protocol 
        "hello"     # The same routing key as used by producer 
    ) 
    mqttStream.count().pprint() 
    ssc.start() 
    ssc.awaitTermination() 
    ssc.stop() 
    
  • pobieranie Zależności (dostosowanie wersji Scala do stosowanego do budowy Spark oraz wersję Spark):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 
    
  • upewnić SPARK_HOME i PYTHONPATH wskaż poprawne katalogi.

  • złożyć consumer.py z (wyregulować wersje jak poprzednio):

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py 
    

Jeśli wykonałeś wszystkie kroki powinieneś zobaczyć Witam wiadomości świat w dzienniku Spark.

+0

Dzięki. Spojrzę na to. Czy to działa z bezpośrednim i dobrym tematem? – disruptive

+0

Wtyczka MQTT [może być skonfigurowana] (https://www.rabbitmq.com/mqtt.html#config) do korzystania z innej giełdy, ale o ile mogę to powiedzieć. Protokół MQTT nie jest dużo bogatszy niż ten. – zero323

+0

Czy istnieje sposób skonfigurowania tego bez okna dokowanego - na przykład przy użyciu pliku .config. Próbowałem z domyślnymi ustawieniami w https://www.rabbitmq.com/mqtt.html. Ale to w ogóle nie działa. Bez ustawień mój słuchacz iskier może łączyć się z następującymi: = RAPORT INFORMACYJNY ==== 5-lip-2016 :: 11: 52: 08 === przyjmowanie połączenia MQTT <0.321.0> (127.0.0.1:47868 -> 127.0. 0,1: 1883). Ale jak sprawić, aby wygenerowane komunikaty były mapowane na ten port? – disruptive

2

Z identyfikatora Javadoc MqttAsyncClient, identyfikator URI serwera musi mieć jeden z następujących schematów: tcp://, ssl:// lub local://. Musisz zmienić powyższe brokerUrl, aby uzyskać jeden z tych schematów.

Aby uzyskać więcej informacji, oto link do źródła dla MqttAsyncClient:

https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272

+1

Podjęto próbę zmiany producenta, aby używał tcp zamiast http, jednak odkryłem, że mam teraz problem z połączeniem następujących: ERROR ReceiverSupervisorImpl: Zatrzymany odbiornik z błędem: Połączenie utracone (32109) - java.net.SocketException: Reset połączenia – disruptive

Powiązane problemy