2014-11-18 15 views
5

W poszukiwaniu sposobu tworzenia tematu Kafka poprzez API, znalazłem ten przykład w Scala:Jak utworzyć Kafka ZKStringSerializer w Javie?

import kafka.admin.AdminUtils 
import kafka.utils.ZKStringSerializer 
import org.I0Itec.zkclient.ZkClient 

// Create a ZooKeeper client 
val sessionTimeoutMs = 10000 
val connectionTimeoutMs = 10000 
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, 
          connectionTimeoutMs, ZKStringSerializer) 

// Create a topic with 8 partitions and a replication factor of 3 
val topicName = "myTopic" 
val numPartitions = 8 
val replicationFactor = 3 
val topicConfig = new Properties 
AdminUtils.createTopic(zkClient, topicName, 
         numPartitions, replicationFactor, topicConfig) 

Źródło: https://stackoverflow.com/a/23360100/871012

Ostatni arg ZKStringSerializer widocznie obiektem Scala. Nie jest dla mnie jasne, jak sprawić, aby ten przykład działał w Javie.

Ten post How to create a scala object in clojure zadaje to samo pytanie w Clojure i odpowiedź brzmiała:

ZKStringSerializer$/MODULE$ 

które w Javie będzie (chyba) przekładają się na:

ZKStringSerializer$.MODULE$ 

Ale gdy próbuję to (lub dowolna liczba innych odmian), których żaden z nich nie kompiluje.
Błąd kompilacji:

KafkaTopicCreator.java:[16,18] cannot find symbol 
symbol: variable ZKStringSerializer$ 
location: class org.sample.KafkaTopicCreator 

Używam kafka_2.9.2-0.8.1.1 i Java 8.

Odpowiedz

17

Java spróbuj wykonać następujące czynności,

Pierwszy import poniżej rachunku

import kafka.utils.ZKStringSerializer$; 

Utwórz obiekt dla ZkClient w następujący sposób,

String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181" 
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$); 
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties()); 

Powyższy kod nie zadziała dla kafka> 0,9 od API zostało zmienione, Użyj poniższego kodu do Kafka> 0,9

import java.util.Properties; 
import kafka.admin.AdminUtils; 
import kafka.utils.ZKStringSerializer$; 
import kafka.utils.ZkUtils; 
import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.ZkConnection; 

public class KafkaTopicCreationInJava 
{ 
    public static void main(String[] args) throws Exception { 
     ZkClient zkClient = null; 
     ZkUtils zkUtils = null; 
     try { 
      String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; 
      int sessionTimeOutInMs = 15 * 1000; // 15 secs 
      int connectionTimeOutInMs = 10 * 1000; // 10 secs 

      zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

      String topicName = "testTopic"; 
      int noOfPartitions = 2; 
      int noOfReplication = 3; 
      Properties topicConfiguration = new Properties(); 

      AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); 

     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } finally { 
      if (zkClient != null) { 
       zkClient.close(); 
      } 
     } 
    } 
} 
Powiązane problemy