8

Chcę zaimplementować priorytet o numerze ActionBlock<T>. Tak, że mogę warunkowo dać pierwszeństwo niektórym TInput przedmiotom przy użyciu Predicate<T>.
Czytam Parallel Extensions Extras Samples i Guide to Implementing Custom TPL Dataflow Blocks.
Ale nadal nie wiem, w jaki sposób mogę zaimplementować ten scenariusz.
---------------------------- EDYCJA ------------------- --------
Istnieje kilka zadań, z których 5 można uruchomić jednocześnie. Kiedy użytkownik wciska przycisk, niektóre (w zależności od funkcji predykatu) zadania powinny działać z najwyższym priorytetem.
W rzeczywistości piszę ten kodDostosowywanie ActionBlock <T>

TaskScheduler taskSchedulerHighPriority; 
ActionBlock<CustomObject> actionBlockLow; 
ActionBlock<CustomObject> actionBlockHigh; 
... 
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5); 
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0); 
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1); 
... 
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow }); 
...  
if (predicate(customObject)) 
    actionBlockHigh.Post(customObject); 
else 
    actionBlockLow.Post(customObject); 

Ale wydaje się priorytetem nie bierze w ogóle realizowane.
---------------------------- EDYCJA ------------------
uważam fakt, że podczas korzystania z tego wiersza kodu:

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow }); 

Przyczyna aplikacja obserwowania priorytetów zadań poprawnie, ale tylko jedno zadanie można wykonać w czasie, w międzyczasie używając pierwszy blok kodu, który jest pokazany na przepływać, ponieważ aplikacja uruchamia 5 zadań jednocześnie, ale w niewłaściwym porządku pierwszeństwa.

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow }); 

Aktualizacja:
Zbiorniki do svick, i powinna określać MaxMessagesPerTask dla taskSchedulerLow.

+2

Co oznacza narzucanie priorytetu? Czy to coś niezwiązanego w ogóle z "T"? Czy też priorytetem jest właściwość własna/wyprowadzona z 'T'? – casperOne

+0

Możesz utworzyć niestandardowy blok bufora, który używa ConcurrentPriorityQueue lub możesz utworzyć niestandardowy asynchroniczny blok transfromacji. Obie opcje są nietrywialne. Zgadzam się również z @casperOne, co oznacza priorytet w twoim przypadku? –

Odpowiedz

7

Twoje pytanie nie zawiera wielu szczegółów, więc poniżej znajduje się tylko przypuszczenie, czego możesz potrzebować.

Myślę, że najprostszym sposobem na zrobienie tego jest posiadanie dwóch ActionBlock s, działających na różnych priorytetach na QueuedTaskScheduler from ParallelExtensionsExtras. Łączymy do tego o wysokim priorytecie za pomocą predykatu, a następnie do priorytetu niskiego priorytetu. Ponadto, aby upewnić się, że nie zostały spełnione oczekiwania o wysokim priorytecie, ustaw MaxMessagesPerTask bloku o niskim priorytecie.

W kodzie, to wyglądać mniej więcej tak:

static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate) 
{ 
    var buffer = new BufferBlock<T>(); 

    var scheduler = new QueuedTaskScheduler(1); 

    var highPriorityScheduler = scheduler.ActivateNewQueue(0); 
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1); 

    var highPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = highPriorityScheduler 
     }); 
    var lowPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = lowPriorityScheduler, 
      MaxMessagesPerTask = 1 
     }); 

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate); 
    buffer.LinkTo(lowPriorityBlock); 

    return buffer; 
} 

To tylko szkic, co można zrobić, na przykład, Completion zwróconej bloku nie zachowuje się poprawnie.

+0

proszę czytać edytowane tag – Rzassar

+1

W swoim kodzie, nie określasz 'MaxMessagesPerTask' dla twojego bloku o niskim priorytecie. Tak jak powiedziałem, robienie tego jest bardzo ważne. – svick