2014-11-04 34 views
5

im pomocą modułu Kafki węzeł https://github.com/SOHU-Co/kafka-nodeKafka węzeł, konsument dostaje zawsze stare wiadomości

i za każdym razem, kiedy ponownie uruchomić konsumenta dostali wszystkie stare wiadomości, IM za pomocą okrągłego systemu Robin (load balancing)

masz pojęcie, jak mogę zadeklarować serwerowi, że spożyłem wiadomość, a on nie wysyła mi tego ponownie, gdy ponownie uruchomię klienta?

jakiś błąd w moim kodzie lub serwerze konfiguracyjnym?

jakiś pomysł?

kod producenta

var kafka = require('kafka-node'); 
var HighLevelProducer = kafka.HighLevelProducer; 
var Client = kafka.Client; 
var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181', 'consumer' + process.pid); 
var argv = require('optimist').argv; 
var topic = argv.topic || 'test_12345'; 
var producer = new HighLevelProducer(client); 
var time = process.hrtime(); 

var message, diff,i=0; 
producer.on('ready', function() { 
    setInterval(function(){ 
     var date = new Date(); 
     var dateString = date.getFullYear() + "-" +((date.getMonth()+1)<10 ? '0'+(date.getMonth()+1) : (date.getMonth()+1)) + "-" +(date.getDate()<10 ? '0'+date.getDate() : date.getDate()) + " " +(date.getHours()<10 ? '0'+date.getHours() : date.getHours()) + ":" +(date.getMinutes()<10 ? '0'+date.getMinutes() : date.getMinutes()) + ":" +(date.getSeconds()<10 ? '0'+date.getSeconds() : date.getSeconds()); 
     message = JSON.stringify({'message' : 'hello - '+dateString}); 
     console.log(message); 
     send(message); 
    },1000); 
}); 

function send(message) { 
    producer.send([ 
     {topic: topic, messages: [message] } 
    ], function (err, data) { 
     console.log(data); 
     if (err) console.log(err); 
    }); 
} 

kod pracownika:

var kafka = require('kafka-node'); 
var HighLevelConsumer = kafka.HighLevelConsumer; 
var Offset = kafka.Offset; 
var Client = kafka.Client; 
var argv = require('optimist').argv; 
var topic = argv.topic || 'test_12345'; 
var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181','consumer'+process.pid); 
var payloads = [ { topic: topic }]; 
var options = { 
    groupId: 'kafka-node-group', 
// Auto commit config 
    autoCommit: true, 
    autoCommitMsgCount: 100, 
    autoCommitIntervalMs: 5000, 
// Fetch message config 
    fetchMaxWaitMs: 100, 
    fetchMinBytes: 1, 
    fetchMaxBytes: 1024 * 10, 
    fromOffset: false, 
    fromBeginning: false 
}; 
var consumer = new HighLevelConsumer(client, payloads, options); 
var offset = new Offset(client); 

consumer.on('message', function (message) { 
    console.log(this.id, message); 
}); 
consumer.on('error', function (err) { 
    console.log('error', err); 
}); 
consumer.on('offsetOutOfRange', function (topic) { 
    console.log("------------- offsetOutOfRange ------------"); 
    topic.maxNum = 2; 
    offset.fetch([topic], function (err, offsets) { 
     var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); 
     consumer.setOffset(topic.topic, topic.partition, min); 
    }); 
}); 

zookeeper zoo.cfg (5 serwerów)

The number of milliseconds of each tick 
tickTime=2000 
# The number of ticks that the initial 
# synchronization phase can take 
initLimit=10 
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement 
syncLimit=5 
# the directory where the snapshot is stored. 
# do not use /tmp for storage, /tmp here is just 
# example sakes. 
dataDir=/etc/zookeeper/data 
# the port at which the clients will connect 
clientPort=2181 
# the maximum number of client connections. 
# increase this if you need to handle more clients 
#maxClientCnxns=60 
# 
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.  
# 
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance 
# 
# The number of snapshots to retain in dataDir 
autopurge.snapRetainCount=5 
# Purge task interval in hours 
# Set to "0" to disable auto purge feature 
autopurge.purgeInterval=24 
server.1=xxx.xxx.xxx.xxx:2888:3888 
server.2=xxx.xxx.xxx.xxx:2888:3888 
server.3=xxx.xxx.xxx.xxx:2888:3888 
server.4=xxx.xxx.xxx.xxx:2888:3888 
server.5=xxx.xxx.xxx.xxx:2888:3888 
leaderServes = false 

Kafka server.properties (5 serwerów)

# Licensed to the Apache Software Foundation (ASF) under one or more 
# contributor license agreements. See the NOTICE file distributed with 
# this work for additional information regarding copyright ownership. 
# The ASF licenses this file to You under the Apache License, Version 2.0 
# (the "License"); you may not use this file except in compliance with 
# the License. You may obtain a copy of the License at 
# 
# http://www.apache.org/licenses/LICENSE-2.0 
# 
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License. 
# see kafka.server.KafkaConfig for additional details and defaults 

############################# Server Basics ############################# 

# The id of the broker. This must be set to a unique integer for each broker. 
broker.id=5 

############################# Socket Server Settings ############################# 

# The port the socket server listens on 
port=9092 

# Hostname the broker will bind to. If not set, the server will bind to all interfaces 
#host.name=localhost 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the 
# value for "host.name" if configured. Otherwise, it will use the value returned from 
# java.net.InetAddress.getCanonicalHostName(). 
#advertised.host.name=<hostname routable by clients> 

# The port to publish to ZooKeeper for clients to use. If this is not set, 
# it will publish the same port that the broker binds to. 
#advertised.port=<port accessible by clients> 

# The number of threads handling network requests 
num.network.threads=4 

# The number of threads doing disk I/O 
num.io.threads=8 

# The send buffer (SO_SNDBUF) used by the socket server 
socket.send.buffer.bytes=1048576 

# The receive buffer (SO_RCVBUF) used by the socket server 
socket.receive.buffer.bytes=1048576 

# The maximum size of a request that the socket server will accept (protection against OOM) 
socket.request.max.bytes=104857600 


############################# Log Basics ############################# 

# A comma seperated list of directories under which to store log files 
log.dirs=/etc/kafka/kafka-logs 

# The default number of log partitions per topic. More partitions allow greater 
# parallelism for consumption, but this will also result in more files across 
# the brokers. 
num.partitions=10 

############################# Log Flush Policy ############################# 

# Messages are immediately written to the filesystem but by default we only fsync() to sync 
# the OS cache lazily. The following configurations control the flush of data to disk. 
# There are a few important trade-offs here: 
# 1. Durability: Unflushed data may be lost if you are not using replication. 
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. 
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
# The settings below allow one to configure the flush policy to flush data after a period of time or 
# every N messages (or both). This can be done globally and overridden on a per-topic basis. 

# The number of messages to accept before forcing a flush of data to disk 
#log.flush.interval.messages=10000 

# The maximum amount of time a message can sit in a log before we force a flush 
#log.flush.interval.ms=1000 

############################# Log Retention Policy ############################# 

# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 

# The minimum age of a log file to be eligible for deletion 
log.retention.hours=168 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 
# segments don't drop below log.retention.bytes. 
#log.retention.bytes=1073741824 

# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=536870912 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=60000 

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. 
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. 
log.cleaner.enable=false 

############################# Zookeeper ############################# 

# Zookeeper connection string (see zookeeper docs for details). 
# This is a comma separated host:port pairs, each corresponding to a zk 
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 
# You can also append an optional chroot string to the urls to specify the 
# root directory for all kafka znodes. 
zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 

# Timeout in ms for connecting to zookeeper 
zookeeper.connection.timeout.ms=1000000  

# metrics reporter properties 
#kafka.metrics.polling.interval.secs=5 
#kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter 
#kafka.csv.metrics.dir=/etc/kafka/kafka_metrics 
# Disable csv reporting by default. 
#kafka.csv.metrics.reporter.enabled=false  

replica.lag.max.messages=10000000 

default.replication.factor=5 
controlled.shutdown.enable=true 

serdecznie

+0

Czy testowany innych klientów, aby zobaczyć pogoda to nodejs problem lub config Kafka? – Maziyar

+0

@Maziyar Mam ten sam problem i testowany z klientem java (co oznacza, że ​​nie używa się kafka-węzła) i problem nie występuje –

Odpowiedz

1

Należy starać szczypanie następujących właściwościach -

To ustawienie jest w godzinach więc wiadomości na dany temat są dostępne dla 24 * 7 godzin domyślnie

#Broker Configs 
# The minimum age of a log file to be eligible for deletion 
log.retention.hours=168 

w config Konsumentów ustawić auto.commit.enable na prawdę, umożliwi to konsumentowi zatwierdzenie przesunięcia do zookeepera dla już pobranych wiadomości. Zmień także auto.offset.reset na "największy", aby nie czytał wiadomości z najmniejszego możliwego przesunięcia.

Wypróbuj to i sprawdź, czy nadal masz problem, możesz monitorować aktualizację offsetu dla danego klienta za pomocą linii poleceń zookeepera. Powinieneś spojrzeć na/konsumentów i/brokerów; po nie daje offset -

get /consumers/my_test_group/offsets/my_topic/0 

nadzieję, że to pomaga

+0

dziękuję za odpowiedź, ale myślę, że problem jest w module nodejs ... – fadaytak

+0

@fadaytak nie mógł się z Tobą skontaktować, ale czy rozwiązałeś ten problem? Mam ten sam problem, ilekroć występuje rebalans w klastrze mojego brokera .... –

1

Twój kod działa dobrze dla mnie. Testowałem go za pomocą kafka-node v0.2.20.

Focus on Heca:

  • dzienników wyboru (np Replications błędy),
  • spróbować jednego zookeeeper instancji
  • spróbować określone leaderServes option = prawda,
  • ścieżka check/konsumenci/kafka- node-group/offset/test_12345/0 by zkCli.sh.
2

Miałem ten sam problem. Zauważyłem, że dzieje się tak, gdy użytkownik korzysta z tematu z więcej niż jedną partycją.

Jeśli określisz numer partycji w temacie konsumenta, pobierze on tylko z jednej partycji i nie otrzyma starszych wiadomości.

Spróbuj zmienić:

var payloads = [ { topic: topic }]; 

do

var payloads = [ { topic: topic, partition : 0 }]; 
0

to błąd moduł kliencki, który ustalony w pr #314

+0

Zaktualizowałem do kafka-węzła 0.3.2, który ma pr # 314, ale wciąż mam ten problem. – ProgrammerGuy

4

nie jestem pewny ... Ale wygląda, że ​​problem ponieważ zmieniasz grupę konsumentów za każdym razem, gdy uruchamiasz ją ponownie (z pidem procesu), a każda grupa konsumentów musi od początku otrzymywać komunikaty ...

0

Spróbuj zmienić:

var consumer = new HighLevelConsumer(client, payloads, options); 

do:

var consumer = new Consumer(client, payloads, options); 
Powiązane problemy