NOTA: Questo non è inteso come una risposta diretta alla domanda. Piuttosto si tratta di un aumento a @TimothyLiu's answer, supponendo che l'utente finale stia utilizzando il pacchetto Boto
(noto anche come Boto2) non Boto3
. Questo codice è un "Boto-2-izzazione" della delete_messages
chiamata cui his answer
A
Boto
(2) richiedono
delete_message_batch(messages_to_delete)
dove
messages_to_delete
è un oggetto
dict
con chiave: valore corrispondente a
id
:
receipt_handle
coppie restituisce
AttributeError: 'dict' object has no attribute 'id'.
Sembra che delete_message_batch
si aspetti un oggetto di classe Message
; la copia di Boto source for delete_message_batch
e la possibilità di utilizzare un oggetto non Message
(ala boto3) ha esito negativo anche se si eliminano più di 10 "messaggi" alla volta. Quindi, ho dovuto usare il seguente work-around.
codice ePrint da here
from __future__ import print_function
import sys
from itertools import islice
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@static_vars(counter=0)
def take(n, iterable, reset=False):
"Return next n items of the iterable as same type"
if reset: take.counter = 0
take.counter += n
bob = islice(iterable, take.counter-n, take.counter)
if isinstance(iterable, dict): return dict(bob)
elif isinstance(iterable, list): return list(bob)
elif isinstance(iterable, tuple): return tuple(bob)
elif isinstance(iterable, set): return set(bob)
elif isinstance(iterable, file): return file(bob)
else: return bob
def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
"""
Deletes a list of messages from a queue in a single request.
:param cx: A boto connection object.
:param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
:param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
"""
listof10s = []
asSuc, asErr, acS, acE = "","",0,0
res = []
it = tuple(enumerate(messages))
params = {}
tenmsg = take(10,it,True)
while len(tenmsg)>0:
listof10s.append(tenmsg)
tenmsg = take(10,it)
while len(listof10s)>0:
tenmsg = listof10s.pop()
params.clear()
for i, msg in tenmsg: #enumerate(tenmsg):
prefix = 'DeleteMessageBatchRequestEntry'
numb = (i%10)+1
p_name = '%s.%i.Id' % (prefix, numb)
params[p_name] = msg.get('id')
p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
params[p_name] = msg.get('receipt_handle')
try:
go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
(sSuc,cS),(sErr,cE) = tup_result_messages(go)
if cS:
asSuc += ","+sSuc
acS += cS
if cE:
asErr += ","+sErr
acE += cE
except cx.ResponseError:
eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
except:
eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res
def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
if sSuc == "": sSuc="None"
if sErr == "": sErr="None"
if cS == expect: sSuc="All"
if cE == expect: sErr="All"
return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)
non posso farlo, dal momento che i messaggi in SQS hanno un timeout di visibilità, quindi se ho ottenere 10 messaggi, quindi esegui un ciclo alcune volte, la prossima volta potrei ricevere gli stessi 10 messaggi da quando è scaduto il timeout. Sto pensando di usare 'dump()' ma dovrò leggere il file dopo, sembra sciocco, mi manchi qualcosa? (Potrei impostare la visibilità_timeout a un tempo molto lungo, ma sembra brutto). –
@linker - hai detto che devi controllare per i messaggi specifici di "n". Questo significa che ci sono alcuni criteri di corrispondenza a cui stai confrontando ogni messaggio? –
Scusate se ciò è stato fonte di confusione, ho aggiornato il mio post. –