2014-10-21 8 views
7

Chcę pobierać kolejkę (RabbitMQ) synchronicznie z blokowaniem.Zużycie synchroniczne i blokujące w RabbitMQ przy użyciu pika

Uwaga: poniżej jest pełny kod gotowy do uruchomienia.

System skonfigurował system RabbitMQ jako system kolejkowania, ale w jednym z naszych modułów nie jest potrzebne zużycie asynchroniczne.

Próbowałem za pomocą basic_get na szczycie BlockingConnection, który nie blokuje (powroty (None, None, None) natychmiast):

# declare queue 
get_connection().channel().queue_declare(TEST_QUEUE) 
def blocking_get_1(): 

     channel = get_connection().channel() 

     # get from an empty queue (prints immediately) 
     print channel.basic_get(TEST_QUEUE) 

Próbowałem również użyć consume generator, nie powiedzie się z „Połączenie zamknięte” po długim czasie nie spożywania.

def blocking_get_2(): 
     channel = get_connection().channel() 
     # put messages in TEST_QUEUE 
     for i in range(4): 
       channel.basic_publish(
         '', 
         TEST_QUEUE, 
         'body %d' % i 
       ) 
     consume_generator = channel.consume(TEST_QUEUE) 
     print next(consume_generator) 
     time.sleep(14400) 
     print next(consume_generator) 

Czy istnieje sposób na wykorzystanie RabbitMQ pomocą pika klienta jak bym Queue.Queue w Pythonie? czy coś podobnego?

Moja opcja w tej chwili jest zajęty-wait (używając basic_get) - ale raczej wykorzystać istniejący system nie zajęty-czekaj, jeśli to możliwe.

Pełny kod:

#!/usr/bin/env python 
import pika 
import time 

TEST_QUEUE = 'test' 
def get_connection(): 
     # define connection 
     connection = pika.BlockingConnection(
       pika.ConnectionParameters(
         host=YOUR_IP, 
         port=YOUR_PORT, 
         credentials=pika.PlainCredentials(
           username=YOUR_USER, 
           password=YOUR_PASSWORD, 
         ) 
       ) 
     ) 
     return connection 

# declare queue 
get_connection().channel().queue_declare(TEST_QUEUE) 
def blocking_get_1(): 

     channel = get_connection().channel() 

     # get from an empty queue (prints immediately) 
     print channel.basic_get(TEST_QUEUE) 

def blocking_get_2(): 
     channel = get_connection().channel() 
     # put messages in TEST_QUEUE 
     for i in range(4): 
       channel.basic_publish(
         '', 
         TEST_QUEUE, 
         'body %d' % i 
       ) 
     consume_generator = channel.consume(TEST_QUEUE) 
     print next(consume_generator) 
     time.sleep(14400) 
     print next(consume_generator) 


print "blocking_get_1" 
blocking_get_1() 

print "blocking_get_2" 
blocking_get_2() 

get_connection().channel().queue_delete(TEST_QUEUE) 
+0

myślę, że ma też do czynienia z nie wysyłając bicie serca ('consume' ewentualnie bloki nich?) Jak widać tutaj: http://stackoverflow.com/questions/14572020/handling-long-running-tasks-in- pika-rabbitmq –

+1

Opublikowalem moje zdanie na ten temat, ale daj mi znać, jeśli źle zrozumiałem twoje pytanie. :) – eandersson

Odpowiedz

10

Powszechnym problemem z Pika jest to, że nie jest obecnie obsługę przychodzących zdarzeń w tle. Zasadniczo oznacza to, że w wielu sytuacjach będziesz musiał okresowo dzwonić pod numer connection.process_data_events(), aby upewnić się, że nie przepadnie za uderzeniami serca.

Oznacza to również, że jeśli śpisz na dłuższy okres czasu, pika nie będzie obsługi danych przychodzących i ostatecznie umrzeć, gdyż nie reaguje pulsem. Opcją tutaj jest wyłączenie pulsu.

zwykle rozwiązać ten problem poprzez gwint w czeku tła dla nowych wydarzeń, jak widać na this przykład.

Jeśli chcesz zablokować całkowicie chciałbym zrobić coś takiego (na podstawie własnej biblioteki AMQP-Storm).

while True: 
    result = channel.basic.get(queue='simple_queue', no_ack=False) 
    if result: 
     print("Message:", result['body']) 
     channel.basic.ack(result['method']['delivery_tag']) 
    else: 
     print("Channel Empty.") 
     sleep(1) 

Jest to oparte na znalezionym przykładzie: here.

+0

Pamiętam, że miałem problemy podczas uzyskiwania dostępu do połączenia z dwóch wątków. Komunikacja między wątkami dodaje nad głową, więc poczekam na sposób, aby to zrobić bez niego. Dam to jeszcze raz i zaktualizuję tutaj. –

+2

Tak, jeśli używasz pika może to być trudne. Nie jest przeznaczony do wątków, ale przykład, który łączyłem, może obsługiwać wiele jednoczesnych wiadomości. Z drugiej strony moja biblioteka amqp-storm powinna ułatwić to zadanie, ponieważ jest bezpieczna dla wątków. – eandersson

Powiązane problemy