2016-01-31 17 views
5

Sto lavorando molto con i flussi di lavoro asincroni e gli agenti in F #, mentre andavo un po 'più in profondità in Eventi ho notato che il tipo Event < _>() non è Thread- sicuro. Qui non sto parlando del problema comune di creare un evento. In realtà sto parlando di sottoscrizione e rimozione/smaltimento da un evento. Allo scopo di testing ho scritto questo breve programmaUtilizzo dell'evento F # nel codice multi-thread

let event = Event<int>() 
let sub = event.Publish 

[<EntryPoint>] 
let main argv = 
    let subscribe sub x = async { 
     let mutable disposables = [] 
     for i=0 to x do 
      let dis = Observable.subscribe (fun x -> printf "%d" x) sub 
      disposables <- dis :: disposables 
     for dis in disposables do 
      dis.Dispose() 
    } 

    Async.RunSynchronously(async{ 
     let! x = Async.StartChild (subscribe sub 1000) 
     let! y = Async.StartChild (subscribe sub 1000) 
     do! x 
     do! y 
     event.Trigger 1 
     do! Async.Sleep 2000 
    }) 
    0 

Il programma è semplice, creo un evento e una funzione che sottoscrive una specifica quantità di eventi ad esso, e dopo che smaltire ogni gestore. Io uso un altro calcolo asincrono per generare due istanze di quelle funzioni con Async.StartChild. Al termine di entrambe le funzioni, faccio scattare l'evento per vedere se ci sono ancora alcuni gestori.

Ma quando si chiama event.Trigger(1), il risultato è che ci sono ancora alcuni gestori reigsterati all'evento. Poiché alcuni "1" verranno stampati sulla console. Questo in generale significa che la sottoscrizione e/o lo smaltimento non sono thread-safe.

E questo è quello che non mi aspettavo. Se la sottoscrizione e lo smaltimento non sono thread-safe, in che modo è possibile utilizzare gli eventi in generale in modo sicuro? Gli eventi sicuri possono anche essere utilizzati al di fuori dei thread e un trigger non genera alcuna funzione in parallelo o su thread diversi. Ma è in qualche modo normale per me che gli Eventi siano usati in Async, codice basato su Agent o in generale con Threads. Sono spesso usati come una comunicazione per raccogliere informazioni sui thread di Backroundworker. Con Async.AwaitEvent è possibile iscriversi a un evento. Se la sottoscrizione e lo smaltimento non sono thread-safe, come è possibile utilizzare gli eventi in tale ambiente? E quale scopo ha Async.AwaitEvent? Considerando che un flusso di lavoro asincrono fa Thread sperando di utilizzare Async.AwaitEvent è fondamentalmente "broken by design" se la sottoscrizione/eliminazione di un evento non è thread-safe per impostazione predefinita.

La domanda generale che sto affrontando è. È corretto che l'iscrizione e lo smaltimento non siano thread-safe? Dal mio esempio sembra assomigliare, ma probabilmente mi sono perso alcuni dettagli importanti. Attualmente utilizzo molto l'evento nel mio progetto, di solito ho MailboxProcessors e uso eventi per la notifica. Quindi la domanda è Se gli eventi non sono thread-safe, l'intero progetto che sto usando non è assolutamente sicuro per i thread. Allora, qual è una correzione per questa situazione? Creazione di una nuova implementazione di eventi thread-safe? Esistono già alcune implementazioni che affrontano questo problema? O ci sono altre opzioni per utilizzare Event in modo sicuro in un ambiente altamente threaded?

risposta

3

FYI; l'implementazione per Event<int> può essere trovata here.

La cosa interessante sembra essere:

member e.AddHandler(d) = 
    x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>) 
member e.RemoveHandler(d) = 
    x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>) 

La sottoscrizione di un evento unisce il gestore di eventi corrente con il gestore di eventi passato in sottoscrizione. Questo gestore di eventi combinato sostituisce quello corrente.

Il problema dal punto di vista della concorrenza è che qui abbiamo una condizione di competizione in cui gli abbonati concorrenti potrebbero utilizzare il gestore di eventi venuti corrente da combinare con l'ultimo e quello che ripristina la vittoria dell'handler (l'ultima è una difficoltà concetto in concorrenza in questi giorni ma nvm).

Ciò che è possibile fare qui è introdurre un ciclo CAS utilizzando Interlocked.CompareAndExchange ma che aggiunge un sovraccarico alle prestazioni che danneggia gli utenti non concorrenti. È qualcosa che si potrebbe fare un PR fuori e vedere se è visto favorevolmente dalla comunità di F #.

WRT alla tua seconda domanda su cosa fare al riguardo Posso solo dire cosa farei.Vorrei andare per la possibilità di creare una versione di FSharpEvent che supporta sottoscrizione/annullamento sottoscrizione protetta. Forse basalo su FSharpEvent se la tua politica FOSS della società lo consente. Se si scopre un successo, allora potrebbe formare una futura libra core da PR a F #.

Non conosco le tue esigenze ma è anche possibile che se quello che ti serve siano le coroutine (cioè Async) e non i thread allora è possibile riscrivere il programma per usare solo 1 thread e quindi non sarai influenzato da questa condizione di gara.

+0

Come per la tua ultima frase. Ogni asincrono è iniziato con Async.Start viene eseguito sul pool di thread, ma anche utilizzando let! o fare! può passare l'asincrono ad un altro thread, per esempio usando semplicemente "Async.Sleep" si passa al pool di thread o usando Async.StartChild. In cima sto facendo scattare l'evento da MailboxProcessor e anche quelli che eseguono sempre alcuni thread nel pool di thread. Quindi non vedo come far funzionare tutto su un thread singolo. Ma anche io non vorrei questo comportamento. Mi aspetto questo tipo di comportamento in quanto non voglio comunque eseguire tutto su un thread singolo. –

+0

Sembra che la mancanza di sicurezza del thread sugli eventi sia un problema significativo con la libreria di base F #. Prenderesti in considerazione l'apertura di un problema per questo? –

+0

Puoi arrotolare le tue proprie coroutine o assicurarti che ogni 'let!' 'Do!' Rimandi al "thread principale". Ma come hai detto tu non vuoi comunque questo comportamento, quindi non importa. – FuleSnabel

2

In un primo momento, grazie a FuleSnable per la sua risposta. Mi ha indicato la giusta direzione. Sulla base delle informazioni fornite, ho implementato un tipo ConcurrentEvent da solo. Questo tipo utilizza Interlocked.CompareExchange per aggiungere/rimuovere i suoi gestori in modo che sia privo di blocco e si spera che il modo più veloce per farlo. Ho iniziato l'implementazione copiando il tipo Event dal compilatore F #. (Lascio anche il commento così com'è). L'implementazione attuale è simile a questa.

type ConcurrentEvent<'T> = 
    val mutable multicast : Handler<'T> 
    new() = { multicast = null } 

    member x.Trigger(arg:'T) = 
     match x.multicast with 
     | null ->() 
     | d -> d.Invoke(null,arg) |> ignore 
    member x.Publish = 
     // Note, we implement each interface explicitly: this works around a bug in the CLR 
     // implementation on CompactFramework 3.7, used on Windows Phone 7 
     { new obj() with 
      member x.ToString() = "<published event>" 
      interface IEvent<'T> 
      interface IDelegateEvent<Handler<'T>> with 
      member e.AddHandler(d) = 
       let mutable exchanged = false 
       while exchanged = false do 
        System.Threading.Thread.MemoryBarrier() 
        let dels = x.multicast 
        let newDels = System.Delegate.Combine(dels, d) :?> Handler<'T> 
        let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) 
        if obj.ReferenceEquals(dels,result) then 
         exchanged <- true 
      member e.RemoveHandler(d) = 
       let mutable exchanged = false 
       while exchanged = false do 
        System.Threading.Thread.MemoryBarrier() 
        let dels = x.multicast 
        let newDels = System.Delegate.Remove(dels, d) :?> Handler<'T> 
        let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) 
        if obj.ReferenceEquals(dels,result) then 
         exchanged <- true 
      interface System.IObservable<'T> with 
      member e.Subscribe(observer) = 
       let h = new Handler<_>(fun sender args -> observer.OnNext(args)) 
       (e :?> IEvent<_,_>).AddHandler(h) 
       { new System.IDisposable with 
        member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } } 

Alcune note sul design:

  • Ho iniziato con un ciclo ricorsivo. Ma facendo ciò e guardando il codice compilato crea una classe anonima e chiamando AddHandler o RemoveHandler ha creato un oggetto di questo. Con l'implementazione diretta del ciclo while evita l'istanziazione di un oggetto ogni volta che un nuovo gestore viene aggiunto/rimosso.
  • Ho usato esplicitamente obj.ReferenceEquals per evitare l'uguaglianza di hash generica.

Almeno nei miei test Aggiunta/Rimozione di un gestore ora sembra essere thread-safe. ConcurrentEvent può essere semplicemente sostituito con il tipo Event secondo necessità. Se qualcuno ha ulteriori miglioramenti per correttezza o prestazioni, quei commenti sono benvenuti. Altrimenti segnerei questa risposta come soluzione.


Un Benchmark se le persone sono curiose su come molto più lento del ConcurrentEvent saranno confrontati con Event

let stopWatch() = System.Diagnostics.Stopwatch.StartNew() 

let event = Event<int>() 
let sub = event.Publish 

let cevent = ConcurrentEvent<int>() 
let csub = cevent.Publish 

let subscribe sub x = async { 
    let mutable disposables = [] 
    for i=0 to x do 
     let dis = Observable.subscribe (fun x -> printf "%d" x) sub 
     disposables <- dis :: disposables 
    for dis in disposables do 
     dis.Dispose() 
} 

let sw = stopWatch() 
Async.RunSynchronously(async{ 
    // Amount of tries 
    let tries = 10000 

    // benchmarking Event subscribe/unsubscribing 
    let sw = stopWatch() 
    let! x = Async.StartChild (subscribe sub tries) 
    let! y = Async.StartChild (subscribe sub tries) 
    do! x 
    do! y 
    sw.Stop() 
    printfn "Event: %O" sw.Elapsed 
    do! Async.Sleep 1000 
    event.Trigger 1 
    do! Async.Sleep 2000 

    // benchmarking ConcurrentEvent subscribe/unsubscribing 
    let sw = stopWatch() 
    let! x = Async.StartChild (subscribe csub tries) 
    let! y = Async.StartChild (subscribe csub tries) 
    do! x 
    do! y 
    sw.Stop() 
    printfn "\nConcurrentEvent: %O" sw.Elapsed 
    do! Async.Sleep 1000 
    cevent.Trigger 1 
    do! Async.Sleep 2000 
}) 

Sul mio sottoscrizione sistema/cancellazione 10.000 gestore con la non-thread-safe Event dura circa 1.4 seconds completare.

Il thread-safe ConcurrentEvent richiede circa 1.8 seconds per completare. Quindi penso che il sovraccarico sia piuttosto basso.

+0

Qualcosa da considerare: non sono sicuro che 'CompareExchange' inserisca una barriera di memoria (o che tipo sia) o che tu debba fare una barriera manuale di memoria. – FuleSnabel

+0

Sembra 'CompareExchange' implica una barriera completa (http: // stackoverflow.com/domande/1581718/fa-interbloccate-compareexchange-uso-a-memory-barriera). Le letture di AFAIK x86 usano la barriera 'acquire', quindi dovrebbe implicare che stai bene su x86 ma se vuoi che il codice tocchi ARM/PowerPC potresti dover inserire chiamate' Interlocked' quando leggi 'x.multicast' – FuleSnabel

+0

@FuleSnabel un errore, come ho usato "x.multicast" in "Delegate.Combined" e "Delegate.Remove". L'operazione dovrebbe funzionare meglio sulla variabile 'dels' recuperata. Per quanto mi riguarda, allora ho solo bisogno di un singolo MemoryBarrier prima di leggere da 'x.multicast' per garantire la freschezza di' x.multicast'. –