2013-07-10 12 views
5

Eseguo un'attività mappa su un file di piccole dimensioni (3-4 MB), ma l'output della mappa è relativamente grande (150 MB). Dopo aver visualizzato Mappa al 100%, è necessario molto tempo per completare lo sversamento. Si prega di suggerire come posso ridurre questo periodo. Di seguito sono riportati alcuni registri di esempio ..."Avvio a filo dell'output della mappa" richiede molto tempo nell'attività mappa hadoop

13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output 
13/07/10 17:45:32 INFO mapred.JobClient: map 98% reduce 0% 
13/07/10 17:45:34 INFO mapred.LocalJobRunner: 
13/07/10 17:45:35 INFO mapred.JobClient: map 100% reduce 0% 
13/07/10 17:45:37 INFO mapred.LocalJobRunner: 
13/07/10 17:45:40 INFO mapred.LocalJobRunner: 
13/07/10 17:45:43 INFO mapred.LocalJobRunner: 
13/07/10 17:45:46 INFO mapred.LocalJobRunner: 
13/07/10 17:45:49 INFO mapred.LocalJobRunner: 
13/07/10 17:45:52 INFO mapred.LocalJobRunner: 
13/07/10 17:45:55 INFO mapred.LocalJobRunner: 
13/07/10 17:45:58 INFO mapred.LocalJobRunner: 
13/07/10 17:46:01 INFO mapred.LocalJobRunner: 
13/07/10 17:46:04 INFO mapred.LocalJobRunner: 
13/07/10 17:46:07 INFO mapred.LocalJobRunner: 
13/07/10 17:46:10 INFO mapred.LocalJobRunner: 
13/07/10 17:46:13 INFO mapred.LocalJobRunner: 
13/07/10 17:46:16 INFO mapred.LocalJobRunner: 
13/07/10 17:46:19 INFO mapred.LocalJobRunner: 
13/07/10 17:46:22 INFO mapred.LocalJobRunner: 
13/07/10 17:46:25 INFO mapred.LocalJobRunner: 
13/07/10 17:46:28 INFO mapred.LocalJobRunner: 
13/07/10 17:46:31 INFO mapred.LocalJobRunner: 
13/07/10 17:46:34 INFO mapred.LocalJobRunner: 
13/07/10 17:46:37 INFO mapred.LocalJobRunner: 
13/07/10 17:46:40 INFO mapred.LocalJobRunner: 
13/07/10 17:46:43 INFO mapred.LocalJobRunner: 
13/07/10 17:46:46 INFO mapred.LocalJobRunner: 
13/07/10 17:46:49 INFO mapred.LocalJobRunner: 
13/07/10 17:46:52 INFO mapred.LocalJobRunner: 
13/07/10 17:46:55 INFO mapred.LocalJobRunner: 
13/07/10 17:46:58 INFO mapred.LocalJobRunner: 
13/07/10 17:47:01 INFO mapred.LocalJobRunner: 
13/07/10 17:47:04 INFO mapred.LocalJobRunner: 
13/07/10 17:47:07 INFO mapred.LocalJobRunner: 
13/07/10 17:47:10 INFO mapred.LocalJobRunner: 
13/07/10 17:47:13 INFO mapred.LocalJobRunner: 
13/07/10 17:47:16 INFO mapred.LocalJobRunner: 
13/07/10 17:47:19 INFO mapred.LocalJobRunner: 
13/07/10 17:47:22 INFO mapred.LocalJobRunner: 
13/07/10 17:47:25 INFO mapred.LocalJobRunner: 
13/07/10 17:47:28 INFO mapred.LocalJobRunner: 
13/07/10 17:47:31 INFO mapred.LocalJobRunner: 
13/07/10 17:47:34 INFO mapred.LocalJobRunner: 
13/07/10 17:47:37 INFO mapred.LocalJobRunner: 
13/07/10 17:47:40 INFO mapred.LocalJobRunner: 
13/07/10 17:47:43 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0 
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting 
13/07/10 17:47:45 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done. 
............................... 
............................... 
............................... 
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22 
13/07/10 17:47:52 INFO mapred.JobClient: File Output Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Written=13401245 
13/07/10 17:47:52 INFO mapred.JobClient: FileSystemCounters 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_READ=18871098 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_READ=7346566 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=35878426 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_WRITTEN=18621307 
13/07/10 17:47:52 INFO mapred.JobClient: File Input Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Read=2558288 
13/07/10 17:47:52 INFO mapred.JobClient: Map-Reduce Framework 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input groups=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output materialized bytes=13320006 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map input records=71040 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce shuffle bytes=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Spilled Records=1480000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output bytes=119998400 
13/07/10 17:47:52 INFO mapred.JobClient:  CPU time spent (ms)=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Total committed heap usage (bytes)=1178009600 
13/07/10 17:47:52 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine input records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  SPLIT_RAW_BYTES=122 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input records=740000 

Mappa codice attività Origine:

public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{ 

    private DoubleWritable distGexpr = new DoubleWritable(); 
    private LongWritable m2keyOut = new LongWritable(); 
    int trMax,tstMax; 

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { 

     Configuration conf =context.getConfiguration(); 
     tstMax = conf.getInt("mtst", 10); 
     trMax = conf.getInt("mtr", 10); 

    } 

    public void map(Text key, Text values, Context context) throws IOException, InterruptedException { 
     String line = values.toString(); 

     double Tij=0.0,TRij=0.0, dist=0; 
     int i=0,j; 
     long m2key=0; 
     String[] SLl = new String[]{}; 

     Configuration conf =context.getConfiguration(); 

     m2key = Long.parseLong(key.toString()); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     j=0; 
     while (tokenizer.hasMoreTokens()) { 

      String test = tokenizer.nextToken(); 
      if(j==0){ 
       Tij = Double.parseDouble(test); 
      } 
      else if(j==1){ 
       TRij = Double.parseDouble(test); 
      } 
      else if(j==2){ 
       SLl = StringUtils.split(conf.get(test),","); 
      } 
      j++; 
     } 
     //Map input ends 

     //Distance Measure function 
     dist = (long)Math.pow((Tij - TRij), 2); 

     //remove gid from key 
     m2key = m2key/100000; 
     //Map2 <key,value> emit starts 
     for(i=0; i<SLl.length;i++){ 
       long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key; 
      m2keyOut.set(m2keyNew); 
      distGexpr.set(dist); 
      context.write(m2keyOut,distGexpr); 
     } 
     //<key,value> emit done 
    } 

} 

Sample Map Input: L'ultima variabile in ogni linea di ottenere un array di interi dalle variabili di trasmissione. Ogni linea produrrà circa 100-200 record di output.

10100014 1356.3238 1181.63 gs-4-56 
10100026 3263.1167 3192.4131 gs-3-21 
10100043 1852.0 1926.3962 gs-4-76 
10100062 1175.5925 983.47125 gs-3-19 
10100066 606.59125 976.26625 gs-8-23 

Campione Mappa uscita:

10101 8633.0 
10102 1822.0 
10103 13832.0 
10104 2726470.0 
10105 1172991.0 
10107 239367.0 
10109 5410384.0 
10111 7698352.0 
10112 6.417 
+1

Puoi pubblicare il codice del tuo mapper (o almeno una descrizione di ciò che fa il tuo mappatore), registrare i record di input e i record di output? Hai un metodo di pulizia? –

+0

Grazie per la risposta. Ho aggiunto il codice sorgente per questa attività sulla mappa e l'input e l'output del campione. Non ho usato alcun metodo di pulizia. In realtà ci sono stati molti sversamenti prima. Così, ho cambiato io.sort.record.percent e poche altre impostazioni. Quindi gli sversamenti sono ridotti al minimo ma il tempo di esecuzione complessivo è rimasto lo stesso. –

risposta

0

suppone che si sia risolto il (2 anni dopo la pubblicazione del messaggio originale), ma solo per chiunque passi lo stesso problema, cercherò fornendo alcuni suggerimenti.

A giudicare dai tuoi contatori, capisco che tu già usi la compressione (poiché il numero di output di mappa materializzati in byte è diverso dal numero di byte di output della mappa), che è una buona cosa. È possibile comprimere ulteriormente l'output del mapper, utilizzando la classe codificata a lunghezza variabile VLongWritable come tipo di chiave di output della mappa. (C'era anche una classe VDoubleWritable, se non sbaglio, ma deve essere stata deprecata ormai).

Nel ciclo for, in cui si emette l'uscita, non è necessario impostare la variabile distGexpr ogni volta. È sempre lo stesso, quindi impostalo appena prima del ciclo for. È anche possibile archiviare a lungo il prodotto di trMax*tstMax fuori dal ciclo e non calcolarlo su ogni iterazione.

Se possibile, impostare il tasto di input LongWritable (dal lavoro precedente), in modo da poter salvare le chiamate Long.parseLong() e Text.toString().

Se possibile (a seconda del riduttore), utilizzare un combinatore per ridurre le dimensioni dei byte versati.

Non sono riuscito a trovare un modo per saltare quella chiamata Integer.parseInt() all'interno del ciclo for, ma sarebbe possibile risparmiare un po 'di tempo se inizialmente fosse possibile caricare SLl come int[].

Problemi correlati