2012-06-24 7 views
5

Otrzymuję NullPointerException podczas uruchamiania zadania MapReduce. Jest on wyrzucany przez metodę SerializationFactory 's getSerializer(). Używam niestandardowej klasy wartości InputSplit, InputFormat,i .Wyjątek NullPointerException z JobSplitWriter/SerializationFao firmy Hadoop podczas wywoływania metody InputClick getClass()

Wiem, że błąd jest generowany jakiś czas po tym, jak podziały są tworzone przez moją klasę InputFormat, ale przed utworzeniem RecordReader. O ile wiem, pojawia się bezpośrednio po "czyszczeniu obszaru przemieszczania".

Sprawdzając źródło Hadoop w miejscach wskazanych przez ślad stosu, wygląda na to, że wystąpił błąd, gdy getSerialization() otrzymuje zerowy wskaźnik getSerialization(). W JobClient za writeNewSplits() wywołania tej metody tak:

Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass()); 

Tak więc zakładam, że gdy getClass() jest nazywany na moich zwyczaj InputSplit obiektów, to zwracanie null wskaźnik, ale to jest po prostu zdumiewające. Jakieś pomysły?

Pełne ślad stosu od błędu następuje:

 
12/06/24 14:26:49 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/tmp/hadoop-s3cur3/mapred/staging/s3cur3/.staging/job_201206240915_0035 
Exception in thread "main" java.lang.NullPointerException 
    at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) 
    at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(JobSplitWriter.java:123) 
    at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:74) 
    at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:968) 
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:979) 
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174) 
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897) 
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850) 
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:500) 
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530) 
    at edu.cs.illinois.cogcomp.hadoopinterface.infrastructure.CuratorJob.start(CuratorJob.java:94) 
    at edu.cs.illinois.cogcomp.hadoopinterface.HadoopInterface.main(HadoopInterface.java:58) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156) 

Dzięki!

EDIT: Mój kod dla niestandardowego InputSplit następująco:

import . . . 

/** 
* A document directory within the input directory. 
* Returned by DirectoryInputFormat.getSplits() 
* and passed to DirectoryInputFormat.createRecordReader(). 
* 
* Represents the data to be processed by an individual Map process. 
*/ 
public class DirectorySplit extends InputSplit { 
    /** 
    * Constructs a DirectorySplit object 
    * @param docDirectoryInHDFS The location (in HDFS) of this 
    *   document's directory, complete with all annotations. 
    * @param fs The filesystem associated with this job 
    */ 
    public DirectorySplit(Path docDirectoryInHDFS, FileSystem fs) 
      throws IOException { 
     this.inputPath = docDirectoryInHDFS; 
     hash = FileSystemHandler.getFileNameFromPath(inputPath); 
     this.fs = fs; 
    } 

    /** 
    * Get the size of the split so that the input splits can be sorted by size. 
    * Here, we calculate the size to be the number of bytes in the original 
    * document (i.e., ignoring all annotations). 
    * 
    * @return The number of characters in the original document 
    */ 
    @Override 
    public long getLength() throws IOException, InterruptedException { 
     Path origTxt = new Path(inputPath, "original.txt"); 
     HadoopInterface.logger.log(msg); 
     return FileSystemHandler.getFileSizeInBytes(origTxt, fs); 
    } 

    /** 
    * Get the list of nodes where the data for this split would be local. 
    * This list includes all nodes that contain any of the required data---it's 
    * up to Hadoop to decide which one to use. 
    * 
    * @return An array of the nodes for whom the split is local 
    * @throws IOException 
    * @throws InterruptedException 
    */ 
    @Override 
    public String[] getLocations() throws IOException, InterruptedException { 
     FileStatus status = fs.getFileStatus(inputPath); 

     BlockLocation[] blockLocs = fs.getFileBlockLocations(status, 0, 
                   status.getLen()); 

     HashSet<String> allBlockHosts = new HashSet<String>(); 
     for(BlockLocation blockLoc : blockLocs) { 
      allBlockHosts.addAll(Arrays.asList(blockLoc.getHosts())); 
     } 

     return (String[])allBlockHosts.toArray(); 
    } 

    /** 
    * @return The hash of the document that this split handles 
    */ 
    public String toString() { 
     return hash; 
    } 

    private Path inputPath; 
    private String hash; 
    private FileSystem fs; 
} 
+0

można pisać kod dla własnych InputSplit - czy rozszerza się Zapisywalny? domyślam się, że nie jest to kod –

+0

. Przyjąłem, że ponieważ rozszerzałem InputSplit, który implementuje Writable, nie musiałbym bezpośrednio implementować Writable. Czy tak nie jest? – s3cur3

Odpowiedz

5

InputSplit nie rozciąga zapisywalny, trzeba jawnie deklarować, że rozłam wejście realizuje Writable

+0

Tak, naprawiłem błąd. Dziękuję bardzo za Twoją pomoc! – s3cur3

+1

Dodam również, że twoja wersja InputSplit musi mieć domyślny konstruktor, którego Hadoop używa do tworzenia instancji klasy. – pedorro

Powiązane problemy