2012-12-10 12 views
7

Chcę utworzyć usługę WCF, która używa powiązania MSMQ, ponieważ mam dużą liczbę powiadomień, które usługa ma przetwarzać. Ważne jest, aby klienci nie byli zatrzymywani przez usługę i aby powiadomienia były przetwarzane w kolejności, w jakiej są wywoływane, a zatem implementacja kolejki.Jak wymusić sekwencję kolejki komunikatów za pomocą wielu wystąpień usług WCF

Inną kwestią jest odporność. Wiem, że mogłem połączyć się z samym MSMQ, aby kolejka była bardziej niezawodna, ale chcę mieć możliwość uruchomienia instancji mojej usługi na różnych serwerach, więc jeśli serwer zawiesza powiadomienia, nie buduje się w kolejce, ale inny serwer wykonuje przetwarzanie .

Eksperymentowałem z powiązaniem MSMQ i odkryłem, że można mieć wiele instancji usługi nasłuchującej w tej samej kolejce, a pozostawione same sobie kończą robiąc coś w rodzaju "round-robin" z rozkładem obciążenia w dostępnych usługach. . To jest świetne, ale w efekcie tracę sekwencję kolejkowania, ponieważ różne instancje potrzebują innego czasu na przetworzenie żądania.

Używam prostej aplikacji konsolowej do eksperymentowania, czyli epickiego zrzutu kodu poniżej. Kiedy jest prowadzony uzyskać wyjście tak:

host1 open 
host2 open 
S1: 01 
S1: 03 
S1: 05 
S2: 02 
S1: 06 
S1: 08 
S1: 09 
S2: 04 
S1: 10 
host1 closed 
S2: 07 
host2 closed 

Co chcę się zdarzyć:

host1 open 
host2 open 
S1: 01 
<pause while S2 completes> 
S2: 02 
S1: 03 
<pause while S2 completes> 
S2: 04 
S1: 05 
S1: 06 
etc. 

bym nie pomyślał, że jak S2 nie został zakończony, to może jeszcze uda i zwraca komunikat to było przetwarzanie do kolejki. Dlatego S1 nie powinno mieć możliwości wyciągnięcia kolejnej wiadomości z kolejki. Moja kolejka nas transakcyjnych i próbowałem ustawić TransactionScopeRequired = true na usługi, ale bez skutku.

Czy to możliwe? Czy podchodzę do tego w niewłaściwy sposób? Czy istnieje inny sposób budowania usługi przełączania awaryjnego bez jakiegoś centralnego mechanizmu synchronizacji?

class WcfMsmqProgram 
{ 
    private const string QueueName = "testq1"; 

    static void Main() 
    { 
     // Create a transactional queue 
     string qPath = ".\\private$\\" + QueueName; 
     if (!MessageQueue.Exists(qPath)) 
      MessageQueue.Create(qPath, true); 
     else 
      new MessageQueue(qPath).Purge(); 

     // S1 processes as fast as it can 
     IService s1 = new ServiceImpl("S1"); 
     // S2 is slow 
     IService s2 = new ServiceImpl("S2", 2000); 

     // MSMQ binding 
     NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None); 

     // Host S1 
     ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private")); 
     ConfigureService(host1, binding); 
     host1.Open(); 
     Console.WriteLine("host1 open"); 

     // Host S2 
     ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private")); 
     ConfigureService(host2, binding); 
     host2.Open(); 
     Console.WriteLine("host2 open"); 

     // Create a client 
     ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName)); 
     IService client = factory.CreateChannel(); 

     // Periodically call the service with a new number 
     int counter = 1; 
     using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500)) 
     { 
      // Enter to stop 
      Console.ReadLine(); 
     } 

     host1.Close(); 
     Console.WriteLine("host1 closed"); 
     host2.Close(); 
     Console.WriteLine("host2 closed"); 

     // Wait for exit 
     Console.ReadLine(); 
    } 

    static void ConfigureService(ServiceHost host, NetMsmqBinding binding) 
    { 
     var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName); 
    } 

    [ServiceContract] 
    interface IService 
    { 
     [OperationContract(IsOneWay = true)] 
     void EchoNumber(int number); 
    } 

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 
    class ServiceImpl : IService 
    { 
     public ServiceImpl(string name, int sleep = 0) 
     { 
      this.name = name; 
      this.sleep = sleep; 
     } 

     private string name; 
     private int sleep; 

     public void EchoNumber(int number) 
     { 
      Thread.Sleep(this.sleep); 
      Console.WriteLine("{0}: {1:00}", this.name, number); 
     } 
    } 
} 
+0

Będziesz mieć twardy czas, aby to zrobić z WCF za MSMQ wiązania (patrz http://stackoverflow.com/questions/729612/ordered-delivery-with-netmsmqbinding). Jednak można to zrobić za pomocą transakcyjnego MSMQ i kodu pojednawczego (bez WCF). –

Odpowiedz

10

batwad,

Próbujesz ręcznie utworzyć magistralę usług. Dlaczego nie spróbujesz użyć istniejącego?

NServiceBus, MassTransit, ServiceStack

przynajmniej 2 z tych prac z MSMQ.

Ponadto, jeśli absolutnie potrzebujesz zamówienia, może to być z innego powodu - chcesz mieć możliwość wysłania wiadomości i nie chcesz, aby wiadomości zależne były przetwarzane przed pierwszą wiadomością. Szukasz wzoru Saga. NServiceBus i MassTransit pozwolą ci łatwo zarządzać Sagami, pozwolą ci po prostu uruchomić początkowy komunikat, a następnie wywołać pozostałe wiadomości w zależności od warunków. Pozwoli Ci to zaimplementować wtłaczanie rozproszonej aplikacji.

Można wtedy nawet skalować do tysięcy klientów, serwerów w kolejce i procesorów komunikatów bez konieczności pisania pojedynczego wiersza kodu i żadnych problemów.

Staraliśmy się wdrożyć naszą własną magistralę serwisową ponad msmq tutaj, zrezygnowaliśmy, ponieważ inny problem ciągle się skradał. Poszliśmy z NServiceBus, ale MassTransit to także doskonały produkt (to w 100% open source, NServiceBus nie jest). ServiceStack jest niesamowity w tworzeniu interfejsów API i korzystania z kolejek komunikatów - jestem pewien, że możesz go użyć do utworzenia usług, które będą działały jako front-endy Queue w kilka minut.

Czy wspomniałem, że w przypadku NSB i MT oba wymagają tylko 10 wierszy kodu do pełnego wdrożenia kolejek, nadawców i obsługi?

----- DODANO -----

UDI Dahan (jeden z głównych contributers z NServiceBus) mówi o tym w: "In-Order Messaging a Myth" by Udi Dahan "Message Ordering: Is it Cost Effective?" with Udi Dahan

Chris Patterson (jeden z głównych contributers masowego tranzytu) pytania "Using Sagas to ensure proper sequential message order" question

StackOverflow/odpowiedzi: "Preserve message order when consuming MSMQ messages in a WCF application"

----- PYTANIE -----

Muszę powiedzieć, że jestem zaskoczony, dlaczego musisz zagwarantować kolejność wiadomości - czy byłbyś w tej samej sytuacji, jeśli używasz protokołu HTTP/SOAP protokół? Zgaduję, że nie, to dlaczego jest problem w MSMQ?

Powodzenia, mam nadzieję, że to pomaga,

+0

Podnoszę powiadomienia, które są od siebie zależne. Na przykład pociąg nie może opuścić stacji, zanim tam nie dotrze. Chcę się upewnić, że powiadomienie OnDepart jest obsługiwane po operacji OnArrive, w przeciwnym razie obsługa OnDepart może nie działać (np.: nie mógł ustalić, ile czasu pociąg spędził na stacji, jeśli nie obsługiwał jeszcze odpowiedniego OnArrive). – batwad

+0

Nie powinieneś nigdy wstawać OnArrive przed OnDepart, co oznacza, że ​​nie powinieneś go wysyłać. Spójrz na wzór Saga, dokładnie to masz na myśli - musisz zarządzać stanem poza MSMQ. –

+0

OK, myślę, że w końcu to dostałem ... Wysyłasz zdarzenie OnDepart, ale obawiasz się, że kiedy zdarzenie OnArrive zostanie wywołane później w zdarzeniu OnDepart, mogło nie być jeszcze przetworzone ... Tak, widzę problem - ale niestety nie mogę dać ci złotego rozwiązania, innego niż być może: użyj jakiegoś unikalnego identyfikatora, jeśli nie możesz zagwarantować, że MSMQ wysyła wiadomość przed innym (sieć początkowa pociągu zawiesza się, dlatego OnDepart utknął tam, i ostatecznie pociąg przyjeżdża), a następnie musisz kodować wokół niego. Jeśli otrzymasz zdarzenie OnArrive, a zdarzenia OnDepart –

1

Zapewnienie dostarczania wiadomości w kolejności jest jednym z de facto przyklejonych problemów z przesyłaniem dużej ilości wiadomości.

W idealnym świecie Twoje miejsca docelowe wiadomości powinny obsługiwać przesyłanie wiadomości poza kolejnością. Można to osiągnąć, upewniając się, że źródło wiadomości zawiera jakieś informacje dotyczące sekwencjonowania. Znowu idealnie przybiera to formę pewnego rodzaju stempla wsadowego x-of-n (wiadomość 1 z 10, 2 z 10 itd.). Miejsce docelowe wiadomości jest następnie wymagane do złożenia danych w kolejności po ich dostarczeniu.

Jednak w świecie rzeczywistym często nie ma możliwości zmiany systemów niższego rzędu w celu obsługi wiadomości przychodzących z rzędu. W tym przypadku masz dwie możliwości:

  1. Go całkowicie pojedynczy gwintowane - faktycznie można zwykle znaleźć jakąś „grupowania id”, która oznacza, że ​​można iść jednowątkowy w for-each-group sensie, co oznacza, nadal mają współbieżność między różnymi grupami wiadomości.
  2. Zaimplementuj opakowanie typu re-sequencer wokół każdego z systemów konsumenckich, które chcesz otrzymywać wiadomości w kolejności.

Żadne rozwiązanie nie jest bardzo ładne, ale jest to jedyny sposób, w jaki myślę, że możesz mieć współbieżność i dostarczanie wiadomości w kolejności.

+0

Interesujące, ale jeśli jedna ServerA odbierze komunikat 1-of-10 off z kolejki, a ServerB odbierze wiadomość 2-of-10 off z kolejki, żadna nie otrzyma kompletnej partii do resekwencji. – batwad

+0

Tak więc w tej konfiguracji potrzebujesz jednej kolejki na grupę. To nie jest bardzo praktyczne. –

Powiązane problemy