2014-12-30 15 views
9

Właśnie zacząłem od Spark Streaming i próbuję zbudować przykładową aplikację, która zlicza słowa ze strumienia Kafka. Chociaż kompiluje się z sbt package, po uruchomieniu otrzymuję NoClassDefFoundError. Ten post wydaje się mieć ten sam problem, ale rozwiązanie jest dla Mavena i nie byłem w stanie odtworzyć go z sbt.Nie znaleziono klasy KafkaUtils w Spark streaming

KafkaApp.scala:

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 

object KafkaApp { 
    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]") 
    val ssc = new StreamingContext(conf, Seconds(1)) 
    val kafkaParams = Map(
     "zookeeper.connect" -> "localhost:2181", 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "group.id" -> "sparkGroup" 
    ) 

    val topics = Map(
     "test" -> 1 
    ) 

    // stream of (topic, ImpressionLog) 
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK) 
    println(s"Number of words: %{messages.count()}") 
    } 
} 

build.sbt:

name := "Simple Project" 

version := "1.1" 

scalaVersion := "2.10.4" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1", 
    "org.apache.spark" %% "spark-streaming" % "1.1.1", 
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1" 
) 

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" 

I złożyć go z:

bin/spark-submit \ 
    --class "KafkaApp" \ 
    --master local[4] \ 
    target/scala-2.10/simple-project_2.10-1.1.jar 

Błąd:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver 
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ 
    at KafkaApp$.main(KafkaApp.scala:28) 
    at KafkaApp.main(KafkaApp.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 

Odpowiedz

14

spark-submit nie umieszcza automatycznie pakietu zawierającego KafkaUtils. Musisz mieć w swoim projekcie JAR. W tym celu musisz utworzyć all-inclusive, używając sbt assembly. Oto przykład build.sbt.

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

Najwyraźniej też trzeba dodać wtyczkę montażową do SBT.

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

+0

Jestem również coraz sam problem, a ja jestem używając Mavena. Potem umieściłem "org.apache.maven.plugins" w moim pom.xml, ale problem jest nierozwiązany. Jakiś inny parametr, który muszę sprawdzić? –

+0

ze zmianą, jeśli uruchomię pakiet STB, dostałem błąd. : error: not found: object AssemblyKeys import AssemblyKeys._ ^ [error] Wpisz błąd w wyrażeniu – johnsam

+0

@johnsam Po prostu pomiń pierwszą linię importu i linię "assemblySettings", działa dla mnie. – pederpansen

6

Spróbuj o tym wszystkie słoiki z zależnościami podczas składania aplikacji:

./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0-SNAPSHOT.jar

2

Po build.sbt pracował dla mnie. Wymaga to również umieszczenia wtyczki sbt-assembly w pliku pod katalogiem projects/.

build.sbt

name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt 

libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-streaming_2.10" % "1.4.1", 
    "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1",   // kafka 
    "org.apache.hbase" % "hbase" % "0.92.1", 
    "org.apache.hadoop" % "hadoop-core" % "1.0.2", 
    "org.apache.spark" % "spark-mllib_2.10" % "1.3.0" 
) 

mergeStrategy in assembly := { 
    case m if m.toLowerCase.endsWith("manifest.mf")   => MergeStrategy.discard 
    case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => MergeStrategy.discard 
    case "log4j.properties"         => MergeStrategy.discard 
    case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines 
    case "reference.conf"         => MergeStrategy.concat 
    case _             => MergeStrategy.first 
} 

projekt/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

0

spotkać ten sam problem, rozwiązałem to przez budować słoik z zależnościami.

dodać poniższy kod do pom.xml

<build> 
    <sourceDirectory>src/main/java</sourceDirectory> 
    <testSourceDirectory>src/test/java</testSourceDirectory> 
    <plugins> 
     <!-- 
        Bind the maven-assembly-plugin to the package phase 
     this will create a jar file without the storm dependencies 
     suitable for deployment to a cluster. 
     --> 
     <plugin> 
     <artifactId>maven-assembly-plugin</artifactId> 
     <configuration> 
      <descriptorRefs> 
      <descriptorRef>jar-with-dependencies</descriptorRef> 
      </descriptorRefs> 
      <archive> 
      <manifest> 
       <mainClass></mainClass> 
      </manifest> 
      </archive> 
     </configuration> 
     <executions> 
      <execution> 
      <id>make-assembly</id> 
      <phase>package</phase> 
      <goals> 
       <goal>single</goal> 
      </goals> 
      </execution> 
     </executions> 
     </plugin> 
    </plugins> 
</build>  

mvn pakiet przedłożenie "przykład-Jar-z-dependencies.jar"

0

Dodany zależność zewnętrznie, projekt -> Ustawienia- -> java Ścieżka kompilacji -> Biblioteki -> dodaj zewnętrzne słoiki i dodaj odpowiedni słoik.

to rozwiązało mój problem.

0

Korzystanie Spark 1.6 wykonać zadanie dla mnie bez konieczności obsługi tylu słoików zewnętrznych ... mogą być dość skomplikowane do zarządzania ...

Powiązane problemy