2014-06-25 23 views
8

Utworzenie klastra burzowego do obliczania trendów w czasie rzeczywistym i innych statystyk, jednak mam pewne problemy z wprowadzeniem funkcji "odzyskiwania" do tego projektu, umożliwiając przesunięcie, które został ostatnio przeczytany przez kafka-spout (kod źródłowy dla kafka-spout pochodzi z https://github.com/apache/incubator-storm/tree/master/external/storm-kafka), aby go zapamiętać. Zacznę kafka-spout w ten sposób:Wpisanie danych offsetu do zookeepera w kafkowskiej burzy

BrokerHosts zkHost = new ZkHosts("localhost:2181"); 
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test"); 
kafkaConfig.forceFromStart = false; 
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 
TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism); 

Domyślne ustawienia powinny być w ten sposób, ale myślę, że nie robi to w moim przypadku, za każdym razem zacznę projektu, PartitionManager próbuje szukać pliku z przesunięciami, wtedy nic nie zostanie znalezione:

2014-06-25 11:57:08 INFO PartitionManager:73 - Read partition information from: /storm/partition_1 --> null 
2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found, using configuration to determine offset 

Następnie rozpoczyna czytanie od najnowszego możliwego przesunięcia. Co jest w porządku, jeśli mój projekt nigdy nie zawodzi, ale nie dokładnie to, co chciałem.

Ja też wyglądał nieco bardziej do klasy PartitionManager który wykorzystuje Zkstate klasę napisać przesunięć, z tego fragmentu kodu:

PartitionManeger

public void commit() { 
    long lastCompletedOffset = lastCompletedOffset(); 
    if (_committedTo != lastCompletedOffset) { 
     LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); 
     Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder() 
       .put("topology", ImmutableMap.of("id", _topologyInstanceId, 
         "name", _stormConf.get(Config.TOPOLOGY_NAME))) 
       .put("offset", lastCompletedOffset) 
       .put("partition", _partition.partition) 
       .put("broker", ImmutableMap.of("host", _partition.host.host, 
         "port", _partition.host.port)) 
       .put("topic", _spoutConfig.topic).build(); 
     _state.writeJSON(committedPath(), data); 

     _committedTo = lastCompletedOffset; 
     LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); 
    } else { 
     LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId); 
    } 
} 

ZkState

public void writeBytes(String path, byte[] bytes) { 
    try { 
     if (_curator.checkExists().forPath(path) == null) { 
      _curator.create() 
        .creatingParentsIfNeeded() 
        .withMode(CreateMode.PERSISTENT) 
        .forPath(path, bytes); 
     } else { 
      _curator.setData().forPath(path, bytes); 
     } 
    } catch (Exception e) { 
     throw new RuntimeException(e); 
    } 
} 

mogłem zobacz, że dla pierwszej wiadomości, metoda writeBytes dostaje się do bloku if Aby utworzyć ścieżkę, następnie dla drugiej wiadomości przechodzi do bloku else, który wydaje się być w porządku. Ale kiedy ponownie rozpoczynam projekt, pojawia się ta sama wiadomość, o której wspomniano powyżej. Nie można znaleźć partition information.

+0

Witaj Juto, napotkałem problemy ... czy naprawiłeś ten problem? dziękuję, czekam na ciebie teraz – kaitian521

+0

Witam @kaitian, opuściłem firmę, dla której zrobiłem ten projekt, dlatego nie mam już dostępu do kodu, nigdy nie miałem rozwiązania tego problemu. :( – Juto

+0

Dziękuję i tak – kaitian521

Odpowiedz

9

Miałem ten sam problem. Okazało się, że pracowałem w trybie lokalnym, który używa pamięciowego zookeepera, a nie Zookeepera, którego używa Kafka.

Aby upewnić się, że nie używa KafkaSpout burzy Heca dla ZkState który przechowuje offset, trzeba ustawić SpoutConfig.zkServers, SpoutConfig.zkPort i SpoutConfig.zkRoot oprócz ZkHosts. Na przykład

import org.apache.zookeeper.client.ConnectStringParser; 
import storm.kafka.SpoutConfig; 
import storm.kafka.ZkHosts; 
import storm.kafka.KeyValueSchemeAsMultiScheme; 

... 

    final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr); 
    final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses(); 
    final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size()); 
    final Integer zkPort = serverInetAddresses.get(0).getPort(); 
    for (InetSocketAddress serverInetAddress : serverInetAddresses) { 
     serverAddresses.add(serverInetAddress.getHostName()); 
    } 

    final ZkHosts zkHosts = new ZkHosts(zkConnectStr); 
    zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath; 

    final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup); 
    spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme); 

    spoutConfig.zkServers = serverAddresses; 
    spoutConfig.zkPort = zkPort; 
    spoutConfig.zkRoot = kafkaZnode; 
+0

tylko po to, aby łatwiej było zrozumieć odpowiedź: gdy masz działającą topologię gotowy do wdrożenia na serwerze zdalnym, dodaj 3 ostatnie linie, aby połączyć się ze zdalnym zookeeperem, gdy jest używany w trybie lokalnym – piotrek

Powiązane problemy