2014-07-03 16 views
8

Próbujemy użyć Apache Storm do przetwarzania dużej ilości (fałszywych) wiadomości. Przykładem Wiadomość:Apache Storm java.nio.channels.ClosedChannelException: null

"{"clientName":"Sergey Bakulin","sum":12925,"group":"property","suspicious":false,"clientId":2,"dt":1404387303764,"coord":{"lat":55.767842588357645,"lon":37.46920361823332}}". 

Używamy Apache Kafkę jako źródło wiadomości dla naszego klastra burzy. Naszym celem jest umożliwienie przetwarzania co najmniej 50k msg/s/węzeł. W przypadku, gdy używamy więcej niż jeden węzeł stale zatrzymany z błędem (log fragment pochodzi od pracownika - * log.):

2014-07-03 15:14:47 b.s.m.n.Client [INFO] failed to send requests to ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701: java.nio.channels.ClosedChannelException: null 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:381) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:349) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.6.3.Final.jar:na] 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51] 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51] 
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] 
2014-07-03 15:14:47 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701 

Nasza obecna burza config:

########### These MUST be filled in for a storm configuration 
storm.zookeeper.servers: 
    - "172.31.*.*" 

storm.local.dir: "/home/*/storm/data" 
nimbus.host: "127.0.0.1" 
supervisor.slots.ports: 
    - 6701 
    - 6702 

ui.port: 8090 

worker.childopts: "-Xmx6g -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=1%ID% -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun$ 

supervisor.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" 
supervisor.worker.start.timeout.secs: 10 
supervisor.worker.timeout.secs: 10 
supervisor.monitor.frequency.secs: 3 
supervisor.heartbeat.frequency.secs: 5 
supervisor.enable: true 

storm.messaging.netty.server_worker_threads: 2 
storm.messaging.netty.client_worker_threads: 2 
storm.messaging.netty.buffer_size: 5242880 
storm.messaging.netty.max_retries: 25 
storm.messaging.netty.max_wait_ms: 1000 

Nasza topologia burza:

Properties conf = Util.readProperties(ClientTopology.class, "storm.properties"); 

prepareRedisDB(conf); 

TopologyBuilder builder = new TopologyBuilder(); 

builder.setSpout("kafka_trans_spout", getKafkaSpout(conf, conf.getProperty("kafka_trans_topic")), 3); 
builder.setSpout("kafka_socevent_spout", getKafkaSpout(conf, conf.getProperty("kafka_socevent_topic")), 3); 

builder.setBolt("json_to_tuple_trans_bolt", new JSONToTupleBolt(Transaction.class), 6) 
     .shuffleGrouping("kafka_trans_spout"); 
builder.setBolt("json_to_tuple_socevent_bolt", new JSONToTupleBolt(SocialEvent.class), 3) 
     .shuffleGrouping("kafka_socevent_spout"); 

builder.setBolt("alert_bolt", new AlertBolt(conf), 3) 
     .fieldsGrouping("json_to_tuple_trans_bolt", new Fields("cl_id")) 
     .fieldsGrouping("json_to_tuple_socevent_bolt", new Fields("cl_id")); 
builder.setBolt("offer_bolt", new NearestOfferBolt(conf), 3) 
     .shuffleGrouping("json_to_tuple_trans_bolt"); 

run(builder, args, 6); 

private static KafkaSpout getKafkaSpout(Properties conf, String topic) { 
    SpoutConfig spoutConfig = new SpoutConfig(
      new ZkHosts(conf.getProperty("zk_host"), "/brokers"), 
      topic, 
      "/brokers", 
      conf.getProperty("kafka_consumer_group_id")); 
    List<String> zkServers = new ArrayList<String>(); 
    zkServers.add(conf.getProperty("zk_host")); 
    spoutConfig.zkServers = zkServers; 
    spoutConfig.zkPort = Integer.valueOf(conf.getProperty("zk_port")); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
    spoutConfig.forceFromStart = true; 
    spoutConfig.fetchSizeBytes = 5*1024*1024; 
    spoutConfig.bufferSizeBytes = 5*1024*1024; 
    storm.kafka.KafkaSpout kafkaSpout = new storm.kafka.KafkaSpout(spoutConfig); 
    return kafkaSpout; 
} 

używamy maszyn c3.2xlarge AWS, Apache Burza 0.9.2 inkubację, Apache Kafka 2.9.2-0.8.1.1.

+0

możesz sprawdzić, czy rzeczywiście masz usługę odsłuchową 172.31.23.123:6701 try netstat -antp | grep 6701 na tej maszynie – Pixou

+0

Czy otrzymałeś na to rozwiązanie? Teraz otrzymuję ten sam błąd. – gjain

+0

Wyjątek wygląda na port inspektora, który nie jest dostępny ze świata zewnętrznego. Sprawdź ten link: https://gist.github.com/amontalenti/8ff0c31a7b95a6dea3d2 Czy próbowałeś telnetu do tego portu hosta? – Shams

Odpowiedz

1

test Ping i Telnet: upewnić, każda maszyna, która prowadzi burzę ma połączenia do wszystkich innych maszyn z ping (wszyscy pracownicy, chmura i Heca). spróbuj pingować przez IP, nazwę hosta i FQDN, a jeśli to nie zadziała, edytuj pliki hosts (/ etc/hosts), aby to zrobiły.

również, telnet maszyny, aby sprawdzić otwarte porty w storm.yaml (6701, 6702). Zookeeper (2181).

w moim badanego środowiska, ustawienia storm.yaml współpracuje z następującymi ustawieniami Netty:

storm.messaging.netty.buffer_size: 5242880 
storm.messaging.netty.client_worker_threads: 1 
storm.messaging.netty.max_retries: 100 
storm.messaging.netty.max_wait_ms: 1000 
storm.messaging.netty.min_wait_ms: 100 
storm.messaging.netty.server_worker_threads: 1 
storm.messaging.transport: backtype.storm.messaging.netty.Context 
0

Spróbuj dodać obciążenia, a następnie uruchomić topologię, zdarza się kilka razy ze mną, co było tematem nowe i obciążenie było nieobecne.