2014-12-23 12 views
6

Jestem nowy dla rabbitmq i pika i mam problem z zatrzymaniem konsumpcji.pika, stop_consuming nie działa

kanał kolejek ustawienie:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True) 

Zasadniczo konsumentów i producentów, są jak poniżej:

konsumenta:

def task(task_id): 
    def callback(channel, method, properties, body): 
     if body != "quit": 
      print(body) 
     else: 
      print(body) 
      channel.stop_consuming(task_id) 

    channel.basic_consume(callback, queue=task_id, no_ack=True) 
    channel.start_consuming() 
    print("finish") 
    return "finish" 

producent:

proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE) 
while proc.returncode is None: # running 
    line = proc.stdout.readline() 
    if line: 
     channel.basic_publish(
      exchange='', 
      routing_key=self.request.id, 
      body=line 
     ) 
    else: 
     channel.basic_publish(
      exchange='', 
      routing_key=self.request.id, 
      body="quit" 
     ) 
     break 

konsument task dała mi wynik:

# ... output from sample.sh, as expected 

quit 
�}q(UstatusqUSUCCESSqU tracebackqNUresultqNUtask_idqU 
1419350416qUchildrenq]u. 

Jednak "finish" nie wydrukowany, więc zgaduję, że to dlatego channel.stop_consuming(task_id) nie powstrzymało czasochłonne. Jeśli tak, jaki jest właściwy sposób postępowania? Dziękuję Ci.

+0

Czy jesteś pewien, że stop_consuming pobiera nazywane? – eandersson

+0

@eandersson Tak, jestem pewien. – laike9m

+0

Ok. Czy jesteś pewien, że podajesz prawidłowy identyfikator do stop_consuming? Spróbuj po prostu użyć '' 'channel.stop_consuming()' '' – eandersson

Odpowiedz

5

Miałem ten sam problem. Wydaje się, że jest to spowodowane faktem, że wewnętrznie, start_consuming dzwoni do start_consuming. Ta time_limit=None powoduje, że zawiesza się.

udało mi się rozwiązać ten problem poprzez zastąpienie wywołanie channel.start_consuming() z jego implementacja, posiekany:

while channel._consumer_infos: 
    channel.connection.process_data_events(time_limit=1) # 1 second 
+0

Nie mam warunku, aby potwierdzić twoją odpowiedź teraz, w każdym razie, przyjmuję to. – laike9m

+2

shx2, masz rację, myślę. Przynajmniej to zrozumiałem, patrząc na źródło. Wydaje się również, że rozwiązuje problem. Otworzę bilet. –

+1

Dla odniesienia: https://github.com/pika/pika/issues/770 –