2012-02-13 11 views
7

che sto cercando di realizzare il seguente:Perl Code e filettatura

  1. Avere un filo che legge i dati da un file molto grande dire su 10GB e spingerli nella coda. (Non voglio per la coda per ottenere molto grande o)

  2. Mentre il filo buildQueue sta spingendo i dati alla coda allo stesso tempo avere discussioni circa 5 di lavoro de-coda e dati di processo.

Ho fatto un tentativo, ma i miei altri thread sono irraggiungibili a causa di un ciclo continuo nel mio buildQueue thread.

Il mio approccio potrebbe essere completamente sbagliato. Grazie per l'aiuto, è molto apprezzato.

Ecco il codice per buildQueue:

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

E, come ho aspettato quando questa discussione viene eseguito nient'altro dopo verrà eseguito perché questa discussione non finire.

my $builder = new Thread(&buildQueue); 

Poiché il thread del builder verrà eseguito per un lungo periodo di tempo, non riesco mai a creare thread di lavoro.

Ecco l'intero codice:

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

Un paio di domande: si parla che il thread coda-builder non finire, ma fa fare qualcosa? La dimensione della coda non scende mai sotto 100 o supera lo 0? Inoltre, [non sono sicuro che stai creando i tuoi thread correttamente] (http://perldoc.perl.org/perlthrtut.html). Non dovrebbe essere 'my $ builder = threads-> create (\ & buildQueue);'? –

+0

Il programma di creazione coda è adatto ma, poiché i thread di lavoro non sono stati raggiunti per essere creati, non possono rimuovere nulla dalla coda, quindi la coda è bloccata a 100 mentre la coda di build è ancora in esecuzione a causa del ciclo continuo. – Sinista

+0

Hmmm, avrò bisogno di vedere più codice per stabilire il contesto, specialmente dove si creano i thread. Non state 'join'ing o' disconnettete il queue builder prima di creare i thread worker, giusto? –

risposta

10

Avrete bisogno di segnare quando si desidera che le discussioni per uscire (sia tramite joinor detach). Anche il fatto di avere loop infiniti senza le dichiarazioni last di uscire da loro è un problema.

Modifica: Ho anche dimenticato una parte molto importante! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue. Di qui perché accodiamo specificatamente undef una volta per ogni thread dopo che il programma di creazione della coda è terminato.

Prova:

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

Sembra che il problema fosse il modulo Thread deprecato che ho passato al modulo threads e il mio codice funziona come dovrebbe ora. Grazie a Jack Many per avermi indicato nella direzione giusta. – Sinista

0

Un approccio diverso: È inoltre possibile utilizzare user_tasks in MCE 1.2+ e creare due multi-lavoratoretasks, un compito per la lettura (in quanto si tratta di un file di grandi dimensioni, si potrebbe anche beneficiare di lettura parallela, preservando file letto cercare) e un compito per l'elaborazione, ecc.

Il codice seguente utilizza ancora Thread::Queue per gestire la coda del buffer.

Il sottomenu buildQueue ha il controllo della dimensione della coda e invia i dati direttamente al processo di gestione '$ R_QUEUE poiché abbiamo utilizzato i thread, quindi ha accesso allo spazio di memoria del genitore. Se invece si desidera utilizzare le forche, è ancora possibile accedere alla coda tramite una funzione di richiamata. Ma qui ho scelto semplicemente di spingere in coda.

Il sub processQueue eseguirà semplicemente il de-code di qualsiasi cosa si trovi nella coda fino a quando non c'è nulla di più in sospeso.

Il sottoindirizzo task_end in ogni attività viene eseguito una sola volta dal processo di gestione alla fine di ogni attività, quindi lo usiamo per segnalare un arresto ai nostri processi di lavoro.

Ovviamente, c'è un sacco di libertà nel modo in cui si desidera pezzo i vostri dati per i lavoratori, in modo da poter decidere le dimensioni del pezzo o anche come trangugiare i dati in.

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

Il Il modulo MCE per Perl ama i file di grandi dimensioni. Con MCE, si possono ridimensionare molte righe in una volta, ingurgitare un grosso pezzo come una stringa scalare o leggere 1 riga alla volta. Chunking di molte righe contemporaneamente riduce il sovraccarico per IPC.

MCE 1.504 è ora disponibile. Fornisce MCE :: Queue con supporto per i processi figlio compresi i thread. Inoltre, la versione 1.5 viene fornita con 5 modelli (MCE :: Flow, MCE :: Grep, MCE :: Loop, MCE :: Map e MCE :: Stream) che si occupano dell'istanziazione dell'istanza MCE e dell'auto- tuning max_workers e chunk_size. Si può sovrascrivere queste opzioni btw.

Sotto, MCE :: Loop viene utilizzato per la dimostrazione.

Se si desidera specificare il numero di worker e/o chunk_size, ci sono 2 modi per farlo.

use MCE::Loop max_workers => 5, chunk_size => 300000; 

Oppure ...

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

Anche se la suddivisione in blocchi è preferito per i file di grandi dimensioni, si può confrontare il tempo con la suddivisione in blocchi di una riga alla volta. Si può omettere la prima riga all'interno del blocco (commentata). Nota come non è necessario un ciclo interno per. $ chunk_ref è ancora un ref di array contenente 1 riga. L'input scalare $ _ contiene la riga quando chunk_size è uguale a 1, altrimenti punta a $ chunk_ref.

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

Spero che questa dimostrazione sia stata utile per le persone che desiderano elaborare un file in parallelo.

:) mario