2010-10-06 7 views
9

La documentazione di msdn indica che una coda generica statica è thread-safe. Questo significa che il seguente codice è thread-safe? In altre parole, c'è un problema quando un thread accoda un int e un altro thread rimuove un int allo stesso tempo? Devo bloccare le operazioni di accodamento e rimuovi per sicurezza thread-thread?L'uso di una coda statica è thread-safe?

class Test { 
    public static Queue<int> queue = new Queue<int>(10000); 

    Thread putIntThread; 
    Thread takeIntThread; 

    public Test() { 
     for(int i = 0; i < 5000; ++i) { 
      queue.Enqueue(0); 
     } 
     putIntThread = new Thread(this.PutInt); 
     takeIntThread = new Thread(this.TakeInt); 
     putIntThread.Start(); 
     takeIntThread.Start(); 
    } 

    void PutInt() { 
     while(true) 
     { 
      if(queue.Count < 10000) {//no need to lock here as only itself can change this condition 
       queue.Enqueue(0); 
      } 
     } 
    } 

    void TakeInt() { 
     while(true) { 
      if(queue.Count > 0) {//no need to lock here as only itself can change this condition 
       queue.Dequeue(); 
      } 
     } 
    } 

} 

Edit: Devo usare .NET 3.5

+0

noto il tuo commento sul non voler bloccare perché è già lento e il blocco lo renderà più lento. Se questo codice è già lento, il blocco avrà un piccolo sovraccarico in confronto. Il blocco è in genere molto veloce, ma le persone si aspettano che sia molto lento. Ho visto persone creare serrature di lettori/scrittori di fantasia senza mai fare alcuna misurazione. Quando sono costretti a farlo, il più delle volte quelli di fantasia sono più lenti. Fai il semplice .net locking, poi il profilo per scoprire dove sei lento – pm100

risposta

23

Questo è assolutamente non thread-safe. Dai documenti di Queue<T>.

I membri statici pubblici (Shared in Visual Basic) di questo tipo sono thread-safe. Non è garantito che tutti i membri di istanza siano thread-safe.

A Queue<T> in grado di supportare più lettori contemporaneamente, purché la raccolta non venga modificata. Anche così, enumerare attraverso una collezione non è intrinsecamente una procedura thread-safe. Per garantire la sicurezza dei thread durante l'enumerazione, è possibile bloccare la raccolta durante l'intera enumerazione. Per consentire l'accesso alla raccolta da più thread per la lettura e la scrittura, è necessario implementare la propria sincronizzazione.

Rileggendo la tua domanda, ti sembra di essere confuso circa la frase "membri statici di questo tipo" - non si tratta di "una coda statica" in quanto non esiste una cosa del genere. Un oggetto non è statico o non è un membro. Quando parla di membri statici parla di cose come Encoding.GetEncoding (Queue<T> in realtà non ha membri statici). I membri di istanza sono cose come Enqueue e Dequeue - membri che si riferiscono a un'istanza del tipo piuttosto che al tipo stesso.

Quindi è necessario utilizzare un blocco per ogni azione oppure, se si utilizza .NET 4, utilizzare ConcurrentQueue<T>.

+1

Ogni volta che leggo le tue risposte, imparo qualcosa di nuovo! Grazie per essere qui a SO. :) – Nayan

+0

Grazie. In effetti ho frainteso la frase "membri statici di questo tipo". – blizpasta

1

Sì, è necessario bloccare altrettanto MSDN dice

per consentire la raccolta a cui accedere da più thread per la lettura e la scrittura, è necessario implementare la propria sincronizzazione.

2

Quali stati MSDN afferma che i metodi statici di Coda sono sicuri per i thread, non che i metodi di istanza di un'istanza statica sono thread-safe.

+1

Siamo spiacenti, questo è stato pubblicato tra la 1a e la 2a versione della risposta di Jon. – Timores

+0

Non c'è bisogno di scusarsi, non è un problema. – blizpasta

+0

Non è un problema reale, ma quando ho iniziato a digitare, la mia risposta ha avuto un certo valore. Quando ho finito, Jon aveva aggiornato il suo post, e il mio non aveva più alcun valore :-) – Timores

4

Sì, come già detto un membro di istanza di un'istanza statica non è la stessa di un membro statico, ed è solo quest'ultimo per cui è garantita la sicurezza del thread, quindi è necessario bloccare le operazioni di accodamento e dequeue .

Se bloccaggio stati rivelando un collo di bottiglia, le code sono una delle raccolte più semplici per scrivere in un modo senza serratura, finché uno non richiederanno il pieno ICollection<T> attuazione fornita da Queue<T>:

internal sealed class LockFreeQueue<T> 
{ 
    private sealed class Node 
    { 
    public readonly T Item; 
    public Node Next; 
    public Node(T item) 
    { 
     Item = item; 
    } 
    } 
    private volatile Node _head; 
    private volatile Node _tail; 
    public LockFreeQueue() 
    { 
    _head = _tail = new Node(default(T)); 
    } 
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked 
    public void Enqueue(T item) 
    { 
    Node newNode = new Node(item); 
    for(;;) 
    { 
     Node curTail = _tail; 
     if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail. 
     { 
     Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. 
     return; 
     } 
     else 
     { 
     Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. 
     } 
    } 
    }  
    public bool TryDequeue(out T item) 
    { 
    for(;;) 
    { 
     Node curHead = _head; 
     Node curTail = _tail; 
     Node curHeadNext = curHead.Next; 
     if (curHead == curTail) 
     { 
     if (curHeadNext == null) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
      Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread 
     } 
     else 
     { 
     item = curHeadNext.Item; 
     if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) 
     { 
      return true; 
     } 
     } 
    } 
    } 
#pragma warning restore 420 
} 

Questa coda ha un solo metodi Enqueue e TryDequeue (restituisce false se la coda era vuoto).Aggiungere una proprietà Count con l'uso di incrementi e decrementi interbloccati è banale (assicurati che il campo count venga letto volutamente nella proprietà effettiva), ma oltre a ciò diventa piuttosto complicato aggiungere qualcosa che non può essere scritto come delega a di di i membri già definiti, o come accadono durante la costruzione (nel qual caso avrete un solo thread che lo usa in quel punto, a meno che non facciate qualcosa di veramente strano).

L'implementazione è anche senza attesa, come se le azioni di un thread non impedissero a un altro di progredire (se un thread è a metà della procedura di accodamento quando un secondo thread tenta di farlo, il secondo thread completerà il lavoro del primo thread).

Ancora, aspetterei fino a quando il blocco ha effettivamente dimostrato un collo di bottiglia (a meno che tu non stia solo sperimentando, giocare con l'esotico, lavorare con il familiare). Infatti, in molte situazioni questo si rivelerà più costoso del blocco su un Queue<T>, in particolare perché è meno utile tenere gli articoli vicini l'uno all'altro in memoria, in modo da poter scoprire che molte operazioni in stretta successione sono state meno performanti per questo motivo. Il blocco è normalmente piuttosto economico, purché non vi siano frequenti conflitti di blocco.

Edit:

ho tempo ora di aggiungere note su come funziona di cui sopra. Ho scritto questo leggendo la versione di qualcun altro della stessa idea, scrivendo questo per me stesso per copiare l'idea, e poi confrontando con la versione che ho letto in seguito, e ho trovato un esercizio molto informativo per farlo.

Iniziamo con un'implementazione senza blocco. È una lista collegata singolarmente.

internal sealed class NotLockFreeYetQueue<T> 
{ 
    private sealed class Node 
    { 
    public readonly T Item; 
    public Node Next{get;set;} 
    public Node(T item) 
    { 
     Item = item; 
    } 
    } 
    private Node _head; 
    private Node _tail; 
    public NotLockFreeYetQueue() 
    { 
    _head = _tail = new Node(default(T)); 
    } 
    public void Enqueue(T item) 
    { 
    Node newNode = new Node(item); 
    _tail.Next = newNode; 
    _tail = newNode; 
    } 
    public bool TryDequeue(out T item) 
    { 
     if (_head == _tail) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
     { 
     item = _head.Next.Item; 
     _head = _head.Next; 
     return true; 
     } 
    } 
} 

Alcune note sull'implementazione finora.

Item e Next possono essere ragionevolmente campi o proprietà. Dal momento che si tratta di una semplice classe interiore e uno deve essere readonly mentre l'altro è una lettura-scrittura "stupida" (nessuna logica nel getter o setter) non c'è davvero molto da scegliere tra qui. Ho trasformato la proprietà Next qui semplicemente perché non funzionerà più tardi, e voglio parlarne quando arriveremo.

Avere _head e _tail inizio come punta a una sentinella, piuttosto che null semplifica le cose per non dover avere un caso speciale per una coda vuota.

Quindi, l'accodamento crea un nuovo nodo e lo imposta come proprietà Next di _tail prima di diventare la nuova coda. La dequeuing controlla la presenza di vuoto e, se non è vuota, ottiene il valore dal nodo head e imposta head come il nodo che era la proprietà della vecchia testa Next.

Un'altra cosa da notare a questo punto, è che poiché i nuovi nodi vengono creati in base alle necessità, piuttosto che in un array pre-assegnato, avrà prestazioni meno buone nell'uso normale rispetto a Queue<T>. Questo non migliorerà, e in effetti tutto ciò che faremo ora peggiorerà le prestazioni single-thread. Ancora una volta, è solo in forte contrasto che questo batterà un Queue<T> bloccato.

Facciamo in modo che l'enqueue non si blocchi. Useremo Interlocked.CompareExchange(). Questo confronta il primo parametro con il terzo parametro e imposta il primo parametro come secondo parametro se sono uguali. In ogni caso restituisce il vecchio valore (indipendentemente dal fatto che sia stato sovrascritto o meno). Il confronto e lo scambio avvengono come un'operazione atomica, quindi è di per sé infallibile, ma abbiamo bisogno di un po 'più di lavoro per rendere le combinazioni di tali operazioni anche a prova di thread.

CompareExchange e gli equivalenti in altre lingue sono talvolta abbreviati in CAS (per Confronto-E-Swap).

Un modo comune per utilizzarli è nei cicli, in cui per prima cosa otteniamo il valore che sovrascriveremo attraverso una lettura normale (ricorda che le letture .NET di valori a 32 bit, valori più piccoli e tipi di riferimento sono sempre atomici) e cerchiamo di sovrascrivere se non è cambiato, il ciclo fino a quando ci riusciamo:

private sealed class Node 
{ 
    public readonly T Item; 
    public Node Next; 
    public Node(T item) 
    { 
    Item = item; 
    } 
} 
/* ... */ 
private volatile Node _tail; 
/* ... */ 
public void Enqueue(T item) 
{ 
    Node newNode = new Node(item); 
    for(;;) 
    { 
    Node curTail = _tail; 
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) 
    { 
     _tail = newNode; 
     return; 
    } 
    } 
} 

Vogliamo aggiungere alla coda del Next solo se si tratta di null - se non altro thread come scritto ad esso. Quindi, facciamo un CAS che avrà successo solo se questo è il caso. Se lo è, impostiamo _tail come nuovo nodo, altrimenti riproviamo.

Successivamente doveva essere cambiato per essere un campo per questo lavoro, non possiamo farlo con le proprietà. Facciamo anche volatile in modo che _tail sia fresco in tutte le cache della CPU (CompareExchange ha una semantica volatile, quindi non sarà interrotto dalla mancanza di volatilità, ma potrebbe girare più spesso del necessario, e faremo di più con _tail anche).

Questo è privo di blocco, ma non di attesa. Se un thread arrivava fino al CAS, ma non era ancora stato scritto in _tail, e quindi non aveva tempo CPU per un po ', tutti gli altri thread che cercavano di accodarsi continuavano a fare il ciclo fino a quando non era programmato e riusciva a farlo. Se il thread è stato interrotto o sospeso per un lungo periodo, ciò causerebbe una sorta di livelock permanente.

Quindi, se siamo nella condizione in cui il CAS ha fallito, siamo in una situazione del genere. Siamo in grado di risolvere il problema facendo il lavoro di l'altro thread per esso:

for(;;) 
    { 
    Node curTail = _tail; 
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) 
    { 
     Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. 

     return; 
    } 
    else 
    { 
     Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. 
    } 
    } 

Ora, nella maggior parte dei casi il filo che ha scritto al curTail.Next assegnerà il nuovo nodo di _tail - ma attraverso un CAS nel caso in cui è già stato fatto. Tuttavia, un altro thread non riesce a scrivere su curtail.Next, può provare a assegnare curTail.Next a _tail per eseguire il lavoro del primo thread e andare al proprio.

Quindi, un accodamento senza blocco, senza attesa. È ora di lavorare sulla dissecazione. Innanzitutto consideriamo il caso in cui non sospettiamo che la coda sia vuota. Proprio come con l'enqueuing, prima otterremo copie locali dei nodi a cui siamo interessati; _head, _tail e _head.Next (anche in questo caso non utilizzare un capo o una coda null per le code vuote rende la vita più semplice, significa che è possibile leggere in modo sicuro _head.Next in qualsiasi stato). Inoltre, come con l'accodamento, ci dipenderà volatilità, questa volta non solo di _tail, ma di _head, così abbiamo modificarla in:

private volatile Node _head; 

E noi cambiare TryDequeue a:

public bool TryDequeue(out T item) 
    { 
     Node curHead = _head; 
     Node curTail = _tail; 
     Node curHeadNext = curHead.Next; 
     if (_head == _tail) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
     { 
     item = curHeadNext.Item; 
     if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) 
      return true; 
     } 
    } 

Il vuoto - Il caso è ora errato, ma torneremo su questo. È possibile impostare l'articolo su curHeadNext.Item come se non completassimo l'operazione, altrimenti lo sovrascriveremo, ma è necessario rendere l'operazione scrivendo a _head atomico e garantito che si verifichi solo se _head non è stato modificato. Se non lo è, allora _head è stato aggiornato da un altro thread e possiamo ripetere il ciclo (non è necessario lavorare per quel thread, è già stato fatto tutto ciò che ci avrà effetto).

Considerare ora cosa succede se _head == _tail. Forse è vuoto, ma probabilmente _tail.Next (che sarà lo stesso di curHeadNext) è stato scritto da un accodamento. In tal caso, ciò che più probabilmente vorremmo non è il risultato di una domanda vuota, ma il risultato della nostra rimozione della voce da quella parte parzialmente accodata. Così, assistiamo quel filo e continuiamo ancora una volta il ciclo:

if (curHead == curTail) 
{ 
    if (curHeadNext == null) 
    { 
     item = default(T); 
     return false; 
    } 
    else 
     Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); 
} 

Infine, l'unico problema è che a sinistra continuiamo a ottenere 420 avvertimenti perché stiamo passando campi volatili byref metodi. Questo spesso interrompe la semantica volatile (da qui l'avvertimento) ma non con CompareExchange (da qui il nostro modo di farlo). Possiamo disabilitare l'avviso, includendo un commento per spiegare perché lo abbiamo fatto (cerco di non disattivare mai un avviso senza un commento giustificativo) e abbiamo il codice che ho dato in precedenza.

Si noti che è importante per questo che lo stiamo facendo in un framework di supporto GC. Se dovessimo gestire anche la deallocazione, sarebbe molto più complicato.

+0

+1: Grazie! Sembra che tu sia in grado di leggere la mia mente, deducendo dalla mia domanda che mi piacerebbe evitare il blocco se possibile. Il mio codice attuale è senza serrature e volevo confermare la mia (errata) interpretazione del msdn doc. È già troppo lento e l'aggiunta di blocchi lo renderà ancora più lento. Potrei provare il tuo suggerimento alla fine se non sono in grado di migliorare la velocità, anche se devo ammettere che non ho ancora capito il tuo codice (qualcosa per me su cui lavorare). – blizpasta

+0

Sarei felice di spiegare il codice qui sopra, anche se forse è abbastanza lontano dall'argomento originale per essere fatto meglio altrove. Sarei ** molto ** scettico sul fatto che questo possa aiutare comunque. Le serrature non sono così costose; le serrature contestate possono essere costose (a volte paralizzanti), ma le serrature che non vengono contestate non saranno un collo di bottiglia (e, naturalmente, quando vengono contestate è proprio quando dovresti essere contento di averle). Nel frattempo, a causa del funzionamento di 'Queue ', è probabile che l'accesso frequente ad esso dalla CPU sia più veloce del codice che ho qui perché ... –

+0

... gli oggetti saranno più vicini l'uno all'altro nella memoria e quindi più probabile che si trovi nella cache della CPU. Anche una semplice lettura o scrittura (come usata in 'Queue ') è più veloce delle operazioni interbloccate usate nella mia classe. In una situazione non contestata, mi aspetterei che 'Coda ' per battere la mia classe sopra a zero, anche con l'aggiunta di blocchi. Quindi, mentre quanto sopra * potrebbe * essere utile, lo considererei solo se la contesa del blocco elevato si rivelasse un vero problema (o per il divertimento di provare nuovi modi di fare le cose, ed è per questo che ho il sopra descritto a mano [basato leggendo il codice di altri, non al 100% originale]) –

Problemi correlati