Utworzenie klastra burzowego do obliczania trendów w czasie rzeczywistym i innych statystyk, jednak mam pewne problemy z wprowadzeniem funkcji "odzyskiwania" do tego projektu, umożliwiając przesunięcie, które został ostatnio przeczytany przez kafka-spout
(kod źródłowy dla kafka-spout
pochodzi z https://github.com/apache/incubator-storm/tree/master/external/storm-kafka), aby go zapamiętać. Zacznę kafka-spout
w ten sposób:Wpisanie danych offsetu do zookeepera w kafkowskiej burzy
BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);
Domyślne ustawienia powinny być w ten sposób, ale myślę, że nie robi to w moim przypadku, za każdym razem zacznę projektu, PartitionManager
próbuje szukać pliku z przesunięciami, wtedy nic nie zostanie znalezione:
2014-06-25 11:57:08 INFO PartitionManager:73 - Read partition information from: /storm/partition_1 --> null
2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found, using configuration to determine offset
Następnie rozpoczyna czytanie od najnowszego możliwego przesunięcia. Co jest w porządku, jeśli mój projekt nigdy nie zawodzi, ale nie dokładnie to, co chciałem.
Ja też wyglądał nieco bardziej do klasy PartitionManager
który wykorzystuje Zkstate
klasę napisać przesunięć, z tego fragmentu kodu:
PartitionManeger
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
_committedTo = lastCompletedOffset;
LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}
ZkState
public void writeBytes(String path, byte[] bytes) {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, bytes);
} else {
_curator.setData().forPath(path, bytes);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
mogłem zobacz, że dla pierwszej wiadomości, metoda writeBytes
dostaje się do bloku if
Aby utworzyć ścieżkę, następnie dla drugiej wiadomości przechodzi do bloku else
, który wydaje się być w porządku. Ale kiedy ponownie rozpoczynam projekt, pojawia się ta sama wiadomość, o której wspomniano powyżej. Nie można znaleźć partition information
.
Witaj Juto, napotkałem problemy ... czy naprawiłeś ten problem? dziękuję, czekam na ciebie teraz – kaitian521
Witam @kaitian, opuściłem firmę, dla której zrobiłem ten projekt, dlatego nie mam już dostępu do kodu, nigdy nie miałem rozwiązania tego problemu. :( – Juto
Dziękuję i tak – kaitian521