Chcę pobierać kolejkę (RabbitMQ) synchronicznie z blokowaniem.Zużycie synchroniczne i blokujące w RabbitMQ przy użyciu pika
Uwaga: poniżej jest pełny kod gotowy do uruchomienia.
System skonfigurował system RabbitMQ jako system kolejkowania, ale w jednym z naszych modułów nie jest potrzebne zużycie asynchroniczne.
Próbowałem za pomocą basic_get na szczycie BlockingConnection, który nie blokuje (powroty (None, None, None)
natychmiast):
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
Próbowałem również użyć consume generator, nie powiedzie się z „Połączenie zamknięte” po długim czasie nie spożywania.
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
Czy istnieje sposób na wykorzystanie RabbitMQ pomocą pika klienta jak bym Queue.Queue
w Pythonie? czy coś podobnego?
Moja opcja w tej chwili jest zajęty-wait (używając basic_get) - ale raczej wykorzystać istniejący system nie zajęty-czekaj, jeśli to możliwe.
Pełny kod:
#!/usr/bin/env python
import pika
import time
TEST_QUEUE = 'test'
def get_connection():
# define connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=YOUR_IP,
port=YOUR_PORT,
credentials=pika.PlainCredentials(
username=YOUR_USER,
password=YOUR_PASSWORD,
)
)
)
return connection
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
print "blocking_get_1"
blocking_get_1()
print "blocking_get_2"
blocking_get_2()
get_connection().channel().queue_delete(TEST_QUEUE)
myślę, że ma też do czynienia z nie wysyłając bicie serca ('consume' ewentualnie bloki nich?) Jak widać tutaj: http://stackoverflow.com/questions/14572020/handling-long-running-tasks-in- pika-rabbitmq –
Opublikowalem moje zdanie na ten temat, ale daj mi znać, jeśli źle zrozumiałem twoje pytanie. :) – eandersson