2016-05-27 20 views
10

Teraz moja odłączenie od serwera WebLogic JMS wygląda następującoOdłączanie od WebLogic JMS

import java.util.Hashtable; 
import javax.jms.*; 
import javax.naming.*; 
import javax.transaction.*; 
import java.util.Vector; 
import javax.rmi.PortableRemoteObject; 
import clojure.java.api.Clojure; 
import clojure.lang.IFn; 
import org.apache.log4j.Logger; 
import weblogic.jndi.*; 

public class WebLogicListener implements MessageListener, ExceptionListener{ 
    public InitialContext ctx; 
    public TopicConnectionFactory conFactory; 
    public TopicConnection tCon; 
    public TopicSession tSession; 
    public TopicSubscriber tSub; 
    public Boolean development; 
    public Topic topic; 
    /*clojure function objects*/ 
    public IFn publish; 
    public IFn close; 
    public IFn incrementMetric; 
    public IFn logMessage; 
    public IFn resync; 

    public Object channel; 
    public ExceptionListener exception; 
    public String topicName; 
    public String subName; 
    public String username; 
    public String password; 
    public String clientId; 
    public String factoryJNDI; 
    public String topicJNDI; 
    public Vector nms; 
    public Hashtable<Object,Object> env; 
    public boolean running = false; 

    public WebLogicListener (String topicName, String host, String username, String password, String factoryJNDI, 
          String topicJNDI, String clientId, String subName, String ns, String fnName, 
          boolean development, Vector nms){ 
    this.username = username; 
    this.password = password; 
    this.clientId = clientId; 
    this.topicName = topicName; 
    this.subName = subName; 
    this.development = development; 
    this.topicJNDI = topicJNDI; 
    this.factoryJNDI = factoryJNDI; 
    this.nms = nms; 
    /*Clojure interop handlers*/ 
    IFn chan = Clojure.var("clojure.core.async", "chan"); 
    resync = Clojure.var("cenx.baldr.api", "resync!"); 
    publish = Clojure.var(ns, fnName); 
    incrementMetric = Clojure.var(ns, "log-metric"); 
    logMessage = Clojure.var (ns, "log-message"); 
    close = Clojure.var("clojure.core.async","close!"); 
    /*populate envrionment*/ 
    env = new Hashtable<Object,Object>(); 
    env.put(Context.PROVIDER_URL, host); 
    env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); 
    env.put(Context.SECURITY_PRINCIPAL, username); 
    env.put(Context.SECURITY_CREDENTIALS, password); 
    env.put("weblogic.jndi.createIntermediateContexts", "true"); 
    /*open communication channel for clojure daemon*/ 
    channel = chan.invoke(); 
    } 

    private void initListener() throws JMSException, NamingException{ 
    try{ 
     if (!running && !development){ 
     ctx = new InitialContext(env); 
     topic = (Topic) ctx.lookup(topicJNDI); 
     conFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(factoryJNDI), TopicConnectionFactory.class); 
     tCon = (TopicConnection) conFactory.createTopicConnection(); 
     tCon.setExceptionListener(this); 
     tCon.setClientID(clientId); 
     tSession = (TopicSession) tCon.createTopicSession(false, 1); 
     tSub = tSession.createDurableSubscriber(topic, subName); 
     tSub.setMessageListener(this); 
     tCon.start(); 
     running = true; 
     }else{ 
     if (running){ 
      logMessage.invoke("error", String.format("Listener is already running")); 
     } 
     if (development){ 
      logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     } 
    } catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start listener \n %s", e.toString())); 
    } 
    } 

    public void startListener(){ 
    if (!development && env != null){ 
     try { 
     initListener(); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start Listener \n %s", e.toString())); 
     } 
    } else { 
     if (development){ 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     if (env == null){ 
     logMessage.invoke("error", "Environment variable is null"); 
     } 
    } 
    } 

    ///Closes the JMS connection and the channel 
    public void stopListener(){ 
    if (!development){ 
     try{ 
     tSub.close(); 
     tSession.close(); 
     tCon.close(); 
     incrementMetric.invoke("JMS-disconnect-count"); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Error while stopping the listener \n %s", e.toString())); 
     }finally{ 
     running = false; 
     } 
    } else { 
     logMessage.invoke("info", "Listener not started, running in development mode"); 
    } 
    } 

    public Object getChannel(){ 
    return channel; 
    } 

    //re-initializes the channel in case of error 
    public void initializeChannel(){ 
    if (channel == null){ 
     IFn chan = Clojure.var("clojure.core.async", "chan"); 
     channel = chan.invoke(); 
    } else { 
     logMessage.invoke("info", "Channel is already initialized"); 
    } 
    } 
    //accessors for debugging 

    public void closeSubscription(){ 
    try{ 
     tSub.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic subscription"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeSession(){ 
    try{ 
     tSession.unsubscribe(subName); 
     tSession.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic session"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeConnection(){ 
    try{ 
     tCon.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic connection"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeContext(){ 
    try { 
     ctx.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close context"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public Boolean isRunning(){ 
    return running; 
    } 

    public Context getContext(){ 
    return ctx; 
    } 

    public TopicConnectionFactory getFactory(){ 
    return conFactory; 
    } 

    public TopicConnection getTopicConnection(){ 
    return tCon; 
    } 

    public TopicSession getTopicSession(){ 
    return tSession; 
    } 

    public Boolean getDevelopmentMode(){ 
    return development; 
    } 

    public TopicSubscriber getTopicSubscriber(){ 
    return tSub; 
    } 

    public Topic getTopic(){ 
    return topic; 
    } 

    /*Interface methods*/ 

    public void onMessage(Message message){ 
    publish.invoke(channel, message); 
    } 
    /*attempt a resync after an exception connection*/ 
    private void resync(){ 
    resync.invoke(nms); 
    } 

    private void attemptReconnect() throws Exception{ 
    if (!development){ 
     //clean up any portions of the connection that managed to establish 
     stopListener(); 
     //incase of stopListener exceptioning out set running to false 
     running = false; 
     do{ 
     try{ 
      initListener(); 
      if (running){ 
      resync(); 
      } 
     }catch(Exception e){ 
      logMessage.invoke("error", 
          String.format("Unable to establish connection to JMS server \n %s", e.toString())); 
     }finally{ 
      Thread.sleep(30000); 
     } 
     } while (!running); 
    } else { 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
    } 
    } 

    public void onException(JMSException e){ 
    logMessage.invoke("error", 
         String.format("A JMS Exception has occurred, attempting to re-establish topic connection \n %s", e.toString())); 
    try{ 
     incrementMetric.invoke("JMS-disconnect-count"); 
     attemptReconnect(); 
    }catch(Exception g){ 
     logMessage.invoke("error", 
         String.format("Unable to start Listener \n %s", g.toString())); 
    } 
    } 

    /* Test functions */ 
    public void testException() throws JMSException{ 
    onException(new JMSException("testing exception function")); 
    } 

    public void testChannel (String message){ 
    if (development){ 
     publish.invoke(channel, message); 
    } 
    } 
} 

Kiedy tworzę połączenie używam netstat, aby sprawdzić, czy serwer jest podłączony

netstat - an | grep 8001 tcp 0 0 Adres IP: 59.730
adres IP: 8001 USTANOWIONA

Potem zadzwonić do mojego .stopListener oprócz metody .closeContext i wrócić ponownie sprawdzić moje połączenie z netstat i uzyskać taki sam wynik:

netstat -an | grep 8001 tcp 0 0 Adres IP: 59.730
adres IP: 8001 USTANOWIONA

Dlaczego zamknięcie sesji abonenta, a połączenie nie zniszczyć połączenia z serwerem JMS. Dokumentacja, którą znalazłem, nie dała mi żadnego wyjaśnienia, dlaczego nie mogę całkowicie zniszczyć połączenia.

+0

Jaka jest wartość flagi "rozwój"? –

+0

jest ustawiony na false. Gdyby było to prawdą, połączenie nigdy by nie zostało ustalone. Podczas rozłączania widzę komunikaty dziennika z moich ostatnich bloków. – jrahme

+1

Można sprawdzić, czy połączenie nie zostało ustanowione przez inny komponent przed utworzeniem połączenia/sesji jms. Nie zapomnij również zamknąć kontekstu jndi. –

Odpowiedz

0

Nie jestem pewien, czy podchodzisz do tego poprawnie. Widzę, że masz wyjątek nasłuchujący na połączeniu.

Na weblogic, detektor będzie wielokrotnie wywoływany dla każdego zdarzenia błędu, więc nie powinieneś próbowaćReconnect na każdym połączeniu. Zostanie on wywołany raz dla każdego zarejestrowanego użytkownika i raz dla każdego monitorowanego połączenia. Należy rozłączyć się tylko wtedy, gdy wyjątek reprezentuje wartość ServerConnectionLost.

Również w procedurze obsługi błędów wystarczy zamknąć połączenie. Jeśli zrobiłeś connection.close(), to również zamknąłoby sesję i detektory. Nie musisz ich zamykać w odwrotnej kolejności, jak Ty.

I jeszcze jedno. Nie powinieneś mieć kodu "development" lub "debug" lub "test" w swoim kodzie produkcyjnym.

Ta część, która mówi: "if (! Rozwój & & env! = Null) {" ... Nie powinieneś tego robić.

Wróćmy teraz do pytania, dlaczego rzeczywiste połączenie nie jest zamknięte. Widzę robisz

try{ 
    tSub.close(); 
    tSession.close(); 
    tCon.close(); 
    incrementMetric.invoke("JMS-disconnect-count"); 
} catch... 

Jeśli tSub.close() lub tSession.close() były do ​​błędu się, czy połączenie nie dostanie zamknięte. Owiń każdy w niezależną próbę/catch.