2016-08-13 15 views
5

Mam wzór producenta/konsumenta w mojej aplikacji zaimplementowano używany przepływ danych TPL. Mam dużą sieć przepływu danych z około 40 bloków w nim. W siatce są dwie główne części funkcjonalne: część producenta i część konsumenta. Producent powinien nieustannie zapewniać dużo pracy dla konsumenta, podczas gdy konsument może powoli podejmować pracę. Chcę wstrzymać producenta, gdy konsument jest zajęty określoną ilością elementów pracy. W przeciwnym razie aplikacja zużywa dużo pamięci/procesora i zachowuje się niestabilnie.Jak szybko zatrzymać producenta, gdy konsument jest przytłoczony?

Zrobiłem aplikację demo, który demonstruje problem:

mesh

using System; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace DataflowTest 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var options = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 4, 
       EnsureOrdered = false 
      }; 

      var boundedOptions = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 4, 
       EnsureOrdered = false, 
       BoundedCapacity = 5 
      }; 

      var bufferBlock = new BufferBlock<int>(boundedOptions); 
      var producerBlock = new TransformBlock<int, int>(x => x + 1, options); 
      var broadcastBlock = new BroadcastBlock<int>(x => x, options); 

      var consumerBlock = new ActionBlock<int>(async x => 
      { 
       var delay = 1000; 
       if (x > 10) delay = 5000; 

       await Task.Delay(delay); 

       Console.WriteLine(x); 
      }, boundedOptions); 

      producerBlock.LinkTo(bufferBlock); 
      bufferBlock.LinkTo(broadcastBlock); 
      broadcastBlock.LinkTo(producerBlock); 
      broadcastBlock.LinkTo(consumerBlock); 

      bufferBlock.Post(1); 

      consumerBlock.Completion.Wait();    
     }   
    } 
} 

odciski app coś takiego:

2 
1 
3 
4 
5 
69055 
69053 
69054 
69057 
438028 
438040 
142303 
438079 

Oznacza to, że producent utrzymuje przędzenia i pchania wiadomości do konsumenta . Chcę, aby zatrzymał się i poczekał, aż konsument zakończy bieżącą porcję pracy, a następnie producent powinien kontynuować dostarczanie wiadomości dla konsumenta.

Moje pytanie dotyczy cytatu podobnego do innych question, ale odpowiedź nie została udzielona prawidłowo. Próbowałem tego rozwiązania i nie działa tutaj pozwalając producentowi zalać konsumenta wiadomościami. Również ustawienie BoundedCapacity również nie działa.

Jedyne rozwiązanie, które do tej pory wydaje mi się, to utworzenie własnego bloku, który będzie monitorował kolejkę bloku docelowego i działa zgodnie z kolejką bloku docelowego. Ale mam nadzieję, że to rodzaj przesady w tej kwestii.

+0

Czy rozważałeś zamiast tego używać 'Rx'? Spójrz na tę odpowiedź: http://stackoverflow.com/questions/2542764/tpl-vs-reactive-framework –

+0

Mam nadzieję, że nie będzie takiej potrzeby, ponieważ poświęca się dużo czasu na przepływ danych i pasuje moje potrzeby dobre. – kseen

+0

W swoim demo producent może sam produkować wszystkie wiadomości, bez otrzymywania od siebie wiadomości przez blok transmisji. Czy twój prawdziwy kod też tak jest, czy też ten producent → cykl producenta jest potrzebny? – svick

Odpowiedz

4

Jeśli zachodzi potrzeba zachowania producenta → bufor → cykl transmisji, należy zamienić blok transmisji na inny blok, który nadal nadaje odebrane wiadomości, ale czeka, gdy jeden z jego celów jest pełny.

Dopóki wiesz celów tego bloku podczas tworzenia go, można zbudować przy użyciu ActionBlock (kod skopiowany z another answer of mine):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    DataflowBlockOptions options, params ITargetBlock<T>[] targets) 
{ 
    var block = new ActionBlock<T>(
     async item => 
     { 
      foreach (var target in targets) 
      { 
       await target.SendAsync(item); 
      } 
     }, new ExecutionDataflowBlockOptions 
     { 
      BoundedCapacity = options.BoundedCapacity, 
      CancellationToken = options.CancellationToken 
     }); 

    block.Completion.ContinueWith(task => 
    { 
     foreach (var target in targets) 
     { 
      if (task.Exception != null) 
       target.Fault(task.Exception); 
      else 
       target.Complete(); 
     } 
    }); 

    return block; 
} 

Stosując tę ​​można zadeklarować blok rozgłaszania :

var broadcastBlock = CreateGuaranteedBroadcastBlock(
    boundedOptions, producerBlock, consumerBlock); 

(Potrzebny będzie również usunięcie LinkTo linie, które łączą ze broadcastBlock).

Problem z oryginalnym kodem, którego nie naprawia to zakończenie, ale jest to trudny problem w TPL Dataflow z cyklami w ogóle.

+0

Co się stanie, jeśli moja sieć będzie ciągła? Podobnie jak w przyszłości nie ma żadnego uzupełnienia, powinno działać nadal, gdy aplikacja działa. – kseen

+0

Właśnie próbowałem tego 'GuaranteedBroadcastBlock' w mojej aplikacji demo i działa jak urok! Idealny! Dziękuję bardzo. – kseen

+0

To jest najlepszy scenariusz: nie potrzebujesz ukończenia, więc wszystko jest w porządku, to nie działa. – svick

0

Wygląda na to, że twój producent generuje sekwencję, więc nie ma potrzeby, aby cały producent → bufor → cykl emisyjny. Zamiast tego, wszystkie trzy bloki mogą być zastąpione przez async pętli, który generuje następną pozycję, a następnie wysyła go do konsumenta z zastosowaniem await SendAsync():

Task.Run(async() => 
{ 
    int i = 1; 
    while (true) 
    { 
     await consumerBlock.SendAsync(i); 
     i++; 
    } 
    consumerBlock.Complete(); 
}); 

W ten sposób, gdy konsument osiągnie swój potencjał, await SendAsync() zapewni producenta czeka, aż konsument zużyje przedmiot.

Jeśli chcesz zamknąć takiego producenta w bloku przepływu danych, aby można było np. połącz go z konsumentem, you can.

+0

Moim prawdziwym "producentem" jest zestaw bloków, które ładują stronę komentarzy (która zawiera link do następnej strony komentarzy), analizują zawartość aktualnej strony komentarzy, wysyłają komentarze do konsumenta i rozpoczynają ten cykl ponownie przekazując adres następna strona z komentarzem do pierwszego bloku w tym cyklu producenta. Tak więc, niestety, nie jest to tylko sekwencja. Jest to jak połączona sekwencja, w której każdy element w sekwencji ma adres do następnego elementu, a ostatni element w sekwencji nie ma adresu następnego elementu. Przepraszamy za to pytanie jest tak proste. – kseen

+0

Właśnie stworzyłem diagram, który lepiej odzwierciedla rzeczywistą sytuację. Tutaj: http://imgur.com/iEklfeG – kseen

Powiązane problemy