2011-11-02 14 views
8

Actual Design:Eventlet può gestire una connessione AMQP con messaggi che passano in modo asincrono sia in entrata che in uscita?

Per coloro che ritornano a questa domanda, la risposta utile al di sotto mi ha spinto verso un design funzionale che esegue bene. Tre intuizioni erano chiave:

  1. Eventlet è un ambiente molto sicuro - se due greenlets entrambi cercano recv() o entrambi cercano send() dalla stessa presa contemporaneamente, quindi Eventlet uccide elegantemente la seconda greenlet con un'eccezione. Questo è magnifico e significa che semplici eccezioni, e non impossibili da riprodurre errori di interleaving dei dati, risulteranno se i "verdi" amqplib non sono soddisfacenti.
  2. I amqplib metodi rientrano grosso modo in due gruppi: wait() loop all'interno di recv() fino a quando un messaggio di AMQP è assemblato, mentre altri metodi send() messaggi avanti e non tenterà la propria recv(). Questo è incredibilmente in buona fortuna, dato che gli autori di amqplib non avevano idea che qualcuno avrebbe cercato di "rendere verde" la loro libreria! Significa che l'invio di messaggi non è solo sicuro dalla richiamata invocata da wait(), ma che i messaggi possono anche essere inviati in modo sicuro da altri greenlet che sono completamente fuori dal controllo del ciclo wait(). Questi metodi sicuri - che possono essere chiamati da qualsiasi greenlet, non solo dal wait() callback - sono:
    1. basic_ack
    2. basic_consume con nowait=True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declare con nowait=True
    7. exchange_delete con nowait=True
    8. queue_bind con nowait=True
    9. queue_unbind con nowait=True
    10. queue_declare con nowait=True
    11. queue_delete con nowait=True
    12. queue_purge con nowait=True
  3. I semafori possono essere utilizzati come blocchi: inizializzare il semaforo con il conteggio 1 e quindi acquire() e release() per bloccare e sbloccare. Tutti i miei greenlet asincroni che desiderano scrivere messaggi possono utilizzare tale blocco per evitare di avere chiamate separate interleave e rovinare il protocollo AMQP.

Quindi il mio codice è più o meno in questo modo:

amqp = eventlet.patcher.import_patched('amqplib.client_0_8') 

class Processor(object): 
    def __init__(self): 
     write_lock = eventlet.semaphore.Semaphore(1) 

    def listening_greenlet(channel): 
     # start this using eventlet.spawn_n() 
     # create Connection and self.channel 
     self.channel.basic_consume(queue, callback=self.consume) 
     while True: 
      self.channel.wait() 

    def safe_publish(channel, *args, **kw): 
     with write_lock: # yes, Eventlet supports this! 
      channel.basic_publish(*args, **kw)  

    def consume(message): 
     # Returning immediately frees the wait() loop 
     eventlet.spawn_n(self.process, message) 

    def process(message): 
     # do whatever I want 
     # whenever I am done, I can async reply: 
     self.safe_publish(...) 

Enjoy!

domanda iniziale:

Immaginate centinaia di messaggi AMQP che arrivano ogni minuto in un piccolo pitone Eventlet applicazione, che ogni hanno bisogno di essere elaborati e ha risposto - in cui il sovraccarico della CPU del trattamento sarà minimo, ma potrebbe coinvolgere in attesa di risposte da altri servizi e prese.

Per consentire, ad esempio, 100 messaggi da elaborare in una sola volta, potrei ovviamente creare 100 connessioni TCP separate a RabbitMQ e disporre di un worker per ogni connessione che riceve, elabora e risponde a singoli messaggi in blocco. Ma per conservare le connessioni TCP preferirei creare una sola connessione AMQP, permettere a RabbitMQ di trasmettere i messaggi a tutta velocità, consegnare tali compiti ai lavoratori e rispedire le risposte al completamento di ogni lavoratore:

         +--------+ 
           +------| worker | <-+ 
           |  +--------+ | 
           |  +--------+ | 
           | +----| worker | <-+ 
           | | +--------+ | 
           | | +--------+ | 
           | | +--| worker | <-+ 
           | | | +--------+ | 
           v v v    | 
          +------------+   | 
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+ 
          +------------+ 

si osservi che:

  • Un Eventlet queue potrebbe elegantemente distribuire il lavoro in entrata tra i lavoratori man mano che diventano disponibili per più lavoro.
  • Il controllo del flusso da RabbitMQ potrebbe anche essere possibile: Posso inviare messaggi ACK solo fino a quando i miei dipendenti sono tutti occupati, quindi attendere prima di inviare altri ACK finché la coda non inizia a svuotarsi.
  • Il lavoro sarà quasi certamente completato fuori ordine: una richiesta potrebbe terminare rapidamente mentre un altro evento che è arrivato prima richiede molto più tempo; e alcune richieste potrebbero non essere mai completate affatto; così gli operai consegneranno le risposte in un ordine imprevedibile e asincrono.

mi aveva intenzione di scrivere questo utilizzando Eventlet e py-amqplib dopo aver visto questo post del blog interessante su come facilmente quella libreria AMQP potrebbe essere tirato nel modello di elaborazione Eventlet:

http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/

Il mio problema è che, dopo aver letto la documentazione per entrambe le librerie, il codice sorgente amqplib e gran parte del codice sorgente di Eventlet, non riesco a capire come posso insegnare l'eventlet che possiede la connessione AMQP - l'eventlet chiamato connect_to_host() nel post del blog - a anche si sveglia quando un operatore completa il proprio lavoro e genera una risposta. Il metodo wait() in amqplib può essere risvegliato solo attraverso l'attività sul socket AMQP. Anche se sembra che dovrei essere in grado di far scrivere ai lavoratori le loro risposte in una coda e fare in modo che l'eventlet connect_to_host() attivi lo o quando arriva un nuovo messaggio o quando un lavoratore è pronto con una risposta da inviare, non riesco a trovare alcun modo per un eventlet dire

e mi venne in mente che i lavoratori potessero provare requisire l'oggetto di connessione AMQP “svegliatemi quando uno di queste cose accade.” - o anche il raw socket - e scrivere i propri messaggi su TCP; ma sembra che i blocchi siano necessari per impedire che i messaggi dei lavoratori in uscita vengano intercalati l'uno con l'altro o con i messaggi ACK scritti dall'evento principale del listener principale, e non riesco a trovare i lock in cui sia disponibile l'Eventlet.

Tutto ciò mi fa sentire quasi certo che sto cercando di affrontare questo problema in qualche modo esattamente all'indietro.Un problema come questo - lasciare che una singola connessione sia condivisa in modo sicuro tra un listener-dispatcher e molti lavoratori - semplicemente non esegue il mapping a un modello di coroutine e richiede una libreria asincrona completa? (Nel qual caso: c'è uno che consiglieresti per questo problema, e come si svolgerà il multiplexing tra i messaggi in arrivo e le risposte dei lavoratori in uscita? Non ho trovato nessuna soluzione pulita prima di provare combinazioni come Pika + ioloop - anche se ne ho appena visto un altro libreria, stormed_amqp, che potrebbe fare meglio di Pika.) O devo effettivamente ripiegare su veri thread Python dal vivo se voglio codice pulito e gestibile che possa mettere in atto questo modello? Sono aperto a tutte le opzioni.

Grazie per qualsiasi aiuto o idee! Continuo a pensare di avere l'intera concurrency-in-Python piuttosto giù, poi imparo ancora una volta che non lo faccio. :) E spero che ti sia piaciuta l'arte ASCII sopra in ogni caso.

+0

Questa è una domanda molto utile, anche tre anni più tardi. Perché il blocco che acquisisci intorno a 'basic_publish' non è richiesto per' basic_ack', 'basic_reject' e gli altri metodi" sicuri "che elencherai? È perché, senza un corpo da inviare, il messaggio è così piccolo che si è certi di essere in grado di completare la scrittura sul socket senza cedere a un altro greenlet? – Matt

risposta

5

Dopo aver letto il tuo post e lavorare con gevent un libary simile a eventlet alcune cose è diventato chiaro per me perché ho appena risolto un problema simile

In generale non v'è alcuna necessità di bloccare in quanto non v'è mai una sola eventlet o greenlet in esecuzione contemporaneamente ma nessuno di essi blocca tutto sembra funzionare contemporaneamente. MA non si desidera inviare i dati a un socket mentre un altro greenlet sta inviando. hai ragione e in effetti bisogno di un blocco per questo.

Se ho domande come queste nella documentazione non è sufficiente .. vai a cercare nella fonte! la sua opensource comunque impara un sacco di più guardando il codice di altre persone.

Ecco un esempio di codice semplificato che potrebbe chiarire le cose per voi.

nel dispatcher hanno

self.worker_queue = Queue() # queue for messages to workers 
self.server_queue = Queue() # queue for messages to ampq server 

avere lavoratori 2 coda di mettere il loro risultato sulla coda del server.

il codice di invio e la ricezione

def send_into_ampq(): 
    while True: 
     message = dispatcher.get_workger_msg() 

     try: 
      connection.send(self.encode(message)) 
     except: 
      connection.kill() 

def read_from_ampq(): 
    while True: 
     message = connection.wait() 

     dispatcher.put_ampq_msg(self.decode(message)) 

in del codice di connessione funzione di invio

self._writelock = Semaphore(1) 
# this is a gevent locking thing. eventlet must have something like this too.. 
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until 
# 0 agian.... why not google it i though.. and yes im right: 
# eventlet.semaphore.Semaphore(value=1) 

def send(self, message): 
    """ 
    you need a write lock to prevent more greenlets 
    sending more messages when previous sent is not done yet. 
    """ 

    with self._writelock: 
     self.socket.sendall(message) 
+0

GRAZIE - Avevo dimenticato di cercare meccanismi più generali che potessero essere usati come serrature e che stavo cercando "blocco" nella documentazione. Quindi le regole assomigliano a questo? (1) SOLO lasciare un eventlet 'wait()'. (2) Altri eventlet devono usare 'nowait = True' se fanno cose come creare o eliminare le code, quindi due eventlet non ascoltano i dati in arrivo in una sola volta e possibilmente intercalano frammenti di messaggi in arrivo. (3) Le operazioni che scrivono i dati OUTGOING devono essere protette l'una dall'altra con un blocco semaforo. Destra? –

+0

1. lasciare che un solo eventlet attenda un nuovo messaggio completo dal server. 2. quando i lavoratori mettono i messaggi sulla coda del server usano nowait = True. 3. Per non avere più messaggi simulanomemente inviando un socket usare il semaforo. – Stephan

+0

Oh, wow - se dimentico 'nowait = True', Eventlet si lamenta," RuntimeError: seconda lettura simultanea su fileno 3 rilevata "- quindi Eventlet è molto più sicuro di quanto immaginassi. –

Problemi correlati