2013-02-27 14 views
8

Utilizzando Python RQ, stiamo provando a gestire dinamicamente i processi di lavoro. Usiamo uno script operaio su misura, che (in forma semplificata) è la seguente:Come arrestare correttamente i processi di lavoro Python RQ in modo dinamico?

from rq import Connection, Worker 

queues_to_listen_on = get_queues_to_listen_on() 

with Connection(connection = get_worker_connection()): 
    w = Worker(queues_to_listen_on) 
    w.work() 

Siamo particolarmente interessati alla chiusura di lavoratori. La preoccupazione principale che abbiamo è come fare un arresto regolare di un lavoratore, in un modo che consenta di terminare il lavoro corrente prima di arrestarlo. Il gestore di segnale request_stop(...) sull'oggetto appropriato Worker sembra fare ciò di cui abbiamo bisogno, ma non sembra esserci alcun modo (almeno a mia conoscenza) di emetterlo a meno che non sia attraverso il premere CTRL+C sul processo di lavoro in esecuzione nel terminale.

come la vedo io, ci sono due soluzioni probabili (ci potrebbe sicuramente essere di più) - in ordine di preferenza:

  1. livello di programmazione, utilizzando la libreria rq, inviare il segnale al request_stop e quindi innescare arresto regolare .
  2. In qualche modo acquisire il pid del processo corretto (non sono sicuro se il processo di workhorse o il processo del listener worker) e utilizzando un altro metodo, inviare il segnale appropriato a tale processo. Abbiamo alcuni modi in cui ciò potrebbe essere fatto, ma molto probabilmente richiederebbe più lavoro e introdurre altre variabili al problema che preferirei essere lasciato fuori (ad esempio, usando Fabric per eseguire un comando remoto o qualcosa del genere).

Se c'è un modo migliore per risolvere questo problema o un'alternativa diversa che raggiungerebbe lo stesso obiettivo, gradirei i vostri suggerimenti.

+0

Se è necessario il PID, in realtà è possibile scaricarlo da w.pid – Borys

risposta

4

L'opzione 1 è decisamente migliore in termini di design.

Tuttavia, per risolvere il tuo problema particolare di dover utilizzare CTRL + C per uscire dal processo (io odio che troppo), è possibile utilizzare la seguente strategia per i vostri operai:

# WORKER_NAME.py 
import os 

PID = os.getpid() 

@atexit.register 
def clean_shut(): 
    print "Clean shut performed" 

    try: 
     os.unlink("WORKER_NAME.%d" % PID) 
    except: 
     pass 

# Worker main 
def main(): 
    f = open("WORKER_NAME.%d" % PID, "w") 
    f.write("Delete this to end WORKER_NAME gracefully") 
    f.close() 

    while os.path.exists("WORKER_NAME.%d" % PID): 
     # Worker working 

E nello script padrone, ottiene il PID lavoratore come suggerito da @Borys, invia la richiesta di arresto caldo e os.unlink("path/to/WORKER_NAME.%d" % worker_PID) per garantire l'arresto regolare :)

Ciò si applica solo ai lavoratori che eseguono il ciclo indefinito. Se il processo di lavoro richiama elementi che bloccano anche il semplice lavoro in sequenza sequenziale, è necessario risalire alla possibile routine di blocco da risolvere, ad esempio applicare una sorta di strategia di timeout.

Problemi correlati