2011-10-24 19 views
21

voglio usare Redis' PubSub per trasmettere alcuni messaggi, ma non vogliono essere bloccati utilizzando listen, come il codice qui sotto:È possibile eseguire pubub Redis non bloccanti?

import redis 
rc = redis.Redis() 

ps = rc.pubsub() 
ps.subscribe(['foo', 'bar']) 

rc.publish('foo', 'hello world') 

for item in ps.listen(): 
    if item['type'] == 'message': 
     print item['channel'] 
     print item['data'] 

L'ultima sezione for bloccherà. Voglio solo verificare se un determinato canale ha dati, come posso realizzare questo? Esiste un metodo simile a check?

+1

C'è un motivo per cui non vuoi essere bloccato usando l'ascolto? Le connessioni di Redis sono piuttosto economiche ed è in genere tipico generare molti di loro. –

+1

PubSub asincrono in Python utilizzando Redis, ZMQ, Tornado - https://github.com/abhinavsingh/async_pubsub –

+1

usa il metodo .get_message() dell'oggetto pubsub invece di .listen() (c'è un esempio sotto). [Questo metodo potrebbe non essere stato supportato nel driver Redis Python quando è stata pubblicata questa domanda]. –

risposta

7

Non penso che sarebbe possibile. Un canale non ha alcun "dato corrente", ti iscrivi a un canale e inizi a ricevere messaggi che vengono spinti da altri client sul canale, quindi è un'API di blocco. Inoltre, se guardi Redis Commands documentation per pub/sub, lo renderebbe più chiaro.

+0

Penso che questa risposta combinata con l'altra sia piuttosto completa. Potrebbe mettere questo in un thread. Se non desidera un'azione istantanea intrapresa quando un chanel ha attività allora potrebbe memorizzarlo magari in un ditt e avere il proprio metodo di controllo che guarda nel ditt con un mutex di blocco – jdi

1

Per raggiungere un codice di blocco non necessario è necessario eseguire un altro tipo di codice di paradigma. Non è difficile, usare un nuovo thread per ascoltare tutte le modifiche e lasciare che il thread principale faccia un altro.

Inoltre, è necessario un meccanismo per scambiare i dati tra thread principale e thread di sottoscrittore redis.

6

Questo è un esempio pratico per collegare il listener di blocco.

import sys 
import cmd 
import redis 
import threading 


def monitor(): 
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0) 

    channel = sys.argv[1] 
    p = r.pubsub() 
    p.subscribe(channel) 

    print 'monitoring channel', channel 
    for m in p.listen(): 
     print m['data'] 


class my_cmd(cmd.Cmd): 
    """Simple command processor example.""" 

    def do_start(self, line): 
     my_thread.start() 

    def do_EOF(self, line): 
     return True 


if __name__ == '__main__': 
    if len(sys.argv) == 1: 
     print "missing argument! please provide the channel name." 
    else: 
     my_thread = threading.Thread(target=monitor) 
     my_thread.setDaemon(True) 

     my_cmd().cmdloop() 
+0

Il GIL non entra in scena ora? Potremmo forse usare la multiprocessing (http://docs.python.org/2/library/multiprocessing.html)?Ma questi metodi comportano il sovraccarico della creazione di un processo. – Pramod

+0

GIL non arriverà a rovinare la festa perché non si sta operando nella zona della CPU ma si ascolta la rete. – Andrew

0

È possibile utilizzare gevent, patch per scimmia gevent per creare un'app di pubblicazione redub non bloccante.

42

Se si sta pensando a un processo non blocco, asincrono, si sta probabilmente utilizzando (o si dovrebbe usare) il framework/server asincrono.

+2

Questa è la risposta giusta che dovrebbe essere selezionata. Non sono sicuro del motivo per cui le persone reinventerebbero la ruota, c'è un client asincrono già esistente per i redis, la generazione di un nuovo thread non è realmente necessaria nell'esistenza di un tale client. – securecurve

+0

@securecurve: per essere onesti, ho aggiunto quella risposta più di un anno dopo quella contrassegnata. Tuttavia, sia txRedis che brükva (da cui il Tornado-Redis ha biforcuto) hanno entrambi 3 anni, quindi non è una scusa. – vartec

+0

Questa risposta non ha nulla a che fare con la domanda. Come sottolineato nella risposta accettata, redis spinge i messaggi ai clienti che stanno ascoltando. Quindi non c'è modo di chiedere messaggi. – Glaslos

1

L'approccio più efficiente sarebbe basato su Greenlet anziché su thread. Come framework di concorrenza basato su greenlet, gevent è già abbastanza consolidato nel mondo Python. Un'integrazione gevent con redis-py sarebbe quindi meravigliosa. Questo è esattamente ciò che è stato discusso in questa edizione su github: pub/sub

https://github.com/andymccurdy/redis-py/issues/310

0

Redis' invia messaggi ai clienti abbonati (in ascolto) su un canale. Se non stai ascoltando, perderai il messaggio (da qui la chiamata bloccante). Se vuoi che non sia bloccato, ti consiglio di usare una coda (i redis sono abbastanza buoni anche in questo). Se devi utilizzare pub/sub puoi utilizzare come gevent suggerito per avere un listener di blocco asincrono, inviare messaggi a una coda e utilizzare un utente separato per elaborare i messaggi da quella coda in modo non bloccante.

9

La nuova versione di redispy supporta il pubub asincrono, controllare https://github.com/andymccurdy/redis-py per ulteriori dettagli. Ecco un esempio dalla documentazione stessa:

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 
2

Ecco una soluzione bloccante senza discussioni:

fd = ps.connection._sock.fileno(); 
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block 
if rlist: 
    for rfd in rlist: 
     if fd == rfd: 
      message = ps.get_message() 

ps.get_message() è sufficiente di per sé, ma io uso questo metodo in modo che io posso aspettare su più FDS invece della sola connessione redis.

5

La risposta accettata è obsoleta poiché redis-py consiglia di utilizzare il blocco non bloccante get_message(). Ma fornisce anche un modo per usare facilmente i thread.

https://pypi.python.org/pypi/redis

Ci sono tre diverse strategie per la lettura dei messaggi.

Dietro le quinte, get_message() utilizza il modulo 'select' del sistema per eseguire il polling rapido del socket della connessione. Se sono disponibili dati da leggere, get_message() lo leggerà, formatterà il messaggio e lo restituirà o lo passerà a un gestore di messaggi. Se non ci sono dati da leggere, get_message() restituirà immediatamente None. Ciò rende banale l'integrazione in un loop di eventi esistente all'interno dell'applicazione.

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 

Versioni precedenti di redis-py leggono solo i messaggi con pubsub.listen(). listen() è un generatore che blocca fino a quando un messaggio è disponibile. Se la tua applicazione non ha bisogno di fare nient'altro che ricevere e agire sui messaggi ricevuti da redis, listen() è un modo semplice per iniziare a correre.

for message in p.listen(): 
    # do something with the message 

La terza opzione esegue un ciclo di eventi in un thread separato. pubsub.run_in_thread() crea un nuovo thread e avvia il loop degli eventi. L'oggetto thread viene restituito al chiamante di run_in_thread(). Il chiamante può utilizzare il metodo thread.stop() per arrestare il ciclo degli eventi e il thread. Dietro le quinte, questo è semplicemente un wrapper attorno a get_message() che gira in un thread separato, essenzialmente creando per te un piccolo ciclo di eventi non bloccante. run_in_thread() accetta un argomento opzionale sleep_time. Se specificato, il ciclo degli eventi chiamerà time.sleep() con il valore in ogni iterazione del ciclo.

Nota: poiché stiamo eseguendo in un thread separato, non c'è modo di gestire i messaggi che non vengono gestiti automaticamente con i gestori di messaggi registrati. Pertanto, redis-py ti impedisce di chiamare run_in_thread() se sei abbonato a pattern o canali a cui non sono collegati gestori di messaggi.

p.subscribe(**{'my-channel': my_handler}) 
thread = p.run_in_thread(sleep_time=0.001) 
# the event loop is now running in the background processing messages 
# when it's time to shut it down... 
thread.stop() 

Quindi, per rispondere si domanda, basta controllare get_message quando si vuole sapere se un messaggio è arrivato.

Problemi correlati