2012-04-16 10 views
9

Pracuję nad aplikacją, której przepływ pracy jest zarządzany przez przekazywanie wiadomości w SQS przy użyciu boto.Jak uzyskać wszystkie wiadomości w kolejce Amazon SQS za pomocą biblioteki boto w Pythonie?

Moja kolejka SQS rośnie stopniowo i nie mam możliwości sprawdzenia, ile elementów ma zawierać.

Mam teraz demona, który okresowo odpytuje kolejkę i sprawdza, czy mam zestaw elementów o stałym rozmiarze. Na przykład, rozważmy następującą „kolejkę”:

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

teraz chcę sprawdzić, czy mam „msg1_comp1”, „msg2_comp1” i „msg3_comp1” w kolejce razem w pewnym momencie, ale don” t znać rozmiar kolejki.

Po patrząc przez API, wydaje się, można albo dostać tylko 1 element, lub określonej liczby elementów w kolejce, ale nie wszystkie:

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

Sugestia zaproponowano w odpowiedzi byłoby otrzymasz na przykład 10 wiadomości w pętli, dopóki nic nie otrzymam, ale wiadomości w SQS mają limit czasu widoczności, co oznacza, że ​​jeśli wybiorę elementy z kolejki, nie zostaną one faktycznie usunięte, będą niewidoczne tylko przez krótki czas czasu.

Czy istnieje prosty sposób na pobranie wszystkich wiadomości w kolejce, nie wiedząc, ile ich jest?

Odpowiedz

13

Umieść swoje wezwanie do q.get_messages(n) wewnątrz podczas pętli:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

Dodatkowo dump won't support more than 10 messages albo:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

Nie mogę tego zrobić, ponieważ komunikaty w SQS mieć timeout widoczność, więc gdybym najpierw uzyskać 10 wiadomości, a następnie zapętlić kilka razy, następnym razem, gdy dostanę te same 10 wiadomości od czasu upływu czasu. Zastanawiam się nad użyciem 'dump()', ale będę musiał przeczytać plik po, że wydaje się głupie, jestem coś brakuje? (Mogę ustawić czas widoczności na bardzo długi czas, ale to wydaje się brzydkie). –

+0

@linker - powiedziałeś, że musisz sprawdzić komunikaty "n". Czy to oznacza, że ​​istnieją pewne kryteria dopasowania, do których porównywana jest każda wiadomość? –

+0

Przepraszam, jeśli to było mylące, zaktualizowałem swój wpis. –

5

moim rozumieniu jest to, że rozproszony charakter służby SQS dość dużo sprawia, że ​​konstrukcja niewykonalne. Za każdym razem, gdy wywołujesz get_messages, rozmawiasz z innym zestawem serwerów, który będzie zawierał niektóre, ale nie wszystkie wiadomości. W związku z tym nie jest możliwe "od czasu do czasu" sprawdzanie, czy określona grupa wiadomości jest gotowa, a następnie po prostu je zaakceptować.

Co trzeba zrobić, to sondować w sposób ciągły, odbierać wszystkie wiadomości po ich otrzymaniu i przechowywać je lokalnie we własnych strukturach danych. Po każdym pomyślnym pobraniu możesz sprawdzić struktury danych, aby sprawdzić, czy kompletny zestaw wiadomości został zebrany.

Należy pamiętać, że wiadomości będzie przyjechać poza kolejnością, a niektóre komunikaty będzie być dostarczone dwa razy, jak Usuwa mają propagować na wszystkich serwerach SQS, ale późniejsze uzyskać wnioski czasami pokonać się usunąć wiadomości.

0

Coś jak poniższy kod powinno wystarczyć. Niestety jest w języku C#, ale nie powinno być trudno przekonwertować na pytona. Słownik służy do usuwania duplikatów.

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

pracuję z kolejek AWS SQS, aby zapewnić natychmiastowe powiadomienia, więc muszę być przetwarzanie wszystkich wiadomości w czasie rzeczywistym. Poniższy kod pomoże ci efektywnie usunąć (wszystkie) wiadomości i poradzić sobie z błędami podczas usuwania.

Uwaga: aby usunąć wiadomości z kolejki, musisz je usunąć.Używam Pythona aktualizowane boto3 AWS SDK, biblioteka json oraz następujące wartości domyślne:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

Adaptacja dla pakietów v2 'Boto' do" backport "funkcji' delete_messages' z 'Boto3' to [tutaj] (http://stackoverflow.com/a/40638174/4228193). Wbudowana 'Boto' (2)' delete_message_batch' ma ograniczenie 10 komunikatów I wymaga pełnych obiektów klasy "Message", a nie tylko 'ID' i' ReceiptHandles' w obiekcie. – mpag

0

UWAGA: To nie jest przeznaczony jako bezpośrednia odpowiedź na pytanie. Jest to raczej rozszerzenie do @TimothyLiu's answer, zakładając, że użytkownik końcowy używa pakietu Boto (inaczej Boto2), a nie Boto3. Kod ten jest „Boto-2-zację” wezwania delete_messages określoną w his answer


Boto (2) wymagają delete_message_batch(messages_to_delete) gdzie messages_to_delete jest dict obiekt o klucz: wartość odpowiadająca id: receipt_handle pary powraca

AttributeError: 'dict' object has no attribute 'id'.

Wygląda na to, że delete_message_batch oczekuje obiektu klasy Message; Kopiowanie obiektu Boto source for delete_message_batch i zezwalanie mu na używanie obiektu innego niż Message (ala boto3) również nie powiedzie się, jeśli usuniesz więcej niż 10 "wiadomości" naraz. Tak więc musiałem użyć następującego obejścia.

ePrint kod z here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

mogę wykonać to w cronjob

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
Powiązane problemy