2013-03-07 6 views
6

La fase di riduzione del processo non riesce con:La riduzione non riesce a causa del tentativo di attività non è stato segnalato lo stato per 600 secondi. Uccidere! Soluzione?

di attività di riduzione non riuscite ha superato il limite consentito.

Il motivo per cui ogni attività ha esito negativo è:

Task attempt_201301251556_1637_r_000005_0 non è riuscito a riportare lo stato per 600 secondi. Uccidere!

problema in dettaglio:

La fase Map prende in ogni record, che è del formato: tempo, eliminare i dati.

I dati sono del formato: elemento dati e relativo conteggio.

ad esempio: a, 1 b, 4 c, 7 corrisponde ai dati di un record.

Il mapper emette per ogni elemento dati i dati per ogni record. ad esempio:

chiave: (tempo, un,), val: (RID, dati) chiave: (tempo, B,), val: (RID, dati) chiave: (tempo, c,), val : (rid, dati)

Ogni riduzione riceve tutti i dati corrispondenti alla stessa chiave da tutti i record. ad esempio: chiave: (tempo, a), val: (rid1, dati) e chiave: (tempo, a), val: (rid2, dati) raggiungere la stessa istanza di riduzione.

Fa un po 'di elaborazione qui e produce riduzioni simili.

Il mio programma viene eseguito senza problemi per un set di dati di piccole dimensioni come 10 MB. Ma fallisce quando i dati aumentano per dire 1G, con la ragione sopra citata. Non so perché questo succede. Per favore aiuto!

ridurre il codice:

Ci sono due classi di seguito:

  • VCLReduce0Split
  • CoreSplit

a. VCLReduce0SPlit

public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ 
    // @SuppressWarnings("unchecked") 
     public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 

      String key_str = key.toString(); 
      StringTokenizer stk = new StringTokenizer(key_str); 
      String t = stk.nextToken(); 

      HashMap<String, String> hmap = new HashMap<String, String>(); 

      while(values.hasNext()) 
      { 
       StringBuffer sbuf1 = new StringBuffer(); 
       String val = values.next().toString(); 
       StringTokenizer st = new StringTokenizer(val); 

       String uid = st.nextToken(); 

       String data = st.nextToken(); 

        int total_size = 0; 

        StringTokenizer stx = new StringTokenizer(data,"|"); 

        StringBuffer sbuf = new StringBuffer(); 

        while(stx.hasMoreTokens()) 
        { 
         String data_part = stx.nextToken(); 
         String data_freq = stx.nextToken(); 

        // System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq); 
         sbuf.append(data_part); 
         sbuf.append("|"); 
         sbuf.append(data_freq); 
         sbuf.append("|"); 
        } 
       /*  
        for(int i = 0; i<parts.length-1; i++) 
        { 
         System.out.println("data:--------------->"+data); 
         int part_size = Integer.parseInt(parts[i+1]); 
         sbuf.append(parts[i]); 
         sbuf.append("|"); 
         sbuf.append(part_size); 
         sbuf.append("|"); 
         total_size = part_size+total_size; 
         i++; 
        }*/ 

       sbuf1.append(String.valueOf(total_size)); 
       sbuf1.append(","); 
       sbuf1.append(sbuf); 
       if(uid.equals("203664471")){ 
       // System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf); 
       } 
       hmap.put(uid, sbuf1.toString()); 

      } 

      float threshold = (float)0.8; 

      CoreSplit obj = new CoreSplit(); 


      ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold); 

      for(int i = 0; i<al.size(); i++) 
      { 
       CustomMapSimilarity cmaps = al.get(i); 
       String xy_pair = cmaps.getRIDPair(); 
       String similarity = cmaps.getSimilarity(); 
       output.collect(new Text(xy_pair), new Text(similarity)); 
      } 


     } 
    } 

b. coreSplit

package com.a; 

import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Iterator; 
import java.util.Set; 
import java.util.StringTokenizer; 
import java.util.TreeMap; 

import org.apache.commons.collections.map.MultiValueMap; 

public class PPJoinPlusCoreOptNewSplit{ 


    public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t) 
    { 

     ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>(); 
     ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>(); 

     Iterator<String> iter = hmap.keySet().iterator(); 

     MultiValueMap index = new MultiValueMap(); 

     String RID; 
     TreeMap<String, Integer> hmap2; 
     Iterator<String> iter1; 

     int size; 
     float prefix_size; 
     HashMap<String, Float> alpha; 
     HashMap<String, CustomMapOverlap> hmap_overlap; 

     String data; 

     while(iter.hasNext()) 
      { 
       RID = (String)iter.next(); 

       String data_val = hmap.get(RID); 

       StringTokenizer st = new StringTokenizer(data_val,","); 
      // System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time); 
       String RIDsize = st.nextToken(); 
       size = Integer.parseInt(RIDsize); 
       data = st.nextToken(); 


       StringTokenizer st1 = new StringTokenizer(data,"\\|"); 


       String[] parts = data.split("\\|"); 

      // hmap2 = (TreeMap<String, Integer>)hmap.get(RID); 
     //  iter1 = hmap2.keySet().iterator(); 

      // size = hmap_size.get(RID); 

       prefix_size = (float)(size-(0.8*size)+1); 

       if(size==1) 
       { 
        prefix_size = 1; 
       } 

       alpha = new HashMap<String, Float>(); 

       hmap_overlap = new HashMap<String, CustomMapOverlap>(); 

     //  Iterator<String> iter2 = hmap2.keySet().iterator(); 

       int prefix_index = 0; 

       int pi=0; 

       for(float j = 0; j<=prefix_size; j++) 
       { 

        boolean prefix_chk = false; 
        prefix_index++; 
        String ptoken = parts[pi]; 
      //  System.out.println("data:---->"+data+" ptoken:---->"+ptoken); 
        float val = Float.parseFloat(parts[pi+1]); 
        float temp_j = j; 
        j = j+val; 
        boolean j_l = false ; 
        float prefix_contri = 0; 
        pi= pi+2; 

        if(j>prefix_size) 
         { 

          // prefix_contri = j-temp_j; 
          prefix_contri = prefix_size-temp_j; 

          if(prefix_contri>0) 
          { 
           j_l = true; 
           prefix_chk = false; 

          } 
          else 
          { 
           prefix_chk = true;        
          } 
         }     


        if(prefix_chk == false){ 


         filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri); 


        CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j); 
        index.put(ptoken, cmapt); 

       } 

      } 


       als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap); 

       for(int i = 0; i<als.size(); i++) 
       { 
        if(als.get(i).getRIDPair()!=null) 
        { 
         alsim.add(als.get(i)); 

        } 
       } 

      } 

     return alsim; 

    } 


    public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri) 
    { 
      @SuppressWarnings("unchecked") 

      ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken); 

      if((positions_list!=null) &&(positions_list.size()!=0)) 
      { 

       CustomMapPrefixTokens cmapt ; 
       String y; 
       Iterator<String> iter3; 
       int y_size = 0; 
       float check_size = 0; 
      // TreeMap<String, Integer> hmapy; 
       float RID_val=0; 
       float y_overlap = 0; 
       float ubound = 0; 
       ArrayList<Float> fl = new ArrayList<Float>(); 

       StringTokenizer st; 

      for(int k = 0; k<positions_list.size(); k++) 
      { 
       cmapt = positions_list.get(k); 

       if(!cmapt.getRID().equals(RID)) 
       { 

       y = hmap.get(cmapt.getRID()); 

       // iter3 = y.keySet().iterator(); 

       String yRID = cmapt.getRID(); 

       st = new StringTokenizer(y,","); 

       y_size = Integer.parseInt(st.nextToken()); 

       check_size = (float)0.8*(size); 

       if(y_size>=check_size) 
       { 

        //hmapy = hmap.get(yRID); 

        String y_data = st.nextToken(); 

        StringTokenizer st1 = new StringTokenizer(y_data,"\\|"); 


        while(st1.hasMoreTokens()) 
        { 
         String token = st1.nextToken(); 
         if(token.equals(ptoken)) 
         { 

          String nxt_token = st1.nextToken(); 
        //  System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token); 
          RID_val = (float)Integer.parseInt(nxt_token); 
          break; 
         } 
        } 

       // RID_val = (float) hmapy.get(ptoken); 
        float alpha1 = (float)(0.8/1.8)*(size+y_size); 

        fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri); 

        ubound = fl.get(0); 
        y_overlap = fl.get(1); 


        positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap); 

        } 

       } 
      } 
     } 



    } 


    public void positionFilter(float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap) 
    { 

    float y_overlap_total = 0; 

      if(null!=hmap_overlap.get(cmapt.getRID())) 
      { 

      y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap(); 

      if((y_overlap_total+ubound)>=alpha1) 
      { 

       CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID()); 

       float y_o_t = y_overlap+y_overlap_total; 

       cmap_tmp.setOverlap(y_o_t); 
       hmap_overlap.put(cmapt.getRID(),cmap_tmp); 

      } 
      else 
      { 
       float n = 0; 
       hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n)); 
      } 

      } 
      else 
      { 
       CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap); 
       hmap_overlap.put(cmapt.getRID(), cmap_tmp); 

      } 

    } 

    public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri) 
    { 

      alpha.put(cmapt.getRID(), alpha1); 
      float min1 = y_size-cmapt.getPosition(); 
      float min2 = size-j; 
      float min = 0; 

      float y_overlap = 0; 

      if(min1<min2) 
      { 
       min = min1; 
      } 
      else 
      { 
       min = min2; 
      } 
      if(j_l==true) 
      { 
       val = prefix_contri;  
      }          
      if(RID_val<val) 
      { 
       y_overlap = RID_val; 
      } 
      else 
      { 
       y_overlap = val; 
      } 

      float ubound = y_overlap+min; 

      ArrayList<Float> fl = new ArrayList<Float>(); 
      fl.add(ubound); 
      fl.add(y_overlap); 

      return fl; 

    } 


    public ArrayList<CustomMapSimilarity> calcSimilarity(String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap) 
    { 

     float jaccard = 0; 

     CustomMapSimilarity cms = new CustomMapSimilarity(null, null); 
     ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>(); 

     Iterator<String> iter = hmap_overlap.keySet().iterator(); 

     while(iter.hasNext()) 
     { 
      String key = (String)iter.next(); 

      CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key); 

      float overlap = (float)val.getOverlap(); 

      if(overlap>0) 
      { 

       String yRID = val.getRID(); 

       String RIDpair = RID+" "+yRID; 

      jaccard = unionIntersection(hmap, RIDpair); 

      if(jaccard>0.8) 
       { 
        cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard)); 
        alsim.add(cms); 
       } 

      } 

     } 

     return alsim; 

    } 


    public float unionIntersection(HashMap<String,String> hmap, String RIDpair) 
    { 


      StringTokenizer st = new StringTokenizer(RIDpair); 

      String xRID = st.nextToken(); 

      String yRID = st.nextToken(); 

      String xdata = hmap.get(xRID); 

      String ydata = hmap.get(yRID); 


      int total_union = 0; 

      int xval = 0; 
      int yval = 0; 
      int part_union = 0; 

      int total_intersect = 0; 

     // System.out.println("xdata:------*************>"+xdata); 

      StringTokenizer xtokenizer = new StringTokenizer(xdata,","); 
      StringTokenizer ytokenizer = new StringTokenizer(ydata,","); 
     // String[] xpart = xdata.split(","); 
     // String[] ypart = ydata.split(","); 

      xtokenizer.nextToken(); 
      ytokenizer.nextToken(); 

      String datax = xtokenizer.nextToken(); 
      String datay = ytokenizer.nextToken(); 


      HashMap<String,Integer> x = new HashMap<String, Integer>(); 
      HashMap<String,Integer> y = new HashMap<String, Integer>(); 


      String [] xparts; 

       xparts = datax.toString().split("\\|"); 


       String [] yparts; 

       yparts = datay.toString().split("\\|"); 


       for(int i = 0; i<xparts.length-1; i++) 
       { 
        int part_size = Integer.parseInt(xparts[i+1]); 
        x.put(xparts[i], part_size); 

        i++; 
       } 

       for(int i = 0; i<yparts.length-1; i++) 
       { 
        int part_size = Integer.parseInt(yparts[i+1]); 
        y.put(xparts[i], part_size); 

        i++; 
       } 


      Set<String> xset = x.keySet(); 
      Set<String> yset = y.keySet(); 

      for(String elm:xset) 
      { 

       yval = 0; 

       xval = (Integer)x.get(elm); 

       part_union = 0; 
       int part_intersect = 0; 
       if(yset.contains(elm)){ 

        yval = (Integer) y.get(elm); 

       if(xval>yval) 
       { 
        part_union = xval; 
        part_intersect = yval; 
       } 
       else 
       { 
        part_union = yval; 
        part_intersect = xval; 
       } 
       total_intersect = total_intersect+part_intersect; 
       } 
       else 
       { 
        part_union = xval; 
       } 

       total_union = total_union+part_union; 


      } 


      for(String elm: yset) 
      { 
       part_union = 0; 

       if(!xset.contains(elm)) 
       { 
        part_union = (Integer) y.get(elm); 
        total_union = total_union+part_union; 
       } 

      } 

      float jaccard = (float)total_intersect/total_union; 

     return jaccard; 

    } 

} 
+0

potete inserire il vostro codice riduttore? –

+0

Ho aggiunto il codice. Potete suggerirmi se devo cambiare qualcosa in ordine per renderlo più efficiente in termini di CPU e così via. –

risposta

10

Il motivo per i timeout potrebbe essere un calcolo a lungo termine nel riduttore senza riportare i progressi nel framework Hadoop. Questo può essere risolto utilizzando diversi approcci:

I. aumentare il timeout in mapred-site.xml:

<property> 
    <name>mapred.task.timeout</name> 
    <value>1200000</value> 
</property> 

Il valore predefinito è 600000 ms = 600 seconds.

II. Comunicare i progressi ogni x record come nel Reducer example in javadoc:

public void reduce(K key, Iterator<V> values, 
          OutputCollector<K, V> output, 
          Reporter reporter) throws IOException { 
    // report progress 
    if ((noValues%10) == 0) { 
    reporter.progress(); 
    } 

    // ... 
} 

Opzionalmente è possibile incrementare un contatore personalizzato come nel example:

reporter.incrCounter(NUM_RECORDS, 1); 
+0

Ciao, grazie per la risposta !. Ho incollato il mio codice di riduzione qui sopra. Nella mia classe reduce, il calcolo principale inizia dopo che l'intera reduc_value_list è stata letta. In questo caso, il programma viene collegato nel calcolo principale al di fuori dell'elenco dei valori ridotti_valore durante il ciclo, come segnalare i progressi? Inoltre, puoi suggerire un modo efficiente per fare CPU con il codice sopra incollato? Inizialmente, avevo usato le hashmap, che offrivano maggiore efficienza della CPU, ma l'ho rimosso a causa di problemi di memoria. –

+0

Presumo che il calcolo più lungo e più intenso della CPU venga eseguito in 'similarityCalculation()'. Dovresti segnalare i progressi in questo metodo. Dovresti anche prendere in considerazione la possibilità di sostituire tutta la tokenizzazione delle stringhe con classi appropriate, in modo che l'analisi delle stringhe e la tokenizzazione avvengano solo una volta. Questo potrebbe migliorare il tuo algoritmo. – harpun

+0

Un altro approccio sarebbe quello di riscrivere l'algoritmo, in modo che alcuni dei calcoli delle sovrapposizioni vengano eseguiti nei mappatori. Si spera che il calcolo parallelo acceleri l'algoritmo. Tuttavia, questo è ciò che devi capire da solo e convalidare se l'approccio è valido per l'algoritmo specifico che vorresti implementare. – harpun

2

E 'possibile che tu possa aver consumato tutto lo spazio di heap di Java o GC sta accadendo troppo spesso dando alcuna possibilità di riduttore per riportare lo stato di padroneggiare e quindi è ucciso.

Un'altra possibilità è che uno dei riduttori stia ottenendo dati troppo distorti, ad esempio per una particolare riduzione, ci sono molti record.

cercare di aumentare il vostro mucchio Java impostando la seguente configurazione: mapred.child.java.opts

a

-Xmx2048m

Inoltre, cercare di ridurre il numero di riduttori paralleli impostando la seguente configurazione per un più basso valore rispetto a quello attuale (valore predefinito è 2):

mapred.tasktracker.reduce.tasks.maximum

+0

Grazie per la risposta. È stato molto utile Ho incollato il codice di riduzione, puoi suggerire modi più efficienti di implementarlo? –

Problemi correlati