2016-03-02 14 views
6

Wdrażam warstwę łącza danych przy użyciu producer/consumer pattern. Warstwa łącza danych ma własny wątek i automat stanowy do przesyłania protokołu łącza danych przez przewód (Ethernet, RS-232 ...). Interfejs do warstwy fizycznej jest reprezentowany jako System.IO.Stream. Inny wątek zapisuje komunikaty i odczytuje wiadomości z obiektu łącza danych.C# Oczekiwanie na wiele zdarzeń w Producer/Consumer

Przedmiotem łącza danych ma stan bezczynności, który musi czekać na jednym z czterech warunków:

  1. Bajt jest odbierany
  2. wiadomość jest dostępny z wątku sieci
  3. Keep-alive czasomierz wygasła
  4. Cała komunikacja została anulowana przez warstwy sieciowej

mam trudny czas FIGU Najlepiej to zrobić bez dzielenia komunikacji na wątek do odczytu/zapisu (co znacznie zwiększa złożoność). Oto w jaki sposób można uzyskać 3 z 4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. 
stream.ReadTimeout = 10000; 
await stream.ReadAsync(buf, 0, 1, cts.Token); 

lub

BlockingCollection<byte[]> SendQueue = new ...; 
... 
// Check for a message from network layer. Timeout after 10 seconds. 
// Monitor cancellation token. 
SendQueue.TryTake(out msg, 10000, cts.Token); 

Co należy zrobić, aby zablokować wątek, czekając na wszystkich czterech warunków? Wszystkie zalecenia są mile widziane. Nie jestem ustawiony na żadnej architekturze ani strukturach danych.

EDYCJA: ******** Dzięki za pomoc dla wszystkich. Oto moje rozwiązanie ********

Po pierwsze, nie sądzę, że była asynchroniczna implementacja kolejki producenta/konsumenta. Więc zaimplementowałem coś podobnego do this stackoverflow post.

Potrzebowałem zewnętrznego i wewnętrznego źródła anulowania, aby zatrzymać wątek klienta i anulować zadania pośrednie, odpowiednio, similar to this article.

byte[] buf = new byte[1]; 
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) 
{ 
    CancellationToken internalToken = internalTokenSource.Token; 
    CancellationToken stopToken = stopTokenSource.Token; 
    using (CancellationTokenSource linkedCts = 
     CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)) 
    { 
     CancellationToken ct = linkedCts.Token; 
     Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct); 
     Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); 
     Task keepAliveTask = Task.Delay(m_keepAliveTime, ct); 

     // Wait for at least one task to complete 
     await Task.WhenAny(readTask, msgTask, keepAliveTask); 

     // Next cancel the other tasks 
     internalTokenSource.Cancel(); 
     try { 
      await Task.WhenAll(readTask, msgTask, keepAliveTask); 
     } catch (OperationCanceledException e) { 
      if (e.CancellationToken == stopToken) 
       throw; 
     } 

     if (msgTask.IsCompleted) 
      // Send the network layer message 
     else if (readTask.IsCompleted) 
      // Process the byte from the physical layer 
     else 
      Contract.Assert(keepAliveTask.IsCompleted); 
      // Send a keep alive message 
    } 
} 
+3

["poczekaj na Task.WhenAny (...)'] (https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx) może pomóc. –

Odpowiedz

2

W tym przypadku chciałbym użyć tylko znaki anulowanie rezerwacji do odwołania. Powtarzający się limit czasu, podobnie jak czasomierz utrzymywania aktywności, jest lepiej reprezentowany jako czasomierz.

Tak więc, zamodelowałbym to jako trzy zadania do anulowania. Po pierwsze, token anulowanie:

Cała komunikacja została anulowana przez warstwy sieciowej

CancellationToken token = ...; 

Następnie trzy równoległe operacje:

Bajt jest odbierany

var readByteTask = stream.ReadAsync(buf, 0, 1, token); 

Keep-alive czasomierz wygasł

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 

Komunikat jest dostępny z wątku sieci

Ten jest nieco trudniejsze. Twój obecny kod używa BlockingCollection<T>, który nie jest zgodny z Asynchronizacją. Zaleca się przełączenie na TPL Dataflow's BufferBlock<T> lub my own AsyncProducerConsumerQueue<T>, z których jeden może być używany jako zgodne z asynchronicznymi kolejkami producenta/konsumenta (co oznacza, że ​​producent może być zsynchronizowany lub asynchronizowany, a konsument może być zsynchronizowany lub asynchronizowany).

BufferBlock<byte[]> SendQueue = new ...; 
... 
var messageTask = SendQueue.ReceiveAsync(token); 

Następnie można użyć Task.WhenAny aby określić, które z tych zadań zakończone:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask); 

Teraz można pobierać wyniki porównując completedTask do innych i await je ing:

if (completedTask == readByteTask) 
{ 
    // Throw an exception if there was a read error or cancellation. 
    await readByteTask; 
    var byte = buf[0]; 
    ... 
    // Continue reading 
    readByteTask = stream.ReadAsync(buf, 0, 1, token); 
} 
else if (completedTask == keepAliveTimerTask) 
{ 
    // Throw an exception if there was a cancellation. 
    await keepAliveTimerTask; 
    ... 
    // Restart keepalive timer. 
    keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 
} 
else if (completedTask == messageTask) 
{ 
    // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) 
    byte[] message = await messageTask; 
    ... 
    // Continue reading 
    messageTask = SendQueue.ReceiveAsync(token); 
} 
+0

Szkoda, że ​​nie widziałem tego wcześniej, gdy poszedłem i zaimplementowałem własny AsyncQueue, choć nie tak ogólny jak twój. Dlaczego czekasz na każde indywidualne zadanie? Czy nie powinno to już być zakończone? –

+0

@BrianHeilig: Konieczne jest pobranie wyników. Zadania te zostały zakończone, ale ich wyniki nie zostały zaobserwowane. Wyniki mogą być danymi, np. 'MessageTask', ale wyniki mogą być również wyjątkami. 'keepAliveTimerTask' może mieć wyjątek, jeśli jest anulowany, a' readByteTask' może mieć wyjątek, jeśli jest anulowany lub występuje błąd odczytu I/O. 'await' spowoduje (ponowne) podniesienie tych wyjątków, jeśli są obecne. –

1

Anulowanie odczytu powoduje, że nie można ustalić, czy dane zostały odczytane, czy nie. Anulowanie i czytanie nie są atomowe względem siebie. Takie podejście działa tylko po zamknięciu strumienia po anulowaniu.

Podejście do kolejki jest lepsze. Możesz utworzyć połączony CancellationTokenSource, który zostanie anulowany, kiedy tylko chcesz. Zamiast podania cts.Token przekazujesz token, który kontrolujesz.

Możesz następnie zasygnalizować tokenowi na podstawie czasu, innego tokena i dowolnego innego zdarzenia, które ci się podoba. Jeśli użyjesz wbudowanego limitu czasu, wewnętrznie zrobi to samo, aby połączyć token przychodzący z przekroczeniem limitu czasu.

+0

Przepraszam, trochę mi brakowało szczegółów. Cts to CancellationTokenSource, który jest własnością warstwy łącza danych. Anulowanie jest również kontrolowane przez warstwę łącza danych; wątek połączenia sieci zatrzymuje, co anuluje całą komunikację i czeka na zakończenie wątku. Celem jest eleganckie i szybkie zatrzymanie warstwy łącza danych. –

+1

I dlaczego ta odpowiedź nie zadziałałaby? Możesz przerwać 'TryTake' abort na * dowolny * warunek, który ci się podoba. – usr

+0

Myślę, że rozumiem. Sugerujesz, że zasygnalizuję token anulowania, gdy wątek sieciowy zostanie anulowany lub bajt jest dostępny, prawda? Sądzę, że potrzebowałbym innego wątku do synchronicznego odczytu ze strumienia, a następnie sygnalizacji tokena anulowania po zakończeniu odczytu. Potrzebuję zmiennej do określenia, co zostało anulowane (odczyt lub zatrzymanie). Będę musiał również anulować przeczytany wątek po odebraniu wiadomości, prawda? Czy może czegoś brakuje? –

3

Wybrałbym opcję drugą, oczekując na spełnienie któregokolwiek z 4 warunków. Zakładając, że masz 4 Zadania Jak już metod awaitable:

var task1 = WaitForByteReceivedAsync(); 
var task2 = WaitForMessageAvailableAsync(); 
var task3 = WaitForKeepAliveTimerAsync(); 
var task4 = WaitForCommunicationCancelledAsync(); 

// now gather them 
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{ 
task1, task2, task3, task4 
}; 

// Wait for any of the things to complete 
var result = await Task.WhenAny(theTasks); 

Powyższy kod zostanie wznowione natychmiast po zakończeniu pierwszego zadania i ignorować pozostałe 3.

Uwaga:

W the documentation for WhenAny, to mówi:

Zwracany zadaniem będzie zawsze kończy się w stanie RanToCompletion z jego zestaw wyników do pierwszego zadania do wykonania. Jest to prawdą, nawet jeśli pierwsze zadanie do wykonania zakończyło się stanem anulowania lub błędu.

Więc upewnij się zrobić to ostateczne sprawdzenie przed ufając, co się stało:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns 
+1

Myślę, że miałeś na myśli: WhenAny() –

+1

Nie. Kiedy Any zwraca w momencie, gdy którekolwiek z zadań powraca. WhenAll czeka na każde zadanie, które należy wykonać, i zwraca samo w sobie zadanie (w przeciwieństwie do WaitAll() –

+1

To nie działa, ponieważ wynik odczytu bajtów może zostać odrzucony Zakładam, że dba o to, aby dane nie zostały utracone. – usr