2014-09-05 28 views
11

Questo dovrebbe essere molto semplice e sono molto sorpreso di non essere stato in grado di trovare le risposte a queste domande già su StackOverflow.Gestione del segnale in Python multi-thread

Ho un programma simile a un demone che deve rispondere ai segnali SIGTERM e SIGINT per funzionare bene con upstart. Ho letto che il modo migliore per farlo è quello di eseguire il ciclo principale del programma in un thread separato dal thread principale e lasciare che il thread principale gestisca i segnali. Quindi, quando viene ricevuto un segnale, il gestore di segnali dovrebbe dire al loop principale di uscire impostando un flag sentinella che viene regolarmente verificato nel loop principale.

Ho provato a farlo ma non funziona come mi aspettavo. Vedere il codice qui sotto:

from threading import Thread 
import signal 
import time 
import sys 

stop_requested = False  

def sig_handler(signum, frame): 
    sys.stdout.write("handling signal: %s\n" % signum) 
    sys.stdout.flush() 

    global stop_requested 
    stop_requested = True  

def run(): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not stop_requested: 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

signal.signal(signal.SIGTERM, sig_handler) 
signal.signal(signal.SIGINT, sig_handler) 

t = Thread(target=run) 
t.start() 
t.join() 
sys.stdout.write("join completed\n") 
sys.stdout.flush() 

Ho provato questo in due modi seguenti:

1)

$ python main.py > output.txt& 
[2] 3204 
$ kill -15 3204 

2)

$ python main.py 
ctrl+c 

In entrambi i casi mi aspetto che questo scritto all'uscita:

run started 
handling signal: 15 
run exited 
join completed 

Nel primo caso il programma si chiude, ma tutto quello che vedo è:

run started 

Nel secondo caso il segnale SIGTERM viene apparentemente ignorato quando si preme CTRL + C e il programma non esce.

Cosa mi manca qui?

+3

Provare a sostituire 't.join()' con 'mentre t.is_alive(): t.join (1)'. Il tuo thread principale si sveglia ogni secondo per verificare i segnali. – roippi

+2

Più lettura: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html – roippi

risposta

18

Il problema è che, come spiegato in Execution of Python signal handlers:

gestore di segnale un pitone non viene eseguito all'interno del gestore di segnale a basso livello (C). Invece, il gestore di segnale di basso livello imposta un flag che indica la macchina virtuale per eseguire il gestore segnali Python corrispondente ad un punto successivo (per esempio alla successiva istruzione bytecode)

...

Un calcolo esecuzione prolungata implementato esclusivamente in C (come la corrispondenza di espressioni regolari su un grande corpo di testo) può essere eseguito senza interruzioni per una quantità arbitraria di tempo, indipendentemente da qualsiasi segnale ricevuto. I gestori di segnale Python verranno chiamati al termine del calcolo.

tuo thread principale è bloccato su threading.Thread.join, che in ultima analisi significa che è bloccato in C su una chiamata pthread_join. Ovviamente questo non è un "calcolo a lungo termine", è un blocco su un syscall ... ma tuttavia, fino al termine di tale chiamata, il tuo gestore di segnale non può funzionare.

E, mentre su alcune piattaforme pthread_join non riuscirà con EINTR su un segnale, su altri non lo farà. Su Linux, credo che dipenda dal fatto che si selezioni lo stile BSD o il comportamento predefinito siginterrupt, ma il valore predefinito è no.


Quindi, cosa si può fare su di esso?

Beh, sono abbastanza sicuro che lo changes to signal handling in Python 3.3 abbia effettivamente modificato il comportamento predefinito su Linux, quindi non sarà necessario fare nulla se si aggiorna; basta eseguire meno di 3.3 e il tuo codice funzionerà come ti aspetti. Almeno lo fa per me con CPython 3.4 su OS X e 3.3 su Linux. (Se ho torto su questo, non sono sicuro che si tratti di un bug in CPython o meno, quindi potresti volerlo sollevare su python-list invece di aprire un problema ...)

D'altra parte, pre-3.3, il modulo signal sicuramente non espone gli strumenti necessari per risolvere da soli questo problema. Pertanto, se non è possibile eseguire l'aggiornamento a 3.3, la soluzione è attendere qualcosa di interrompibile, ad esempio Condition o Event. Il thread secondario notifica l'evento prima che si chiuda e il thread principale attende l'evento prima che si unisca al thread secondario. Questo è decisamente hacky. E non riesco a trovare nulla che garantisca che farà la differenza; mi capita di lavorare per me in varie versioni di CPython 2.7 e 3.2 su OS X e 2.6 e 2.7 su Linux ...

+0

"Questo è sicuramente hacky" - non direi che in genere. sincronizzare i thread su un livello più alto di astrazione rispetto al semplice uso di 'join' è ragionevole. Se il tuo obiettivo è aspettare che il thread esca (come questo specifico esempio) allora "join" è lo strumento giusto; se vuoi aspettare che il carico di lavoro finisca, il 'condition' ecc. ha più senso. Il carico di lavoro potrebbe essere eseguito (ad esempio) in un thread in pool che non si chiude immediatamente, dopo tutto. –

8

La risposta di abarnert era azzeccata. Sto ancora usando Python 2.7 comunque. Per risolvere questo problema per me stesso ho scritto una lezione di InterruptableThread.

Al momento non consente il passaggio di ulteriori argomenti alla destinazione del thread. Join non accetta neanche un parametro di timeout. Questo è solo perché non ho bisogno di farlo. Puoi aggiungerlo se vuoi. Probabilmente vorrai rimuovere le istruzioni di output se lo usi tu stesso. Sono solo lì come un modo di commentare e testare.

import threading 
import signal 
import sys 

class InvalidOperationException(Exception): 
    pass  

# noinspection PyClassHasNoInit 
class GlobalInterruptableThreadHandler: 
    threads = [] 
    initialized = False 

    @staticmethod 
    def initialize(): 
     signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler) 
     signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler) 
     GlobalInterruptableThreadHandler.initialized = True 

    @staticmethod 
    def add_thread(thread): 
     if threading.current_thread().name != 'MainThread': 
      raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.") 

     if not GlobalInterruptableThreadHandler.initialized: 
      GlobalInterruptableThreadHandler.initialize() 

     GlobalInterruptableThreadHandler.threads.append(thread) 

    @staticmethod 
    def sig_handler(signum, frame): 
     sys.stdout.write("handling signal: %s\n" % signum) 
     sys.stdout.flush() 

     for thread in GlobalInterruptableThreadHandler.threads: 
      thread.stop() 

     GlobalInterruptableThreadHandler.threads = []  

class InterruptableThread: 
    def __init__(self, target=None): 
     self.stop_requested = threading.Event() 
     self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run) 

    def run(self): 
     pass 

    def start(self): 
     GlobalInterruptableThreadHandler.add_thread(self) 
     self.t.start() 

    def stop(self): 
     self.stop_requested.set() 

    def is_stop_requested(self): 
     return self.stop_requested.is_set() 

    def join(self): 
     try: 
      while self.t.is_alive(): 
       self.t.join(timeout=1) 
     except (KeyboardInterrupt, SystemExit): 
      self.stop_requested.set() 
      self.t.join() 

     sys.stdout.write("join completed\n") 
     sys.stdout.flush() 

La classe può essere utilizzata in due modi diversi. È possibile sottoclasse InterruptableThread:

import time 
import sys 
from interruptable_thread import InterruptableThread 

class Foo(InterruptableThread): 
    def __init__(self): 
     InterruptableThread.__init__(self) 

    def run(self): 
     sys.stdout.write("run started\n") 
     sys.stdout.flush() 
     while not self.is_stop_requested(): 
      time.sleep(2) 

     sys.stdout.write("run exited\n") 
     sys.stdout.flush() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

foo = Foo() 
foo2 = Foo() 
foo.start() 
foo2.start() 
foo.join() 
foo2.join() 

Oppure si può utilizzare più come il modo in cui funziona threading.thread. Tuttavia, il metodo run deve prendere l'oggetto InterruptableThread come parametro.

import time 
import sys 
from interruptable_thread import InterruptableThread 

def run(t): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not t.is_stop_requested(): 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

t1 = InterruptableThread(run) 
t2 = InterruptableThread(run) 
t1.start() 
t2.start() 
t1.join() 
t2.join() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

Fai ciò che vuoi.

1

Ho affrontato lo stesso problema qui signal not handled when multiple threads join. Dopo aver letto la risposta di abarnert, ho cambiato in Python 3 e ho risolto il problema. Ma mi piace cambiare tutto il mio programma in python 3. Così, ho risolto il mio programma evitando di chiamare thread join() prima del segnale inviato. Di seguito è il mio codice.

non è molto buona, ma risolto il mio programma in python 2.7. La mia domanda è stata contrassegnata come duplicata, quindi ho messo la mia soluzione qui.

import threading, signal, time, os 


RUNNING = True 
threads = [] 

def monitoring(tid, itemId=None, threshold=None): 
    global RUNNING 
    while(RUNNING): 
     print "PID=", os.getpid(), ";id=", tid 
     time.sleep(2) 
    print "Thread stopped:", tid 


def handler(signum, frame): 
    print "Signal is received:" + str(signum) 
    global RUNNING 
    RUNNING=False 
    #global threads 

if __name__ == '__main__': 
    signal.signal(signal.SIGUSR1, handler) 
    signal.signal(signal.SIGUSR2, handler) 
    signal.signal(signal.SIGALRM, handler) 
    signal.signal(signal.SIGINT, handler) 
    signal.signal(signal.SIGQUIT, handler) 

    print "Starting all threads..." 
    thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60}) 
    thread1.start() 
    threads.append(thread1) 
    thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60}) 
    thread2.start() 
    threads.append(thread2) 
    while(RUNNING): 
     print "Main program is sleeping." 
     time.sleep(30) 
    for thread in threads: 
     thread.join() 

    print "All threads stopped."