Wdrażam warstwę łącza danych przy użyciu producer/consumer pattern. Warstwa łącza danych ma własny wątek i automat stanowy do przesyłania protokołu łącza danych przez przewód (Ethernet, RS-232 ...). Interfejs do warstwy fizycznej jest reprezentowany jako System.IO.Stream. Inny wątek zapisuje komunikaty i odczytuje wiadomości z obiektu łącza danych.C# Oczekiwanie na wiele zdarzeń w Producer/Consumer
Przedmiotem łącza danych ma stan bezczynności, który musi czekać na jednym z czterech warunków:
- Bajt jest odbierany
- wiadomość jest dostępny z wątku sieci
- Keep-alive czasomierz wygasła
- Cała komunikacja została anulowana przez warstwy sieciowej
mam trudny czas FIGU Najlepiej to zrobić bez dzielenia komunikacji na wątek do odczytu/zapisu (co znacznie zwiększa złożoność). Oto w jaki sposób można uzyskać 3 z 4:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
lub
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
Co należy zrobić, aby zablokować wątek, czekając na wszystkich czterech warunków? Wszystkie zalecenia są mile widziane. Nie jestem ustawiony na żadnej architekturze ani strukturach danych.
EDYCJA: ******** Dzięki za pomoc dla wszystkich. Oto moje rozwiązanie ********
Po pierwsze, nie sądzę, że była asynchroniczna implementacja kolejki producenta/konsumenta. Więc zaimplementowałem coś podobnego do this stackoverflow post.
Potrzebowałem zewnętrznego i wewnętrznego źródła anulowania, aby zatrzymać wątek klienta i anulować zadania pośrednie, odpowiednio, similar to this article.
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
["poczekaj na Task.WhenAny (...)'] (https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx) może pomóc. –