2014-08-27 16 views
6

Ho un programma di "copia" relativamente semplice che copia semplicemente tutte le righe di un file su un altro. Sto giocando intorno con il supporto concorrenza di Haskell con TMQueue e STM così ho pensato che mi piacerebbe provare in questo modo:Consumo di memoria enorme per il semplice multithreading Haskell

{-# LANGUAGE BangPatterns #-} 

module Main where 

import Control.Applicative 
import Control.Concurrent.Async    -- from async 
import Control.Concurrent.Chan 
import Control.Concurrent.STM (atomically) 
import Control.Concurrent.STM.TMQueue  -- from stm-chans 
import Control.Monad (replicateM, forM_, forever, unless) 
import qualified Data.ByteString.Char8 as B 
import Data.Function (fix) 
import Data.Maybe (catMaybes, maybe) 
import System.IO (withFile, IOMode(..), hPutStrLn, hGetLine) 
import System.IO.Error (catchIOError) 

input = "data.dat" 
output = "out.dat" 
batch = 100 :: Int 

consumer :: TMQueue B.ByteString -> IO() 
consumer q = withFile output WriteMode $ \fh -> fix $ \loop -> do 
    !items <- catMaybes <$> replicateM batch readitem 
    forM_ items $ B.hPutStrLn fh 
    unless (length items < batch) loop 
    where 
    readitem = do 
     !item <- atomically $ readTMQueue q 
     return item 

producer :: TMQueue B.ByteString -> IO() 
producer q = withFile input ReadMode $ \fh -> 
    (forever (B.hGetLine fh >>= atomically . writeTMQueue q)) 
    `catchIOError` const (atomically (closeTMQueue q) >> putStrLn "Done") 

main :: IO() 
main = do 
    q <- atomically newTMQueue 
    thread <- async $ consumer q 
    producer q 
    wait thread 

posso fare un po 'di file di input test come questo

ghc -e 'writeFile "data.dat" (unlines (map show [1..5000000]))' 

e costruire it Ti piace questa

ghc --make QueueTest.hs -O2 -prof -auto-all -caf-all -threaded -rtsopts -o q 

quando l'eseguo in questo modo ./q +RTS -s -prof -hc -L60 -N2, si dice che "la memoria 2117 MB totale in uso"! Ma il file di input è solo 38 MB!

Sono nuovo alla profilazione, ma ho prodotto grafico dopo grafico e non riesco a individuare il mio errore.

+0

Do la colpa alla coda. Se si scambia 'TMQueue' con' TBMQueue' e un limite appropriato (ad esempio, 10 * batch), si ha ~ 3 MB di memoria totale. – Zeta

+0

Cosa hai imparato da '-hc' e cosa mostra' -hy'? Che cosa dice quando si compila senza profiling e si esegue semplicemente con '+ RTS -s -N'? – jberryman

+0

@Zeta Ci proverò. Tuttavia, nella mia situazione di vita reale, non posso permettere al produttore di bloccare. Sono estremamente curioso del perché TMQueue avrebbe avuto un effetto così orribile sulle prestazioni! –

risposta

2

Come sottolinea il PO, ormai posso anche scrivere una vera risposta. Iniziamo con il consumo di memoria.

Due riferimenti utili sono Memory footprint of Haskell data types e http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html. Dovremo anche esaminare le definizioni di alcune delle nostre strutture.

-- from http://hackage.haskell.org/package/stm-chans-3.0.0.2/docs/src/Control-Concurrent-STM-TMQueue.html 

data TMQueue a = TMQueue 
    {-# UNPACK #-} !(TVar Bool) 
    {-# UNPACK #-} !(TQueue a) 
    deriving Typeable 


-- from http://hackage.haskell.org/package/stm-2.4.3/docs/src/Control-Concurrent-STM-TQueue.html 

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel. 
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a]) 
         {-# UNPACK #-} !(TVar [a]) 

Il TQueue implementazione utilizza una coda funzionale standard con un fine di lettura e scrittura finale.

Impostiamo un limite superiore per l'utilizzo della memoria e presupponiamo che leggiamo l'intero file nello TMQueue prima che l'utente esegua qualcosa. In tal caso, la fine della scrittura del nostro TQueue conterrà un elenco con un elemento per riga di input (memorizzato come un test). Ogni nodo di lista sarà simile a

(:) bytestring tail 

che richiede 3 parole (1 per campo + 1 per il costruttore). Ogni test è di 9 parole, quindi aggiungi i due insieme e ci sono 12 parole di overhead per riga, esclusi i dati effettivi. I tuoi dati di test sono 5 milioni di righe, quindi sono 60 milioni di parole di overhead per l'intero file (più alcune costanti), che su un sistema a 64 bit è di circa 460 MB (presumendo che abbia fatto bene i miei calcoli, sempre discutibile). Aggiungi 40 MB per i dati effettivi e otteniamo valori molto simili a quelli che vedo sul mio sistema.

Quindi, perché l'utilizzo della memoria è vicino a questo limite superiore? Ho una teoria (l'indagine è partita come esercizio!). Innanzitutto, è probabile che il produttore funzioni un po 'più velocemente rispetto al consumatore semplicemente perché la lettura è solitamente più veloce della scrittura (sto usando dischi rotanti, forse un SSD sarebbe diverso). Ecco la definizione di readTQueue:

-- |Read the next value from the 'TQueue'. 
readTQueue :: TQueue a -> STM a 
readTQueue (TQueue read write) = do 
    xs <- readTVar read 
    case xs of 
    (x:xs') -> do writeTVar read xs' 
        return x 
    [] -> do ys <- readTVar write 
      case ys of 
       [] -> retry 
       _ -> case reverse ys of 
         [] -> error "readTQueue" 
         (z:zs) -> do writeTVar write [] 
            writeTVar read zs 
            return z 

Prima di tutto cerchiamo di leggere a partire dalla fine di leggere, e se questo è vuoto cerchiamo di leggere a partire dalla fine di scrittura, dopo l'inversione quella lista.

Quello che penso stia accadendo è questo: quando il consumatore ha bisogno di leggere dalla fine della scrittura, deve attraversare la lista di input all'interno della transazione STM. Questo richiede un po 'di tempo, il che lo farà contendere al produttore. Man mano che il produttore va oltre, questa lista si allunga, facendo sì che la lettura impieghi ancora più tempo, durante la quale il produttore è in grado di scrivere più valori, causando il fallimento della lettura. Questo processo si ripete finché il produttore non finisce e solo allora il consumatore ha la possibilità di elaborare la maggior parte dei dati.Ciò non solo rovina la concorrenza, ma aumenta il sovraccarico della CPU perché la transazione del consumatore viene continuamente riprovata e fallita.

Quindi, che dire di unagi? Ci sono un paio di differenze chiave. Innanzitutto, unagi-chan utilizza gli array internamente anziché le liste. Questo riduce un po 'l'overhead. La maggior parte del sovraccarico proviene dai puntatori ByteString, quindi non molto, ma un po '. In secondo luogo, unagi mantiene blocchi di matrici. Anche se pensiamo pessimisticamente che il produttore vinca sempre delle contese, dopo che la serie si è riempita, viene espulsa dalla parte del produttore del canale. Ora il produttore sta scrivendo su un nuovo array e il consumatore legge dal vecchio array. Questa situazione è quasi ideale; non c'è contesa per le risorse condivise, il consumatore ha una buona localizzazione di riferimento, e poiché il consumatore sta lavorando su una parte diversa della memoria non ci sono problemi con la coerenza della cache. A differenza della mia descrizione teorica dello TMQueue, ora si stanno ottenendo operazioni simultanee, consentendo al produttore di cancellare parte dell'utilizzo della memoria in modo che non raggiunga mai il limite superiore.

Per inciso, penso che il dosaggio del consumatore non sia vantaggioso. Le maniglie sono già bufferizzate dal sottosistema di I/O, quindi non credo che questo guadagni nulla. Per me le prestazioni sono migliorate un po 'quando ho cambiato il consumatore per funzionare line-by-line comunque.

Ora, cosa si può fare per questo problema? Partendo dalla mia ipotesi di lavoro sul fatto che TMQueue soffra di problemi di contesa e dei requisiti specificati, sarà sufficiente utilizzare un altro tipo di coda. Ovviamente unagi funziona abbastanza bene. Ho anche provato lo TMChan, era circa il 25% più lento di unagi ma utilizzava il 45% di memoria in meno, quindi anche questa poteva essere una buona opzione. (questo non è troppo sorprendente, ha una struttura diversa da TMQueue quindi avrà diverse caratteristiche di prestazione)

Si potrebbe anche provare a cambiare il proprio algoritmo in modo che il produttore mandi blocchi multi-linea. Ciò ridurrebbe il sovraccarico della memoria da tutti i ByteStrings.

Quindi, quando è ok usare TMQueue? Se produttore e consumatore hanno la stessa velocità o se il consumatore è più veloce, dovrebbe essere ok. Inoltre, se i tempi di elaborazione non sono uniformi o se il produttore funziona a raffiche, probabilmente otterrai una buona prestazione ammortizzata. Questa è quasi la peggiore situazione e forse dovrebbe essere segnalata come un bug contro stm? Penso che se la funzione di lettura fosse cambiata in

-- |Read the next value from the 'TQueue'. 
readTQueue :: TQueue a -> STM a 
readTQueue (TQueue read write) = do 
    xs <- readTVar read 
    case xs of 
    (x:xs') -> do writeTVar read xs' 
        return x 
    [] -> do ys <- readTVar write 
      case ys of 
       [] -> retry 
       _ -> do writeTVar write [] 
         let (z:zs) = reverse ys 
         writeTVar read zs 
         return z 

eviterebbe questo problema. Ora i binding e zs devono essere entrambi valutati pigramente, quindi l'attraversamento dell'elenco avverrebbe al di fuori di questa transazione, consentendo talvolta il corretto funzionamento dell'operazione di lettura in conflitto. Supponendo che io sia corretto riguardo al problema, ovviamente (e che questa definizione sia abbastanza pigra). Ci potrebbero essere altri aspetti negativi inaspettati però.

+0

Risposta fenomenale! Molto grato per la tua analisi approfondita da tutti i diversi angoli. Hai considerato di archiviare il tuo 'readTQueue 'alternativo come un potenziale miglioramento di' stm'? –

Problemi correlati