2011-09-26 16 views
10

Ho appena scoperto che ha solo una struttura NIO, Java NIO Pipe progettata per il trasferimento dei dati tra i thread. C'è qualche vantaggio nell'usare questo meccanismo sul messaggio più convenzionale che passa su una coda, come un ArrayBlockingQueue?Java NIO Pipe vs BlockingQueue

+0

I pipe passano attraverso il kernel, raramente utile in quanto il selettore si è svegliato ... implementato tramite pipe su linux ... – bestsss

+0

@bestsss cura di elaborare? puoi registrare pipe con selettori per ricevere notifiche, qual è il problema? – raffian

+0

@raffian, per dirla semplicemente - non puoi davvero usare Pipes per IPC e all'interno del processo c'è un modo molto più efficiente per passare le informazioni. – bestsss

risposta

6

Di solito il modo più semplice per passare i dati per un altro thread da elaborare è utilizzare un ExecutorService. Questo include una coda e un pool di thread (può avere un thread)

È possibile utilizzare un tubo quando si dispone di una libreria che supporta i canali NIO. È anche utile se si desidera passare ByteBuffer di dati tra i thread.

Altrimenti è solitamente semplice/veloce utilizzare un oggetto ArrayBlockingQueue.

Se volete un modo più veloce per lo scambio di dati tra le discussioni vi consiglio di guardare al Exchanger tuttavia non è scopo più generale di un ArrayBlockingQueue.

The Exchanger and GC-less Java

+0

Grazie, non ho mai preso in considerazione il fatto che l'uso dello scambiatore riduce al minimo l'overhead di GC. Tuttavia, il lato negativo dello scambiatore è che è sincrono. Di solito vuoi semplicemente pompare dati in un altro thread senza dover aspettare che venga raccolto. – Maxaon3000

+0

Una pipe è una dimensione fissa. Il problema è lo stesso se il produttore sta producendo per digiunare deve fermarsi. Se il produttore non riempie mai il buffer prima che il consumatore finisca, non deve fermarsi (in entrambi i casi) –

+1

I pipe vengono utilizzati per implementare Selector.wakeup, oltre a non essere molto utili, in quanto le sole soluzioni di memoria sono più efficaci e non passare attraverso il kernel. – bestsss

1

suppongo il tubo avrà una latenza meglio come si potrebbe molto probabilmente essere implementato con coroutine dietro le quinte. Pertanto, il produttore restituisce immediatamente al consumatore quando i dati sono disponibili, non quando decide lo scheduler dei thread.

I tubi in genere rappresentano un problema produttore-consumatore e molto probabilmente verranno implementati in questo modo in modo che entrambi i thread cooperino e non vengano anticipati esternamente.

2

Credo che un tubo NIO sia stato progettato in modo da poter inviare dati a un canale all'interno del ciclo di selezione in modo thread-safe, in altre parole, qualsiasi thread può scrivere sulla pipe e i dati verranno gestiti nell'altro estremo del tubo, all'interno del loop del selettore. Quando scrivi su una pipe, puoi rendere leggibile il canale sull'altro lato.

+0

Mi chiedo quali siano le caratteristiche delle prestazioni del passaggio dei dati tra i thread utilizzando un ciclo di selezione su un semplice polling di coda. Inoltre, il passaggio di dati attraverso una pipe sembra comportare l'inconveniente di dover passare byte anziché oggetti agli altri thread. In altre parole, ti obbliga a sviluppare un protocollo wire per lo scambio di dati tra thread. – Maxaon3000

+0

Intendi su un ConcurrentLinkedQueue, giusto? Questa è una grande domanda. Scommetto i miei chip su ConcurrentLinkedQueue. :) Ma un vantaggio che vedo delle pipe è: si invia un messaggio come fanno tutti gli altri, in altre parole, si legge da un canale invece di recuperare un oggetto dalla coda. – chrisapotek

3

Quindi, dopo aver avuto un sacco di problemi con il tubo (check here), ho deciso di favorire le code concorrenti non bloccanti su pipe NIO. Così ho fatto alcuni benchmark su ConcurrentLinkedQueue di Java. Vedi sotto:

public static void main(String[] args) throws Exception { 

    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); 

    // first test nothing: 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 

     for (int i = 0; i < 1000000; i++) { 
      bench.mark(); 
      // s = queue.poll(); 
      bench.measure(); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 

    System.out.println(); 

    // first test empty queue: 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 

     for (int i = 0; i < 1000000; i++) { 
      bench.mark(); 
      s = queue.poll(); 
      bench.measure(); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 

    System.out.println(); 

    // now test polling one element on a queue with size one 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 
     String x = "pela"; 

     for (int i = 0; i < 1000000; i++) { 
      queue.offer(x); 
      bench.mark(); 
      s = queue.poll(); 
      bench.measure(); 
      if (s != x) throw new Exception("bad!"); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 

    System.out.println(); 

    // now test polling one element on a queue with size two 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 
     String x = "pela"; 

     for (int i = 0; i < 1000000; i++) { 
      queue.offer(x); 
      queue.offer(x); 
      bench.mark(); 
      s = queue.poll(); 
      bench.measure(); 
      if (s != x) throw new Exception("bad!"); 
      queue.poll(); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 
} 

I risultati:

totalLogs=1000000, minTime=0, maxTime=85000, avgTime=58.61 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=5281000, avgTime=63.35 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=725000, avgTime=59.71 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=25000, avgTime=58.13 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=378000, avgTime=58.45 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=15000, avgTime=57.71 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=170000, avgTime=58.11 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1495000, avgTime=59.87 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=232000, avgTime=63.0 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=184000, avgTime=57.89 (times in nanos) 

totalLogs=1000000, minTime=0, maxTime=2600000, avgTime=65.22 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=850000, avgTime=60.5 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=150000, avgTime=63.83 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=43000, avgTime=59.75 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=276000, avgTime=60.02 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=457000, avgTime=61.69 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=204000, avgTime=60.44 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=154000, avgTime=63.67 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=355000, avgTime=60.75 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=338000, avgTime=60.44 (times in nanos) 

totalLogs=1000000, minTime=0, maxTime=345000, avgTime=110.93 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=396000, avgTime=100.32 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=298000, avgTime=98.93 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1891000, avgTime=101.9 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=254000, avgTime=103.06 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1894000, avgTime=100.97 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=230000, avgTime=99.21 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=348000, avgTime=99.63 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=922000, avgTime=99.53 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=168000, avgTime=99.12 (times in nanos) 

totalLogs=1000000, minTime=0, maxTime=686000, avgTime=107.41 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=320000, avgTime=95.58 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=248000, avgTime=94.94 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=217000, avgTime=95.01 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=159000, avgTime=93.62 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=155000, avgTime=95.28 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=106000, avgTime=98.57 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=370000, avgTime=95.01 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1836000, avgTime=96.21 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=212000, avgTime=98.62 (times in nanos) 

Conclusione:

Il MaxTime può essere spaventoso, ma penso che sia sicuro di concludere che siamo negli anni '50 nano gamma per il polling un concomitante coda.