2016-01-10 20 views
12

Sto scrivendo uno strumento che si collega al numero X di socket UNIX, invia un comando e salva l'output nel file system locale. Funziona questo ogni X secondi. Per eseguire un po 'di pulizia quando lo strumento riceve i segnali di terminazione, registro una funzione (spegnimento) per segnalare i segnali SIGHUP e signal.SIGTERM. Questa funzione annulla tutte le attività e quindi chiude il ciclo degli eventi.Modo corretto per arrestare le attività asyncio

Il mio problema è che ottengo

RuntimeError: Event loop stopped before Future completed

quando invio signal.SIGTERM (uccidere 'pid'). Ho letto la documentazione sull'annullamento delle attività due volte ma non ho notato cosa sto facendo male qui.

Ho notato anche qualcosa di strano, quando invio il segnale di terminazione il programma è in modalità sospensione e vedo nel registro che si sveglia la coroutine pull_stats(), è possibile vederlo nelle prime 2 righe del registro .

Log:

21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs** 
21:53:45,857 [23031] [MainThread:pull_stats ] INFO  pull statistics 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,859 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  received stop signal, cancelling tasks... 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  stopping event loop 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  bye, exiting... 
Traceback (most recent call last): 
    File "./pull.py", line 249, in <module> 
    main() 
    File "./pull.py", line 245, in main 
    supervisor(loop, config) 
    File "./pull.py", line 161, in supervisor 
    config['pull']['socket-dir'], storage_dir, loop)) 
    File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete 
    raise RuntimeError('Event loop stopped before Future completed.') 
RuntimeError: Event loop stopped before Future completed. 

Ecco il codice:

def shutdown(loop): 
    LOGGER.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     LOGGER.info(task.cancel()) 
    LOGGER.info('stopping event loop') 
    loop.stop() 
    LOGGER.info('bye, exiting...') 


def write_file(filename, data): 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     return False 
    else: 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop): 
    connect = asyncio.open_unix_connection(socket_file) 
    reader, writer = yield from asyncio.wait_for(connect, 1) 

    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    filename = os.path.basename(socket_file) + '_' + cmd.split()[1] 
    filename = os.path.join(storage_dir, filename) 
    result = yield from loop.run_in_executor(None, write_file, filename, data) 

    return result 


@asyncio.coroutine 
def pull_stats(socket_dir, storage_dir, loop): 
    socket_files = glob.glob(socket_dir + '/*sock*') 
    coroutines = [get(socket_file, cmd, storage_dir, loop) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    status = yield from asyncio.gather(*coroutines) 

    if len(set(status)) == 1 and True in set(status): 
     return True 
    else: 
     return False 


def supervisor(loop, config): 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 

    while True: 
     start_time = int(time.time()) 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to create directory {d}:{e}".format(d=storage_dir, 
                   e=exc) 
      LOGGER.critical(msg) 

     # Launch all connections. 
     result = loop.run_until_complete(pull_stats(
      config['pull']['socket-dir'], storage_dir, loop)) 

     if result: 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       LOGGER.critical("failed to move %s to %s: %s", storage_dir, 
           dst_dir, exc) 
       break 
      else: 
       LOGGER.info('statistics are saved in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      LOGGER.critical('failed to pull stats') 
      shutil.rmtree(storage_dir) 

     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      time.sleep(sleep) 
    loop.close() 
    sys.exit(1) 


def main(): 
    args = docopt(__doc__, version=VERSION) 
    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    config.read(args['--file']) 

    loop = asyncio.get_event_loop() 

    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 

    num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None) 
    LOGGER.setLevel(num_level) 

    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 

risposta

6

La cancellazione non è immediata e richiede ioloop esecuzione da risolvere con l'eccezione CancelledError. Rimuovere ioloop.stop dallo spegnimento e gestire l'eccezione nel supervisore, per far funzionare le cose. Sotto l'esempio semplificato.

È importante, tuttavia, è possibile annullare Task, si ferma solo a guardare/in attesa di fine/risultati e ciclo non gestirà ulteriori eventi per esso. Ma la richiesta/pipe non verrà interrotta.

esempio semplificato:

import asyncio 
import functools 
import logging 
import signal 
import sys 
from concurrent.futures import CancelledError 


def shutdown(loop): 
    logging.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     task.cancel() 
    logging.info('bye, exiting in a minute...')  


@asyncio.coroutine 
def get(i): 
    logging.info('sleep for %d', i) 
    yield from asyncio.sleep(i)  


@asyncio.coroutine 
def pull_stats(): 
    coroutines = [get(i) for i in range(10,20)] 
    status = yield from asyncio.gather(*coroutines) 


def supervisor(loop): 
    try: 
     while True: 
      result = loop.run_until_complete(pull_stats()) 
    except CancelledError: 
     logging.info('CancelledError') 
    loop.close() 
    sys.exit(1) 


def main(): 
    logging.getLogger().setLevel(logging.INFO) 
    loop = asyncio.get_event_loop() 
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 
    supervisor(loop) 


if __name__ == '__main__': 
    main() 

nota, che se si annulla solo gather's futuro, tutti i bambini saranno impostati come cancellata pure.

E il sonno cosa

Qualsiasi ricezione di un segnale o interruzione fa sì che il programma per riprendere l'esecuzione. Quindi, quando il processo riceve SIGTERM e il gestore è impostato, python consente di gestirlo, per fare questo thread viene ripreso e viene chiamato il siglerler. A causa dell'implementazione di ioloop e della sua gestione del segnale, continua a funzionare dopo la veglia.

+0

ho modificato il codice come lei ha suggerito e che cattura l'eccezione, ma vedo ancora che pull_stats() per essere svegliato quando invio il segnale TERM. Nel tuo esempio di codice non vedo che ciò accada. Non capisco la tua affermazione sul sonno. Stai suggerendo che il sonno impedisce che il filo venga fermato? Inoltre, come faccio a propagare l'annullamento in tutte le coroutine, così posso eseguire dei passaggi di pulizia? Grazie mille @kwarunek per la tua risposta e il tuo tempo per fornire un esempio di codice, molto apprezzato –

+1

Ho modificato un po 'la parte di SIGTERM, inoltre non è coperto nell'esempio. – kwarunek

+0

@kwarunke, ha senso ora. Quindi, quando viene inviato l'annullamento, viene ripreso un compito all'ultima riga di rendimento in cui la coroutine è attualmente sospesa. Nel mio caso sono nella linea di sonno, il segnale entra, il thread principale si sveglia dal sonno, mentre True lancia tutti i futures che poi ricevono la cancellazione ma coroutines pull_stats e si svegliano ma non procedono mentre vengono cancellati. Sto ancora cercando di trovare un modo per catturare l'annullamento quando il programma è in fase di connessione/ricezione/scrittura, come vorrei fare un po 'di pulizia. Ancora una volta grazie mille per l'assistenza. –

0

Aggiornamento: Il codice funziona come previsto su python 3.4.4, vedere il mio commento qui sotto. @kwarunek, quando hai menzionato il tuo ultimo commento su ioloop continua a funzionare, non ho capito come funzionava il mio codice, uccidere il processo invia una cancellazione a tutte le attività che sono attive. Ma ora vedo il tuo punto perché l'annullamento delle attività non è attivato con 3.4.4, con 3.4.2 va bene.

21:28:09,004 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:28:11,826 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:11,827 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:11,828 [59441] [MainThread:shutdown] INFO  received stop signal 
21:28:11,828 [59441] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /opt/blue-python/3.4/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:28:11,829 [59441] [MainThread:shutdown] INFO  cancelling task 
21:28:11,829 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
21:28:11,829 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
21:28:21,009 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:21,010 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:21,011 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
2016-01-30 21:28:21,011 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 

mentre in pitone 3.4.2

21:23:51,015 [10219] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:23:55,737 [10219] [MainThread:supervisor] INFO  starting while loop 
21:23:55,737 [10219] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:23:55,740 [10219] [MainThread:shutdown] INFO  received stop signal 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,740 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(0)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(7)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(4)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(5)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(3)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(6)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<pull_stats() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:150> wait_for=<_GatheringFuture pending cb=[Task._wakeup()]> cb=[_raise_stop_error() at /usr/lib/python3.4/asyncio/base_events.py:101]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(2)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,753 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,754 [10219] [MainThread:supervisor] INFO  Received CancelledError exception 
21:23:55,754 [10219] [MainThread:supervisor] INFO  waiting for threads to finish any pending IO tasks 
21:23:55,754 [10219] [MainThread:supervisor] INFO  closing our asyncio loop 
21:23:55,755 [10219] [MainThread:supervisor] INFO  exiting with status 0 

La differenza principale è quando shutdown() invia la cancellazione non vi sono attività svegliata e di conseguenza il ciclo while non viene bloccato dalla prova a bloccare il blocco che gestisce la cancellazione. Come risolvo questo ora ?!

Ecco il codice

def shutdown(): 
    """Performs a clean shutdown""" 
    log.info('received stop signal') 
    for task in asyncio.Task.all_tasks(): 
     log.info(task) 
     log.info('cancelling task') 
     task.cancel() 


def write_file(filename, data): 
    """Writes data to a file. 

    Returns: 
     True if succeeds False otherwise. 
    """ 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     log.critical('failed to write data %s', exc) 
     return False 
    else: 
     log.debug('data saved in %s', filename) 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop, executor, timeout): 
    """Fetches data from a UNIX socket. 

    Sends a command to HAProxy over UNIX socket, reads the response and then 
    offloads the writing of the received data to a thread, so we don't block 
    this coroutine. 

    Arguments: 
     socket_file (str): The full path of the UNIX socket file to connect to. 
     cmd (str): The command to send. 
     storage_dir (str): The full path of the directory to save the response. 
     loop (obj): A base event loop from asyncio module. 
     executor (obj): A Threader executor to execute calls asynchronously. 
     timeout (int): Timeout for the connection to the socket. 

    Returns: 
     True if statistics from a UNIX sockets are save False otherwise. 
    """ 
    # try to connect to the UNIX socket 
    connect = asyncio.open_unix_connection(socket_file) 
    log.debug('connecting to UNIX socket %s', socket_file) 
    try: 
     reader, writer = yield from asyncio.wait_for(connect, timeout) 
    except (ConnectionRefusedError, PermissionError, OSError) as exc: 
     log.critical(exc) 
     return False 
    else: 
     log.debug('connection established to UNIX socket %s', socket_file) 

    log.debug('sending command "%s" to UNIX socket %s', cmd, socket_file) 
    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    if len(data) == 0: 
     log.critical('received zero data') 
     return False 

    log.debug('received data from UNIX socket %s', socket_file) 

    suffix = CMD_SUFFIX_MAP.get(cmd.split()[1]) 
    filename = os.path.basename(socket_file) + suffix 
    filename = os.path.join(storage_dir, filename) 
    log.debug('going to save data to %s', filename) 
    # Offload the writing to a thread so we don't block ourselves. 
    result = yield from loop.run_in_executor(executor, 
              write_file, 
              filename, 
              data) 

    return result 


@asyncio.coroutine 
def pull_stats(config, storage_dir, loop, executor): 
    """Launches coroutines for pulling statistics from UNIX sockets. 

    This a delegating routine. 

    Arguments: 
     config (obj): A configParser object which holds configuration. 
     storage_dir (str): The absolute directory path to save the statistics. 
     loop (obj): A base event loop. 
     executor(obj): A ThreadPoolExecutor object. 

    Returns: 
     True if statistics from *all* UNIX sockets are fetched False otherwise. 
    """ 
    # absolute directory path which contains UNIX socket files. 
    socket_dir = config.get('pull', 'socket-dir') 
    timeout = config.getint('pull', 'timeout') 
    socket_files = [f for f in glob.glob(socket_dir + '/*') 
        if is_unix_socket(f)] 

    log.debug('pull statistics') 
    coroutines = [get(socket_file, cmd, storage_dir, loop, executor, timeout) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    # Launch all connections. 
    status = yield from asyncio.gather(*coroutines) 

    return len(set(status)) == 1 and True in set(status) 


def supervisor(loop, config): 
    """Coordinates the pulling of HAProxy statistics from UNIX sockets. 

    This is the client routine which launches requests to all HAProxy 
    UNIX sockets for retrieving statistics and save them to file-system. 
    It runs indefinitely until main program is terminated. 

    Arguments: 
     loop (obj): A base event loop from asyncio module. 
     config (obj): A configParser object which holds configuration. 
    """ 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 
    executor = ThreadPoolExecutor(max_workers=config.getint('pull', 'workers')) 
    exit_code = 1 

    while True: 
     log.info('starting while loop') 
     start_time = int(time.time()) 
     # HAProxy statistics are stored in a directory and we use retrieval 
     # time(seconds since the Epoch) as a name of the directory. 
     # We first store them in a temporary place until we receive statistics 
     # from all UNIX sockets. 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     # If our storage directory can't be created we can't do much, thus 
     # abort main program. 
     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to make directory {d}:{e}".format(d=storage_dir, 
                  e=exc) 
      log.critical(msg) 
      log.critical('a fatal error has occurred, exiting..') 
      break 

     try: 
      log.info('launch the delegating coroutine') 
      result = loop.run_until_complete(pull_stats(config, storage_dir, 
                 loop, executor)) 
      log.info('delegating coroutine finished') 
     except asyncio.CancelledError: 
      log.info('Received CancelledError exception') 
      exit_code = 0 
      break 

     # if and only if we received statistics from all sockets then move 
     # statistics to the permanent directory. 
     # NOTE: when temporary and permanent storage directory are on the same 
     # file-system the move is actual a rename, which is an atomic 
     # operation. 
     if result: 
      log.debug('move %s to %s', storage_dir, dst_dir) 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       log.critical("failed to move %s to %s: %s", storage_dir, 
          dst_dir, exc) 
       log.critical('a fatal error has occurred, exiting..') 
       break 
      else: 
       log.info('statistics are stored in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      log.critical('failed to pull stats') 
      log.debug('removing temporary directory %s', storage_dir) 
      shutil.rmtree(storage_dir) 

     # calculate sleep time which is interval minus elapsed time. 
     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      log.debug('sleeping for %.3fs secs', sleep) 
      time.sleep(sleep) 

    # It is very unlikely that threads haven't finished their job by now, but 
    # they perform disk IO operations which can take some time in certain 
    # situations, thus we want to wait for them in order to perform a clean 
    # shutdown. 
    log.info('waiting for threads to finish any pending IO tasks') 
    executor.shutdown(wait=True) 
    log.info('closing our asyncio loop') 
    loop.close() 
    log.info('exiting with status %s', exit_code) 
    sys.exit(exit_code) 


def main(): 
    """Parses CLI arguments and launches main program.""" 
    args = docopt(__doc__, version=VERSION) 

    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    # Set defaults for all sections 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    # Load configuration from a file. NOTE: ConfigParser doesn't warn if user 
    # sets a filename which doesn't exist, in this case defaults will be used. 
    config.read(args['--file']) 

    if args['--print']: 
     for section in sorted(DEFAULT_OPTIONS): 
      print("[{}]".format(section)) 
      for key, value in sorted(DEFAULT_OPTIONS[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 
    if args['--print-conf']: 
     for section in sorted(config): 
      print("[{}]".format(section)) 
      for key, value in sorted(config[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 

    log.setLevel(getattr(logging, config.get('pull', 'loglevel').upper(), 
         None)) 
    # Setup our event loop 
    loop = asyncio.get_event_loop() 

    # Register shutdown to signals 
    loop.add_signal_handler(signal.SIGHUP, shutdown) 
    loop.add_signal_handler(signal.SIGTERM, shutdown) 

    # a temporary directory to store fetched data 
    tmp_dst_dir = config['pull']['tmp-dst-dir'] 
    # a permanent directory to move data from the temporary directory. Data are 
    # picked up by the process daemon from that directory. 
    dst_dir = config['pull']['dst-dir'] 
    for directory in dst_dir, tmp_dst_dir: 
     try: 
      os.makedirs(directory) 
     except OSError as exc: 
      # errno 17 => file exists 
      if exc.errno != 17: 
       sys.exit("failed to make directory {d}:{e}".format(d=directory, 
                    e=exc)) 
    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 
+0

Trovato il problema. nel sistema in cui ho usato python 3.4.4 la coroutine pull_stats non pianifica le coroutine di get come l'elenco socket_files è vuoto. Questo spiega il messaggio [MainThread: shutdown] INFO risultato = Falso> Poiché l'attività è completata, l'annullamento non ha luogo e, di conseguenza, il tentativo di cattura non riceve mai l'eccezione per causare l'uscita del programma. Su un'altra scatola con 3.4.4 dove l'elenco socket_files è * non * vuoto la cancellazione funziona –

Problemi correlati