Sì, come già detto un membro di istanza di un'istanza statica non è la stessa di un membro statico, ed è solo quest'ultimo per cui è garantita la sicurezza del thread, quindi è necessario bloccare le operazioni di accodamento e dequeue .
Se bloccaggio stati rivelando un collo di bottiglia, le code sono una delle raccolte più semplici per scrivere in un modo senza serratura, finché uno non richiederanno il pieno ICollection<T>
attuazione fornita da Queue<T>
:
internal sealed class LockFreeQueue<T>
{
private sealed class Node
{
public readonly T Item;
public Node Next;
public Node(T item)
{
Item = item;
}
}
private volatile Node _head;
private volatile Node _tail;
public LockFreeQueue()
{
_head = _tail = new Node(default(T));
}
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
public void Enqueue(T item)
{
Node newNode = new Node(item);
for(;;)
{
Node curTail = _tail;
if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail.
{
Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread.
return;
}
else
{
Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread.
}
}
}
public bool TryDequeue(out T item)
{
for(;;)
{
Node curHead = _head;
Node curTail = _tail;
Node curHeadNext = curHead.Next;
if (curHead == curTail)
{
if (curHeadNext == null)
{
item = default(T);
return false;
}
else
Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread
}
else
{
item = curHeadNext.Item;
if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
{
return true;
}
}
}
}
#pragma warning restore 420
}
Questa coda ha un solo metodi Enqueue
e TryDequeue
(restituisce false se la coda era vuoto).Aggiungere una proprietà Count con l'uso di incrementi e decrementi interbloccati è banale (assicurati che il campo count venga letto volutamente nella proprietà effettiva), ma oltre a ciò diventa piuttosto complicato aggiungere qualcosa che non può essere scritto come delega a di di i membri già definiti, o come accadono durante la costruzione (nel qual caso avrete un solo thread che lo usa in quel punto, a meno che non facciate qualcosa di veramente strano).
L'implementazione è anche senza attesa, come se le azioni di un thread non impedissero a un altro di progredire (se un thread è a metà della procedura di accodamento quando un secondo thread tenta di farlo, il secondo thread completerà il lavoro del primo thread).
Ancora, aspetterei fino a quando il blocco ha effettivamente dimostrato un collo di bottiglia (a meno che tu non stia solo sperimentando, giocare con l'esotico, lavorare con il familiare). Infatti, in molte situazioni questo si rivelerà più costoso del blocco su un Queue<T>
, in particolare perché è meno utile tenere gli articoli vicini l'uno all'altro in memoria, in modo da poter scoprire che molte operazioni in stretta successione sono state meno performanti per questo motivo. Il blocco è normalmente piuttosto economico, purché non vi siano frequenti conflitti di blocco.
Edit:
ho tempo ora di aggiungere note su come funziona di cui sopra. Ho scritto questo leggendo la versione di qualcun altro della stessa idea, scrivendo questo per me stesso per copiare l'idea, e poi confrontando con la versione che ho letto in seguito, e ho trovato un esercizio molto informativo per farlo.
Iniziamo con un'implementazione senza blocco. È una lista collegata singolarmente.
internal sealed class NotLockFreeYetQueue<T>
{
private sealed class Node
{
public readonly T Item;
public Node Next{get;set;}
public Node(T item)
{
Item = item;
}
}
private Node _head;
private Node _tail;
public NotLockFreeYetQueue()
{
_head = _tail = new Node(default(T));
}
public void Enqueue(T item)
{
Node newNode = new Node(item);
_tail.Next = newNode;
_tail = newNode;
}
public bool TryDequeue(out T item)
{
if (_head == _tail)
{
item = default(T);
return false;
}
else
{
item = _head.Next.Item;
_head = _head.Next;
return true;
}
}
}
Alcune note sull'implementazione finora.
Item
e Next
possono essere ragionevolmente campi o proprietà. Dal momento che si tratta di una semplice classe interiore e uno deve essere readonly
mentre l'altro è una lettura-scrittura "stupida" (nessuna logica nel getter o setter) non c'è davvero molto da scegliere tra qui. Ho trasformato la proprietà Next
qui semplicemente perché non funzionerà più tardi, e voglio parlarne quando arriveremo.
Avere _head
e _tail
inizio come punta a una sentinella, piuttosto che null
semplifica le cose per non dover avere un caso speciale per una coda vuota.
Quindi, l'accodamento crea un nuovo nodo e lo imposta come proprietà Next
di _tail
prima di diventare la nuova coda. La dequeuing controlla la presenza di vuoto e, se non è vuota, ottiene il valore dal nodo head e imposta head come il nodo che era la proprietà della vecchia testa Next
.
Un'altra cosa da notare a questo punto, è che poiché i nuovi nodi vengono creati in base alle necessità, piuttosto che in un array pre-assegnato, avrà prestazioni meno buone nell'uso normale rispetto a Queue<T>
. Questo non migliorerà, e in effetti tutto ciò che faremo ora peggiorerà le prestazioni single-thread. Ancora una volta, è solo in forte contrasto che questo batterà un Queue<T>
bloccato.
Facciamo in modo che l'enqueue non si blocchi. Useremo Interlocked.CompareExchange()
. Questo confronta il primo parametro con il terzo parametro e imposta il primo parametro come secondo parametro se sono uguali. In ogni caso restituisce il vecchio valore (indipendentemente dal fatto che sia stato sovrascritto o meno). Il confronto e lo scambio avvengono come un'operazione atomica, quindi è di per sé infallibile, ma abbiamo bisogno di un po 'più di lavoro per rendere le combinazioni di tali operazioni anche a prova di thread.
CompareExchange e gli equivalenti in altre lingue sono talvolta abbreviati in CAS (per Confronto-E-Swap).
Un modo comune per utilizzarli è nei cicli, in cui per prima cosa otteniamo il valore che sovrascriveremo attraverso una lettura normale (ricorda che le letture .NET di valori a 32 bit, valori più piccoli e tipi di riferimento sono sempre atomici) e cerchiamo di sovrascrivere se non è cambiato, il ciclo fino a quando ci riusciamo:
private sealed class Node
{
public readonly T Item;
public Node Next;
public Node(T item)
{
Item = item;
}
}
/* ... */
private volatile Node _tail;
/* ... */
public void Enqueue(T item)
{
Node newNode = new Node(item);
for(;;)
{
Node curTail = _tail;
if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
{
_tail = newNode;
return;
}
}
}
Vogliamo aggiungere alla coda del Next
solo se si tratta di null
- se non altro thread come scritto ad esso. Quindi, facciamo un CAS che avrà successo solo se questo è il caso. Se lo è, impostiamo _tail
come nuovo nodo, altrimenti riproviamo.
Successivamente doveva essere cambiato per essere un campo per questo lavoro, non possiamo farlo con le proprietà. Facciamo anche volatile
in modo che _tail
sia fresco in tutte le cache della CPU (CompareExchange
ha una semantica volatile, quindi non sarà interrotto dalla mancanza di volatilità, ma potrebbe girare più spesso del necessario, e faremo di più con _tail
anche).
Questo è privo di blocco, ma non di attesa. Se un thread arrivava fino al CAS, ma non era ancora stato scritto in _tail, e quindi non aveva tempo CPU per un po ', tutti gli altri thread che cercavano di accodarsi continuavano a fare il ciclo fino a quando non era programmato e riusciva a farlo. Se il thread è stato interrotto o sospeso per un lungo periodo, ciò causerebbe una sorta di livelock permanente.
Quindi, se siamo nella condizione in cui il CAS ha fallito, siamo in una situazione del genere. Siamo in grado di risolvere il problema facendo il lavoro di l'altro thread per esso:
for(;;)
{
Node curTail = _tail;
if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
{
Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread.
return;
}
else
{
Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread.
}
}
Ora, nella maggior parte dei casi il filo che ha scritto al curTail.Next
assegnerà il nuovo nodo di _tail
- ma attraverso un CAS nel caso in cui è già stato fatto. Tuttavia, un altro thread non riesce a scrivere su curtail.Next
, può provare a assegnare curTail.Next
a _tail
per eseguire il lavoro del primo thread e andare al proprio.
Quindi, un accodamento senza blocco, senza attesa. È ora di lavorare sulla dissecazione. Innanzitutto consideriamo il caso in cui non sospettiamo che la coda sia vuota. Proprio come con l'enqueuing, prima otterremo copie locali dei nodi a cui siamo interessati; _head
, _tail
e _head.Next
(anche in questo caso non utilizzare un capo o una coda null per le code vuote rende la vita più semplice, significa che è possibile leggere in modo sicuro _head.Next
in qualsiasi stato). Inoltre, come con l'accodamento, ci dipenderà volatilità, questa volta non solo di _tail
, ma di _head
, così abbiamo modificarla in:
private volatile Node _head;
E noi cambiare TryDequeue a:
public bool TryDequeue(out T item)
{
Node curHead = _head;
Node curTail = _tail;
Node curHeadNext = curHead.Next;
if (_head == _tail)
{
item = default(T);
return false;
}
else
{
item = curHeadNext.Item;
if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
return true;
}
}
Il vuoto - Il caso è ora errato, ma torneremo su questo. È possibile impostare l'articolo su curHeadNext.Item
come se non completassimo l'operazione, altrimenti lo sovrascriveremo, ma è necessario rendere l'operazione scrivendo a _head
atomico e garantito che si verifichi solo se _head
non è stato modificato. Se non lo è, allora _head
è stato aggiornato da un altro thread e possiamo ripetere il ciclo (non è necessario lavorare per quel thread, è già stato fatto tutto ciò che ci avrà effetto).
Considerare ora cosa succede se _head == _tail
. Forse è vuoto, ma probabilmente _tail.Next
(che sarà lo stesso di curHeadNext
) è stato scritto da un accodamento. In tal caso, ciò che più probabilmente vorremmo non è il risultato di una domanda vuota, ma il risultato della nostra rimozione della voce da quella parte parzialmente accodata. Così, assistiamo quel filo e continuiamo ancora una volta il ciclo:
if (curHead == curTail)
{
if (curHeadNext == null)
{
item = default(T);
return false;
}
else
Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);
}
Infine, l'unico problema è che a sinistra continuiamo a ottenere 420 avvertimenti perché stiamo passando campi volatili byref
metodi. Questo spesso interrompe la semantica volatile (da qui l'avvertimento) ma non con CompareExchange
(da qui il nostro modo di farlo). Possiamo disabilitare l'avviso, includendo un commento per spiegare perché lo abbiamo fatto (cerco di non disattivare mai un avviso senza un commento giustificativo) e abbiamo il codice che ho dato in precedenza.
Si noti che è importante per questo che lo stiamo facendo in un framework di supporto GC. Se dovessimo gestire anche la deallocazione, sarebbe molto più complicato.
noto il tuo commento sul non voler bloccare perché è già lento e il blocco lo renderà più lento. Se questo codice è già lento, il blocco avrà un piccolo sovraccarico in confronto. Il blocco è in genere molto veloce, ma le persone si aspettano che sia molto lento. Ho visto persone creare serrature di lettori/scrittori di fantasia senza mai fare alcuna misurazione. Quando sono costretti a farlo, il più delle volte quelli di fantasia sono più lenti. Fai il semplice .net locking, poi il profilo per scoprire dove sei lento – pm100