2013-07-26 10 views
9

Mam ogromne nie. małych plików, chcę użyć CombineFileInputFormat, aby scalić pliki tak, że dane każdego pliku przychodzą jako pojedynczy rekord w mojej pracy MR. Śledzę http://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html i starał się przekształcić go w nowym APICzytanie pliku jako pojedynczego rekordu w hadoopie

Jestem stoi 2 problemy:

a) jestem po prostu sprawdzając je za pomocą 2 małych plików, nadal 2 mappers są wypalane. Spodziewałem się 1

b) Każda linia nadchodzi jako pojedynczy rekord, chcę cały plik jako pojedynczy rekord.

To może być bolesne, ale proszę zajrzeć do kodu poniżej. Nadal jestem naiwny w Hadoop

Kierowca klasa

public class MRDriver extends Configured implements Tool { 


@Override 
public int run(String[] args) throws Exception { 
    FileSystem fs = new Path(".").getFileSystem(getConf()); 
    fs.printStatistics(); 
    Job job = new Job(getConf()); 
    job.setJobName("Enron MR"); 
    job.setMapperClass(EnronMailReadMapper.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    job.setNumReduceTasks(0); 
    job.setJarByClass(EnronMailReadMapper.class); 
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0])); 
    job.setOutputFormatClass(TextOutputFormat.class); 
    TextOutputFormat.setOutputPath(job, new Path(args[1])); 
    return job.waitForCompletion(true) ? 0 :1; 
} 

public static void main(String[] args) throws Exception { 
    int exitCode = ToolRunner.run(new MRDriver(), args); 
    System.exit(exitCode); 
} 

}

Poniższa klasa jest głównie skopiować pasty LineRecordReader z modyfikacją zainicjować) & nextKeyValue funkcja (()

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> { 
    private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class); 

    private long start; 
    private long pos; 
    private long end; 
    private LineReader in; 
    private int maxLineLength; 
    private LongWritable key = null; 
    private Text value = null; 

    public void initialize(InputSplit genericSplit, 
         TaskAttemptContext context) throws IOException { 
    FileSplit split = (FileSplit) genericSplit; 
    Configuration job = context.getConfiguration(); 
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 
            Integer.MAX_VALUE); 
    start = split.getStart(); 
    end = start + split.getLength(); 
    final Path file = split.getPath(); 

    // open the file and seek to the start of the split 
    FileSystem fs = file.getFileSystem(job); 
    FSDataInputStream fileIn = fs.open(split.getPath()); 

     fileIn.seek(start); 
     in = new LineReader(fileIn, job); 
    // If this is not the first split, we always throw away first record 
    // because we always (except the last split) read one extra line in 
    // next() method. 
    if (start != 0) { 
     start += in.readLine(new Text(), 0, maxBytesToConsume(start)); 
    } 
    this.pos = start; 
    } 

    private int maxBytesToConsume(long pos) { 
    return (int) Math.min(Integer.MAX_VALUE, end - pos); 
    } 

    private long getFilePosition() throws IOException { 
    long retVal= pos; 
    return retVal; 
    } 

    public boolean nextKeyValue() throws IOException { 
    if (key == null) { 
     key = new LongWritable(); 
    } 
    key.set(pos); 
    if (value == null) { 
     value = new Text(); 
    } 
    int newSize = 0; 
    StringBuffer totalValue = new StringBuffer(); 
    // We always read one extra line, which lies outside the upper 
    // split limit i.e. (end - 1) 
    while (getFilePosition() <= end) { 
     newSize = in.readLine(value, maxLineLength, 
      Math.max(maxBytesToConsume(pos), maxLineLength)); 
     if (newSize == 0) { 
     break; 
     } 
     totalValue.append(value.toString()+"\n"); 
     pos += newSize; 
     if (newSize < maxLineLength) { 
     break; 
     } 

     // line too long. try again 
     LOG.info("Skipped line of size " + newSize + " at pos " + 
       (pos - newSize)); 
    } 
    if (newSize == 0) { 
     key = null; 
     value = null; 
     return false; 
    } else { 
     value = new Text(totalValue.toString()); 
     return true; 
    } 
    } 

    @Override 
    public LongWritable getCurrentKey() { 
    return key; 
    } 

    @Override 
    public Text getCurrentValue() { 
    return value; 
    } 

    /** 
    * Get the progress within the split 
    */ 
    public float getProgress() throws IOException { 
    if (start == end) { 
     return 0.0f; 
    } else { 
     return Math.min(1.0f, 
     (getFilePosition() - start)/(float)(end - start)); 
    } 
    } 

    public synchronized void close() throws IOException { 
    try { 
     if (in != null) { 
     in.close(); 
     } 
    } finally { 

    } 
    } 

}

Ot Jej pliki

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{ 

@Override 
public RecordReader<LongWritable, Text> createRecordReader(
     InputSplit split, TaskAttemptContext context) throws IOException { 
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class); 
} 

}

I

public class MultiFileRecordReader extends RecordReader < LongWritable, Text > { 

private CombineFileSplit split; 
private TaskAttemptContext context; 
private int index; 
private RecordReader< LongWritable, Text > rr; 

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) { 
    this.split = split; 
    this.context = context; 
    this.index = index; 
    this.rr = new SingleFileRecordReader(); 
} 
@Override 
public void initialize(InputSplit split, TaskAttemptContext context) 
     throws IOException, InterruptedException { 
    this.split = (CombineFileSplit) split; 
     this.context = context; 

     if (null == rr) { 
     rr = new SingleFileRecordReader(); 
     } 

     FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
              this.split.getOffset(index), 
              this.split.getLength(index), 
              this.split.getLocations()); 
     this.rr.initialize(fileSplit, this.context); 

} 

@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.nextKeyValue(); 
} 

@Override 
public LongWritable getCurrentKey() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getCurrentKey(); 
} 

@Override 
public Text getCurrentValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getCurrentValue(); 
} 

@Override 
public float getProgress() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getProgress(); 
} 

@Override 
public void close() throws IOException { 
    if (rr != null) { 
      rr.close(); 
      rr = null; 
    }  
} 

}

+0

trzeba ustawić maksymalną wielkość podziału, do których pliki są łączone. Możesz to przeczytać w dokumentacji 'CombineFileInputFormat'. –

Odpowiedz

10

Spójrz na ten wejściowych format.This jest format wejściowy do czytania wielu plików w jednym zadaniu mapy. Dokładnie jeden (nierozpuszczony) plik zostanie odczytany przy każdym zapisie przekazanym do programu odwzorowującego. WholeFileRecordReader zajmuje się wysyłaniem jednej zawartości pliku jako jedną wartością. Zwrócony klucz to NullWritable, a wartość to zawartość każdego pliku jako całości. Teraz możesz go użyć i uruchomić zadanie mapreduce i sprawdzić, ilu programistów faktycznie działa i sprawdzić, czy dane wyjściowe są poprawne, czy nie.

Zapisy są zbudowane z WholeFileRecordReaders.

public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{ 

     @Override 
     protected boolean isSplitable(JobContext context, Path file) { 
      return false; 
     } 

/** 
    * Creates a CombineFileRecordReader to read each file assigned to this InputSplit. 
    * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore 
    * is expected to specify multiple files. 
    * 
    * @param split The InputSplit to read. Throws an IllegalArgumentException if this is 
    *  not a CombineFileSplit. 
    * @param context The context for this task. 
    * @return a CombineFileRecordReader to process each file in split. 
    *   It will read each file with a WholeFileRecordReader. 
    * @throws IOException if there is an error. 
    */ 

    @Override 
    public RecordReader<NullWritable, Text> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException { 

     if (!(split instanceof CombineFileSplit)) { 
       throw new IllegalArgumentException("split must be a CombineFileSplit"); 
      } 
      return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class); 
    } 

    } 

Tutaj powyżej można WholeFileRecordReader służy który przedstawia się następująco: -

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { 
    private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); 

     /** The path to the file to read. */ 
     private final Path mFileToRead; 
     /** The length of this file. */ 
     private final long mFileLength; 

     /** The Configuration. */ 
     private final Configuration mConf; 

     /** Whether this FileSplit has been processed. */ 
     private boolean mProcessed; 
     /** Single Text to store the file name of the current file. */ 
    // private final Text mFileName; 
     /** Single Text to store the value of this file (the value) when it is read. */ 
     private final Text mFileText; 

     /** 
     * Implementation detail: This constructor is built to be called via 
     * reflection from within CombineFileRecordReader. 
     * 
     * @param fileSplit The CombineFileSplit that this will read from. 
     * @param context The context for this task. 
     * @param pathToProcess The path index from the CombineFileSplit to process in this record. 
     */ 
     public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, 
      Integer pathToProcess) { 
     mProcessed = false; 
     mFileToRead = fileSplit.getPath(pathToProcess); 
     mFileLength = fileSplit.getLength(pathToProcess); 
     mConf = context.getConfiguration(); 

     assert 0 == fileSplit.getOffset(pathToProcess); 
     if (LOG.isDebugEnabled()) { 
      LOG.debug("FileToRead is: " + mFileToRead.toString()); 
      LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); 

      try { 
      FileSystem fs = FileSystem.get(mConf); 
      assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; 
      } catch (IOException ioe) { 
      // oh well, I was just testing. 
      } 
     } 

    // mFileName = new Text(); 
     mFileText = new Text(); 
     } 

     /** {@inheritDoc} */ 
     @Override 
     public void close() throws IOException { 
     mFileText.clear(); 
     } 

     /** 
     * Returns the absolute path to the current file. 
     * 
     * @return The absolute path to the current file. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public NullWritable getCurrentKey() throws IOException, InterruptedException { 
     return NullWritable.get(); 
     } 

     /** 
     * <p>Returns the current value. If the file has been read with a call to NextKeyValue(), 
     * this returns the contents of the file as a BytesWritable. Otherwise, it returns an 
     * empty BytesWritable.</p> 
     * 
     * <p>Throws an IllegalStateException if initialize() is not called first.</p> 
     * 
     * @return A BytesWritable containing the contents of the file to read. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
     return mFileText; 
     } 

     /** 
     * Returns whether the file has been processed or not. Since only one record 
     * will be generated for a file, progress will be 0.0 if it has not been processed, 
     * and 1.0 if it has. 
     * 
     * @return 0.0 if the file has not been processed. 1.0 if it has. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public float getProgress() throws IOException, InterruptedException { 
     return (mProcessed) ? (float) 1.0 : (float) 0.0; 
     } 

     /** 
     * All of the internal state is already set on instantiation. This is a no-op. 
     * 
     * @param split The InputSplit to read. Unused. 
     * @param context The context for this task. Unused. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // no-op. 
     } 

     /** 
     * <p>If the file has not already been read, this reads it into memory, so that a call 
     * to getCurrentValue() will return the entire contents of this file as Text, 
     * and getCurrentKey() will return the qualified path to this file as Text. Then, returns 
     * true. If it has already been read, then returns false without updating any internal state.</p> 
     * 
     * @return Whether the file was read or not. 
     * @throws IOException if there is an error reading the file. 
     * @throws InterruptedException if there is an error. 
     */ 
     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (!mProcessed) { 
      if (mFileLength > (long) Integer.MAX_VALUE) { 
      throw new IOException("File is longer than Integer.MAX_VALUE."); 
      } 
      byte[] contents = new byte[(int) mFileLength]; 

      FileSystem fs = mFileToRead.getFileSystem(mConf); 
      FSDataInputStream in = null; 
      try { 
      // Set the contents of this file. 
      in = fs.open(mFileToRead); 
      IOUtils.readFully(in, contents, 0, contents.length); 
      mFileText.set(contents, 0, contents.length); 

      } finally { 
      IOUtils.closeStream(in); 
      } 
      mProcessed = true; 
      return true; 
     } 
     return false; 
     } 

} 

Poniżej kodzie kierowca: -

public int run(String[] arg) throws Exception { 
    Configuration conf=getConf(); 
    FileSystem fs = FileSystem.get(conf); 
    //estimate reducers 
    Job job = new Job(conf); 
    job.setJarByClass(WholeFileDriver.class); 
    job.setJobName("WholeFile"); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    job.setInputFormatClass(WholeFileInputFormat.class); 
    job.setMapperClass(WholeFileMapper.class); 
    job.setNumReduceTasks(0); 

    FileInputFormat.addInputPath(job, new Path(arg[0])); 
    Path output=new Path(arg[1]); 
    try { 
     fs.delete(output, true); 
    } catch (IOException e) { 
     LOG.warn("Failed to delete temporary path", e); 
    } 
    FileOutputFormat.setOutputPath(job, output); 

    boolean ret=job.waitForCompletion(true); 
    if(!ret){ 
     throw new Exception("Job Failed"); 
    } 
+0

dziękuję panu. binarny ... Nie dodałem job.setInputFormatClass (WholeFileInputFormat.class). Zastanawiam się, że nawet kiedy dodałem RawCombineFileInputFormat.addInputPath (zadanie, new Path (args [0])); Dlaczego nie wybrano formatu? Wszelkie ans? –