2012-08-23 13 views
6

Se questa è una domanda idiota, mi scuso e andrò nascondere la mia testa per la vergogna, ma:Python/RQ - monitoraggio dello stato operaio

sto usando RQ fare la fila posti di lavoro in Python. Voglio che funzioni in questo modo:

  1. Il lavoro A inizia. Job A acquisisce i dati tramite l'API Web e li archivia.
  2. Il lavoro A funziona.
  3. Il lavoro A è completo.
  4. Al termine di A, viene avviato il lavoro B. Il lavoro B controlla ogni record archiviato dal lavoro A e aggiunge alcuni dati di risposta aggiuntivi.
  5. Al termine del lavoro B, l'utente riceve una e-mail felice che dice che il rapporto è pronto.

Il mio codice finora:

redis_conn = Redis() 
use_connection(redis_conn) 
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later 
w = Worker(q) 
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid) 
w.work() 

ho assunto la mia soluzione migliore era quella di avere 2 lavoratori, uno per lavoro A e uno per la B. Il lavoratore lavoro B potrebbe Job Monitor A e, quando il lavoro A è stato fatto, inizia il lavoro B.

Quello che non riesco a capire per salvarmi è come ottengo un lavoratore per monitorare lo stato di un altro. Posso prendere l'ID del lavoro dal lavoro A con job.id. Posso prendere il nome del lavoratore con w.name. Ma non ho il più pallido su come passare nessuna di queste informazioni all'altro lavoratore.

Oppure, c'è un modo molto più semplice per farlo che mi manca del tutto?

+1

Se il lavoro B non può essere eseguito finché il lavoro A non è completo (il che implica che non possono essere eseguiti in parallelo), perché utilizzare rq? Eseguili in ordine sequenziale (in un thread o processo separato se non vuoi bloccare la tua applicazione) –

+0

I lavori per A e B occupano un tempo molto lungo e possono svolgersi separatamente, quindi mi piacerebbe essere in grado di Continui a fare un sacco di lavoro A indipendente dal lavoro B. Se è troppo difficile, posso comunque arrendermi. – user1066609

+0

Avete coppie A e B che vanno insieme, o una B può dipendere da qualsiasi A? Perché in quest'ultimo caso hai un problema di sincronizzazione. :-) –

risposta

0

Probabilmente sei troppo in profondità nel tuo progetto per passare, ma in caso contrario, dai un'occhiata a Twisted. http://twistedmatrix.com/trac/ Lo sto usando in questo momento per un progetto che colpisce le API, raschia il contenuto Web, ecc. Esegue più lavori in parallelo e organizza determinati lavori in ordine, quindi Job B non viene eseguito fino a quando non viene eseguito il lavoro A.

Questo è il miglior tutorial per l'apprendimento di Twisted se si desidera tentare. http://krondo.com/?page_id=1327

0

Unire le cose che il lavoro A e il lavoro B fanno in una funzione, e quindi utilizzare per es. multiprocessing.Pool (è il metodo map_async) per eseguirlo su più processi.

Non ho familiarità con rq, ma multiprocessing è una parte della libreria standard. Di default usa tanti processi quanti sono i core della CPU, che nella mia esperienza di solito è sufficiente per saturare la macchina.

2

Da this page sulle rq documenti, sembra che ogni oggetto ha un attributo jobresult, richiamabile da job.result, che è possibile controllare. Se il lavoro non è terminato, sarà None, ma se ti assicuri che il tuo lavoro restituisca un valore (anche solo "Done"), puoi fare in modo che l'altro lavoratore controlli il risultato del primo lavoro e poi inizi a lavorare solo quando job.result ha un valore, ovvero il primo worker è stato completato.

6

Aggiornamento Gennaio 2015, questa richiesta di pull è ora fusa, e il parametro viene rinominato depends_on, vale a dire:

second_job = q.enqueue(email_customer, depends_on=first_job) 

Il post originale lasciato intatto per le persone che eseguono versioni precedenti e così via:

Ho inviato una richiesta pull (https://github.com/nvie/rq/pull/207) per gestire le dipendenze del lavoro in RQ. Quando questa richiesta di pull viene fusa in, sarete in grado di fare:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

first_job = q.enqueue(generate_report) 
second_job = q.enqueue(email_customer, after=first_job) 
# In the second enqueue call, job is created, 
# but only moved into queue after first_job finishes 

Per ora, vi suggerisco di scrivere una funzione wrapper per eseguire in sequenza i lavori. Ad esempio:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

def generate_report_and_email(): 
    generate_report() 
    email_customer() # You can also enqueue this function, if you really want to 

# Somewhere else 
q.enqueue(generate_report_and_email) 
Problemi correlati