2013-01-13 11 views
5

Szukałem od kilku dni, próbując znaleźć sposób, używając zredukowanych danych do dalszego mapowania w hadoopie. Mam obiekty klasy A jako dane wejściowe i obiekty klasy B jako dane wyjściowe. Problem polega na tym, że podczas mapowania generowane są również tylko nowe .Podział zmniejszonych danych na dane wyjściowe i nowe dane wejściowe w Hadoop

Oto, co chciałbym osiągnąć:

1.1 input: a list of As 
1.2 map result: for each A a list of new As and a list of Bs is generated 
1.3 reduce: filtered Bs are saved as output, filtered As are added to the map jobs 

2.1 input: a list of As produced by the first map/reduce 
2.2 map result: for each A a list of new As and a list of Bs is generated 
2.3 ... 

3.1 ... 

Należy uzyskać podstawowe pojęcia.

Czytałem dużo o łańcuchach, ale nie jestem pewien, jak połączyć ChainReducer i ChainMapper, a nawet, czy byłoby to właściwe podejście.

Oto moje pytanie: Jak podzielić zmapowane dane podczas zmniejszania, aby zapisać jedną część jako wynik, a drugą część jako nowe dane wejściowe.

Odpowiedz

2

Spróbuj użyć MultipleOutputs. Jak to sugeruje Javadoc:

W MultipleOutputs klasa upraszcza zapisywania danych wyjściowych do wielokrotności wyjścia

jednym przypadku: pisanie dodatkowych wyjść innych niż domyślny praca wyjściu. Każde dodatkowe wyjście lub nazwane wyjście można skonfigurować z własnym OutputFormat, z własną klasą klucza i własną klasą wartości .

Przypadek drugi: do zapisu danych do różnych plików dostarczonych przez użytkownika

Wykorzystanie wzoru składania praca:

Job job = new Job(); 

FileInputFormat.setInputPath(job, inDir); 
FileOutputFormat.setOutputPath(job, outDir); 

job.setMapperClass(MOMap.class); 
job.setReducerClass(MOReduce.class); 
... 

// Defines additional single text based output 'text' for the job 
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, 
LongWritable.class, Text.class); 

// Defines additional sequence-file based output 'sequence' for the job 
MultipleOutputs.addNamedOutput(job, "seq", 
    SequenceFileOutputFormat.class, 
    LongWritable.class, Text.class); 
... 

job.waitForCompletion(true); 
... 

Wykorzystanie w Reduktor

String generateFileName(K k, V v) { 
    return k.toString() + "_" + v.toString(); 
} 

public class MOReduce extends 
    Reducer<WritableComparable, Writable,WritableComparable, Writable> { 
private MultipleOutputs mos; 
public void setup(Context context) { 
... 
mos = new MultipleOutputs(context); 
} 

public void reduce(WritableComparable key, Iterator<Writable> values, 
Context context) 
throws IOException { 
... 
mos.write("text", , key, new Text("Hello")); 
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); 
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); 
mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); 
... 
} 

public void cleanup(Context) throws IOException { 
mos.close(); 
... 
} 

} 
+0

proszę zauważ, że te przykłady kodu dotyczą Hadoop 0. *, ale nie 1.0 .4. Ponieważ pracuję z wersją 1.0.4, interfejsy uległy niewielkiej zmianie. Ale podstawową ideą było to, czego szukałem. Dziękuję Ci! – Mennny

+0

Tak, to prawda. to było za 0,20 – Amar

Powiązane problemy