2010-06-06 14 views
6

ho un algoritmo che passerà attraverso un grande set di dati, leggere alcuni file di testo e cercare termini specifici in quelle linee. L'ho implementato in Java, ma non volevo postare codice in modo che non sembri che cerco qualcuno che lo implementa per me, ma è vero che ho davvero bisogno di molto aiuto !!! Questo non era previsto per il mio progetto, ma il set di dati si è rivelato enorme, quindi l'insegnante mi ha detto che devo farlo in questo modo.Ho bisogno di aiuto per implementare questo algoritmo con la mappa Hadoop MapReduce

EDIT (non ho chiarito i consigli precedenti versioni) L'insieme di dati che ho è su un cluster Hadoop, e dovrei fare la sua attuazione MapReduce

stavo leggendo su MapReduce e insegnati che ho faccio il implementazione standard e quindi sarà più o meno più facile farlo con mapreduce. Ma non è successo, dato che l'algoritmo è abbastanza stupido e niente di speciale, e la mappa si riduce ... non posso girarci intorno.

ecco poco pseudo codice del mio algoritmo

LIST termList (there is method that creates this list from lucene index) 
FOLDER topFolder 

INPUT topFolder 
IF it is folder and not empty 
    list files (there are 30 sub folders inside) 
    FOR EACH sub folder 
     GET file "CheckedFile.txt" 
     analyze(CheckedFile) 
    ENDFOR 
END IF 


Method ANALYZE(CheckedFile) 

read CheckedFile 
WHILE CheckedFile has next line 
    GET line 
    FOR(loops through termList) 
      GET third word from line 
      IF third word = term from list 
     append whole line to string buffer 
    ENDIF 
ENDFOR 
END WHILE 
OUTPUT string buffer to file 

Inoltre, come si può vedere, ogni volta che "analizzare" si chiama, nuovo file deve essere creato, ho capito che mappa a ridurre è difficile scrivere a molte uscite ???

Capisco mapreduce intuition, e il mio esempio sembra perfettamente adatto per mapreduce, ma quando si tratta di fare questo, ovviamente non ne so abbastanza e sono STUCK!

Si prega di aiutare.

risposta

3

È possibile utilizzare solo un riduttore vuoto e partizionare il lavoro per eseguire un singolo mapper per file. Ogni mapper creerà il proprio file di output nella cartella di output.

+0

Ciao! Grazie per la risposta !!! Ma non sono sicuro di aver capito:/puoi darmi qualche informazione in più? Hai forse qualche esempio del genere ??? – Julia

2

Map Reduce è facilmente implementabile utilizzando alcune interessanti funzionalità di concorrenza di Java 6, in particolare Future, Callable ed ExecutorService.

ho creato una Callable che analizzerà un file nel modo in cui è stato specificato

public class FileAnalyser implements Callable<String> { 

    private Scanner scanner; 
    private List<String> termList; 

    public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException { 
    this.termList = termList; 
    scanner = new Scanner(new File(filename)); 
    } 

    @Override 
    public String call() throws Exception { 
    StringBuilder buffer = new StringBuilder(); 
    while (scanner.hasNextLine()) { 
     String line = scanner.nextLine(); 
     String[] tokens = line.split(" "); 
     if ((tokens.length >= 3) && (inTermList(tokens[2]))) 
     buffer.append(line); 
    } 
    return buffer.toString(); 
    } 

    private boolean inTermList(String term) { 
    return termList.contains(term); 
    } 
} 

Abbiamo bisogno di creare un nuovo richiamabile per ogni file trovato e la trasmette al servizio esecutore. Il risultato della submission è un Future che possiamo usare in seguito per ottenere il risultato dell'analisi del file.

public class Analayser { 

    private static final int THREAD_COUNT = 10; 

    public static void main(String[] args) { 

    //All callables will be submitted to this executor service 
    //Play around with THREAD_COUNT for optimum performance 
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); 

    //Store all futures in this list so we can refer to them easily 
    List<Future<String>> futureList = new ArrayList<Future<String>>(); 

    //Some random term list, I don't know what you're using. 
    List<String> termList = new ArrayList<String>(); 
    termList.add("terma"); 
    termList.add("termb"); 

    //For each file you find, create a new FileAnalyser callable and submit 
    //this to the executor service. Add the future to the list 
    //so we can check back on the result later 
    for each filename in all files { 
     try { 
     Callable<String> worker = new FileAnalyser(filename, termList); 
     Future<String> future = executor.submit(worker); 
     futureList.add(future); 
     } 
     catch (FileNotFoundException fnfe) { 
     //If the file doesn't exist at this point we can probably ignore, 
     //but I'll leave that for you to decide. 
     System.err.println("Unable to create future for " + filename); 
     fnfe.printStackTrace(System.err); 
     } 
    } 

    //You may want to wait at this point, until all threads have finished 
    //You could maybe loop through each future until allDone() holds true 
    //for each of them. 

    //Loop over all finished futures and do something with the result 
    //from each 
    for (Future<String> current : futureList) { 
     String result = current.get(); 
     //Do something with the result from this future 
    } 
    } 
} 

Il mio esempio qui è tutt'altro che completo e tutt'altro che efficiente. Non ho considerato la dimensione del campione, se è davvero enorme si poteva tenere in loop sul futureList, la rimozione di elementi che hanno finito, qualcosa di simile a:

while (futureList.size() > 0) { 
     for (Future<String> current : futureList) { 
     if (current.isDone()) { 
      String result = current.get(); 
      //Do something with result 
      futureList.remove(current); 
      break; //We have modified the list during iteration, best break out of for-loop 
     } 
     } 
} 

In alternativa si potrebbe implementare un tipo di installazione produttore-consumatore in cui il il produttore invia callables al servizio executor e produce un futuro e il consumatore prende il risultato del futuro e scarta poi il futuro.

Ciò richiederebbe probabilmente che il prodotto e il consumatore siano thread stessi e una lista sincronizzata per aggiungere/rimuovere future.

Qualsiasi domanda chieda.

+0

Ciao! Grazie mille per la soluzione proposta !!Mi dispiace probabilmente non ho specificato chiaramente il problema, anche se ho provato. Il mio errore, ho appena citato Hadoop nel titolo, ma il mio set di dati è su un cluster che esegue hadoop, quindi dovrei implementarlo secondo Hadoop MaPreduce frameork ... Ora modificherò il mio post.Il set di dati che sto analizzando è 6GB :/Troppo per la concorrenza per farcela ????? – Julia

+0

Oops, sono un noob qui: D Per riscattarmi leggermente ho eseguito il mio codice su 100 file, ~ 61 MB ciascuno, ~ 6 GB in totale. Non sono del tutto sicuro di ciò che fa il parser dei file, così ho omesso i dettagli cruenti e ho semplicemente scansionato ogni riga e restituito una stringa vuota. Un po 'inventato, lo so. Le prestazioni non erano troppo terribili, la dimensione del pool di thread era 100, quindi tutti i 100 file sono stati analizzati senza essere accodati dal servizio executor. Il tempo di esecuzione totale era di 17 minuti sul mio processore Atom. Mi spiace di non poter rispondere correttamente alla tua domanda. Non ho esperienza con Hadoop ma dopo aver letto la risposta di SquareCog ha senso. –

+0

Ciao! Grazie mille, mi hai aiutato molto, perché non riesco a farmi fronteggiare la MR con il cervello e il tempo che ho. Avrò pochi altri algoritmi simili da implementare, quindi devo provarlo in modo che io sia in grado di farlo. Non posso ottenere aiuto hadoop da nessuna parte:/ Quindi il tuo codice che ho adottato e sul mio Intel 2Ghz, con il pool di thread 42 impiegarono circa 20 minuti per analizzare e produrre risultati in nuovi file, ma solo su 200Mb di dati (42 file). Ancora una volta, devo fare alcune modifiche al parser, deve fare un po 'più stretto matching, non puro termine "contiene", quindi quando eseguo tutto, vi faccio sapere i risultati :) – Julia

Problemi correlati