Actual Design:Eventlet può gestire una connessione AMQP con messaggi che passano in modo asincrono sia in entrata che in uscita?
Per coloro che ritornano a questa domanda, la risposta utile al di sotto mi ha spinto verso un design funzionale che esegue bene. Tre intuizioni erano chiave:
- Eventlet è un ambiente molto sicuro - se due greenlets entrambi cercano
recv()
o entrambi cercanosend()
dalla stessa presa contemporaneamente, quindi Eventlet uccide elegantemente la seconda greenlet con un'eccezione. Questo è magnifico e significa che semplici eccezioni, e non impossibili da riprodurre errori di interleaving dei dati, risulteranno se i "verdi"amqplib
non sono soddisfacenti. - I
amqplib
metodi rientrano grosso modo in due gruppi:wait()
loop all'interno direcv()
fino a quando un messaggio di AMQP è assemblato, mentre altri metodisend()
messaggi avanti e non tenterà la propriarecv()
. Questo è incredibilmente in buona fortuna, dato che gli autori diamqplib
non avevano idea che qualcuno avrebbe cercato di "rendere verde" la loro libreria! Significa che l'invio di messaggi non è solo sicuro dalla richiamata invocata dawait()
, ma che i messaggi possono anche essere inviati in modo sicuro da altri greenlet che sono completamente fuori dal controllo del ciclowait()
. Questi metodi sicuri - che possono essere chiamati da qualsiasi greenlet, non solo dalwait()
callback - sono:basic_ack
basic_consume
connowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
connowait=True
exchange_delete
connowait=True
queue_bind
connowait=True
queue_unbind
connowait=True
queue_declare
connowait=True
queue_delete
connowait=True
queue_purge
connowait=True
- I semafori possono essere utilizzati come blocchi: inizializzare il semaforo con il conteggio
1
e quindiacquire()
erelease()
per bloccare e sbloccare. Tutti i miei greenlet asincroni che desiderano scrivere messaggi possono utilizzare tale blocco per evitare di avere chiamate separate interleave e rovinare il protocollo AMQP.
Quindi il mio codice è più o meno in questo modo:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
Enjoy!
domanda iniziale:
Immaginate centinaia di messaggi AMQP che arrivano ogni minuto in un piccolo pitone Eventlet applicazione, che ogni hanno bisogno di essere elaborati e ha risposto - in cui il sovraccarico della CPU del trattamento sarà minimo, ma potrebbe coinvolgere in attesa di risposte da altri servizi e prese.
Per consentire, ad esempio, 100 messaggi da elaborare in una sola volta, potrei ovviamente creare 100 connessioni TCP separate a RabbitMQ e disporre di un worker per ogni connessione che riceve, elabora e risponde a singoli messaggi in blocco. Ma per conservare le connessioni TCP preferirei creare una sola connessione AMQP, permettere a RabbitMQ di trasmettere i messaggi a tutta velocità, consegnare tali compiti ai lavoratori e rispedire le risposte al completamento di ogni lavoratore:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
si osservi che:
- Un Eventlet queue potrebbe elegantemente distribuire il lavoro in entrata tra i lavoratori man mano che diventano disponibili per più lavoro.
- Il controllo del flusso da RabbitMQ potrebbe anche essere possibile: Posso inviare messaggi ACK solo fino a quando i miei dipendenti sono tutti occupati, quindi attendere prima di inviare altri ACK finché la coda non inizia a svuotarsi.
- Il lavoro sarà quasi certamente completato fuori ordine: una richiesta potrebbe terminare rapidamente mentre un altro evento che è arrivato prima richiede molto più tempo; e alcune richieste potrebbero non essere mai completate affatto; così gli operai consegneranno le risposte in un ordine imprevedibile e asincrono.
mi aveva intenzione di scrivere questo utilizzando Eventlet e py-amqplib dopo aver visto questo post del blog interessante su come facilmente quella libreria AMQP potrebbe essere tirato nel modello di elaborazione Eventlet:
http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
Il mio problema è che, dopo aver letto la documentazione per entrambe le librerie, il codice sorgente amqplib e gran parte del codice sorgente di Eventlet, non riesco a capire come posso insegnare l'eventlet che possiede la connessione AMQP - l'eventlet chiamato connect_to_host()
nel post del blog - a anche si sveglia quando un operatore completa il proprio lavoro e genera una risposta. Il metodo wait()
in amqplib può essere risvegliato solo attraverso l'attività sul socket AMQP. Anche se sembra che dovrei essere in grado di far scrivere ai lavoratori le loro risposte in una coda e fare in modo che l'eventlet connect_to_host()
attivi lo o quando arriva un nuovo messaggio o quando un lavoratore è pronto con una risposta da inviare, non riesco a trovare alcun modo per un eventlet dire
e mi venne in mente che i lavoratori potessero provare requisire l'oggetto di connessione AMQP “svegliatemi quando uno di queste cose accade.” - o anche il raw socket - e scrivere i propri messaggi su TCP; ma sembra che i blocchi siano necessari per impedire che i messaggi dei lavoratori in uscita vengano intercalati l'uno con l'altro o con i messaggi ACK scritti dall'evento principale del listener principale, e non riesco a trovare i lock in cui sia disponibile l'Eventlet.
Tutto ciò mi fa sentire quasi certo che sto cercando di affrontare questo problema in qualche modo esattamente all'indietro.Un problema come questo - lasciare che una singola connessione sia condivisa in modo sicuro tra un listener-dispatcher e molti lavoratori - semplicemente non esegue il mapping a un modello di coroutine e richiede una libreria asincrona completa? (Nel qual caso: c'è uno che consiglieresti per questo problema, e come si svolgerà il multiplexing tra i messaggi in arrivo e le risposte dei lavoratori in uscita? Non ho trovato nessuna soluzione pulita prima di provare combinazioni come Pika + ioloop - anche se ne ho appena visto un altro libreria, stormed_amqp, che potrebbe fare meglio di Pika.) O devo effettivamente ripiegare su veri thread Python dal vivo se voglio codice pulito e gestibile che possa mettere in atto questo modello? Sono aperto a tutte le opzioni.
Grazie per qualsiasi aiuto o idee! Continuo a pensare di avere l'intera concurrency-in-Python piuttosto giù, poi imparo ancora una volta che non lo faccio. :) E spero che ti sia piaciuta l'arte ASCII sopra in ogni caso.
Questa è una domanda molto utile, anche tre anni più tardi. Perché il blocco che acquisisci intorno a 'basic_publish' non è richiesto per' basic_ack', 'basic_reject' e gli altri metodi" sicuri "che elencherai? È perché, senza un corpo da inviare, il messaggio è così piccolo che si è certi di essere in grado di completare la scrittura sul socket senza cedere a un altro greenlet? – Matt