Mam wzór producenta/konsumenta w mojej aplikacji zaimplementowano używany przepływ danych TPL. Mam dużą sieć przepływu danych z około 40 bloków w nim. W siatce są dwie główne części funkcjonalne: część producenta i część konsumenta. Producent powinien nieustannie zapewniać dużo pracy dla konsumenta, podczas gdy konsument może powoli podejmować pracę. Chcę wstrzymać producenta, gdy konsument jest zajęty określoną ilością elementów pracy. W przeciwnym razie aplikacja zużywa dużo pamięci/procesora i zachowuje się niestabilnie.Jak szybko zatrzymać producenta, gdy konsument jest przytłoczony?
Zrobiłem aplikację demo, który demonstruje problem:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new ActionBlock<int>(async x =>
{
var delay = 1000;
if (x > 10) delay = 5000;
await Task.Delay(delay);
Console.WriteLine(x);
}, boundedOptions);
producerBlock.LinkTo(bufferBlock);
bufferBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(producerBlock);
broadcastBlock.LinkTo(consumerBlock);
bufferBlock.Post(1);
consumerBlock.Completion.Wait();
}
}
}
odciski app coś takiego:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079
Oznacza to, że producent utrzymuje przędzenia i pchania wiadomości do konsumenta . Chcę, aby zatrzymał się i poczekał, aż konsument zakończy bieżącą porcję pracy, a następnie producent powinien kontynuować dostarczanie wiadomości dla konsumenta.
Moje pytanie dotyczy cytatu podobnego do innych question, ale odpowiedź nie została udzielona prawidłowo. Próbowałem tego rozwiązania i nie działa tutaj pozwalając producentowi zalać konsumenta wiadomościami. Również ustawienie BoundedCapacity
również nie działa.
Jedyne rozwiązanie, które do tej pory wydaje mi się, to utworzenie własnego bloku, który będzie monitorował kolejkę bloku docelowego i działa zgodnie z kolejką bloku docelowego. Ale mam nadzieję, że to rodzaj przesady w tej kwestii.
Czy rozważałeś zamiast tego używać 'Rx'? Spójrz na tę odpowiedź: http://stackoverflow.com/questions/2542764/tpl-vs-reactive-framework –
Mam nadzieję, że nie będzie takiej potrzeby, ponieważ poświęca się dużo czasu na przepływ danych i pasuje moje potrzeby dobre. – kseen
W swoim demo producent może sam produkować wszystkie wiadomości, bez otrzymywania od siebie wiadomości przez blok transmisji. Czy twój prawdziwy kod też tak jest, czy też ten producent → cykl producenta jest potrzebny? – svick