2013-07-18 11 views
5

Próbuję przesłać prostą topologię słów do mojego lokalnego klastra burzowego. Najpierw spróbowałem użyć programu maven, a następnie używając klienta linii poleceń Storm. Stworzyłem plik JAR za pomocą eclipse. Ale rzuca główny wyjątek nie znaleziono klasy. Czy ktoś może mi powiedzieć, jaki może być problem? Załączam poniższy kod i wyjątek.java.lang.ClassNotFoundException: TopologyMain

package com.test.newpackage; 

import com.test.newpackage.WordReader; 
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 
import com.test.newpackage.WordCounter; 
import com.test.newpackage.WordNormalizer; 

public class TopologyMain { 
    public static void main(String[] args) throws InterruptedException { 
     // Topology definition 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("word-reader", new WordReader()); 
     builder.setBolt("word-normalizer", new WordNormalizer()) 
       .shuffleGrouping("word-reader"); 
     builder.setBolt("word-counter", new WordCounter(), 2).fieldsGrouping(
       "word-normalizer", new Fields("word")); 
     // Configuration 
     Config conf = new Config(); 
     conf.put("wordsFile", args[0]); 
     conf.setDebug(false); 
     // Topology run 
     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 
     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("Getting-Started-Toplogie", conf, 
       builder.createTopology()); 
     Thread.sleep(1000); 
     cluster.shutdown(); 
    } 
} 

package com.test.newpackage; 

import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.tuple.Tuple; 

public class WordCounter implements IRichBolt { 
    Integer id; 
    String name; 
    Map<String, Integer> counters; 
    private OutputCollector collector; 

    /** 
    * At the end of the spout (when the cluster is shutdown We will show the 
    * word counters 
    */ 
    @Override 
    public void cleanup() { 
     System.out.println("-- Word Counter [" + name + "-" + id + "] --"); 
     for (Map.Entry<String, Integer> entry : counters.entrySet()) { 
      System.out.println(entry.getKey() + ": " + entry.getValue()); 
     } 
    } 

    /** 
    * On each word We will count 
    */ 
    @Override 
    public void execute(Tuple input) { 
     String str = input.getString(0); 
     /** 
     * If the word dosn't exist in the map we will create this, if not We 
     * will add 1 
     */ 
     if (!counters.containsKey(str)) { 
      counters.put(str, 1); 
     } else { 
      Integer c = counters.get(str) + 1; 
      counters.put(str, c); 
     } 
     // Set the tuple as Acknowledge 
     collector.ack(input); 
    } 

    /** 
    * On create 
    */ 
    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
      OutputCollector collector) { 
     this.counters = new HashMap<String, Integer>(); 
     this.collector = collector; 
     this.name = context.getThisComponentId(); 
     this.id = context.getThisTaskId(); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     // TODO Auto-generated method stub 
     return null; 
    } 
} 

package com.test.newpackage; 

import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class WordNormalizer implements IRichBolt { 
    private OutputCollector collector; 

    public void cleanup() { 
    } 

    /** 
    * The bolt will receive the line from the words file and process it to 
    * Normalize this line 
    * 
    * The normalize will be put the words in lower case and split the line to 
    * get all words in this 
    */ 
    public void execute(Tuple input) { 
     String sentence = input.getString(0); 
     String[] words = sentence.split(" "); 
     for (String word : words) { 
      word = word.trim(); 
      if (!word.isEmpty()) { 
       word = word.toLowerCase(); 
       // Emit the word 
       List a = new ArrayList(); 
       a.add(input); 
       collector.emit(a, new Values(word)); 
      } 
     } 
     // Acknowledge the tuple 
     collector.ack(input); 
    } 

    public void prepare(Map stormConf, TopologyContext context, 
      OutputCollector collector) { 
     this.collector = collector; 
    } 

    /** 
    * The bolt will only emit the field "word" 
    */ 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("word")); 
    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     // TODO Auto-generated method stub 
     return null; 
    } 
} 

package com.test.newpackage; 

import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.IRichSpout; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class WordReader implements IRichSpout { 
    private SpoutOutputCollector collector; 
    private FileReader fileReader; 
    private boolean completed = false; 
    private TopologyContext context; 

    public boolean isDistributed() { 
     return false; 
    } 

    public void ack(Object msgId) { 
     System.out.println("OK:" + msgId); 
    } 

    public void close() { 
    } 

    public void fail(Object msgId) { 
     System.out.println("FAIL:" + msgId); 
    } 

    /** 
    * The only thing that the methods will do It is emit each file line 
    */ 
    public void nextTuple() { 
     /** 
     * The nextuple it is called forever, so if we have been readed the file 
     * we will wait and then return 
     */ 
     if (completed) { 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       // Do nothing 
      } 
      return; 
     } 
     String str; 
     // Open the reader 
     BufferedReader reader = new BufferedReader(fileReader); 
     try { 
      // Read all lines 
      while ((str = reader.readLine()) != null) { 
       /** 
       * By each line emmit a new value with the line as a their 
       */ 
       this.collector.emit(new Values(str), str); 
      } 
     } catch (Exception e) { 
      throw new RuntimeException("Error reading tuple", e); 
     } finally { 
      completed = true; 
     } 
    } 

    /** 
    * We will create the file and get the collector object 
    */ 
    public void open(Map conf, TopologyContext context, 
      SpoutOutputCollector collector) { 
     try { 
      this.context = context; 
      this.fileReader = new FileReader(conf.get("wordsFile").toString()); 
     } catch (FileNotFoundException e) { 
      throw new RuntimeException("Error reading file[" 
        + conf.get("wordFile") + "]"); 
     } 
     this.collector = collector; 
    } 

    /** 
    * Declare the output field "word" 
    */ 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("line")); 
    } 

    @Override 
    public void activate() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void deactivate() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     // TODO Auto-generated method stub 
     return null; 
    } 
} 

    mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="resources/ 
    words.txt" 

    storm jar TopologyMain.jar TopologyMain wordcount 

    Exception in thread "main" java.lang.NoClassDefFoundError: TopologyMain 
     Caused by: java.lang.ClassNotFoundException: TopologyMain 
      at java.net.URLClassLoader$1.run(URLClassLoader.java:217) 
      at java.security.AccessController.doPrivileged(Native Method) 
      at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
      at java.lang.ClassLoader.loadClass(ClassLoader.java:321) 
      at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
      at java.lang.ClassLoader.loadClass(ClassLoader.java:266) 
     Could not find the main class: TopologyMain. Program will exit. 

EDIT Wyjątek:

backtype.storm.daemon.nimbus - Activating word-count: word-count-1-1374756437 
1724 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread Thread[main,5,main] died 
java.lang.NullPointerException 
    at clojure.lang.Numbers.ops(Numbers.java:942) 
    at clojure.lang.Numbers.isPos(Numbers.java:94) 
    at clojure.core$take$fn__4112.invoke(core.clj:2500) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core$concat$fn__3804.invoke(core.clj:662) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core$map$fn__4091.invoke(core.clj:2437) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54) 
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13) 
    at clojure.core$reduce.invoke(core.clj:6030) 
    at clojure.core$into.invoke(core.clj:6077) 
    at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245) 
    at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408) 
    at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420) 
    at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315) 
    at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54) 
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13) 
    at clojure.core$reduce.invoke(core.clj:6030) 
    at clojure.core$into.invoke(core.clj:6077) 
    at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625) 
    at clojure.lang.RestFn.invoke(RestFn.java:410) 
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto__$reify__3603.submitTopology(nimbus.clj:898) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:616) 
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) 
    at backtype.storm.testing$submit_local_topology.invoke(testing.clj:227) 
    at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:19) 
    at backtype.storm.LocalCluster.submitTopology(Unknown Source) 
    at storm.starter.WordCountTopology.main(WordCountTopology.java:83) 
11769 [Thread-2] ERROR backtype.storm.daemon.nimbus - Error when processing event 
java.lang.NullPointerException 
    at clojure.lang.Numbers.ops(Numbers.java:942) 
    at clojure.lang.Numbers.isPos(Numbers.java:94) 
    at clojure.core$take$fn__4112.invoke(core.clj:2500) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core$concat$fn__3804.invoke(core.clj:662) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core$map$fn__4091.invoke(core.clj:2437) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54) 
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13) 
    at clojure.core$reduce.invoke(core.clj:6030) 
    at clojure.core$into.invoke(core.clj:6077) 
    at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245) 
    at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408) 
    at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420) 
    at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315) 
    at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626) 
    at clojure.lang.LazySeq.sval(LazySeq.java:42) 
    at clojure.lang.LazySeq.seq(LazySeq.java:60) 
    at clojure.lang.RT.seq(RT.java:473) 
    at clojure.core$seq.invoke(core.clj:133) 
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54) 
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13) 
    at clojure.core$reduce.invoke(core.clj:6030) 
    at clojure.core$into.invoke(core.clj:6077) 
    at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625) 
    at clojure.lang.RestFn.invoke(RestFn.java:410) 
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596$fn__3597.invoke(nimbus.clj:860) 
    at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596.invoke(nimbus.clj:859) 
    at backtype.storm.timer$schedule_recurring$this__1753.invoke(timer.clj:69) 
    at backtype.storm.timer$mk_timer$fn__1736$fn__1737.invoke(timer.clj:33) 
    at backtype.storm.timer$mk_timer$fn__1736.invoke(timer.clj:26) 
    at clojure.lang.AFn.run(AFn.java:24) 
    at java.lang.Thread.run(Thread.java:679) 
11774 [Thread-2] INFO backtype.storm.util - Halting process: ("Error when processing an event") 
+0

W jaki sposób oczekujesz od nas pomocy, jeśli nie podasz więcej szczegółów? Jak stworzyłeś plik jar? Gdzie jest kod? – Doorknob

+0

Mam załączony kod tutaj i stworzyłem JAR przy użyciu zaćmienia. – Naresh

+0

Będziesz musiał być bardziej konkretny niż "za pomocą Eclipse". Co dokładnie zrobiłeś? Czy twoje pliki znajdują się w katalogu 'com/test/newpackage/FileName.class'? – Doorknob

Odpowiedz

4

Naresh, na podstawie tego, co widzę rozwiązanie problemu może leżeć w nazwie klasy użyłeś. Oto argument wiersza polecenia określono:

storm jar TopologyMain.jar TopologyMain wordcount 

odwoływać się do swojej głównej klasie jako „TopologyMain” używając tylko nazwę klasy zamiast pełnej nazwy. Oto moja rewizja twojej próby linii poleceń:

storm jar TopologyMain.jar com.test.newpackage.TopologyMain 

Używam pełnej nazwy pakietu zamiast samej klasy. Zauważ, że usunąłem także odniesienie do "wordcount", ponieważ nie wiem, co on tam robi (w twoim przykładzie kodu nie ma klas, metod ani zmiennych o nazwie "wordcount").

Oto doskonały artykuł na grupach Google, która obejmuje wczesne problemy konfiguracyjne z Burzy: early setup question

znalazłem się przy użyciu tego artykułu przez pierwsze kilka tygodni, kiedy zacząłem używać burzy.

Daj nam znać, jeśli to rozwiąże Twój problem.

+0

Cóż, Thanx Tim dla składni. Ale muszę również podać plik słów, które będą używane jako arg [0]. Jeśli dokładnie zaglądasz do kodu. W przeciwnym razie otrzymam wyjątek java.lang.ArrayIndexOutOfBoundsException. Próbuję przesłać pliki tekstowe przy użyciu polecenia "budzik słoik TopologyMain.jar com.test.newpackage.TopologyMain wordfile.txt". Ale rzuca java.lang.NullPointerException. Czy mógłbyś mi pomóc. Mój plik wordfile.txt znajduje się w folderze nadrzędnym. – Naresh

+0

Naresh, zanim spróbuję skompilować i uruchomić twój kod, czy możesz wkleić więcej informacji o przyczynie wyjątku NullPointerException? Pozwoli mi to zaoszczędzić wiele czasu. –

+0

Dodałem wyjątek, który otrzymuję po przesłaniu słoika. – Naresh

Powiązane problemy