2015-05-19 14 views
5

Próbuję przeprowadzić migrację z konfiguracji Spring Amqp XML do opartej na adnotacjach Java, ponieważ jest "prostsza". Nie jestem pewien, co robię źle, konfiguracja XML działa dobrze, ale java @Configurable generuje wyjątek "Przyczyna: java.net.SocketException: Connection reset".Próba przeniesienia z <rabbit:> konfiguracji przestrzeni nazw xml do java @Configurable nie może replikować

XML config (działa idealnie):

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
     xmlns:context="http://www.springframework.org/schema/context" 
     xmlns:util="http://www.springframework.org/schema/util" 
     xsi:schemaLocation="http://www.springframework.org/schema/rabbit 
      http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
      http://www.springframework.org/schema/context 
      http://www.springframework.org/schema/context/spring-context.xsd 
      http://www.springframework.org/schema/util 
      http://www.springframework.org/schema/util/spring-util.xsd 
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <!-- define which properties files will be used --> 
    <context:property-placeholder location="classpath:*.properties" /> 

    <rabbit:connection-factory id="connectionFactory" 
           addresses='${rabbitmq.hostname}' 
           username='${rabbitmq.username}' 
           password='${rabbitmq.password}' 
           virtual-host='${rabbitmq.virtual_host}' 
           cache-mode='${rabbitmq.cache_mode}'        
           channel-cache-size='${rabbitmq.channel_cache_size}'/> 

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
     <property name="corePoolSize" value="3"/> 
     <property name="maxPoolSize" value="5"/> 
     <property name="queueCapacity" value="15"/>       
    </bean>        


    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> 
    <rabbit:admin connection-factory="connectionFactory"/> 
     <rabbit:queue name="${rabbitmq.queue_name}" /> 
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}"> 
    <rabbit:bindings> 
     <rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/> 
    </rabbit:bindings> 
</rabbit:topic-exchange> 

    <bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/> 


    <rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor"> 
      <rabbit:listener ref="listener" queues="notification.main" /> 

    </rabbit:listener-container> 
</beans> 

Java config: błąd config

@Configurable 
@PropertySource("classpath:rabbitmq.properties") 
public class RabbitMQConfig { 

@Value("${rabbitmq.hostname}") 
private String hostname; 

@Value("${rabbitmq.port}") 
private String port; 

@Value("${rabbitmq.username}") 
private String username; 

@Value("${rabbitmq.password}") 
private String password; 

@Value("${rabbitmq.virtual_host}") 
private String virtualHost; 

//@Value("${rabbitmq.cache_mode}") 
//private String cacheMode; 

@Value("${rabbitmq.channel_cache_size}") 
private String channelCacheSize; 

@Value("${rabbitmq.topic_exchange_name}") 
private String topicExchangeName; 

@Value("${rabbitmq.topic_exchange_pattern}") 
private String topicExchangePattern; 

@Value("${rabbitmq.queue_name}") 
private String queueName; 

@Autowired 
private ConnectionFactory cachingConnectionFactory; 

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

    connectionFactory.setUsername(username); 
    connectionFactory.setPassword(password); 

    //connectionFactory.setCacheMode(CacheMode.valueOf(cacheMode)); 
    connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

    return connectionFactory; 
} 

@Bean(name="taskExecutor") 
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { 
    ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor(); 
    tpte.setCorePoolSize(3); 
    tpte.setMaxPoolSize(5); 
    tpte.setQueueCapacity(15); 
    return tpte; 
} 

@Bean 
public AmqpTemplate AmqpTemplate() { 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 

    return template; 
} 


@Bean 
public AmqpAdmin amqpAdmin() { 
    RabbitAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory); 

    return amqpAdmin; 
} 

@Bean 
public Queue queue() { 
    return new Queue(queueName); 
} 

@Bean 
public TopicExchange topicExchange() { 
    TopicExchange topicExchange = new TopicExchange(topicExchangeName); 
    return topicExchange; 
} 

@Bean 
public Binding dataBinding(TopicExchange topicExchange, Queue queue) { 
    return BindingBuilder.bind(queue).to(topicExchange).with(topicExchangePattern); 
} 

@Bean 
public DefaultMessageListener defaultMessageListener() { 
    return new DefaultMessageListener(); 
} 

@Bean 
public SimpleMessageListenerContainer container(DefaultMessageListener defaultMessageListener) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(cachingConnectionFactory); 
    container.setQueueNames(queueName); 
    container.setAutoStartup(true); 
    container.setMessageListener(defaultMessageListener); 
    //container.setTaskExecutor(taskExecutor); 
    return container; 
} 

@Bean 
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { 
    return new PropertySourcesPlaceholderConfigurer(); 
} 

Java:

INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - No global properties bean 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Starting Rabbit listener container. 
ERROR: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s). 
org.springframework.amqp.AmqpIOException: java.io.IOException 
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:217) 
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004) 
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) 
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:603) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:637) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208) 
    ... 12 more 
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348) 
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 
    ... 16 more 
Caused by: java.net.SocketException: Connection reset 
    at java.net.SocketInputStream.read(SocketInputStream.java:209) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) 
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) 
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) 
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) 
    ... 1 more 

Podszedłem do kodu źródłowego Springa, a winowajcą jest metoda RabbitAdmin # getQueueProperties. W konfiguracji XML wykonuje poprawnie ... ale zaraz po uruchomieniu z konfiguracją java zgłasza wyjątek powyżej? Co robię to jest inne? Obie konfiguracje wyglądają tak samo dla mnie.

package org.springframework.amqp.rabbit.core; 

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean { 
//... 
    @Override 
    public Properties getQueueProperties(final String queueName) { 
     Assert.hasText(queueName, "'queueName' cannot be null or empty"); 
     return this.rabbitTemplate.execute(new ChannelCallback<Properties>() { 
      @Override 
      public Properties doInRabbit(Channel channel) throws Exception { 
       try { 
        DeclareOk declareOk = channel.queueDeclarePassive(queueName); 
        Properties props = new Properties(); 
        props.put(QUEUE_NAME, declareOk.getQueue()); 
        props.put(QUEUE_MESSAGE_COUNT, declareOk.getMessageCount()); 
        props.put(QUEUE_CONSUMER_COUNT, declareOk.getConsumerCount()); 
        return props; 
       } 
       catch (Exception e) { 
        if (logger.isDebugEnabled()) { 
         logger.debug("Queue '" + queueName + "' does not exist"); 
        } 
        return null; 
       } 
      } 
     }); 
    } 
} 

Obie konfiguracje używają tego samego pliku rabbitmq.properties w ścieżce klas. Sprawdziłem nawet atrybuty klas RabbitAdmin i RabbitTemplate w środowisku wykonawczym dla obu konfiguracji i wyglądają dokładnie tak samo ...

Odpowiedz

1

Nie używam hosta wirtualnego "/" root. Miałem własną wartość dla virtual_host. Mimo, że wstrzyknąłem tę właściwość przez spel do mojej konfiguracji java, nie ustawiłem jej jawnie w connectionFactory.

connectionFactory.setVirtualHost(virtualHost); 

Podziękowania dla @Gary'ego Russella za pomoc w rozwiązaniu problemu.

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 

     CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

     connectionFactory.setUsername(username); 
     connectionFactory.setPassword(password); 

     connectionFactory.setVirtualHost(virtualHost); 
     connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

     return connectionFactory; 
    } 
4

Powinieneś używać @Configuration, a nie @Configurable.

EDIT:

Wygląda na to RabbitMQ-serwer jest zamknięcie połączenia:

Caused by: java.net.SocketException: Connection reset 

Look w dzienniku serwera; jeśli to nie pomoże; publikuj pełny dziennik DEBUG dla org.springframework gdzieś (prawdopodobnie za duży jak na tutaj).

EDIT2:

Masz problem z uwierzytelnianiem ...

{handshake_error,opening,0, 
      {amqp_error,access_refused, 
         "access to vhost '/' refused for user 'gggdw'", 
         'connection.open'}} 

... sprawdź swoją nazwę użytkownika i pasword (i vhosta).

+0

słuszna uwaga, ale wciąż otrzymuję ten sam błąd – Selwyn

+0

Russel http://pastebin.com/p0KzJv3g Pełny stos nie jest bardzo odkrywcze (do mnie) Wszedłem do kodu wiosna AMQP i że RabbitAdmin # getQueueProperties jest metoda który zgłasza wyjątek w konfiguracji java, ale wykonuje się poprawnie w konfiguracji XML – Selwyn

+1

Niektóre problemy z poświadczeniami. –

Powiązane problemy