2012-04-16 12 views
9

Sto lavorando su un'applicazione il cui flusso di lavoro è gestito passando messaggi in SQS, usando boto.Come ottenere tutti i messaggi nella coda di Amazon SQS utilizzando la libreria boto in Python?

La coda di SQS sta crescendo gradualmente e non ho modo di controllare quanti elementi deve contenere.

Ora ho un daemon che esegue periodicamente il polling della coda e controlla se ho un set di elementi a dimensione fissa. Ad esempio, si consideri il seguente "coda":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

Ora voglio verificare se ho "msg1_comp1", "msg2_comp1" e "msg3_comp1" in coda insieme a un certo punto nel tempo, ma I don' so la dimensione della coda.

Dopo guardando attraverso l'API, a quanto pare è possibile ottenere solo 1 elemento, o di un numero fisso di elementi nella coda, ma non tutti:

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

Un suggerimento proposto nelle risposte sarebbe quello di ottenere ad esempio 10 messaggi in un ciclo fino a quando non ottengo nulla indietro, ma i messaggi in SQS hanno un timeout di visibilità, il che significa che se scrivo elementi dalla coda, non verranno realmente rimossi, saranno invisibili solo per un breve periodo di tempo.

C'è un modo semplice per ottenere tutti i messaggi in coda, senza sapere quanti ce ne sono?

risposta

13

Metti la tua chiamata a q.get_messages(n) all'interno ciclo while:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

Inoltre, dump won't support more than 10 messages sia:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

non posso farlo, dal momento che i messaggi in SQS hanno un timeout di visibilità, quindi se ho ottenere 10 messaggi, quindi esegui un ciclo alcune volte, la prossima volta potrei ricevere gli stessi 10 messaggi da quando è scaduto il timeout. Sto pensando di usare 'dump()' ma dovrò leggere il file dopo, sembra sciocco, mi manchi qualcosa? (Potrei impostare la visibilità_timeout a un tempo molto lungo, ma sembra brutto). –

+0

@linker - hai detto che devi controllare per i messaggi specifici di "n". Questo significa che ci sono alcuni criteri di corrispondenza a cui stai confrontando ogni messaggio? –

+0

Scusate se ciò è stato fonte di confusione, ho aggiornato il mio post. –

5

mia comprensione è che la natura distribuita del servizio SQS rende praticamente il vostro disegno impraticabile. Ogni volta che chiami get_messages stai parlando con un diverso set di server, che avrà alcuni ma non tutti i tuoi messaggi. Pertanto non è possibile "effettuare il check-in di volta in volta" per impostare se un particolare gruppo di messaggi è pronto e quindi accettarli.

Ciò che è necessario eseguire è il polling continuo, prendere tutti i messaggi non appena arrivano e memorizzarli localmente nelle proprie strutture dati. Dopo ogni raccolta riuscita è possibile controllare le strutture dati per vedere se è stata raccolta una serie completa di messaggi.

Tenete a mente che i messaggi verranno arrivano in ordine, e alcuni messaggi saranno essere consegnati due volte, come eliminazioni devono propagarsi a tutti i server SQS, ma le successive richieste di ottenere a volte battono i messaggi di eliminazione.

0

Qualcosa come il seguente codice dovrebbe fare il trucco. Scusa è in C#, ma non dovrebbe essere difficile convertire in python. Il dizionario è usato per estirpare i duplicati.

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

ho lavorato con le code AWS SQS per fornire notifiche istantanee, quindi ho bisogno di essere l'elaborazione di tutti i messaggi in tempo reale. Il seguente codice ti aiuterà a rimuovere in modo efficiente tutti i messaggi (tutti) e a gestire eventuali errori durante la rimozione.

Nota: per rimuovere i messaggi dalla coda è necessario eliminarli.Sto utilizzando la versione aggiornata boto3 AWS pitone SDK, biblioteca JSON, e i seguenti valori di default:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

Un adattamento per i pacchetti v2 'Boto' su" backport "la funzione' delete_messages' da 'Boto3' è [qui] (http://stackoverflow.com/a/40638174/4228193). Il 'Boto' (2) predefinito' delete_message_batch' ha una limitazione di 10 messaggi E richiede pieni oggetti 'Message'-classe, piuttosto che solo' ID' e 'ReceiptHandles' in un oggetto. – mpag

0

NOTA: Questo non è inteso come una risposta diretta alla domanda. Piuttosto si tratta di un aumento a @TimothyLiu's answer, supponendo che l'utente finale stia utilizzando il pacchetto Boto (noto anche come Boto2) non Boto3. Questo codice è un "Boto-2-izzazione" della delete_messages chiamata cui his answer


A Boto (2) richiedono delete_message_batch(messages_to_delete) dove messages_to_delete è un oggetto dict con chiave: valore corrispondente a id: receipt_handle coppie restituisce

AttributeError: 'dict' object has no attribute 'id'.

Sembra che delete_message_batch si aspetti un oggetto di classe Message; la copia di Boto source for delete_message_batch e la possibilità di utilizzare un oggetto non Message (ala boto3) ha esito negativo anche se si eliminano più di 10 "messaggi" alla volta. Quindi, ho dovuto usare il seguente work-around.

codice ePrint da here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

eseguo questo in un cronjob

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
Problemi correlati