2011-07-27 21 views
6

mam pracę mapreduce: moja klasa Kod Mapa:Hadoop MapReduce: Kierowca łańcuchowym mappers w pracy MapReduce

public static class MapClass extends Mapper<Text, Text, Text, LongWritable> { 

    @Override 
    public void map(Text key, Text value, Context context) 
     throws IOException, InterruptedException { 
    } 
} 

I chcę używać ChainMapper:

1. Job job = new Job(conf, "Job with chained tasks"); 
2. job.setJarByClass(MapReduce.class); 
3. job.setInputFormatClass(TextInputFormat.class); 
4. job.setOutputFormatClass(TextOutputFormat.class); 

5. FileInputFormat.setInputPaths(job, new Path(InputFile)); 
6. FileOutputFormat.setOutputPath(job, new Path(OutputFile)); 

7. JobConf map1 = new JobConf(false); 

8. ChainMapper.addMapper(
     job, 
     MapClass.class, 
     Text.class, 
     Text.class, 
     Text.class, 
     Text.class, 
     true, 
     map1 
     ); 

ale jego raport ma błąd w wierszu 8:

Wiele znaczników w tej linii - Występowanie polecenia 'addMappe r ' - Metoda addMapper (JobConf, Class>, Class, Class, Class, Class, boolean, JobConf) w typie ChainMapper nie ma zastosowania dla argumentów (Job, Klasa, klasa, klasa, klasa, klasa, boolean, konfiguracja) - Debugowanie bieżącego wskaźnika instrukcji - Metoda addMapper (JobConf, Class>, Class, Class, Class, Class, boolean, JobConf) w typie ChainMapper nie ma zastosowania dla argumentów (JobConf, Class, klasa, klasa, klasa, klasa, logiczna, JobConf)

Odpowiedz

0

Musisz użyć Configuration zamiast JobConf. JobConf jest podklasą Configuration, więc powinien istnieć do tego konstruktor.

0

Dla pierwszego argumentu twojego ChainMapper.addMapper() minąłeś obiekt job. Podczas gdy funkcja oczekuje obiektu typu JobConf. Przepisanie do:

 
ChainMapper.addMapper(
      (JobConf)conf, 
      MapClass.class, 
      Text.class, 
      Text.class, 
      Text.class, 
      Text.class, 
      true, 
      map1 
      ); 

powinno rozwiązać problem ..

+0

ma już zadanie, potrzebuje konfiguracji. Casting nie będzie tu właściwym wyborem. Chodzi o mapę1, a nie o conf. –

+1

Twoja klasa mapy musi się przedłużyć: org.apache.hadoop.mapred.Mapper not org.apache.hadoop.mapreduce.Mapper – user864846

7

Po dużo "Kung Fu", byłem w stanie wykorzystać ChainMapper/ChainReducer. Dzięki za ostatni komentarz user864846.

/** 
* Licensed to the Apache Software Foundation (ASF) under one 
* or more contributor license agreements. See the NOTICE file 
* distributed with this work for additional information 
* regarding copyright ownership. The ASF licenses this file 
* to you under the Apache License, Version 2.0 (the 
* "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 

package myPKG; 

/* 
* Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2. 
*/ 

import java.io.IOException; 
import java.util.Iterator; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.mapred.lib.ChainMapper; 
import org.apache.hadoop.mapred.lib.ChainReducer; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class ChainWordCount extends Configured implements Tool { 

    public static class Tokenizer extends MapReduceBase 
    implements Mapper<LongWritable, Text, Text, IntWritable> { 

     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 

     public void map(LongWritable key, Text value, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      String line = value.toString(); 
      System.out.println("Line:"+line); 
      StringTokenizer itr = new StringTokenizer(line); 
      while (itr.hasMoreTokens()) { 
       word.set(itr.nextToken()); 
       output.collect(word, one); 
      } 
     } 
    } 

    public static class UpperCaser extends MapReduceBase 
    implements Mapper<Text, IntWritable, Text, IntWritable> { 

     public void map(Text key, IntWritable value, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      String word = key.toString().toUpperCase(); 
      System.out.println("Upper Case:"+word); 
      output.collect(new Text(word), value);  
     } 
    } 

    public static class Reduce extends MapReduceBase 
    implements Reducer<Text, IntWritable, Text, IntWritable> { 

     public void reduce(Text key, Iterator<IntWritable> values, 
       OutputCollector<Text, IntWritable> output, 
       Reporter reporter) throws IOException { 
      int sum = 0; 
      while (values.hasNext()) { 
       sum += values.next().get(); 
      } 
      System.out.println("Word:"+key.toString()+"\tCount:"+sum); 
      output.collect(key, new IntWritable(sum)); 
     } 
    } 

    static int printUsage() { 
     System.out.println("wordcount <input> <output>"); 
     ToolRunner.printGenericCommandUsage(System.out); 
     return -1; 
    } 

    public int run(String[] args) throws Exception { 
     JobConf conf = new JobConf(getConf(), ChainWordCount.class); 
     conf.setJobName("wordcount"); 

     if (args.length != 2) { 
      System.out.println("ERROR: Wrong number of parameters: " + 
        args.length + " instead of 2."); 
      return printUsage(); 
     } 
     FileInputFormat.setInputPaths(conf, args[0]); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     JobConf mapAConf = new JobConf(false); 
     ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf); 

     JobConf mapBConf = new JobConf(false); 
     ChainMapper.addMapper(conf, UpperCaser.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf); 

     JobConf reduceConf = new JobConf(false); 
     ChainReducer.setReducer(conf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf); 

     JobClient.runJob(conf); 
     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new ChainWordCount(), args); 
     System.exit(res); 
    } 
} 

EDIT w najnowszej wersji (przynajmniej z Hadoop 2,6), flaga true w addMapper nie jest potrzebne. (w rzeczywistości sygnatura zmieniła to tłumienie).

Tak byłoby tylko

JobConf mapAConf = new JobConf(false); 
ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, 
         Text.class, IntWritable.class, mapAConf); 
0

Właściwie klasa odwzorowujący musi wdrożyć interfejs org.apache.hadoop.mapred.Mapper. Miałem ten sam problem, ale to rozwiązało.

Powiązane problemy