2010-05-18 11 views
11

La forza di Twisted (per python) è il suo framework asincrono (credo). Ho scritto un server di elaborazione delle immagini che accetta le richieste tramite Prospettive Broker. Funziona benissimo finché lo nutro meno di un paio di centinaia di immagini alla volta. Tuttavia, a volte viene spinato con centinaia di immagini praticamente alla stessa ora. Poiché tenta di elaborarli tutti contemporaneamente, il server si arresta in modo anomalo.Chiamate remote in coda a un broker prospettiva Twisted di Python?

Come soluzione mi piacerebbe mettere in coda le remote_calls sul server in modo che elabora solo ~ 100 immagini alla volta. Sembra che questo potrebbe essere qualcosa che Twisted già fa, ma non riesco a trovarlo. Qualche idea su come iniziare a implementare questo? Un punto nella giusta direzione? Grazie!

risposta

29

Un'opzione ready-made che potrebbe aiutare con questo è twisted.internet.defer.DeferredSemaphore. Questa è la versione asincrona del semaforo normale (conteggio) che potreste già sapere se avete fatto molta programmazione in thread.

Un semaforo (di conteggio) è molto simile a un mutex (un blocco). Ma dove un mutex può essere acquisito solo una volta fino a una versione corrispondente, un semaforo (di conteggio) può essere configurato per consentire un numero arbitrario (ma specificato) di acquisizioni per avere successo prima che siano richieste eventuali versioni corrispondenti.

Ecco un esempio di utilizzo DeferredSemaphore per eseguire dieci operazioni asincrone, ma per eseguire più di tre in una volta:

from twisted.internet.defer import DeferredSemaphore, gatherResults 
from twisted.internet.task import deferLater 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def main(): 
    sem = DeferredSemaphore(3) 

    jobs = [] 
    for i in range(10): 
     jobs.append(sem.run(async, i)) 

    d = gatherResults(jobs) 
    d.addCallback(lambda ignored: reactor.stop()) 
    reactor.run() 


if __name__ == '__main__': 
    main() 

DeferredSemaphore dispone anche espliciti acquire e release metodi, ma il metodo run è così conveniente è quasi sempre quello che vuoi. Chiama il metodo acquire, che restituisce un valore Deferred. A quel primo Deferred, aggiunge un callback che chiama la funzione che hai passato (insieme a qualsiasi argomento posizionale o parola chiave). Se tale funzione restituisce un valore Deferred, viene aggiunto un callback che chiama il metodo .

Il caso sincrono viene gestito anche chiamando immediatamente release. Anche gli errori vengono gestiti, consentendo loro di propagarsi ma assicurandosi che sia necessario il release necessario per lasciare lo DeferredSemaphore in uno stato coerente. Il risultato della funzione passata a (o il risultato dello Deferred restituito) diventa il risultato dello Deferred restituito da run.

Un altro approccio possibile potrebbe essere basato su DeferredQueue e cooperate. DeferredQueue è come una normale coda, ma il suo metodo get restituisce un valore Deferred. Se non ci sono elementi in coda al momento della chiamata, lo Deferred non si attiva fino a quando non viene aggiunto un articolo.

Ecco un esempio:

from random import randrange 

from twisted.internet.defer import DeferredQueue 
from twisted.internet.task import deferLater, cooperate 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def assign(jobs): 
    # Create new jobs to be processed 
    jobs.put(randrange(10)) 
    reactor.callLater(randrange(10), assign, jobs) 


def worker(jobs): 
    while True: 
     yield jobs.get().addCallback(async) 


def main(): 
    jobs = DeferredQueue() 

    for i in range(10): 
     jobs.put(i) 

    assign(jobs) 

    for i in range(3): 
     cooperate(worker(jobs)) 

    reactor.run() 


if __name__ == '__main__': 
    main() 

Nota che la funzione operaio async è lo stesso di quello del primo esempio. Tuttavia, questa volta, è disponibile anche una funzione worker che estrae in modo esplicito i lavori da DeferredQueue e li elabora con async (aggiungendo async come richiamata allo Deferred restituito da get). Il generatore worker è gestito da cooperate, che lo itera una volta dopo ogni Deferred produce incendi.Il ciclo principale, quindi, avvia tre di questi generatori di lavoratori in modo che tre lavori siano in corso in qualsiasi momento.

Questo approccio coinvolge un po 'più di codice rispetto all'approccio DeferredSemaphore, ma ha alcuni vantaggi che possono essere interessanti. Innanzitutto, cooperate restituisce un'istanza CooperativeTask che ha metodi utili come pause, resume e un altro paio. Inoltre, tutti i lavori assegnati allo stesso collaboratore saranno cooperano l'uno con l'altro nella pianificazione, in modo da non sovraccaricare il ciclo degli eventi (e questo è ciò che dà il nome dell'API). Sul lato DeferredQueue, è anche possibile impostare dei limiti sul numero di articoli in attesa di elaborazione, in modo da evitare di sovraccaricare completamente il server (ad esempio, se i processori di immagine rimangono bloccati e smettono di completare le attività). Se il codice che chiama put gestisce l'eccezione di overflow della coda, è possibile utilizzarla come pressione per cercare di interrompere l'accettazione di nuovi lavori (magari la loro deviazione su un altro server o la segnalazione di un amministratore). Fare cose simili con DeferredSemaphore è un po 'più complicato, dal momento che non c'è modo di limitare il numero di lavori in attesa di essere in grado di acquisire il semaforo.

+0

Cool, ho davvero apprezzato queste idee. In risposta all'idea di utilizzare un DeferredSemaphore. Ciò sarebbe molto utile se esistessero lotti distinti di lavori che dovevano essere completati. Se un batch ha troppi lavori da fare, esegue solo alcuni lavori contemporaneamente e quindi quando tutti i lavori sono completi, il batch viene raccolto. Questo ha il rovescio della medaglia che nessun risultato viene restituito fino a quando l'intero lotto non termina correttamente? E penso che questo svantaggio sia risolto usando un DeferredQueue ... – agartland

+1

L'approccio con un DeferredQueue e cooperare è intelligente. In realtà mi darà più controllo in futuro per quanto riguarda il ridimensionamento del processore. Non penso nemmeno che sia necessariamente più complicato. Grazie. – agartland

-2

Potrebbe piacerti anche il txRDQ (Coda di invio ridimensionabile) che ho scritto. Google, è nella collezione tx su LaunchPad. Scusa, non ho più tempo per rispondere, per andare sul palco.

Terry