UWAGA: To nie jest przeznaczony jako bezpośrednia odpowiedź na pytanie. Jest to raczej rozszerzenie do @TimothyLiu's answer, zakładając, że użytkownik końcowy używa pakietu Boto
(inaczej Boto2), a nie Boto3
. Kod ten jest „Boto-2-zację” wezwania delete_messages
określoną w his answer
Boto
(2) wymagają
delete_message_batch(messages_to_delete)
gdzie
messages_to_delete
jest
dict
obiekt o klucz: wartość odpowiadająca
id
:
receipt_handle
pary powraca
AttributeError: 'dict' object has no attribute 'id'.
Wygląda na to, że delete_message_batch
oczekuje obiektu klasy Message
; Kopiowanie obiektu Boto source for delete_message_batch
i zezwalanie mu na używanie obiektu innego niż Message
(ala boto3) również nie powiedzie się, jeśli usuniesz więcej niż 10 "wiadomości" naraz. Tak więc musiałem użyć następującego obejścia.
ePrint kod z here
from __future__ import print_function
import sys
from itertools import islice
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@static_vars(counter=0)
def take(n, iterable, reset=False):
"Return next n items of the iterable as same type"
if reset: take.counter = 0
take.counter += n
bob = islice(iterable, take.counter-n, take.counter)
if isinstance(iterable, dict): return dict(bob)
elif isinstance(iterable, list): return list(bob)
elif isinstance(iterable, tuple): return tuple(bob)
elif isinstance(iterable, set): return set(bob)
elif isinstance(iterable, file): return file(bob)
else: return bob
def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
"""
Deletes a list of messages from a queue in a single request.
:param cx: A boto connection object.
:param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
:param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
"""
listof10s = []
asSuc, asErr, acS, acE = "","",0,0
res = []
it = tuple(enumerate(messages))
params = {}
tenmsg = take(10,it,True)
while len(tenmsg)>0:
listof10s.append(tenmsg)
tenmsg = take(10,it)
while len(listof10s)>0:
tenmsg = listof10s.pop()
params.clear()
for i, msg in tenmsg: #enumerate(tenmsg):
prefix = 'DeleteMessageBatchRequestEntry'
numb = (i%10)+1
p_name = '%s.%i.Id' % (prefix, numb)
params[p_name] = msg.get('id')
p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
params[p_name] = msg.get('receipt_handle')
try:
go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
(sSuc,cS),(sErr,cE) = tup_result_messages(go)
if cS:
asSuc += ","+sSuc
acS += cS
if cE:
asErr += ","+sErr
acE += cE
except cx.ResponseError:
eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
except:
eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res
def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
if sSuc == "": sSuc="None"
if sErr == "": sErr="None"
if cS == expect: sSuc="All"
if cE == expect: sErr="All"
return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)
Nie mogę tego zrobić, ponieważ komunikaty w SQS mieć timeout widoczność, więc gdybym najpierw uzyskać 10 wiadomości, a następnie zapętlić kilka razy, następnym razem, gdy dostanę te same 10 wiadomości od czasu upływu czasu. Zastanawiam się nad użyciem 'dump()', ale będę musiał przeczytać plik po, że wydaje się głupie, jestem coś brakuje? (Mogę ustawić czas widoczności na bardzo długi czas, ale to wydaje się brzydkie). –
@linker - powiedziałeś, że musisz sprawdzić komunikaty "n". Czy to oznacza, że istnieją pewne kryteria dopasowania, do których porównywana jest każda wiadomość? –
Przepraszam, jeśli to było mylące, zaktualizowałem swój wpis. –