2015-06-03 13 views
18

Ho una discussione che sta aggiornando un elenco chiamato l. Ho ragione nel dire che è sicuro eseguire il thread da un altro thread?Il filtro è thread-safe

filter(lambda x: x[0] == "in", l) 

Se proprio non thread-safe, è questo allora il corretto approccio:

import threading 
import time 
import Queue 

class Logger(threading.Thread): 
    def __init__(self, log): 
     super(Logger, self).__init__() 
     self.log = log 
     self.data = [] 
     self.finished = False 
     self.data_lock = threading.Lock() 

    def run(self): 
     while not self.finished: 
      try: 
       with self.data_lock: 
        self.data.append(self.log.get(block=True, timeout=0.1)) 
      except Queue.Empty: 
       pass 

    def get_data(self, cond): 
     with self.data_lock: 
      d = filter(cond, self.data)  
     return d 

    def stop(self): 
     self.finished = True 
     self.join() 
     print("Logger stopped") 

cui il metodo get_data(self, cond) viene usata per recuperare un piccolo sottoinsieme di dati nel self.data in una cassaforte filo maniera.

+7

Si dovrebbe preoccuparsi della lista non il 'filtro' e no, le liste non sono thread-safe – thefourtheye

+1

Fitler su una copia della lista. –

+4

@thefourtheye http://stackoverflow.com/questions/6319207/are-lists-thread-safe contraddice la tua risposta un po ':) Credo che le liste stesse siano infallibili e il GIL protegga contro la corruzione dei dati in questo modo (* nella maggior parte delle situazioni *). –

risposta

7

Innanzitutto, per rispondere alla domanda nel titolo: filter è solo una funzione. Quindi, la sicurezza del thread si baserà sulla struttura dei dati con cui lo si utilizza.

Come già sottolineato nei commenti, le operazioni di lista stesse sono thread-safe in CPython e protette da GIL, ma questo è probabilmente solo un dettaglio di implementazione di CPython su cui non dovresti fare affidamento. Anche se potessi fare affidamento su di esso, la sicurezza di alcune delle operazioni probabilmente non significa il tipo di sicurezza del thread che intendi:

Il problema è che l'iterazione su una sequenza con filter non è in generale un'operazione atomica. La sequenza potrebbe essere cambiata durante l'iterazione. A seconda della struttura dati sottostante al tuo iteratore, questo potrebbe causare effetti più o meno strani. Un modo per superare questo problema è iterando su una copia della sequenza creata con un'azione atomica. modo più semplice per fare questo per sequenze standard come tuple, list, string è con l'operatore porzione simili:

filter(lambda x: x[0] == "in", l[:]) 

Oltre a questo non necessariamente thread-safe per altri tipi di dati, c'è un problema con questo però : è solo una copia superficiale. Dato che gli elementi della tua lista sembrano essere simili a una lista, un altro thread potrebbe parallelamente fare del l[1000][:] per svuotare una delle liste interne (che sono indicate anche nella tua copia superficiale). Ciò renderebbe impossibile l'espressione del filtro con un IndexError.

Detto questo, non è un peccato usare un lucchetto per proteggere l'accesso al tuo elenco e lo consiglierei sicuramente. A seconda del modo in cui i dati vengono modificati e del modo in cui si utilizzano i dati restituiti, potrebbe essere consigliabile eseguire una copia profonda degli elementi mantenendo il blocco e restituendole. In questo modo è possibile garantire che, una volta restituito, la condizione del filtro non cambierà improvvisamente per gli elementi restituiti.

Wrt. il tuo codice Logger: Non sono sicuro al 100% su come prevedi di usarlo e se è fondamentale eseguire più thread su una coda e join. Quello che mi sembra strano è che non usi mai lo Queue.task_done() (supponendo che il suo self.log sia un Queue). Anche il tuo polling della coda è potenzialmente dispendioso. Se non è necessario il join del filo, io suggerirei almeno trasformare l'acquisizione lock attorno:

class Logger(threading.Thread): 
    def __init__(self, log): 
     super(Logger, self).__init__() 
     self.daemon = True 
     self.log = log 
     self.data = [] 
     self.data_lock = threading.Lock() 

    def run(self): 
     while True: 
      l = self.log.get() # thread will sleep here indefinitely 
      with self.data_lock: 
       self.data.append(l) 
      self.log.task_done() 

    def get_data(self, cond): 
     with self.data_lock: 
      d = filter(cond, self.data) 
      # maybe deepcopy d here 
     return d 

Esternamente si potrebbe ancora fare log.join() per assicurarsi che tutti gli elementi della coda log vengono elaborati.

+1

La protezione GIL non è qualcosa su cui è possibile rispondere e, auspicabilmente, questa cosa dovrebbe essere eliminata in futuro. probabilmente è meglio rispondere alle specifiche? – HuStmpHrrr

+0

Non sono del tutto sicuro di cosa intendi con 'reply on spec', ma ho modificato la risposta per raccomandare di non fare affidamento su GIL –

+1

@ JörnHees L'esistenza di GIL è un dettaglio di implementazione CPython, quindi non devi fare affidamento su (perché non dovresti fare affidamento sui dettagli di implementazione). Ciò che HuStmpHrrr significa è che dovresti invece fare affidamento solo sulle funzionalità fornite dalle specifiche (la specifica del linguaggio o la libreria linguistica). – poke

4

Se un thread scrive su un elenco e un altro thread legge tale elenco, i due devono essere sincronizzati. Non importa per questo aspetto se il lettore utilizza filter(), un indice o iterazione o se il writer utilizza append() o qualsiasi altro metodo.

Nel codice, è possibile ottenere la sincronizzazione necessaria utilizzando uno threading.Lock. Poiché si accede alla lista solo nel contesto di with self.data_lock, gli accessi si escludono a vicenda.

In breve, il codice è formalmente corretto per quanto riguarda la gestione dell'elenco tra thread. Ma:

  • È possibile accedere a self.finished senza il blocco, il che è problematico. L'assegnazione a quel membro cambierà self, ovvero la mappatura dell'oggetto nei membri corrispondenti, quindi questa dovrebbe essere sincronizzata. In effetti, questo non farà male, perché True e False sono costanti globali, nel peggiore dei casi si avrà un breve ritardo tra l'impostazione dello stato in un thread e la visualizzazione dello stato nell'altro. Resta male, perché è assuefazione.
  • Come regola generale, quando si utilizza un blocco, è sempre documentare quali oggetti protegge da questo blocco. Inoltre, documentare a quale oggetto si accede da quale thread. Il fatto che self.finished sia condiviso e richieda la sincronizzazione sarebbe stato ovvio. Inoltre, facendo una distinzione visiva tra funzioni pubbliche e dati e quelli privati ​​(iniziando con uno _underscore, vedi PEP 8) aiuta a tenerne traccia. Aiuta anche altri lettori.
  • Un problema simile è la tua base di riferimento. In generale, ereditare da threading.Thread è una cattiva idea. Piuttosto, includere un'istanza della classe thread e dargli una funzione come self._main_loop per l'esecuzione. La ragione è che tu dici che il tuo Logger è un Thread e che tutti i membri pubblici della classe di base sono anche membri pubblici della tua classe, che probabilmente è un'interfaccia molto più ampia di quella che intendevi.
  • Non si dovrebbe mai bloccare con un blocco trattenuto. Nel tuo codice, blocchi in self.log.get(block=True, timeout=0.1) con il blocco sul mutex. In quel momento, anche se non succede nulla, nessun altro thread sarà in grado di chiamare e completare una chiamata a get_data(). C'è in realtà solo una piccola finestra tra lo sblocco del mutex e il blocco di nuovo in cui un chiamante di get_data()non deve attendere, il che è molto negativo per le prestazioni. Potrei persino immaginare che la tua domanda sia motivata da prestazioni davvero cattive per questa causa. Invece, chiamare log.get(..) senza blocco, non dovrebbe averne bisogno. Quindi, tenendo il blocco, aggiungere i dati a self.data e controllare self.finished.
+0

Grazie per la risposta.Ti dispiacerebbe elaborare il tuo terzo punto sopra perché non lo capisco. Grazie ancora! – Baz

+1

Puoi semplicemente fare in modo che un thread esegua qualche funzione con 'threading.Thread (target = func_name)' come nell'esempio [Queue example] (https://docs.python.org/2/library/queue.html#Queue.Queue .aderire). Quindi potresti rendere a 'Logger' una sottoclasse di' object' e quindi nel suo '__init__' creare un' self.thread = Thread (target = self._main_loop) '. Quindi definisci un metodo '_main_loop' nella tua classe' Logger'. In questo modo non avrai un Logger con tutti i metodi e le proprietà pubbliche di un 'Thread' che non usi. È un po 'più pulito, ma una questione di scelta direi. –

+0

Quando si pensa a un 'Logger', lo si considera come un' Thread' o è il suo punto principale la funzione 'get_data()'? Direi piuttosto che è quest'ultimo e il fatto che usi un thread per eseguire il polling di una coda è irrilevante. Se avesse inserito dei dati, non userebbe un thread (almeno non direttamente) ma l'interfaccia 'get_data()' rimarrebbe. In questa luce, il thread è solo un dettaglio di implementazione che non dovrebbe essere visibile nell'interfaccia pubblica. –

Problemi correlati