2014-09-24 10 views

Odpowiedz

1

Wydaje się, że nie ma api serwer Kafka stworzyć wątek, więc trzeba użyć temat automatycznego tworzenia lub narzędzia wiersza poleceń:

bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test 
1

Wygląda na to, można użyć następujących do upewnić się, że temat już istnieje (zakładam, że używasz następujące kafka python wdrożenia):

client = KafkaClient(...) 
producer = KafkaProducer(...) 
client.ensure_topic_exists('my_new_topic') 
producer.send_messages('my_new_topic', ...) 
+3

To nie będzie działać . 'ensure_topic_exists' działa tylko z włączonym automatycznym tworzeniem tematu. https://github.com/mumrah/kafka-python/blob/cd81cf0ec8c1b7e7651374c5d1cbd105d003d352/kafka/client.py#L305-L306 – zackdever

0

jest już zbyt późno. Nie wiem o poleceniu do jawnego tworzenia tematów, ale poniższe tworzy i dodaje komunikaty.

I stworzył producent pyton Kafka:

prod = KafkaProducer(bootstrap_servers='localhost:9092') 
for i in xrange(1000): 
    prod.send('xyz', str(i)) 

Na liście tematów Kafka xyz nie było tam wcześniej. kiedy wykonałem powyższą metodę, klient Python-kafka utworzył go i dodał do niego wiadomości.

+1

Właściwie broker utworzył temat i tylko dlatego, że auto.topic.create.enable zostało ustawione na "true" . Wszystkie utworzone w ten sposób tematy będą miały domyślną konfigurację, ale mogą nie być dobre dla Twojego przypadku użycia. –

0

AdminClient API potrzebne do zrobienia programowe tworzenie i konfigurację temat został właśnie dodany w Kafka 0,11 (początkowo dla Java)

Zobacz https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations

Oczekuje się, że nie-Java biblioteki klienta doda tę funkcję jako z czasem. Sprawdź u autora klienta Kafka Python używanego (jest ich kilka), aby zobaczyć, czy i kiedy KIP-4 Administrator obsługa protokołu będzie w API

Zobacz https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

0
from kafka import KafkaProducer 

producer = KafkaProducer(bootstrap_servers=['localhost:9092']) 
topic = 'topic-name' 

producer.send(topic, final_list[0]).get(timeout=10) 
Powiązane problemy