2016-11-10 11 views
6

Mamy brokera ActiveMQ, który jest podłączony do bardzo różnych klientów przy użyciu JMS, AMQP i MQTT. Z jakiegoś powodu nie odkryliśmy jeszcze, że określony zestaw klientów MQTT często (nie zawsze) subskrybuje trwale. Jest to środowisko testowe, w którym klienci są często dodawani i usuwani, czasem przez ciągnięcie wtyczki lub ponowne uruchamianie wbudowanego urządzenia, aby nie mogli poprawnie zrezygnować z subskrypcji. Efektem (IIUC) jest to, że broker gromadzi "trwałą subskrypcję offline" dla urządzeń, których nigdy już nie zobaczy (widzę to pod http://my_broker:8161/admin/subscribers.jsp), utrzymując wiadomości na te tematy na zawsze, aż w końcu zepsuje się pod własnym śladem pamięci .Jak zmusić mojego brokera ActiveMQ do przerwania trwałych abonentów w trybie offline

Problem polega na tym, że subskrybenci subskrybują na stałe, i musimy się dowiedzieć, dlaczego tak jest. Jednak zdecydowano również, że klienci robiący to (nieświadomie) nie powinni doprowadzić do zatrzymania brokera, więc musimy rozwiązać ten problem w sposób niezależny.

znalazłem there are settings for a timeout for offline durable subscriptions i umieścić te w naszej konfiguracji brokera (ostatnie dwie linie):

<broker 
    xmlns="http://activemq.apache.org/schema/core" 
    brokerName="my_broker" 
    dataDirectory="${activemq.data}" 
    useJmx="true" 
    advisorySupport="false" 
    persistent="false" 
    offlineDurableSubscriberTimeout="1800000" 
    offlineDurableSubscriberTaskSchedule="60000"> 

Jeśli dobrze rozumiem, powyższe należy sprawdzić każdą minutę i odwoływania klientów nie widział na pół godzina. Jednak, w przeciwieństwie do dokumentów, nie wydaje się to działać: Konsument, którego subskrybowałem, a następnie wyciągnąłem wtyczkę kilka dni temu jest nadal widoczny na liście abonentów o trwałym dostępie offline, ślad pamięci brokera stale rośnie, a jeśli ręcznie usuwaj subskrybentów w interfejsie internetowym brokera Widzę, że ślad pamięci spada.

Więc oto moje pytania:

  1. Co decyduje czy abonament MQTT do tematu na brokera ActiveMQ jest trwały?
  2. Co robię źle w ustawianiu limitu czasu dla trwałych subskrypcji w trybie offline w ustawieniach ActiveMQ?
+0

Próbowałeś na odwrót, poprzez publikowanie wiadomości z krótkim okresie TTL (czas życia) i konfigurując krótki ** expireMessagesPeriod **? Zgodnie z dokumentami, przy takiej konfiguracji system musi usuwać wszystkie takie wiadomości po upływie okresu TTL, nieistotny dla długo utraconych stałych abonentów (którzy nie zrezygnowali z subskrypcji).To powinno nam również pomóc w uwolnieniu zasobów pamięci, ponieważ rzeczywiste zużycie pamięci polega na przechowywaniu "wiadomości", a nie na przechowywaniu samych obiektów subskrybentów. – blackpen

Odpowiedz

2

Wyodrębniłem odpowiedni kod (doCleanup()), który usuwa czas wygaśnięcia trwałych subskrypcji.

W przypadku sukcesu, wykonuje:

LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 

W przypadku awarii, wykonuje:

LOG.error("Failed to remove inactive durable subscriber", e); 

Poszukaj powyżej linii dziennika w pliku dziennika i dopasować go ze szczegółami, które obserwowane za pomocą przeglądarka admin/subscribers.jsp. Jeśli nie wydrukuje żadnej z linii, subskrypcje mogą pozostać z jakiegoś powodu active lub możesz natknąć się na błąd.

Czy możesz spróbować usunąć podkreślenie (_) w nazwie brokera, jeśli możesz? Podręcznik mówi o problemach z podkreśleniami w nazwach brokerów.

Kod:

public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 
    super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 
    if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { 
     this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); 
     this.cleanupTask = new TimerTask() { 
     @Override 
     public void run() { 
      doCleanup(); 
     } 
     }; 
     this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); 
    } 
} 

public void doCleanup() { 
    long now = System.currentTimeMillis(); 
    for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { 
     DurableTopicSubscription sub = entry.getValue(); 
     if (!sub.isActive()) { 
     long offline = sub.getOfflineTimestamp(); 
     if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { 
      LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 
      try { 
       RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 
       info.setClientId(entry.getKey().getClientId()); 
       info.setSubscriptionName(entry.getKey().getSubscriptionName()); 
       ConnectionContext context = new ConnectionContext(); 
       context.setBroker(broker); 
       context.setClientId(entry.getKey().getClientId()); 
       removeSubscription(context, info); 
      } catch (Exception e) { 
       LOG.error("Failed to remove inactive durable subscriber", e); 
      } 
     } 
     } 
    } 
} 

// The toString method for DurableTopicSubscription class 
@Override 
public synchronized String toString() { 
    return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 
} 
+0

Dzięki za odpowiedź. Nasza profesjonalna opcja wsparcia pojawiła się z dwoma faktami: _1) _ Naprawdę istnieje błąd w ActiveMQ, w którym nie można zrzucić trwałych subskrybentów offline. _2) _ Aby subskrybować nietrwałe z MQTT, musisz połączyć się z 'cleanSession' ustawionym na' true' iz QoS <1. – sbi

Powiązane problemy