2013-07-10 10 views
5

Wykonuję zadanie mapy na małym pliku (3-4 MB), ale wynik na mapie jest stosunkowo duży (150 MB). Po wyświetleniu mapy 100% ukończenie wycieku zajmuje dużo czasu. Proszę zasugerować, jak zmniejszyć ten okres. Poniżej przedstawiono kilka przykładowych logi ..."Rozpoczęcie wydruku mapy" zajmuje bardzo dużo czasu w zadaniu mapy hadoopu

13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output 
13/07/10 17:45:32 INFO mapred.JobClient: map 98% reduce 0% 
13/07/10 17:45:34 INFO mapred.LocalJobRunner: 
13/07/10 17:45:35 INFO mapred.JobClient: map 100% reduce 0% 
13/07/10 17:45:37 INFO mapred.LocalJobRunner: 
13/07/10 17:45:40 INFO mapred.LocalJobRunner: 
13/07/10 17:45:43 INFO mapred.LocalJobRunner: 
13/07/10 17:45:46 INFO mapred.LocalJobRunner: 
13/07/10 17:45:49 INFO mapred.LocalJobRunner: 
13/07/10 17:45:52 INFO mapred.LocalJobRunner: 
13/07/10 17:45:55 INFO mapred.LocalJobRunner: 
13/07/10 17:45:58 INFO mapred.LocalJobRunner: 
13/07/10 17:46:01 INFO mapred.LocalJobRunner: 
13/07/10 17:46:04 INFO mapred.LocalJobRunner: 
13/07/10 17:46:07 INFO mapred.LocalJobRunner: 
13/07/10 17:46:10 INFO mapred.LocalJobRunner: 
13/07/10 17:46:13 INFO mapred.LocalJobRunner: 
13/07/10 17:46:16 INFO mapred.LocalJobRunner: 
13/07/10 17:46:19 INFO mapred.LocalJobRunner: 
13/07/10 17:46:22 INFO mapred.LocalJobRunner: 
13/07/10 17:46:25 INFO mapred.LocalJobRunner: 
13/07/10 17:46:28 INFO mapred.LocalJobRunner: 
13/07/10 17:46:31 INFO mapred.LocalJobRunner: 
13/07/10 17:46:34 INFO mapred.LocalJobRunner: 
13/07/10 17:46:37 INFO mapred.LocalJobRunner: 
13/07/10 17:46:40 INFO mapred.LocalJobRunner: 
13/07/10 17:46:43 INFO mapred.LocalJobRunner: 
13/07/10 17:46:46 INFO mapred.LocalJobRunner: 
13/07/10 17:46:49 INFO mapred.LocalJobRunner: 
13/07/10 17:46:52 INFO mapred.LocalJobRunner: 
13/07/10 17:46:55 INFO mapred.LocalJobRunner: 
13/07/10 17:46:58 INFO mapred.LocalJobRunner: 
13/07/10 17:47:01 INFO mapred.LocalJobRunner: 
13/07/10 17:47:04 INFO mapred.LocalJobRunner: 
13/07/10 17:47:07 INFO mapred.LocalJobRunner: 
13/07/10 17:47:10 INFO mapred.LocalJobRunner: 
13/07/10 17:47:13 INFO mapred.LocalJobRunner: 
13/07/10 17:47:16 INFO mapred.LocalJobRunner: 
13/07/10 17:47:19 INFO mapred.LocalJobRunner: 
13/07/10 17:47:22 INFO mapred.LocalJobRunner: 
13/07/10 17:47:25 INFO mapred.LocalJobRunner: 
13/07/10 17:47:28 INFO mapred.LocalJobRunner: 
13/07/10 17:47:31 INFO mapred.LocalJobRunner: 
13/07/10 17:47:34 INFO mapred.LocalJobRunner: 
13/07/10 17:47:37 INFO mapred.LocalJobRunner: 
13/07/10 17:47:40 INFO mapred.LocalJobRunner: 
13/07/10 17:47:43 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0 
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting 
13/07/10 17:47:45 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done. 
............................... 
............................... 
............................... 
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22 
13/07/10 17:47:52 INFO mapred.JobClient: File Output Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Written=13401245 
13/07/10 17:47:52 INFO mapred.JobClient: FileSystemCounters 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_READ=18871098 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_READ=7346566 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=35878426 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_WRITTEN=18621307 
13/07/10 17:47:52 INFO mapred.JobClient: File Input Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Read=2558288 
13/07/10 17:47:52 INFO mapred.JobClient: Map-Reduce Framework 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input groups=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output materialized bytes=13320006 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map input records=71040 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce shuffle bytes=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Spilled Records=1480000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output bytes=119998400 
13/07/10 17:47:52 INFO mapred.JobClient:  CPU time spent (ms)=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Total committed heap usage (bytes)=1178009600 
13/07/10 17:47:52 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine input records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  SPLIT_RAW_BYTES=122 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input records=740000 

Mapa kod źródłowy Zadanie:

public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{ 

    private DoubleWritable distGexpr = new DoubleWritable(); 
    private LongWritable m2keyOut = new LongWritable(); 
    int trMax,tstMax; 

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { 

     Configuration conf =context.getConfiguration(); 
     tstMax = conf.getInt("mtst", 10); 
     trMax = conf.getInt("mtr", 10); 

    } 

    public void map(Text key, Text values, Context context) throws IOException, InterruptedException { 
     String line = values.toString(); 

     double Tij=0.0,TRij=0.0, dist=0; 
     int i=0,j; 
     long m2key=0; 
     String[] SLl = new String[]{}; 

     Configuration conf =context.getConfiguration(); 

     m2key = Long.parseLong(key.toString()); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     j=0; 
     while (tokenizer.hasMoreTokens()) { 

      String test = tokenizer.nextToken(); 
      if(j==0){ 
       Tij = Double.parseDouble(test); 
      } 
      else if(j==1){ 
       TRij = Double.parseDouble(test); 
      } 
      else if(j==2){ 
       SLl = StringUtils.split(conf.get(test),","); 
      } 
      j++; 
     } 
     //Map input ends 

     //Distance Measure function 
     dist = (long)Math.pow((Tij - TRij), 2); 

     //remove gid from key 
     m2key = m2key/100000; 
     //Map2 <key,value> emit starts 
     for(i=0; i<SLl.length;i++){ 
       long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key; 
      m2keyOut.set(m2keyNew); 
      distGexpr.set(dist); 
      context.write(m2keyOut,distGexpr); 
     } 
     //<key,value> emit done 
    } 

} 

Przykładowa mapa Wejście: Ostatnią zmienną w każdym wierszu uzyskać tablicę całkowitą od zmiennych transmisji. Każda linia będzie produkować około 100-200 rekordów wyjściowych.

10100014 1356.3238 1181.63 gs-4-56 
10100026 3263.1167 3192.4131 gs-3-21 
10100043 1852.0 1926.3962 gs-4-76 
10100062 1175.5925 983.47125 gs-3-19 
10100066 606.59125 976.26625 gs-8-23 

Przykładowa mapa wyjściowa:

10101 8633.0 
10102 1822.0 
10103 13832.0 
10104 2726470.0 
10105 1172991.0 
10107 239367.0 
10109 5410384.0 
10111 7698352.0 
10112 6.417 
+1

Czy możesz zamieścić swój kod mapera (lub przynajmniej opis działania funkcjonalnego mapera), przykładowy rekord wejściowy i rekord wyjściowy? Czy masz metodę oczyszczania? –

+0

Dziękuję za odpowiedź. Dodałem kod źródłowy do tego zadania mapy i przykładowego wejścia i wyjścia. Nie użyłem żadnej metody czyszczenia. W rzeczywistości było dużo wycieków wcześniej. Zmieniłem więc io.sort.record.percent i kilka innych ustawień. Następnie wycieki są minimalizowane, ale ogólny czas wykonania pozostaje taki sam. –

Odpowiedz

0

Przypuszczam, że rozwiązali że (2 lata po zaksięgowaniu oryginalną wiadomość), ale po prostu dla każdego, kto wchodzi do tego samego problemu, postaram dostarczanie pewnych sugestii.

Sądząc po twoich licznikach, rozumiem, że używasz już kompresji (ponieważ liczba zmaterializowanych bajtów wyjściowych mapy różni się od liczby bajtów wyjściowych mapy), co jest dobrą rzeczą. Można dodatkowo skompresować dane wyjściowe programu odwzorowującego, używając klasy VLongWritable zakodowanej o zmiennej długości, jako typu klucza wyjściowego mapy. (Kiedyś nie miałem do czynienia z klasą VDoubleWritable, jeśli się nie mylę, ale do tej pory musiałem ją wycofać).

W pętli for, w której emitowane jest wyjście, nie ma potrzeby ustawiania zmiennej distGexpr za każdym razem. Jest zawsze taki sam, więc ustaw go tuż przed pętlą for. Możesz również przechowywać długą z produktem trMax*tstMax poza pętlą i nie obliczać go w każdej iteracji.

Jeśli to możliwe, wprowadź klucz wejściowy LongWritable (z poprzedniego zadania), aby móc zapisać inwokacje Long.parseLong() i Text.toString().

Jeśli to możliwe (w zależności od reduktora), użyj kombinacji, aby zmniejszyć rozmiar rozlanych bajtów.

Nie mogłem znaleźć sposobu na pominięcie tego połączenia Integer.parseInt() w pętli for, ale zaoszczędziłoby to trochę czasu, gdyby początkowo można było załadować SLl jako int[].

Powiązane problemy