2013-08-27 15 views
7

Pracuję przez QuickStart kafka:Kafka Quickstart: Jakie zależności są mi potrzebne?

http://kafka.apache.org/07/quickstart.html

i podstawowy przykład grupy konsumentów:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

mam zakodowaną się konsumenta i ConsumerThreadPool jak powyżej:

import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerIterator; 

public class Consumer implements Runnable { 

    private KafkaStream m_stream; 
    private Integer m_threadNumber; 

    public Consumer(KafkaStream a_stream, Integer a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()) { 
      System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); 

     } 
     System.out.println("Shutting down Thread: " + m_threadNumber); 
    } 
} 

Kilka innych aspektów: Jestem usi ng spring zarządzać moim zookeeperem:

import javax.inject.Named; 
import java.util.Properties; 
import kafka.consumer.ConsumerConfig; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.ComponentScan; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
@ComponentScan("com.truecar.inventory.worker.core") 
public class AppConfig { 

    @Bean 
    @Named("consumerConfig") 
    private static ConsumerConfig createConsumerConfig() { 
     String zookeeperAddress = "127.0.0.1:2181"; 
     String groupId = "inventory"; 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeperAddress); 
     props.put("group.id", groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     return new ConsumerConfig(props); 
    } 
} 

I kompiluję się z Mavenem i wtyczką OneJar Maven. Jednak mogę skompilować, a następnie uruchomić wynikowy jeden słoik pojawia się następujący błąd:

Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters 
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning 
Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:792) 
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803) 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
at java.lang.Class.getDeclaredMethods0(Native Method) 
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521) 
at java.lang.Class.getDeclaredMethods(Class.java:1845) 
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180) 
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222) 
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165) 
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223) 
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630) 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461) 
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73) 
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31) 
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20) 
... 6 more 
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
... 27 more 

Teraz wiem trochę o Kafki, a nic o Scala. Jak to naprawić? Co powinienem spróbować dalej? Czy to znany problem? Czy potrzebuję innych zależności? Oto wersja Kafka w moim pom.xml:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.9.2</artifactId> 
    <version>0.8.0-beta1</version> 
</dependency> 

Aktualizacja: Skontaktowałem się z listy mailingowej Kafka dev, i daj mi znać pewne specyficzne wymagania wersji dla zależności Scala. Istnieje jednak również nieudokumentowana zależność log4j, która powoduje inny czas wykonania, a nie czas kompilacji, wyjątek.

Exception in thread "main" java.lang.reflect.InvocationTargetException 
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V 
at org.apache.log4j.Category.log(Category.java:333) 
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177) 

Kolejna aktualizacja:

znalazłem właściwą zależność log4j:

<dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 

Ale teraz mam spotkała się z jeszcze bardziej tajemnicze wyjątkiem wykonywania:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

w tym Punkt mam uczucie WTF. Więc dodałem kolejną zależność:

<dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 

ale to narażone kolejny wyjątek środowiska wykonawczego:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge 
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146) 
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

mam nadzieję być w stanie uzyskać ten przykład dziecka uruchomiony, ale może to jest cena płacić za używanie produktów beta? Może powinienem przełączyć się na Apache Active MQ. Ale to brzmi mniej zabawnie. Czy czegoś brakuje?

Odpowiedz

9

Problem polega na tym, że kafka beta was built in a way that pom generated with a jar isn't valid and maven could not recognize it and parse properly, pobierając w ten sposób zależności przechodnie. Udało nam się złagodzić ten problem, pobierając wszystkie zależności z tej pom (scala, zk, itp.) W naszej definicji pom. Czekamy na kolejne wersje beta kafka, w których problem zostanie naprawiony.

Pełna lista zależności znajduje się poniżej. Zauważ, że musisz zmienić zależność wersji scala odpowiednio do przyrostka twojego artefaktu kafka.

<dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.15</version> 
      <exclusions> 
       <exclusion> 
        <groupId>com.sun.jmx</groupId> 
        <artifactId>jmxri</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>com.sun.jdmk</groupId> 
        <artifactId>jmxtools</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>javax.jms</groupId> 
        <artifactId>jms</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>net.sf.jopt-simple</groupId> 
      <artifactId>jopt-simple</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.6.4</version> 
     </dependency> 
     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-compiler</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.101tec</groupId> 
      <artifactId>zkclient</artifactId> 
      <version>0.3</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-core</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-annotation</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.easymock</groupId> 
      <artifactId>easymock</artifactId> 
      <version>3.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.scalatest</groupId> 
      <artifactId>scalatest</artifactId> 
      <version>1.2</version> 
      <scope>test</scope> 
     </dependency> 

chodzi o

Maybe I should switch to Apache Active MQ. But that sounds less fun. Am I missing something?

Cóż, nie można zapomnieć, że jest to beta uwolnienie? Niektóre złe rzeczy zdarzają się naprawdę, ale obecnie używamy kafka 0.7 bez wszelkich starań.

+0

Awesome, dziękuję za odpowiedź. Chciałbym wypróbować 0,7, ale na Maven dostępne są tylko 0,8 słoiki. Co byś polecił w zakresie dostępu programowego? –

+0

@DavidWilliams używamy [kafka zbudowany przez twitter] (http://search.maven.org/#artifactdetails%7Ccom.twitter%7Ckafka_2.9.2%7C0.7.0%7Cjar) dla 0.7. Co rozumiesz przez * programowy dostęp *? –

+0

Ah, powinien być bardziej szczegółowy, w Javie. Jedyne artefakty Mavena dla Kafki to 0.8 –

3

Znalazłem to konfiguracja zależności być funkcjonalne:

<dependencies> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-core</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-context</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.8.0-beta1</version> 
    </dependency> 
    <dependency> 
     <groupId>javax.inject</groupId> 
     <artifactId>javax.inject</artifactId> 
     <version>1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.9.2</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.yammer.metrics</groupId> 
     <artifactId>metrics-core</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
</dependencies> 
Powiązane problemy