2012-02-16 10 views
13

Sto cercando di costruire la mia indici in Lucene con più thread. Quindi, ho iniziato la mia codifica e ho scritto il seguente codice. Per prima cosa trovo i file e per ogni file, creo un thread per indicizzarlo. Successivamente, aggiungo i thread e ottimizzo gli indici. Funziona ma non sono sicuro ... posso fidarmi di esso su larga scala? C'è un modo per migliorarlo?Migliorare l'indicizzazione multi-thread con Lucene

import java.io.File; 
import java.io.FileFilter; 
import java.io.FileReader; 
import java.io.IOException; 
import java.io.File; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import org.apache.lucene.index.IndexWriter; 
import org.apache.lucene.document.Field; 
import org.apache.lucene.document.Document; 
import org.apache.lucene.store.RAMDirectory; 
import org.apache.lucene.analysis.standard.StandardAnalyzer; 
import org.apache.lucene.analysis.StopAnalyzer; 
import org.apache.lucene.index.IndexReader; 
import org.apache.lucene.store.Directory; 
import org.apache.lucene.store.FSDirectory; 
import org.apache.lucene.util.Version; 
import org.apache.lucene.index.TermFreqVector; 

public class mIndexer extends Thread { 

    private File ifile; 
    private static IndexWriter writer; 

    public mIndexer(File f) { 
    ifile = f.getAbsoluteFile(); 
    } 

    public static void main(String args[]) throws Exception { 
    System.out.println("here..."); 

    String indexDir; 
     String dataDir; 
    if (args.length != 2) { 
     dataDir = new String("/home/omid/Ranking/docs/"); 
     indexDir = new String("/home/omid/Ranking/indexes/"); 
    } 
    else { 
     dataDir = args[0]; 
     indexDir = args[1]; 
    } 

    long start = System.currentTimeMillis(); 

    Directory dir = FSDirectory.open(new File(indexDir)); 
    writer = new IndexWriter(dir, 
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")), 
    true, 
    IndexWriter.MaxFieldLength.UNLIMITED); 
    int numIndexed = 0; 
    try { 
     numIndexed = index(dataDir, new TextFilesFilter()); 
    } finally { 
     long end = System.currentTimeMillis(); 
     System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds"); 
     writer.optimize(); 
     System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds"); 
     writer.close(); 
    } 
    System.out.println("Enjoy your day/night"); 
    } 

    public static int index(String dataDir, FileFilter filter) throws Exception { 
    File[] dires = new File(dataDir).listFiles(); 
    for (File d: dires) { 
     if (d.isDirectory()) { 
     File[] files = new File(d.getAbsolutePath()).listFiles(); 
     for (File f: files) { 
      if (!f.isDirectory() && 
      !f.isHidden() && 
      f.exists() && 
      f.canRead() && 
      (filter == null || filter.accept(f))) { 
       Thread t = new mIndexer(f); 
       t.start(); 
       t.join(); 
      } 
     } 
     } 
    } 
    return writer.numDocs(); 
    } 

    private static class TextFilesFilter implements FileFilter { 
    public boolean accept(File path) { 
     return path.getName().toLowerCase().endsWith(".txt"); 
    } 
    } 

    protected Document getDocument() throws Exception { 
    Document doc = new Document(); 
    if (ifile.exists()) { 
     doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES)); 
     doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); 
     String cat = "WIR"; 
     cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1); 
     cat = cat.substring(cat.lastIndexOf('/')+1, cat.length()); 
     //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES)); 
     //System.out.println(cat.subSequence(0, cat.length())); 
    } 
    return doc; 
    } 

    public void run() { 
    try { 
     System.out.println("Indexing " + ifile.getAbsolutePath()); 
     Document doc = getDocument(); 
     writer.addDocument(doc); 
    } catch (Exception e) { 
     System.out.println(e.toString()); 
    } 

    } 
} 

Si considera qualsiasi errore.

risposta

13

Se si vuole parallelizzare l'indicizzazione, ci sono due cose che puoi fare:

  • parallelizzare le chiamate verso AddDocument,
  • aumentando il numero massimo di filo del vostro scheduler unione.

Sei sulla strada giusta per parallelizzare le chiamate ad addDocuments, ma generare un thread per documento non si ridimensiona man mano che il numero di documenti da indicizzare crescerà. Dovresti invece usare una dimensione fissa ThreadPoolExecutor. Dal momento che questo compito è prevalentemente intensivo della CPU (a seconda del vostro analizzatore e il modo di recuperare i dati), l'impostazione del numero di CPU del computer come il numero massimo di thread potrebbe essere un buon inizio.

Per quanto riguarda l'utilità di pianificazione unione, è possibile aumentare il numero massimo di thread che è possibile utilizzare con setMaxThreadCount method of ConcurrentMergeScheduler. Attenzione che i dischi sono molto più bravi a letture sequenziali/scrive di lettura casuale/scrive, come la fissazione di un numero massimo troppo elevato di thread per il programmatore delle merge conseguenza è più probabile che rallentare l'indicizzazione giù che per accelerarlo.

Ma prima di tentare di parallelizzazione del processo di indicizzazione, probabilmente si dovrebbe cercare di trovare dove il collo di bottiglia è. Se il disco è troppo lento, il collo di bottiglia è probabile che sia i passaggi a filo e l'unione, come conseguenza parallelizzare chiamate a AddDocument (che consiste essenzialmente in analisi di un documento e tamponando il risultato dell'analisi in memoria) non migliorerà velocità di indicizzazione affatto.

Alcune note a margine:

  • C'è un po 'lavori in corso nella versione di sviluppo di Lucene al fine di migliorare l'indicizzazione il parallelismo (la parte di lavaggio in particolare, questo blog entry spiega come funziona).

  • Lucene ha una bella pagina wiki su how to improve indexing speed dove si trovano altri modi per migliorare la velocità di indicizzazione.

+0

Apprezzo molto il tuo Il tuo commento sul numero di discussioni è stato davvero utile, non l'ho detto prima ... – orezvani

5

Penso che il modo più moderno per fare ciò sia usare un ThreadPoolExecutor e inviare uno Runnable che sta facendo l'indicizzazione. Puoi aspettare che tutti i thread terminino usando .awaitTermination o CountdownLatch.

Io non sono un grande fan di avere la vostra classe principale estende Thread, basta creare una classe interna eseguibile che prende le sue depdencies in un costruttore. Ciò rende il tuo codice più leggibile, in quanto il lavoro svolto dai thread è chiaramente separato dal codice di configurazione dell'applicazione.

Alcune note sullo stile, non sono un grande fan di avere la tua classe principale lancia Eccezione, questo di solito significa solo che non hai una chiara idea dei diversi casi di eccezione controllati che il codice che stai usando può lanciare . Di solito non è la cosa giusta da fare se non si ha una ragione molto specifica.

+0

Grazie in anticipo. In realtà ho implementato Runnable che è stata una buona idea e ho utilizzato ThreadPoolExecutor che ha risolto un bug reale nel programma menzionato da jpountz. – orezvani

+0

Lo svantaggio di 'awaitTermination' è che non aspetta che tutti i thread finiscano, ma uscirà dopo n unità di tempo. :-(È necessario un ciclo –

+0

d'accordo con questo, questo si rivelerà che IndexWriter non si chiuderà correttamente e il writer_lock esisterà ancora anche l'indice La directory non viene manipolata dall'indicatore – JasonHuang