2010-08-04 8 views
7

Sto provando a scrivere un crawler Web con multithreading.Utilizzo Java ThreadPool

La mia classe di ingresso principale ha il seguente codice:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null) 
     return; 
exec.execute(new URLCrawler(this, url)); 
} 

L'URLCrawler recupera l'URL specificato, analizza il codice HTML estratti i link da esso, e gli orari collegamenti invisibili indietro alla frontiera.

Una frontiera è una coda di URL non distribuiti. Il problema è come scrivere il metodo get(). Se la coda è vuota, è necessario attendere la fine di qualsiasi URLCrawler e riprovare. Dovrebbe restituire null solo quando la coda è vuota e URLCrawler non è attivo.

La mia prima idea era di usare un oggetto AtomicInteger per contare il numero corrente di URLCrawlers funzionanti e un oggetto ausiliario per le chiamate notifyAll()/wait(). Ogni crawler all'avvio incrementa il numero di URLCrawlers funzionanti correnti e all'uscita li decrementa e notifica all'oggetto che è stato completato.

Ma ho letto che notify()/notifyAll() e wait() sono metodi un po 'deprecati per fare comunicazioni thread.

Che cosa dovrei usare in questo modello di lavoro? È simile ai produttori M e N consumatori, la questione è come affrontare l'esaurimento dei produttori.

risposta

1

Penso che l'uso di attesa/notifica sia giustificato in questo caso. Non riesco a pensare a un modo semplice per farlo usando j.u.c.
In una classe, chiamiamolo Coordinatore:

private final int numOfCrawlers; 
private int waiting; 

public boolean shouldTryAgain(){ 
    synchronized(this){ 
     waiting++; 
     if(waiting>=numOfCrawlers){ 
      //Everybody is waiting, terminate 
      return false; 
     }else{ 
      wait();//spurious wake up is okay 
      //waked up for whatever reason. Try again 
      waiting--; 
      return true; 
     } 
    } 

public void hasEnqueued(){ 
    synchronized(this){ 
     notifyAll(); 
    } 
} 

poi,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null){ 
     if(!coordinator.shouldTryAgain()){ 
      //all threads are waiting. No possibility of new jobs. 
      return; 
     }else{ 
      //Possible that there are other jobs. Try again 
      continue; 
     } 
    } 
    exec.execute(new URLCrawler(this, url)); 
}//while(true) 
3

io non sono sicuro di aver capito il vostro disegno, ma questo può essere un lavoro per una soluzione Semaphore

3

Uno è quello di rendere "frontiera" una coda di blocco, quindi ogni filo cercare di "ottenere" da esso bloccherà . Non appena un altro URLCrawler mette gli oggetti in quella coda, qualsiasi altro thread verrà automaticamente notificato (con l'oggetto disattivato)

+0

Sì, questa è una soluzione per uno stato stazionario. Ma come affrontare allora la situazione quando nessuno degli URLCrawlers mette in coda gli URL? Con una coda bloccante la frontiera bloccherà all'infinito. –

+0

In tal caso è possibile avere un metodo crawlerDone() sull'oggetto di frontiera che viene chiamato ogni volta che un UrlCrawler termina il lavoro. Questo metodo insieme all'approccio contatore che hai suggerito, puoi testare (nel tuo metodo di frontiera) se tutti i crawler hanno finito. Se ciò è vero, get() può restituire null senza bloccare la frontiera – naikus

+0

può essere una coda di blocco della capacità fissa. un buon candidato per quella capacità è il numero di Crawlers –

2

Penso che un blocco di base per il vostro caso d'uso è un "latch", simile a CountDownLatch, ma a differenza di CountDownLatch, quello che consente di incrementare il numero incrementando anche il conteggio.

Un'interfaccia per un tale dispositivo di chiusura potrebbe essere

public interface Latch { 
    public void countDown(); 
    public void countUp(); 
    public void await() throws InterruptedException; 
    public int getCount(); 
} 

valori legali per i conteggi sarebbero 0 e fino. Il metodo await() ti consente di bloccare finché il conteggio non scende a zero.

Se si dispone di un tale dispositivo di chiusura, il caso d'uso può essere descritto abbastanza facilmente. Sospetto anche che la coda (frontiera) possa essere eliminata in questa soluzione (l'executor ne fornisce uno comunque è in qualche modo ridondante).Vorrei riscrivere la routine principale come

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers); 
Latch latch = ...; // instantiate a latch 
URL[] initialUrls = ...; 
for (URL url: initialUrls) { 
    executor.execute(new URLCrawler(this, url, latch)); 
} 
// now wait for all crawling tasks to finish 
latch.await(); 

tuo URLCrawler avrebbe utilizzato il fermo in questo modo:

public class URLCrawler implements Runnable { 
    private final Latch latch; 

    public URLCrawler(..., Latch l) { 
     ... 
     latch = l; 
     latch.countUp(); // increment the count as early as possible 
    } 

    public void run() { 
     try { 
      List<URL> secondaryUrls = crawl(); 
      for (URL url: secondaryUrls) { 
       // submit new tasks directly 
       executor.execute(new URLCrawler(..., latch)); 
      } 
     } finally { 
      // as a last step, decrement the count 
      latch.countDown(); 
     } 
    } 
} 

Per quanto riguarda le implementazioni fermo, non ci può essere una serie di possibili implementazioni, che vanno da uno che è basato su wait() e notifyAll(), uno che utilizza Lock e Condition, a un'implementazione che utilizza AbstractQueuedSynchronizer. Tutte queste implementazioni penso che sarebbero piuttosto semplici. Si noti che la versione wait() - notifyAll() e la versione Lock-Condition si baserebbero sull'esclusione reciproca, mentre la versione AQS utilizzerebbe CAS (compare-and-swap), e quindi potrebbe scalare meglio in determinate situazioni.

+0

Il tuo lucchetto personalizzato assomiglia molto a un semaforo ... Perché non usarne uno? – assylias

+0

Sì, ci sono sicuramente delle similitudini. Una cosa che manca dal semaforo della vaniglia è il metodo await() al di sopra del quale il termine semaforo può bloccare fino a quando non vengono rilasciati tutti i permessi.Probabilmente si può creare questo combinando un semaforo e un latch per il conto alla rovescia. – sjlee

0

Mi piacerebbe suggerire un AdaptiveExecuter. Sulla base di un valore caratteristico, è possibile scegliere di serializzare o parallelizzare un thread per l'esecuzione. Nell'esempio seguente, PUID è una stringa/oggetto che volevo utilizzare per prendere questa decisione. È possibile modificare la logica in base al proprio codice. Alcune parti del codice sono commentate per consentire ulteriori esperimenti.

classe AdaptiveExecutor implementa Executor { final Queue tasks = new LinkedBlockingQueue(); Runnable attivo; // ExecutorService threadExecutor = Executors.newCachedThreadPool(); static ExecutorService threadExecutor = Executors.newFixedThreadPool (4);

AdaptiveExecutor() { 
    System.out.println("Initial Queue Size=" + tasks.size()); 
} 

public void execute(final Runnable r) { 
    /* if immediate start is needed do either of below two 
    new Thread(r).start(); 

    try { 
     threadExecutor.execute(r); 
    } catch(RejectedExecutionException rEE) { 
     System.out.println("Thread Rejected " + new Thread(r).getName()); 
    } 

    */ 


    tasks.offer(r); // otherwise, queue them up 
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel. 
    /* 
    tasks.offer(new Runnable() { 
     public void run() { 
      try { 
       r.run(); 
      } finally { 
       scheduleNext(); 
      } 
     } 
    }); 
    */ 
    if ((active == null)&& !tasks.isEmpty()) { 
     active = tasks.poll(); 
     try { 
      threadExecutor.submit(active); 
     } catch (RejectedExecutionException rEE) { 
      System.out.println("Thread Rejected " + new Thread(r).getName()); 
     } 
    } 

    /* 
    if ((active == null)&& !tasks.isEmpty()) { 
     scheduleNext(); 
    } else tasks.offer(r); 
    */ 
    //tasks.offer(r); 

    //System.out.println("Queue Size=" + tasks.size()); 

} 

private void serialize(Thread th) { 
    try { 
     Thread activeThread = new Thread(active); 

     th.wait(200); 
     threadExecutor.submit(th); 
    } catch (InterruptedException iEx) { 

    } 
    /* 
    active=tasks.poll(); 
    System.out.println("active thread is " + active.toString()); 
    threadExecutor.execute(active); 
    */ 
} 

private void parallalize() { 
    if(null!=active) 
     threadExecutor.submit(active); 
} 

protected void scheduleNext(Thread r) { 
    //System.out.println("scheduleNext called") ; 
    if(false==compareKeys(r,new Thread(active))) 
     parallalize(); 
    else serialize(r); 
} 

private boolean compareKeys(Thread r, Thread active) { 
    // TODO: obtain names of threads. If they contain same PUID, serialize them. 
    if(null==active) 
     return true; // first thread should be serialized 
    else return false; //rest all go parallel, unless logic controlls it 
} 

}

2

la questione è un po 'vecchio, ma penso di aver trovato qualche semplice, soluzione di lavoro:

estendere la classe ThreadPoolExecutor come qui di seguito. La nuova funzionalità sta mantenendo il conteggio attività attivo (purtroppo, fornito getActiveCount() non è affidabile). Se taskCount.get() == 0 e non ci sono più attività in coda, significa che non c'è nulla da fare e l'executor si spegne. Hai i tuoi criteri di uscita. Inoltre, se si crea vostro esecutore, ma non riescono a presentare eventuali compiti, esso non bloccare:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor { 

    private final AtomicInteger taskCount = new AtomicInteger(); 

    public CrawlingThreadPoolExecutor() { 
     super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 

     super.beforeExecute(t, r); 
     taskCount.incrementAndGet(); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 

     super.afterExecute(r, t); 
     taskCount.decrementAndGet(); 
     if (getQueue().isEmpty() && taskCount.get() == 0) { 
      shutdown(); 
     } 
    } 
} 

Una cosa che dovete fare è implementare il Runnable in modo mantiene riferimento alla Executor si utilizza per poter inviare nuovi compiti. Ecco una simulazione:

public class MockFetcher implements Runnable { 

    private final String url; 
    private final Executor e; 

    public MockFetcher(final Executor e, final String url) { 
     this.e = e; 
     this.url = url; 
    } 

    @Override 
    public void run() { 
     final List<String> newUrls = new ArrayList<>(); 
     // Parse doc and build url list, and then: 
     for (final String newUrl : newUrls) { 
      e.execute(new MockFetcher(this.e, newUrl)); 
     } 
    } 
}