2014-07-23 8 views
11

Używam wersji kafka 0.8 i bardzo wiele nowych.Jak uzyskać listę tematów z serwera kafka w Javie

Chcę poznać listę tematów utworzonych w kafka server wraz z jej metadanymi . Czy jest dostępne jakieś API, aby się tego dowiedzieć?

Zasadniczo, muszę napisać konsumenta Java, który powinien automatycznie odkryć każdy temat w kafka server .There jest API do pobierania TopicMetadata, ale to wymaga nazwę tematu na wejściu trzeba parameters.I informacje o wszystkich tematów obecnych w serwerze .

+0

jeśli szukasz aplikacji Java api, niestety obecnie nie ma nikogo poza wymienionym numerem – user2720864

Odpowiedz

3

Dobrym miejscem na rozpoczęcie będzie przykładowy skrypt powłoki dostarczany z Kafką. W katalogu/bin dystrybucji znajduje się kilka skryptów powłoki, z których jeden to ./kafka-topic-list.sh Jeśli uruchomisz to bez określania tematu, zwróci on wszystkie tematy wraz z ich metadanymi. Patrz: https://github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh

Że skrypt z kolei seriach: https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala

Powyższy są zarówno odniesienia do wersji 0.8 Kafka, więc jeśli używasz innej wersji (nawet różnica punktowa), bądź pamiętaj, aby zastosować odpowiednią gałąź/etykietę na github

0

można użyć Heca API, aby uzyskać listę maklerów jak wymienione poniżej:

ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null); 
    List<String> ids = zk.getChildren("/brokers/ids", false); 
    List<Map> brokerList = new ArrayList<>(); 
    ObjectMapper objectMapper = new ObjectMapper(); 

    for (String id : ids) { 
     Map map = objectMapper.readValue(zk.getData("/brokers/ids/" + id, false, null), Map.class); 
     brokerList.add(map); 
    } 

użyć tej listy broker dostać cały wątek korzystając z poniższego linku

https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader

+0

Program OP zapytał o listę tematów nie brokerów – Andrejs

+0

Otrzymujesz listę brokerów, a następnie dostajesz tematy, co jest nie tak z powyższym podejściem. –

+1

Po prostu, że fragment kodu nie zawiera odpowiedzi, a link ma więcej niż to, o co został poproszony. Zamiast linku lepiej umieścić rozwiązanie w swojej odpowiedzi. – Andrejs

17

z Kafki 0.9.0

można notować tematów na serwerze za pomocą dostarczonych metoda listTopics konsumpcyjnych();

np.

Map<String, List<PartitionInfo> > topics; 

Properties props = new Properties(); 
props.put("bootstrap.servers", "1.2.3.4:9092"); 
props.put("group.id", "test-consumer-group"); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
topics = consumer.listTopics(); 
consumer.close(); 
1

Jeśli chcesz ciągnąć brokera lub w inny Kafka informacje z Heca następnie kafka.utils.ZkUtils zapewnia ładny interfejs. Oto kod mam do listy wszystkich brokerów Heca (istnieje mnóstwo innych metod tam):

List<Broker> listBrokers() { 

     final ZkConnection zkConnection = new ZkConnection(connectionString); 
     final int sessionTimeoutMs = 10 * 1000; 
     final int connectionTimeoutMs = 20 * 1000; 
     final ZkClient zkClient = new ZkClient(connectionString, 
               sessionTimeoutMs, 
               connectionTimeoutMs, 
               ZKStringSerializer$.MODULE$); 

     final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); 

     scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); 
} 
+0

OP poprosił o listę tematów nie brokerów – Andrejs

1

myślę, że to jest najlepszy sposób:

ZkClient zkClient = new ZkClient("zkHost:zkPort"); 
List<String> topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient)); 
+0

Po nieco prostszej wersji 0.9.x – fengkb

1

Korzystanie Scala:

import java.util.{Properties} 
import org.apache.kafka.clients.consumer.KafkaConsumer 

object KafkaTest { 
    def main(args: Array[String]): Unit = { 

    val brokers = args(0) 
    val props = new Properties(); 
    props.put("bootstrap.servers", brokers); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    val consumer = new KafkaConsumer[String, String](props); 
    val topics = consumer.listTopics().keySet(); 

    println(topics) 
    } 
} 
Powiązane problemy