2015-05-05 11 views
7

Mam zadanie, które zapisuje dane wyjściowe avro w wielu katalogach zorganizowanych przez kilka pól rekordów wejściowych.Hadoop wiele wyjść z wykonywaniem spekulacji

 
For example : 
Process records of countries across years 
and write in a directory structure of country/year 
eg: 
outputs/usa/2015/outputs_usa_2015.avro 
outputs/uk/2014/outputs_uk_2014.avro 
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context); 
.... 
.... 
    multipleOutputs.write("output", avroKey, NullWritable.get(), 
      OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear()); 

Co wyjście commiter byłoby poniżej użycie kodu napisać output.Is nie można bezpiecznie stosować z realizacją spekulacyjnego? z realizacją spekulacyjnego powoduje (może powodować) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

W tym poście Hadoop Reducer: How can I output to multiple directories using speculative execution? Jest zalecane, aby użyć committer zwyczaj wyjściowy

Poniższy kod z Hadoop AvroMultipleOutputs nie określa żadnego problemu z realizacją spekulacyjnego

private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, 
      String baseFileName) throws IOException, InterruptedException { 

    writer = 
       ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), 
        taskContext.getConfiguration())).getRecordWriter(taskContext); 
... 
} 

Ani czy dokument metoda zapisu żadnych problemów jeśli baseoutput ścieżka jest poza katalogiem pracy

public void write(String namedOutput, Object key, Object value, String baseOutputPath) 

Czy istnieje problem z AvroMultipleOutputs (inne wyjścia) z wykonywaniem spekulatywnym podczas pisania poza katalogiem zadań? Jeśli więc jak mogę nadpisać AvroMultipleOutputs mieć swoje własne wyjście committer.I nie widzę żadnego outputformat wewnątrz AvroMultipleOutputs którego wyjście committer używa

+0

Czy napisałeś własną implementację? Mam to samo pytanie: – tesnik03

+0

W Kiedy powiesz "Z egzekucją spekulacyjną to powoduje (może powodować) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException", widziałeś to udokumentowane w dowolnym miejscu, lub mówisz z doświadczenia. Widzimy to samo zachowanie, ale nie znaleźliśmy żadnych wyraźnych odniesień do wyłączenia spekulatywnej realizacji przy użyciu wielu wyników. – ioss

+0

Tak, jest to udokumentowane. Ostrzegamy o tym tutaj http://archive.cloudera.com/cdh5/cdh/5/hadoop/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html – bl3e

Odpowiedz

1

AvroMultipleOutputs użyje OutputFormat, które zostały zarejestrowane w konfiguracjach Pracy dodając nazwie wyjście np. przy użyciu addNamedOutput API od AvroMultipleOutputs (np. AvroKeyValueOutputFormat).

Przy użyciu AvroMultipleOutputs może nie być możliwe korzystanie z funkcji wykonywania zadań spekulacyjnych. Nawet przesłonięcie go albo nie pomoże, albo nie będzie proste.

Zamiast tego powinno się pisać własne OutputFormat (najprawdopodobniej rozszerzenie jednego z dostępnych formatów wyjściowych Avro np AvroKeyValueOutputFormat) i nadpisanie/realizować swoją getRecordWriter API, gdzie byłoby powrócić jeden RecordWriter wystąpienie powiedzieć MainRecordWriter (tylko dla odniesienia).

Ta wersja MainRecordWriter zachowałaby mapę z instancjami RecordWriter (np. AvroKeyValueRecordWriter). Każda z tych instancji RecordWriter należałaby do jednego z plików wyjściowych. W interfejsie API write z MainRecordWriter otrzymasz rzeczywistą instancję z mapy (w oparciu o rekord, który zamierzasz napisać), i zapisz rekord za pomocą tego programu zapisującego zapis. Tak więc MainRecordWriter będzie działać tylko jako opakowanie dla wielu instancji RecordWriter.

Dla podobnej implementacji możesz chcieć zapoznać się z kodem klasy MultiStorage z biblioteki .

0

Po dodaniu nazwie wyjście AvroMultipleOutputs, to zadzwoń albo AvroKeyOutputFormat.getRecordWriter() lub AvroKeyValueOutputFormat.getRecordWriter(), który nazywamy AvroOutputFormatBase.getAvroFileOutputStream(), którego treść jest

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException { 
    Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(), 
    getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT)); 
    return path.getFileSystem(context.getConfiguration()).create(path); 
} 

I AvroOutputFormatBase rozciąga FileOutputFormat (The getOutputCommitter() w powyższej metodzie jest w rzeczywistości zadzwoń pod numer FileOutputFormat.getOutputCommitter(). Dlatego też AvroMultipleOutputs powinien mieć te same ograniczenia, co MultipleOutputs.