Chcę utworzyć usługę WCF, która używa powiązania MSMQ, ponieważ mam dużą liczbę powiadomień, które usługa ma przetwarzać. Ważne jest, aby klienci nie byli zatrzymywani przez usługę i aby powiadomienia były przetwarzane w kolejności, w jakiej są wywoływane, a zatem implementacja kolejki.Jak wymusić sekwencję kolejki komunikatów za pomocą wielu wystąpień usług WCF
Inną kwestią jest odporność. Wiem, że mogłem połączyć się z samym MSMQ, aby kolejka była bardziej niezawodna, ale chcę mieć możliwość uruchomienia instancji mojej usługi na różnych serwerach, więc jeśli serwer zawiesza powiadomienia, nie buduje się w kolejce, ale inny serwer wykonuje przetwarzanie .
Eksperymentowałem z powiązaniem MSMQ i odkryłem, że można mieć wiele instancji usługi nasłuchującej w tej samej kolejce, a pozostawione same sobie kończą robiąc coś w rodzaju "round-robin" z rozkładem obciążenia w dostępnych usługach. . To jest świetne, ale w efekcie tracę sekwencję kolejkowania, ponieważ różne instancje potrzebują innego czasu na przetworzenie żądania.
Używam prostej aplikacji konsolowej do eksperymentowania, czyli epickiego zrzutu kodu poniżej. Kiedy jest prowadzony uzyskać wyjście tak:
host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed
Co chcę się zdarzyć:
host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.
bym nie pomyślał, że jak S2 nie został zakończony, to może jeszcze uda i zwraca komunikat to było przetwarzanie do kolejki. Dlatego S1 nie powinno mieć możliwości wyciągnięcia kolejnej wiadomości z kolejki. Moja kolejka nas transakcyjnych i próbowałem ustawić TransactionScopeRequired = true
na usługi, ale bez skutku.
Czy to możliwe? Czy podchodzę do tego w niewłaściwy sposób? Czy istnieje inny sposób budowania usługi przełączania awaryjnego bez jakiegoś centralnego mechanizmu synchronizacji?
class WcfMsmqProgram
{
private const string QueueName = "testq1";
static void Main()
{
// Create a transactional queue
string qPath = ".\\private$\\" + QueueName;
if (!MessageQueue.Exists(qPath))
MessageQueue.Create(qPath, true);
else
new MessageQueue(qPath).Purge();
// S1 processes as fast as it can
IService s1 = new ServiceImpl("S1");
// S2 is slow
IService s2 = new ServiceImpl("S2", 2000);
// MSMQ binding
NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
// Host S1
ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
ConfigureService(host1, binding);
host1.Open();
Console.WriteLine("host1 open");
// Host S2
ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
ConfigureService(host2, binding);
host2.Open();
Console.WriteLine("host2 open");
// Create a client
ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
IService client = factory.CreateChannel();
// Periodically call the service with a new number
int counter = 1;
using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
{
// Enter to stop
Console.ReadLine();
}
host1.Close();
Console.WriteLine("host1 closed");
host2.Close();
Console.WriteLine("host2 closed");
// Wait for exit
Console.ReadLine();
}
static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
{
var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
}
[ServiceContract]
interface IService
{
[OperationContract(IsOneWay = true)]
void EchoNumber(int number);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class ServiceImpl : IService
{
public ServiceImpl(string name, int sleep = 0)
{
this.name = name;
this.sleep = sleep;
}
private string name;
private int sleep;
public void EchoNumber(int number)
{
Thread.Sleep(this.sleep);
Console.WriteLine("{0}: {1:00}", this.name, number);
}
}
}
Będziesz mieć twardy czas, aby to zrobić z WCF za MSMQ wiązania (patrz http://stackoverflow.com/questions/729612/ordered-delivery-with-netmsmqbinding). Jednak można to zrobić za pomocą transakcyjnego MSMQ i kodu pojednawczego (bez WCF). –