2015-06-15 6 views
5

Ho difficoltà a imparare l'idea dietro Fibers \ coroutines e l'implementazione in Crystal.Crystal converte l'idea che sta dietro il pool di thread in Fibre/spawn

Spero che questo è il posto giusto per chiedere a questa, io assolutamente accettare una risposta "non qui" :)

Questo è il mio solito modo di gestire il multi-threading in Ruby:

threads = [] 
max_threads = 10 

loop do 
    begin 
    threads << Thread.new do 
     helper_method(1,2,3,4) 
    end 
    rescue Exception => e 
    puts "Error Starting thread" 
    end 

    begin 
    threads = threads.select { |t| t.alive? ? true : (t.join; false) } 
    while threads.size >= max_threads 
     puts 'Got Maximum threads' 
     sleep 1 
     threads = threads.select { |t| t.alive? ? true : (t.join; false) } 
    end 
    rescue Exception => e 
    puts e 
    end 
end 

In questo modo, apro una nuova discussione, di solito di una connessione in entrata o qualche altra cosa, aggiungo la discussione a una matrice di thread, quindi verifica che non abbia più thread e quindi quello che volevo.

Quale sarebbe un buon modo per implementare qualcosa di simile in Crystal usando spawn \ channels \ fiber ecc.?

risposta

12

Qualcosa di simile a questo:

require "socket" 

ch = Channel(TCPSocket).new 

10.times do 
    spawn do 
    loop do 
     socket = ch.receive 
     socket.puts "Hi!" 
     socket.close 
    end 
    end 
end 

server = TCPServer.new(1234) 
loop do 
    socket = server.accept 
    ch.send socket 
end 

Questo codice pre-uova 10 fibre per assistere le richieste. Il canale non è bloccato, quindi le connessioni non saranno in coda se non possono essere supportate da alcuna fibra.

+1

Esattamente quello che stavo cercando, grazie! – Ba7a7chy

5

Non è possibile replicare il modo in cui funziona per i thread. spawn non restituisce un oggetto di coroutine e non c'è alcun modo per le coroutine di join.

Eppure possiamo aprire un canale per comunicare tra le coroutine e il gestore del pool. Questo manager può essere eseguito all'interno della propria coroutine o essere la principale coroutine, in modo da impedire l'uscita del processo.

Ecco un esempio di lavoro, con un metodo worker(&block) che generare un coroutine, e aprire un canale di restituire lo stato (non è riuscito o terminato), e un metodo pool(&block) che manterrà un pool di tali lavoratori e leggere dai canali dei risultati per conoscere lo stato delle coroutine e continuare a generarne di nuove.

def worker(&block) 
    result = UnbufferedChannel(Exception?).new 

    ::spawn do 
    begin 
     block.call 
    rescue ex 
     result.send(ex) 
    else 
     result.send(nil) 
    end 
    end 

    result 
end 

def pool(size, &block) 
    counter = 0 
    results = [] of UnbufferedChannel(Exception?) 

    loop do 
    while counter < size 
     counter += 1 
     puts "spawning worker" 
     results << worker(&block) 
    end 

    result = Channel.select(results) 
    counter -= 1 
    results.delete(result) 

    if ex = result.receive 
     puts "ERROR: #{ex.message}" 
    else 
     puts "worker terminated" 
    end 
    end 
end 

pool(5) do 
    loop { helper_method(1, 2, 3, 4) } 
end 
Problemi correlati