2013-07-29 10 views
5

scrivo un programma che conta le frequenze di NGrams in un corpus. Ho già una funzione che consuma un flusso di gettoni e produce NGrams di un unico ordine:Conduit: più consumatori Ruscello

ngram :: Monad m => Int -> Conduit t m [t] 
trigrams = ngram 3 
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int) 

Al momento posso solo collegare un flusso dei consumatori ad una fonte di flusso:

tokens --- trigrams --- countFreq 

Come fare Collego più utenti dello stream alla stessa sorgente di streaming? Mi piacerebbe avere qualcosa di simile:

  .--- unigrams --- countFreq 
      |--- bigrams --- countFreq 
tokens ----|--- trigrams --- countFreq 
      '--- ...  --- countFreq 

Un plus sarebbe quella di eseguire ogni consumatore in parallelo

EDIT: Grazie a Petr mi si avvicinò con questa soluzione

spawnMultiple orders = do 
    chan <- atomically newBroadcastTMChan 

    results <- forM orders $ \_ -> newEmptyMVar 
    threads <- forM (zip results orders) $ 
         forkIO . uncurry (sink chan) 

    forkIO . runResourceT $ sourceFile "test.txt" 
         $$ javascriptTokenizer 
         =$ sinkTMChan chan 

    forM results readMVar 

    where 
     sink chan result n = do 
      chan' <- atomically $ dupTMChan chan 
      freqs <- runResourceT $ sourceTMChan chan' 
           $$ ngram n 
           =$ frequencies 
      putMVar result freqs 
+0

Si desidera che quando "token" restituisce un valore, tutti i "... grammi" lo ricevono? –

risposta

5

Suppongo che tu voglia che tutti i tuoi lavandini ricevano tutti i valori.

Io suggerirei:

  1. Usa newBroadcastTMChan per creare un nuovo canale Control.Concurrent.STM.TMChan (STM-Chans).
  2. Usare questo canale per costruire un lavandino utilizzando sinkTBMChan da Data.Conduit.TMChan (STM-conduit) per il principale produttore.
  3. Per utilizzare ogni cliente dupTMChan per creare la propria copia per la lettura. Inizia una nuova discussione che leggerà questa copia usando sourceTBMChan.
  4. raccogliere i risultati dai vostri thread.
  5. essere che i vostri clienti possono leggere i dati più velocemente come sono prodotte, in caso contrario è possibile ottenere heap overflow.

(non ho provato, fateci sapere come funziona.)


Aggiornamento: Un modo come si potrebbe raccogliere i risultati è quello di creare un MVar per ogni thread consumatore . Ognuno di loro sarebbe putMVar il suo risultato dopo che è finito. E il tuo thread principale sarebbe takeMVar su tutti questi MVar s, in attesa quindi per ogni thread per finire. Ad esempio se vars è un elenco dei tuoi MVar s, il thread principale emetterebbe mapM takeMVar vars per raccogliere tutti i risultati.

+0

Grazie per la risposta, come faccio a raccogliere i risultati se spawn i thread con forkIO? – SvenK

+0

@SvenK Ho aggiornato la risposta con un'idea su come raccogliere i risultati. –

+0

Perché TMChan ha una versione broadcast e TBMChan no, dove posso trovare un 'newBroadcastTBMChan'? – CMCDragonkai

Problemi correlati