2013-01-11 13 views
5

Czy ktoś może wskazać, gdzie mogę znaleźć implementację dla CombineFileInputFormat (org., Używając Hadoop 0.20.205 ?, aby utworzyć duże podziały z bardzo małych plików dziennika (tekst w liniach) za pomocą EMRImplementacja dla CombineFileInputFormat Hadoop 0.20.205

Zaskakujące jest to, że Hadoop nie ma domyślnej implementacji dla tej klasy stworzonej specjalnie do tego celu i googlowanie wygląda na to, że nie jestem jedynym, który jest zdezorientowany przez to. Muszę skompilować klasę i zlepić ją w słoik do streamowania mufami, z ograniczoną znajomością Javy jest to pewne wyzwanie:

Edytuj: Próbowałem już przykładu yetitrails, z ne cessary import, ale dostaję błąd kompilatora dla następnej metody.

+0

Jakie błędy nie masz z yeti szlaki przykład? Wydaje mi się, że jestem w porządku. –

+0

Widziałeś to: http://yaseminavcular.blogspot.com/2011/03/many-small-input-files.html –

+0

@Charles, dziękuję, błąd, który dostaję jest "błąd: MyKeyValueLineRecordReader nie jest abstrakcyjna i nie nie zastąpić abstrakcyjnej metody następnej (obiekt, obiekt) w RecordReader " –

Odpowiedz

14

Oto implementacja mam dla Ciebie:

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.LineRecordReader; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.lib.CombineFileInputFormat; 
import org.apache.hadoop.mapred.lib.CombineFileRecordReader; 
import org.apache.hadoop.mapred.lib.CombineFileSplit; 

@SuppressWarnings("deprecation") 
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> { 

    @SuppressWarnings({ "unchecked", "rawtypes" }) 
    @Override 
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { 

     return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class); 
    } 

    public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> { 
     private final LineRecordReader linerecord; 

     public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { 
      FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations()); 
      linerecord = new LineRecordReader(conf, filesplit); 
     } 

     @Override 
     public void close() throws IOException { 
      linerecord.close(); 

     } 

     @Override 
     public LongWritable createKey() { 
      // TODO Auto-generated method stub 
      return linerecord.createKey(); 
     } 

     @Override 
     public Text createValue() { 
      // TODO Auto-generated method stub 
      return linerecord.createValue(); 
     } 

     @Override 
     public long getPos() throws IOException { 
      // TODO Auto-generated method stub 
      return linerecord.getPos(); 
     } 

     @Override 
     public float getProgress() throws IOException { 
      // TODO Auto-generated method stub 
      return linerecord.getProgress(); 
     } 

     @Override 
     public boolean next(LongWritable key, Text value) throws IOException { 

      // TODO Auto-generated method stub 
      return linerecord.next(key, value); 
     } 

    } 
} 

W swojej pracy najpierw ustawić parametr mapred.max.split.size w zależności od wielkości chcesz, aby pliki wejściowe mają być połączone w. Zrobić coś jak następuje w biegu ():

... 
      if (argument != null) { 
       conf.set("mapred.max.split.size", argument); 
      } else { 
       conf.set("mapred.max.split.size", "134217728"); // 128 MB 
      } 
... 

      conf.setInputFormat(CombinedInputFormat.class); 
... 
+0

Schludny, kompiluje się bez problemów, dzięki Amar! –

+0

Witam @Amar, skąd się bierze indeks? – learninghuman

+0

@ManikandanKannan: nie musisz martwić się o indeks podczas korzystania z tego InputFormat. Aby go użyć, po prostu wykonaj 'conf.setInputFormat (CombinedInputFormat.class);' jak pokazano powyżej. Jednak indeks FYI jest prawdopodobnie liczbą podzieloną, która jest przekazywana podczas wewnętrznego inicjowania czytnika rekordów przez hadoop. – Amar