2009-12-14 17 views
5

Obecnie mam problem z utratą wiadomości. Ten błąd występuje rzadko, ale zdarza się tak często, że może być denerwujący. Oto kontekst wydania:Czy można utracić wiadomości za pomocą MSMQ MessageQueue.Peek z przekroczonym limitem czasu?

  • Włączyłem dziennik wiadomości w goldmine_service_queue, MSMQ na serwerze Windows 2003.
  • Mogę udowodnić, że wiadomość jest wstawiana do goldmine_service_queue, ponieważ wiadomość pojawia się w dzienniku wiadomości. Ta informacja podaje czas informacji o tym, kiedy wiadomość zniknęła.
  • Funkcje rejestrowania używają http://logging.apache.org/log4net/index.html
  • Dzienniki nie wyświetlają błędów.
  • Funkcja pracownika (pokazana poniżej) wykonuje się w wątku usługi systemu Windows. Odpowiada za podgląd wiadomości (elementów pracy) z kolejki i ich przetwarzanie.
  • Z dzienników podejrzewam, że mój problem może dotyczyć zachowania MessageQueue.Peek i przekroczenia limitu czasu.

Czy możliwe jest wystąpienie limitu czasu i komunikatu w tym samym czasie? Czy istnieje lepszy sposób na obsłużenie sprawdzania zatrzymania usługi, aby uniknąć tego błędu?

 private void workerFunction() 
    { 

     logger.Info("Connecting to queue: " + Settings.Default.goldmine_service_queue); 
     MessageQueue q = new MessageQueue(Settings.Default.goldmine_service_queue); 
     q.Formatter = new ActiveXMessageFormatter(); 

     while (serviceStarted) 
     { 

      Message currentMessage = null; 

      try 
      { 
       currentMessage = q.Peek(new TimeSpan(0,0,30)); 
      } 
      catch (System.Messaging.MessageQueueException mqEx) 
      { 
       if (mqEx.ToString().Contains("Timeout for the requested operation has expired")) 
       { 
        logger.Info("Check for service stop request"); 
       } 
       else 
       { 
        logger.Error("Exception while peeking into MSMQ: " + mqEx.ToString()); 
       } 
      } 
      catch (Exception e) 
      { 
       logger.Error("Exception while peeking into MSMQ: " + e.ToString()); 
      } 

      if (currentMessage != null) 
      { 

       logger.Info(currentMessage.Body.ToString()); 
       try 
       { 
        ProcessMessage(currentMessage); 
       } 
       catch (Exception processMessageException) 
       { 
        logger.Error("Error in process message: " + processMessageException.ToString()); 
       } 

       //Remove message from queue. 
       logger.Info("Message removed from queue."); 
       q.Receive(); 
       //logPerformance(ref transCount, ref startTime); 
      } 


     }//end while 

     Thread.CurrentThread.Abort(); 

    } 

Odpowiedz

5

Nie sądzę, wszelkie komunikaty powinny być pominięte w oparciu o szybki przegląd, ale pracują w bardzo dziwny sposób z dużą ilością zakres warunkach wyścigowych.

Po prostu nie otrzymuj wiadomości i prześlij ją na numer ProcessMessage (jeśli ProcessMessage nie powiedzie się, mimo to będziesz czytał). Jeśli potrzebujesz obsługi wielu odbiorników, wykonaj odbiór w transakcji MSMQ, aby komunikat był niedostępny dla innych odbiorców, ale nie został usunięty z kolejki, dopóki transakcja nie zostanie zatwierdzona.

Ponadto, zamiast odpytywania kolejki, może nie odbierać asynchronicznego i pozwolić puli wątków na zakończenie (gdzie należy zadzwonić pod numer EndReceive). Oszczędza to wiązanie wątku i nie ma potrzeby wyłączania specjalnych przypadków (zamknij kolejkę komunikatów, a następnie wywołaj MessageQueue.ClearConnectionCache();).

Przerwanie wątku jest naprawdę złym sposobem wyjścia, wystarczy wrócić z funkcji uruchamiania wątku.

+0

Dzięki za odpowiedź: - w odniesieniu do warunków wyścigu: Nie jestem pewien, czy widzę warunki wyścigu, o których wspomniałeś. Według http://msdn.microsoft.com/en-us/library/t5te2tk0.aspx, MessageQueue.Peek (Timeout) jest wywołaniem blokującym. - Jestem zaniepokojony użyciem wzorca asynchronicznego, ponieważ ProcessMessage wykonuje niektóre operacje COM. Nie wiem, czy te operacje są bezpieczne dla wątków. Jestem w porządku z próbą użycia funkcji odbierania() zamiast wartości szczytowej, z tą różnicą, że nie wiem, jak utworzyć możliwość zatrzymania pętli while z powodu zatrzymania usługi. –

+0

Re: COM w 'ProcessMessage': nie powinno stanowić problemu, jeśli działa tylko jeden wątek' ProcessMessage' (który można osiągnąć w przypadku asynchronizmu, uruchamiając nowy asynchroniczny odbiór po zakończeniu funkcji "ProcessMessage".) W asynchronizmie jeśli zamkniesz kolejkę (i opróżnisz pamięć podręczną --- na podstawie wyszukiwania, które wydaje się być potrzebne, aby naprawdę zamknąć połączenie) i wszystkie oczekujące otrzymane zostaną anulowane.Nie ma pętli do anulowania.Nie ma pętli do anulowania – Richard

1

Zamierzam zmienić tę nazwę na currentMessage = q.Peek(new TimeSpan(0,0,30)); na currentMessage = q.Receive();, aby rozwiązać problem. Używam MSMQ do przekazywania komunikatów w tym samym kontekście, ale używam tylko peek (i oczekuję wyjątku timeout), aby określić, czy kolejka jest pusta. Połączenie z numerem Receive jest blokowane, więc odpowiednio zaplanuj.

Edytuj: także dla wyjątków - możesz sprawdzić, czy wyjątek jest wyjątkiem limitu czasu, porównując kod błędu.

mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout 

gdzie mqe jest wyjątkiem kolejki komunikatów. Możesz polubić to lepiej niż porównanie ciągów.

+0

Używam 'Message rsp = colaStatus.ReceiveByCorrelationId (correlationId, new TimeSpan (0, timeOut, 0)); 'Jestem zdezorientowany, jeśli 5 minut jest bardzo wysoki poziom timeout. IMHO, myślę, że lepiej jest niż 1 minutę. – Kiquenet

3

Wystarczy kilka uwag, aby wyjaśnić, jak działa tutaj MSMQ.

"Mogę udowodnić, że wiadomość jest wstawiana do goldmine_service_queue, ponieważ wiadomość pojawia się w dzienniku wiadomości.

Wiadomość trafia do kolejki dziennika, gdy oryginalna wiadomość zostanie usunięta z goldmine_service_queue. Można więc powiedzieć, że komunikat został pomyślnie dostarczony do kolejki i pomyślnie usunięty z kolejki.

"Zdecydowanie podejrzewam, że mój problem może dotyczyć zachowania MessageQueue.Peek i przekroczenia limitu czasu."

Peek nie robi nic, aby usunąć wiadomość z kolejki. Tylko "q.Receive();" robi to. W twoim kodzie nie ma wyraźnego połączenia między wyświetlanym komunikatem a tym, który jest odbierany. "q.Receive();" po prostu mówi "odbierać wiadomość od góry kolejki". W środowisku wielowątkowym można oczekiwać, że komunikaty będą niekonsekwentnie odczytywane - niektóre mogą być przeglądane i przetwarzane wiele razy. Powinieneś uzyskać identyfikator wiadomości Peeked i używać ReceiveByID, abyś mógł otrzymać tylko przeczytaną wiadomość.

Powiązane problemy