2013-03-07 10 views
6

Faza redukcji zadania kończy się niepowodzeniem z:Ograniczenie nie powiodło się z powodu próby wykonania zadania nie powiodło się zgłoszenie statusu przez 600 sekund. Zabicie! Rozwiązanie?

Nieudana redukcja zadań przekroczyła dozwolony limit.

Powodem każde zadanie nie jest:

attempt_201301251556_1637_r_000005_0 Zadanie nie statusu zgłosić do 600 sekund. Zabicie!

Problem szczegółowo:

Faza Mapa odbywa się każdego rekordu, który jest w formacie: czas pozbyć, dane.

Dane mają format: element danych i jego liczba.

np .: a, 1 b, 4 c, 7 odpowiada danym rekordu.

Dane wyjściowe mapera dla każdego elementu danych dane dla każdego rekordu. np

klucz: (czas, A), Val (RID danych) klucz: (czas, b), Val (RID danych) klucz: (czas, c), Val : (usuń, dane)

Każda redukcja odbiera wszystkie dane odpowiadające temu samemu kluczowi ze wszystkich rekordów. np .: klawisz: (time, a), val: (rid1, data) i klawisz: (time, a), val: (rid2, data) osiągnąć to samo zmniejszenie wystąpienia.

Wykonuje tutaj pewne przetwarzanie i wyświetla podobne rids.

Mój program działa bezproblemowo dla małego zestawu danych, takiego jak 10 MB. Ale zawiedzie, gdy dane wzrosną, aby powiedzieć 1G, z wyżej wymienionego powodu. Nie wiem, dlaczego tak się dzieje. Proszę pomóż!

Zmniejszyć kod:

Istnieją dwie klasy poniżej:

  • VCLReduce0Split
  • CoreSplit

a. VCLReduce0SPlit

public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ 
    // @SuppressWarnings("unchecked") 
     public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 

      String key_str = key.toString(); 
      StringTokenizer stk = new StringTokenizer(key_str); 
      String t = stk.nextToken(); 

      HashMap<String, String> hmap = new HashMap<String, String>(); 

      while(values.hasNext()) 
      { 
       StringBuffer sbuf1 = new StringBuffer(); 
       String val = values.next().toString(); 
       StringTokenizer st = new StringTokenizer(val); 

       String uid = st.nextToken(); 

       String data = st.nextToken(); 

        int total_size = 0; 

        StringTokenizer stx = new StringTokenizer(data,"|"); 

        StringBuffer sbuf = new StringBuffer(); 

        while(stx.hasMoreTokens()) 
        { 
         String data_part = stx.nextToken(); 
         String data_freq = stx.nextToken(); 

        // System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq); 
         sbuf.append(data_part); 
         sbuf.append("|"); 
         sbuf.append(data_freq); 
         sbuf.append("|"); 
        } 
       /*  
        for(int i = 0; i<parts.length-1; i++) 
        { 
         System.out.println("data:--------------->"+data); 
         int part_size = Integer.parseInt(parts[i+1]); 
         sbuf.append(parts[i]); 
         sbuf.append("|"); 
         sbuf.append(part_size); 
         sbuf.append("|"); 
         total_size = part_size+total_size; 
         i++; 
        }*/ 

       sbuf1.append(String.valueOf(total_size)); 
       sbuf1.append(","); 
       sbuf1.append(sbuf); 
       if(uid.equals("203664471")){ 
       // System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf); 
       } 
       hmap.put(uid, sbuf1.toString()); 

      } 

      float threshold = (float)0.8; 

      CoreSplit obj = new CoreSplit(); 


      ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold); 

      for(int i = 0; i<al.size(); i++) 
      { 
       CustomMapSimilarity cmaps = al.get(i); 
       String xy_pair = cmaps.getRIDPair(); 
       String similarity = cmaps.getSimilarity(); 
       output.collect(new Text(xy_pair), new Text(similarity)); 
      } 


     } 
    } 

b. coreSplit

package com.a; 

import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Iterator; 
import java.util.Set; 
import java.util.StringTokenizer; 
import java.util.TreeMap; 

import org.apache.commons.collections.map.MultiValueMap; 

public class PPJoinPlusCoreOptNewSplit{ 


    public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t) 
    { 

     ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>(); 
     ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>(); 

     Iterator<String> iter = hmap.keySet().iterator(); 

     MultiValueMap index = new MultiValueMap(); 

     String RID; 
     TreeMap<String, Integer> hmap2; 
     Iterator<String> iter1; 

     int size; 
     float prefix_size; 
     HashMap<String, Float> alpha; 
     HashMap<String, CustomMapOverlap> hmap_overlap; 

     String data; 

     while(iter.hasNext()) 
      { 
       RID = (String)iter.next(); 

       String data_val = hmap.get(RID); 

       StringTokenizer st = new StringTokenizer(data_val,","); 
      // System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time); 
       String RIDsize = st.nextToken(); 
       size = Integer.parseInt(RIDsize); 
       data = st.nextToken(); 


       StringTokenizer st1 = new StringTokenizer(data,"\\|"); 


       String[] parts = data.split("\\|"); 

      // hmap2 = (TreeMap<String, Integer>)hmap.get(RID); 
     //  iter1 = hmap2.keySet().iterator(); 

      // size = hmap_size.get(RID); 

       prefix_size = (float)(size-(0.8*size)+1); 

       if(size==1) 
       { 
        prefix_size = 1; 
       } 

       alpha = new HashMap<String, Float>(); 

       hmap_overlap = new HashMap<String, CustomMapOverlap>(); 

     //  Iterator<String> iter2 = hmap2.keySet().iterator(); 

       int prefix_index = 0; 

       int pi=0; 

       for(float j = 0; j<=prefix_size; j++) 
       { 

        boolean prefix_chk = false; 
        prefix_index++; 
        String ptoken = parts[pi]; 
      //  System.out.println("data:---->"+data+" ptoken:---->"+ptoken); 
        float val = Float.parseFloat(parts[pi+1]); 
        float temp_j = j; 
        j = j+val; 
        boolean j_l = false ; 
        float prefix_contri = 0; 
        pi= pi+2; 

        if(j>prefix_size) 
         { 

          // prefix_contri = j-temp_j; 
          prefix_contri = prefix_size-temp_j; 

          if(prefix_contri>0) 
          { 
           j_l = true; 
           prefix_chk = false; 

          } 
          else 
          { 
           prefix_chk = true;        
          } 
         }     


        if(prefix_chk == false){ 


         filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri); 


        CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j); 
        index.put(ptoken, cmapt); 

       } 

      } 


       als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap); 

       for(int i = 0; i<als.size(); i++) 
       { 
        if(als.get(i).getRIDPair()!=null) 
        { 
         alsim.add(als.get(i)); 

        } 
       } 

      } 

     return alsim; 

    } 


    public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri) 
    { 
      @SuppressWarnings("unchecked") 

      ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken); 

      if((positions_list!=null) &&(positions_list.size()!=0)) 
      { 

       CustomMapPrefixTokens cmapt ; 
       String y; 
       Iterator<String> iter3; 
       int y_size = 0; 
       float check_size = 0; 
      // TreeMap<String, Integer> hmapy; 
       float RID_val=0; 
       float y_overlap = 0; 
       float ubound = 0; 
       ArrayList<Float> fl = new ArrayList<Float>(); 

       StringTokenizer st; 

      for(int k = 0; k<positions_list.size(); k++) 
      { 
       cmapt = positions_list.get(k); 

       if(!cmapt.getRID().equals(RID)) 
       { 

       y = hmap.get(cmapt.getRID()); 

       // iter3 = y.keySet().iterator(); 

       String yRID = cmapt.getRID(); 

       st = new StringTokenizer(y,","); 

       y_size = Integer.parseInt(st.nextToken()); 

       check_size = (float)0.8*(size); 

       if(y_size>=check_size) 
       { 

        //hmapy = hmap.get(yRID); 

        String y_data = st.nextToken(); 

        StringTokenizer st1 = new StringTokenizer(y_data,"\\|"); 


        while(st1.hasMoreTokens()) 
        { 
         String token = st1.nextToken(); 
         if(token.equals(ptoken)) 
         { 

          String nxt_token = st1.nextToken(); 
        //  System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token); 
          RID_val = (float)Integer.parseInt(nxt_token); 
          break; 
         } 
        } 

       // RID_val = (float) hmapy.get(ptoken); 
        float alpha1 = (float)(0.8/1.8)*(size+y_size); 

        fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri); 

        ubound = fl.get(0); 
        y_overlap = fl.get(1); 


        positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap); 

        } 

       } 
      } 
     } 



    } 


    public void positionFilter(float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap) 
    { 

    float y_overlap_total = 0; 

      if(null!=hmap_overlap.get(cmapt.getRID())) 
      { 

      y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap(); 

      if((y_overlap_total+ubound)>=alpha1) 
      { 

       CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID()); 

       float y_o_t = y_overlap+y_overlap_total; 

       cmap_tmp.setOverlap(y_o_t); 
       hmap_overlap.put(cmapt.getRID(),cmap_tmp); 

      } 
      else 
      { 
       float n = 0; 
       hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n)); 
      } 

      } 
      else 
      { 
       CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap); 
       hmap_overlap.put(cmapt.getRID(), cmap_tmp); 

      } 

    } 

    public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri) 
    { 

      alpha.put(cmapt.getRID(), alpha1); 
      float min1 = y_size-cmapt.getPosition(); 
      float min2 = size-j; 
      float min = 0; 

      float y_overlap = 0; 

      if(min1<min2) 
      { 
       min = min1; 
      } 
      else 
      { 
       min = min2; 
      } 
      if(j_l==true) 
      { 
       val = prefix_contri;  
      }          
      if(RID_val<val) 
      { 
       y_overlap = RID_val; 
      } 
      else 
      { 
       y_overlap = val; 
      } 

      float ubound = y_overlap+min; 

      ArrayList<Float> fl = new ArrayList<Float>(); 
      fl.add(ubound); 
      fl.add(y_overlap); 

      return fl; 

    } 


    public ArrayList<CustomMapSimilarity> calcSimilarity(String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap) 
    { 

     float jaccard = 0; 

     CustomMapSimilarity cms = new CustomMapSimilarity(null, null); 
     ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>(); 

     Iterator<String> iter = hmap_overlap.keySet().iterator(); 

     while(iter.hasNext()) 
     { 
      String key = (String)iter.next(); 

      CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key); 

      float overlap = (float)val.getOverlap(); 

      if(overlap>0) 
      { 

       String yRID = val.getRID(); 

       String RIDpair = RID+" "+yRID; 

      jaccard = unionIntersection(hmap, RIDpair); 

      if(jaccard>0.8) 
       { 
        cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard)); 
        alsim.add(cms); 
       } 

      } 

     } 

     return alsim; 

    } 


    public float unionIntersection(HashMap<String,String> hmap, String RIDpair) 
    { 


      StringTokenizer st = new StringTokenizer(RIDpair); 

      String xRID = st.nextToken(); 

      String yRID = st.nextToken(); 

      String xdata = hmap.get(xRID); 

      String ydata = hmap.get(yRID); 


      int total_union = 0; 

      int xval = 0; 
      int yval = 0; 
      int part_union = 0; 

      int total_intersect = 0; 

     // System.out.println("xdata:------*************>"+xdata); 

      StringTokenizer xtokenizer = new StringTokenizer(xdata,","); 
      StringTokenizer ytokenizer = new StringTokenizer(ydata,","); 
     // String[] xpart = xdata.split(","); 
     // String[] ypart = ydata.split(","); 

      xtokenizer.nextToken(); 
      ytokenizer.nextToken(); 

      String datax = xtokenizer.nextToken(); 
      String datay = ytokenizer.nextToken(); 


      HashMap<String,Integer> x = new HashMap<String, Integer>(); 
      HashMap<String,Integer> y = new HashMap<String, Integer>(); 


      String [] xparts; 

       xparts = datax.toString().split("\\|"); 


       String [] yparts; 

       yparts = datay.toString().split("\\|"); 


       for(int i = 0; i<xparts.length-1; i++) 
       { 
        int part_size = Integer.parseInt(xparts[i+1]); 
        x.put(xparts[i], part_size); 

        i++; 
       } 

       for(int i = 0; i<yparts.length-1; i++) 
       { 
        int part_size = Integer.parseInt(yparts[i+1]); 
        y.put(xparts[i], part_size); 

        i++; 
       } 


      Set<String> xset = x.keySet(); 
      Set<String> yset = y.keySet(); 

      for(String elm:xset) 
      { 

       yval = 0; 

       xval = (Integer)x.get(elm); 

       part_union = 0; 
       int part_intersect = 0; 
       if(yset.contains(elm)){ 

        yval = (Integer) y.get(elm); 

       if(xval>yval) 
       { 
        part_union = xval; 
        part_intersect = yval; 
       } 
       else 
       { 
        part_union = yval; 
        part_intersect = xval; 
       } 
       total_intersect = total_intersect+part_intersect; 
       } 
       else 
       { 
        part_union = xval; 
       } 

       total_union = total_union+part_union; 


      } 


      for(String elm: yset) 
      { 
       part_union = 0; 

       if(!xset.contains(elm)) 
       { 
        part_union = (Integer) y.get(elm); 
        total_union = total_union+part_union; 
       } 

      } 

      float jaccard = (float)total_intersect/total_union; 

     return jaccard; 

    } 

} 
+0

można pisać kod reduktor? –

+0

Dodałem kod. Czy możesz mnie zasugerować, jeśli chcę coś zmienić w celu zwiększenia wydajności procesora i tak dalej. –

Odpowiedz

10

Przyczyną przekroczenia limitu czasu może być długotrwała kalkulacja w reduktorze bez zgłaszania postępu z powrotem do platformy Hadoop. Problem ten można rozwiązać stosując różne podejścia:

I. Zwiększenie limitu czasu w mapred-site.xml:

<property> 
    <name>mapred.task.timeout</name> 
    <value>1200000</value> 
</property> 

Domyślnie jest 600000 ms = 600 seconds.

II. Raportowanie postępu co x zapisy jak w Reducer example in javadoc:

public void reduce(K key, Iterator<V> values, 
          OutputCollector<K, V> output, 
          Reporter reporter) throws IOException { 
    // report progress 
    if ((noValues%10) == 0) { 
    reporter.progress(); 
    } 

    // ... 
} 

ewentualnie można zwiększyć licznika niestandardowego jak w example:

reporter.incrCounter(NUM_RECORDS, 1); 
+0

Cześć, dziękuję za odpowiedź !. Wkleiłem powyższy kod redukujący. W mojej klasie redukcji główne obliczenia rozpoczynają się po przeczytaniu całej listy wartości_mniejszych. W tym przypadku program jest powiązany z głównym obliczeniem poza listą ujemną_wartość podczas pętli, jak zgłaszać postęp? Poza tym, czy możesz zaproponować procesorowi efektywny sposób wykonania powyższego wklejonego kodu? Początkowo użyłem hashmap, które oferowały większą wydajność procesora, ale zostały usunięte z powodu problemów z pamięcią. –

+0

Zakładam, że najdłuższe i najbardziej obciążające procesor obliczenia są wykonywane w funkcji "similarityCalculation()". Powinieneś zgłosić postęp w tej metodzie. Powinieneś również rozważyć zastąpienie wszystkich tokenów ciągów własnymi odpowiednimi klasami, tak aby parsowanie i tokenizacja była wykonywana tylko raz. To może poprawić twój algorytm. – harpun

+0

Innym podejściem byłoby przepisanie algorytmu, aby niektóre obliczenia nakładek zostały wykonane w mapach. Równoległe obliczenia miejmy nadzieję przyspieszyć algorytm. Jednak to jest coś, czego musisz się dowiedzieć i sprawdzić, czy podejście jest właściwe dla konkretnego algorytmu, który chcesz zaimplementować. – harpun

2

Jest możliwe, że może być spożywany całą przestrzeń sterty Java lub GC dzieje się zbyt często nie dając szansy reduktora do stanu opanowania sprawozdania i czy zatem zabity.

Inną możliwością jest to, że jeden z reduktorów pobiera zbyt przekrzywione dane, to znaczy, że dla konkretnego rid, istnieje wiele rekordów.

próbować zwiększyć swój Java sterty przez ustawienie następujące config: mapred.child.java.opts

do

-Xmx2048m

też spróbować zmniejszyć liczbę równoległych reduktorów ustawiając następujące config na niższą wartość niż obecnie (domyślna wartość to 2):

mapred.tasktracker.reduce.tasks.maximum

+0

Dzięki za odpowiedź. To było bardzo pomocne. Wkleiłem kod redukujący, czy możesz zaproponować bardziej efektywne sposoby jego implementacji? –

Powiązane problemy