2013-07-05 14 views
52

Teaser: Chłopaki, to pytanie nie dotyczy tego, jak zaimplementować politykę ponawiania prób. Chodzi o poprawne zakończenie bloku przepływu danych TPL.Wdrażanie poprawnego wykonania bloku do ponownej próby

To pytanie jest głównie kontynuacją mojego poprzedniego pytania Retry policy within ITargetBlock. Odpowiedzią na to pytanie było inteligentne rozwiązanie @ svick, które wykorzystuje TransformBlock (źródło) i TransformManyBlock (cel). Pozostaje tylko ukończyć ten blok w prawidłowej drodze:: poczekaj na zakończenie wszystkich ponownych prób, a następnie ukończ blok docelowy. Oto, co skończyło się (to tylko fragment, nie płacą zbyt wiele uwagi na zakaz THREADSAFE retries zestawie):

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    while (target.InputCount > 0 || retries.Any()) 
     await Task.Delay(100); 

    target.Complete(); 
}); 

Chodzi o to, aby wykonać jakąś odpytywania i sprawdzić, czy istnieją nadal wiadomości oczekujące na przetworzenie i nie ma wiadomości, które wymagają ponowienia. Ale w tym rozwiązaniu nie podoba mi się pomysł sondowania.

Tak, można hermetyzować logikę dodawania/usuwania ponownych prób w oddzielnej klasie, a nawet np. wykonać pewne działanie, gdy zbiór ponownych prób staje się pusty, ale jak radzić sobie z warunkiem target.InputCount > 0? Nie ma takiego wywołania zwrotnego, które zostanie wywołane, gdy nie ma oczekujących komunikatów dla bloku, więc wydaje się, że weryfikacja target.ItemCount w pętli z małym opóźnieniem jest jedyną opcją.

Czy ktoś zna mądrzejszy sposób, aby to osiągnąć?

+1

Wygląda na to, że ITargetBlock obsługuje powiadomienia oparte na pilocie za pośrednictwem obserwatora zwrócone przez metodę AsObserver Extension. Zobacz http://msdn.microsoft.com/en-us/library/hh160359.aspx i http://msdn.microsoft.com/en-us/library/ee850490.aspx. – JamieSee

+0

Wygląda na to, że próbujesz używać wyjątków w normalnym przepływie programu, co jest złym zwyczajem. Wyszukiwarka Google lub szukać pod następującym temacie na SO: http://stackoverflow.com/questions/729379/why-not-use-exceptions-as-regular-flow-of-control Cała logika ponawiania powinien być w bloku try, a nie w bloku wyjątków. Nie jest to odpowiedź na twoje pytanie, ale coś, o czym myślałem, że powinieneś wiedzieć. – Nullius

+4

@Nullius, logika ponownych prób opiera się na * wyjątkach * - ponów w przypadku przejściowego błędu. Nie sądzę, że logika ponownej próby w bloku 'try' jest dobrym pomysłem, ponieważ nie znasz typu błędu i czy ten rodzaj błędu jest przejściowy, czy też nie. – Alex

Odpowiedz

1

Łącząc hwcverwe odpowiedź i JamieSee komentarz może być idealnym rozwiązaniem.

Najpierw trzeba utworzyć więcej niż jedno wydarzenie:

var signal = new ManualResetEvent(false); 
var completedEvent = new ManualResetEvent(false); 

Następnie trzeba stworzyć obserwatora i subskrybować TransformManyBlock, więc użytkownik jest powiadamiany, gdy istotne zdarzenie:

var observer = new RetryingBlockObserver<TOutput>(completedEvent); 
var observable = target.AsObservable(); 
observable.Subscribe(observer); 

zauważalny może być całkiem proste:

private class RetryingBlockObserver<T> : IObserver<T> { 
     private ManualResetEvent completedEvent; 

     public RetryingBlockObserver(ManualResetEvent completedEvent) {     
      this.completedEvent = completedEvent; 
     } 

     public void OnCompleted() { 
      completedEvent.Set(); 
     } 

     public void OnError(Exception error) { 
      //TODO 
     } 

     public void OnNext(T value) { 
      //TODO 
     } 
    } 

A ty ca n czekać na obu sygnału zakończenia (lub wyczerpania wszystkich elementów źródłowych) lub obu

source.Completion.ContinueWith(async _ => { 

      WaitHandle.WaitAll(completedEvent, signal); 
      // Or WaitHandle.WaitAny, depending on your needs! 

      target.Complete(); 
     }); 

Można sprawdzić wartość wynik WaitAll zrozumieć wydarzenie, które zostało ustawione, i odpowiednio reagować. Można także dodawać inne zdarzenia do kodu, przekazując je obserwatorowi, aby mógł je ustawić w razie potrzeby.Możesz rozróżnić swoje zachowanie i reagować inaczej, gdy błąd zostanie zgłoszony, na przykład:

2

Może ManualResetEvent może załatwić sprawę.

Dodaj własność publiczną TransformManyBlock

private ManualResetEvent _signal = new ManualResetEvent(false); 
public ManualResetEvent Signal { get { return _signal; } } 

A tu proszę:

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 

      // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
      if(!retries.Any()) Signal.Set(); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 

       // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
       if(!retries.Any()) Signal.Set(); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    //Blocks the current thread until the current WaitHandle receives a signal. 
    target.Signal.WaitOne(); 

    target.Complete(); 
}); 

Nie jestem pewien gdzie target.InputCount jest ustawiony. Więc w miejscu zmiany target.InputCount można dodać następujący kod:

if(InputCount == 0) Signal.Set(); 
+0

Chodzi o to, że 'target.InputCount' to * czarne pole * - jest to własność tylko do odczytu' TransformManyBlock' od TPL Dataflow. – Alex

Powiązane problemy