2013-07-18 13 views
6

Sto lavorando a un'applicazione di rete haskell e utilizzo il pattern attore per gestire il multithreading. Una cosa che ho trovato è come archiviare per esempio un set di socket/handle client. Che, naturalmente, deve essere accessibile a tutti i thread e può cambiare quando i client si collegano/disattivano.Haskell - Mutazione basata su attore

Dato che io vengo dal mondo imperativo ho pensato a una sorta di lock-meccanismo, ma quando ho notato quanto brutto questo è ho pensato mutevolezza "puro", ben in realtà è una specie di pura:

import Control.Concurrent 
import Control.Monad 
import Network 
import System.IO 
import Data.List 
import Data.Maybe 
import System.Environment 
import Control.Exception 


newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a)) 
newStorage = do 
    q <- newChan 
    forkIO $ storage [] q 
    return q 


newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle)) 
newHandleStorage = newStorage 


storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO() 
storage s q = do 
    let loop = (`storage` q) 
    (req, reply, d) <- readChan q 
    print ("processing " ++ show(d)) 
    case req of 
    "add" -> loop ((fromJust d) : s) 
    "remove" -> loop (delete (fromJust d) s) 
    "get" -> do 
     writeChan (fromJust reply) s 
     loop s 


store s d = writeChan s ("add", Nothing, Just d) 
unstore s d = writeChan s ("remove", Nothing, Just d) 
request s = do 
    chan <- newChan 
    writeChan s ("get", Just chan, Nothing) 
    readChan chan 

Il punto è che un thread (attore) gestisce un elenco di elementi e modifica l'elenco in base alle richieste in arrivo. Poiché i thread sono davvero economici, ho pensato che potesse essere un'alternativa funzionale davvero interessante.

Ovviamente questo è solo un prototipo (una rapida prova del concetto). Quindi la mia domanda è:

  1. È questo un modo "buono" della gestione delle variabili mutabili condivise (nel mondo attore)?
  2. Esiste già una libreria per questo modello? (Ho già cercato, ma non ho trovato nulla)

saluti, Chris

+3

Se sei disposto ad esplorare alternative al modello dell'attore, ti suggerisco di provare la [Memoria transazionale software] di Haskell (https://en.wikipedia.org/wiki/Software_transactional_memory). È un meccanismo meraviglioso simile alle transazioni del database. Vedi [Capitolo 28] (http://book.realworldhaskell.org/read/software-transactional-memory.html) in The Real World Haskell. –

+0

Tecnicamente un'ottima scelta, ma ho sentito che usare STM con un numero elevato di thread (un thread per client che è standard in haskell) e operazioni relativamente lunghe (eliminare un elemento da un elenco è O (n), ovviamente hash sets/le mappe potrebbero aiutare qui) potrebbe ridurre le prestazioni di STM di un numero elevato. E naturalmente il canale MVar potrebbe essere sostituito dal canale STM che significa utilizzare la migliore delle due tecniche. EDIT: Il pattern degli attori è generalmente molto bello in una situazione del genere, perché l'eliminazione o l'aggiunta di un elemento è O (1) (è sufficiente inviare un messaggio) Il lavoro effettivo è fatto in una discussione ... – Kr0e

+0

Hai ragione. Con STM può succedere che le transazioni vengano riavviate più volte, portando a prestazioni ridotte. Ma se le tue operazioni sincronizzate richiedono molto tempo, puoi anche avere problemi simili con gli attori - se ci sono più messaggi di quanti ne possa gestire, il suo stato resterà indietro rispetto alla realtà. Quindi usare gli alberi bilanciati ('Map' /' Set') o i set di hash 'ST/IO' sarebbero sicuramente d'aiuto. –

risposta

6

Ecco un esempio veloce e sporco utilizzando stm e pipes-network. Questo configurerà un semplice server che consente ai client di connettersi e incrementare o decrementare un contatore. Visualizzerà una barra di stato molto semplice che mostra i racconti correnti di tutti i client connessi e rimuoverà i commenti dei clienti dalla barra quando si disconnetteranno.

Prima Inizierò con il server, e ho generosamente commentato il codice per spiegare come funziona:

import Control.Concurrent.STM (STM, atomically) 
import Control.Concurrent.STM.TVar 
import qualified Data.HashMap.Strict as H 
import Data.Foldable (forM_) 

import Control.Concurrent (forkIO, threadDelay) 
import Control.Monad (unless) 
import Control.Monad.Trans.State.Strict 
import qualified Data.ByteString.Char8 as B 
import Control.Proxy 
import Control.Proxy.TCP 
import System.IO 

main = do 
    hSetBuffering stdout NoBuffering 

    {- These are the internal data structures. They should be an implementation 
     detail and you should never expose these references to the 
     "business logic" part of the application. -} 
    -- I use nRef to keep track of creating fresh Ints (which identify users) 
    nRef <- newTVarIO 0  :: IO (TVar Int) 
    {- hMap associates every user (i.e. Int) with a counter 

     Notice how I've "striped" the hash map by storing STM references to the 
     values instead of storing the values directly. This means that I only 
     actually write the hashmap when adding or removing users, which reduces 
     contention for the hash map. 

     Since each user gets their own unique STM reference for their counter, 
     modifying counters does not cause contention with other counters or 
     contention with the hash map. -} 
    hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int))) 

    {- The following code makes heavy use of Haskell's pure closures. Each 
     'let' binding closes over its current environment, which is safe since 
     Haskell is pure. -} 

    let {- 'getCounters' is the only server-facing command in our STM API. The 
      only permitted operation is retrieving the current set of user 
      counters. 

      'getCounters' closes over the 'hMap' reference currently in scope so 
      that the server never needs to be aware about our internal 
      implementation. -} 
     getCounters :: STM [Int] 
     getCounters = do 
      refs <- fmap H.elems (readTVar hMap) 
      mapM readTVar refs 

     {- 'init' is the only client-facing command in our STM API. It 
      initializes the client's entry in the hash map and returns two 
      commands: the first command is what the client calls to 'increment' 
      their counter and the second command is what the client calls to log 
      off and delete 
      'delete' command. 

      Notice that those two returned commands each close over the client's 
      unique STM reference so the client never needs to be aware of how 
      exactly 'init' is implemented under the hood. -} 
     init :: STM (STM(), STM()) 
     init = do 
      n <- readTVar nRef 
      writeTVar nRef $! n + 1 

      ref <- newTVar 0 
      modifyTVar' hMap (H.insert n ref) 

      let incrementRef :: STM() 
       incrementRef = do 
        mRef <- fmap (H.lookup n) (readTVar hMap) 
        forM_ mRef $ \ref -> modifyTVar' ref (+ 1) 

       deleteRef :: STM() 
       deleteRef = modifyTVar' hMap (H.delete n) 

      return (incrementRef, deleteRef) 

    {- Now for the actual program logic. Everything past this point only uses 
     the approved STM API (i.e. 'getCounters' and 'init'). If I wanted I 
     could factor the above approved STM API into a separate module to enforce 
     the encapsulation boundary, but I am lazy. -} 

    {- Fork a thread which polls the current state of the counters and displays 
     it to the console. There is a way to implement this without polling but 
     this gets the job done for now. 

     Most of what it is doing is just some simple tricks to reuse the same 
     console line instead of outputting a stream of lines. Otherwise it 
     would be just: 

     forkIO $ forever $ do 
      ns <- atomically getCounters 
      print ns 
    -} 
    forkIO $ (`evalStateT` 0) $ forever $ do 
     del <- get 
     lift $ do 
      putStr (replicate del '\b') 
      putStr (replicate del ' ') 
      putStr (replicate del '\b') 
     ns <- lift $ atomically getCounters 
     let str = show ns 
     lift $ putStr str 
     put $! length str 
     lift $ threadDelay 10000 

    {- Fork a thread for each incoming connection, which listens to the client's 
     commands and translates them into 'STM' actions -} 
    serve HostAny "8080" $ \(socket, _) -> do 
     (increment, delete) <- atomically init 

     {- Right now, just do the dumb thing and convert all keypresses into 
      increment commands, with the exception of the 'q' key, which will 
      quit -} 
     let handler :: (Proxy p) =>() -> Consumer p Char IO() 
      handler() = runIdentityP loop 
       where 
       loop = do 
        c <- request() 
        unless (c == 'q') $ do 
         lift $ atomically increment 
         loop 

     {- This uses my 'pipes' library. It basically is a high-level way to 
      say: 

      * Read binary packets from the socket no bigger than 4096 bytes 

      * Get the first character from each packet and discard the rest 

      * Handle the character using the above 'handler' function -} 
     runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler 

     {- The above pipeline finishes either when the socket closes or 
      'handler' stops looping because it received a 'q'. Either case means 
      that the client is done so we log them out using 'delete'. -} 
     atomically delete 

Il prossimo è il cliente, che si apre semplicemente un connessioni e inoltra tutti i tasti premuti come pacchetti singoli:

import Control.Monad 
import Control.Proxy 
import Control.Proxy.Safe 
import Control.Proxy.TCP.Safe 
import Data.ByteString.Char8 (pack) 
import System.IO 

main = do 
    hSetBuffering stdin NoBuffering 
    hSetEcho  stdin False 

    {- Again, this uses my 'pipes' library. It basically says: 

     * Read characters from the console using 'commands' 

     * Pack them into a binary format 

     * send them to a server running at 127.0.0.1:8080 

     This finishes looping when the user types a 'q' or the connection is 
     closed for whatever reason. 
    -} 
    runSafeIO $ runProxy $ runEitherK $ 
     try . commands 
    >-> mapD (\c -> pack [c]) 
    >-> connectWriteD Nothing "127.0.0.1" "8080" 

commands :: (Proxy p) =>() -> Producer p Char IO() 
commands() = runIdentityP loop 
    where 
    loop = do 
     c <- lift getChar 
     respond c 
     unless (c == 'q') loop 

e 'piuttosto semplice: commands genera un flusso di Char s, che poi vengono convertiti in ByteString s e poi inviati come pacchetti al server.

se si esegue il server e un paio di clienti e li hanno ogni tipo in pochi tasti, il server di visualizzazione volontà uscita una lista che mostra quante chiavi ogni cliente digitato:

[1,6,4] 

... e se alcuni dei clienti scollegare saranno rimossi dalla lista:

[1,4] 

nota che la componente pipes di questi esempi semplificherà notevolmente nel prossimo pipes-4.0.0 rilascio, ma la corrente pipes ecosistema ancora g Rende il lavoro come è.

+0

Soluzione eccezionale, ci penserò sicuramente;) – Kr0e

+0

Solo per la mia comprensione: STM è considerato puro? Immagino che non lo sia dato che riguarda la mutabilità senza usare un meccanismo di blocco, giusto? – Kr0e

+2

@ Kr0e Destra. Pensa a STM come a riferimenti di memoria mutabili configurabili e thread-safe. –

3

Per prima cosa, ti consiglio di utilizzare il tuo tipo di dati specifico per rappresentare i comandi. Quando si utilizza (String, Maybe (Chan [a]), Maybe a) un client buggato può mandare in crash il proprio attore semplicemente inviando un comando sconosciuto o inviando ("add", Nothing, Nothing), ecc.Io suggerirei qualcosa come

data Command a = Add a | Remove a | Get (Chan [a]) 

Quindi è possibile abbinare modello su comandi in storage in modo risparmiare.

Gli attori hanno i loro vantaggi, ma anche io sento che hanno alcuni inconvenienti. Ad esempio, ottenere una risposta da un attore richiede l'invio di un comando e quindi attendere una risposta. E il cliente non può essere completamente sicuro che abbia una risposta e che la risposta sia di un tipo specifico - non si può dire che voglio solo risposte di questo tipo (e quante di esse) per questo particolare comando.

Quindi, ad esempio, fornirò una soluzione STM semplice. Sarebbe meglio usare una tabella hash o un set (albero bilanciato), ma dal momento che gli strumenti Handle non sono né OrdHashable, non possiamo usare queste strutture di dati, quindi continuerò a utilizzare le liste.

module ThreadSet (
    TSet, add, remove, get 
) where 

import Control.Monad 
import Control.Monad.STM 
import Control.Concurrent.STM.TVar 
import Data.List (delete) 

newtype TSet a = TSet (TVar [a]) 

add :: (Eq a) => a -> TSet a -> STM() 
add x (TSet v) = readTVar v >>= writeTVar v . (x :) 

remove :: (Eq a) => a -> TSet a -> STM() 
remove x (TSet v) = readTVar v >>= writeTVar v . delete x 

get :: (Eq a) => TSet a -> STM [a] 
get (TSet v) = readTVar v 

Questo modulo implementa un set in base STM di elementi arbitrari. Puoi avere più di questi set e usarli insieme in una singola transazione STM che ha esito positivo o negativo in una sola volta. Per esempio

-- | Ensures that there is exactly one element `x` in the set. 
add1 :: (Eq a) => a -> TSet a -> STM() 
add1 x v = remove x v >> add x v 

Questo sarebbe difficile con gli attori, dovreste aggiungerlo come un altro comando per l'attore, non è possibile comporla delle azioni esistenti e avere ancora atomicità.

Aggiornamento: C'è un interessante article che spiega perché i progettisti Clojure hanno scelto di non utilizzare gli attori. Ad esempio, usando gli attori, anche se hai molte letture e solo poche scritture su una struttura mutevole, sono tutte serializzate, il che può avere un impatto notevole sulle prestazioni.

+0

Beh, serializzare/deserializzare costa molto, questo è vero. CloudHaskell ha lo stesso "overhead di serializzazione", lo chiamano una funzionalità. Ma recentemente hanno aggiunto una funzione di invio non sicura che trasmette il messaggio senza ser./deser. che è un ordine di grandezza più veloce. In teoria il passaggio dei messaggi dovrebbe essere economico come una semplice chiamata di funzione per rendere il modello attore una vera alternativa, il che, ovviamente, non è il caso, ma in Erlang lo è. Penso che STM sia davvero una grande funzionalità, forse usare entrambe le tecniche è la strada da percorrere, dal momento che STM è davvero di basso livello rispetto al modello dell'attore. – Kr0e