2012-06-27 13 views
8

Chcę kolejkować zależne zadania w kilku przepływach, które należy przetworzyć w kolejności (w każdym przepływie). Przepływy mogą być przetwarzane równolegle.Zadania zależne od kolejkowania, które mają być przetwarzane przez pulę wątków

Mówiąc konkretnie, potrzebuję dwóch kolejek i chcę, aby zadania w każdej kolejce były przetwarzane w kolejności. Oto próbka Pseudokod do zilustrowania pożądanego zachowania:

Queue1_WorkItem wi1a=...; 

enqueue wi1a; 

... time passes ... 

Queue1_WorkItem wi1b=...; 

enqueue wi1b; // This must be processed after processing of item wi1a is complete 

... time passes ... 

Queue2_WorkItem wi2a=...; 

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b 

... time passes ... 

Queue1_WorkItem wi1c=...; 

enqueue wi1c; // This must be processed after processing of item wi1b is complete 

tutaj jest schemat ze strzałkami ilustrujący zależności między elementów roboczych:

enter image description here

pytanie brzmi, jak mogę to zrobić za pomocą C# 4.0/.NET 4.0? Obecnie mam dwa wątki robocze, po jednym na kolejkę i używam BlockingCollection<> dla każdej kolejki. Chciałbym zamiast tego wykorzystać pulę wątków .NET i jednocześnie wątki robocze przetwarzać elementy jednocześnie (przez przepływy), ale szeregowo w ramach przepływu. Innymi słowy, chciałbym móc wskazać, że na przykład wi1b zależy od zakończenia wi1a, bez konieczności śledzenia zakończenia i pamiętania wi1a, kiedy nadejdzie wi1b. Innymi słowy, chcę tylko powiedzieć: "Chcę wysłać element pracy dla kolejki1, który ma być przetwarzany seryjnie z innymi elementami, które już przesłałem do kolejki1, ale być może równolegle z elementami roboczymi przesłanymi do innych kolejek".

Mam nadzieję, że ten opis miał sens. Jeśli nie, prosimy o zadawanie pytań w komentarzach. Odpowiednio zaktualizuję to pytanie.

Dzięki za przeczytanie.

Aktualizacja:

Podsumowując „wadliwe” rozwiązania do tej pory, są tu rozwiązania z sekcji odpowiada, że ​​nie mogę używać i powód (y), dlaczego nie można z nich korzystać:

TPL zadania wymagają określenia zadania poprzedzającego dla ContinueWith(). Nie chcę utrzymywać wiedzy na temat zadań poprzedzających każdą kolejkę podczas przesyłania nowego zadania.

TDF ActionBlocks wyglądał obiecująco, ale wydawałoby się, że przedmioty wysłane do ActionBlock są przetwarzane równolegle. Potrzebuję, aby elementy dla określonej kolejki były przetwarzane seryjnie.

Aktualizacja 2:

RE: ActionBlocks

Wydaje się, że ustawienie opcji MaxDegreeOfParallelism jednego uniemożliwia równoległe przetwarzanie elementów roboczych złożonych z pojedynczym ActionBlock. Dlatego wydaje się, że posiadanie numeru ActionBlock na kolejkę rozwiązuje mój problem z jedyną wadą polegającą na tym, że wymaga to instalacji i wdrożenia biblioteki TDF od firmy Microsoft i miałem nadzieję na czyste rozwiązanie .NET 4.0. Jak dotąd jest to zaakceptowana przez kandydata odpowiedź, chyba że ktoś może znaleźć sposób, aby to zrobić z czystym rozwiązaniem .NET 4.0, które nie ulega degeneracji do wątku roboczego w kolejce (którego już używam).

+0

Czy spojrzałeś na Zadanie/Kontynuacja? –

+0

Mam i zauważyłem, że ContinueWith wymaga znajomości poprzedniego zadania. Nie chciałbym śledzić zadań poprzedzających, o których mowa w pierwotnym pytaniu, częściowo dlatego, że musiałbym to zrobić w kolejce. Zamiast tego chciałbym "podpalić i zapomnieć" od punktu zgłoszenia i radzić sobie z błędami i propagacją błędów w predykatach przetwarzania zadań. Innymi słowy, chcę minimalny stan w momencie przesyłania - element pracy i kolejkę do której powinien zostać przesłany. –

Odpowiedz

4

Rozumiem, że masz wiele kolejek i nie chcesz wiązać wątków. Możesz mieć ActionBlock dla każdej kolejki. ActionBlock automatyzuje większość potrzebnych rzeczy: przetwarza elementy pracy serialnie i uruchamia tylko zadanie, gdy praca oczekuje. Jeśli żadna praca nie jest oczekująca, żadne zadanie/wątek nie jest blokowane.

+0

Niestety, na podstawie dokumentacji pod podanym linkiem, to rozwiązanie wymaga .NET 4.5, który nie został jeszcze wydany. Szukam rozwiązania, które wykorzystuje .NET 4.0, jak określono w znacznikach dołączonych do tego pytania. –

+0

TPL Dataflow jest łatwo dostępny (http://msdn.microsoft.com/en-us/devlabs/gg585582.aspx). – usr

+0

Zainstalowałem przepływ danych TPL i wypróbowałem przykładowy kod pod adresem http://msdn.microsoft.com/en-us/library/hh462696(v=vs.110).aspx. Wydaje się, że elementy wysłane do bloku działań są przetwarzane równolegle, a nie seryjnie, jak wspomniano w Twojej odpowiedzi. Nie tego chcę - elementy na kolejkę muszą być przetwarzane seryjnie. –

3

Najlepszym sposobem jest użycie Task Parallel Library (TPL) i Continuations. Kontynuacja nie tylko pozwala utworzyć przepływ zadań, ale także obsługuje wyjątki. Jest to great introduction dla licencji TPL. Ale żeby dać ci jakiś pomysł ...

można rozpocząć zadania OC korzystając

Task task = Task.Factory.StartNew(() => 
{ 
    // Do some work here... 
}); 

teraz, aby rozpocząć drugie zadanie podczas będącego krokiem wykończenia zadania (w wyniku błędu lub pomyślnie) można użyć metody ContinueWith

Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task")); 
Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation...")); 

Więc jak wkrótce jako task1 kończy się, zawiedzie lub zostanie anulowane task2 "pożarowe" i zacznie działać. Zauważ, że jeśli task1 zakończyłoby się przed osiągnięciem drugiej linii kodu task2, byłoby zaplanowane do natychmiastowego wykonania. Argument antTask przekazany do drugiej wartości lambda jest odniesieniem do zadania poprzedzającego. Zobacz this link dla bardziej szczegółowych przykładów ...

Można także przekazać kontynuacje wyniki od poprzednika zadania

Task.Factory.StartNew<int>(() => 1) 
    .ContinueWith(antTask => antTask.Result * 4) 
    .ContinueWith(antTask => antTask.Result * 4) 
    .ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64. 

Note. Przeczytaj informacje na temat obsługi wyjątków w pierwszym odsyłaczu, ponieważ może to spowodować, że nowicjusz w błąd na TPL.

Ostatnią rzeczą, na którą należy zwrócić szczególną uwagę, są zadania dla dzieci. Zadania podrzędne to te, które są tworzone jako AttachedToParent. W takim przypadku kontynuacja nie będzie działać do momentu ukończenia wszystkich zadań podrzędnych:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; 
Task.Factory.StartNew(() => 
{ 
    Task.Factory.StartNew(() => { SomeMethod() }, atp); 
    Task.Factory.StartNew(() => { SomeOtherMethod() }, atp); 
}).ContinueWith(cont => { Console.WriteLine("Finished!") }); 

Mam nadzieję, że to pomoże.

Edycja: Czy miałeś okazję obejrzeć ConcurrentCollections, w szczególności BlockngCollection<T>. Więc w twoim przypadku można użyć coś jak

public class TaskQueue : IDisposable 
{ 
    BlockingCollection<Action> taskX = new BlockingCollection<Action>(); 

    public TaskQueue(int taskCount) 
    { 
     // Create and start new Task for each consumer. 
     for (int i = 0; i < taskCount; i++) 
      Task.Factory.StartNew(Consumer); 
    } 

    public void Dispose() { taskX.CompleteAdding(); } 

    public void EnqueueTask (Action action) { taskX.Add(Action); } 

    void Consumer() 
    { 
     // This seq. that we are enumerating will BLOCK when no elements 
     // are avalible and will end when CompleteAdding is called. 
     foreach (Action action in taskX.GetConsumingEnumerable()) 
      action(); // Perform your task. 
    } 
} 
+0

Wada polega na tym, że nie chcę przechowywać żadnej pamięci o zadaniu 1 w momencie, gdy zadanie2 zostało przesłane. Chcę tylko powiedzieć, że zadanie2 powinno zostać przesłane w kolejce1 po ostatnim przesłanym przeze mnie przedmiocie, cokolwiek to było. Dlatego też nie wiem ani nie chcę śledzić zadania poprzedzającego (zadanie1) w momencie przedłożenia następnego zadania (zadanie2). –

+0

Zobacz sekcję zadań podrzędnych, którą właśnie dodałem. Może to pomóc w osiągnięciu tego, czego potrzebujesz. Wszystko co najlepsze ... – MoonKnight

+0

Czy 'SomeMethod()' i 'SomeOtherMethod()' są wykonywane szeregowo lub równolegle? Na moje potrzeby muszą być wykonywane seryjnie. Ponadto nie wszystkie zadania są dostępne w tym samym czasie. Elementy robocze są dostępne i muszę je zaplanować, więc nie mogę wykonać funkcji 'StartNew()' z wieloma elementami. –

0

Wygląda projekcie masz już jest dobry i działa. Twoje wątki robocze (jedna na kolejkę) są długotrwałe, więc jeśli chcesz zamiast tego używać zadania, określ TaskCreationOptions.LongRunning, aby uzyskać dedykowany wątek roboczy.

Ale tak naprawdę nie ma potrzeby korzystania z ThreadPool tutaj. Nie oferuje wielu korzyści w przypadku długotrwałej pracy.

+1

Gwinty działają długo, ale same zadania są krótkie. Ponadto, jeśli skaluje to do wielu kolejek, ulegnie awarii, ponieważ mogę mieć nadsubskrypcję, ponieważ wszystkie kolejki są przetwarzane jednocześnie. W przypadku puli wątków nie muszę się tym martwić, ponieważ zadania w kolejkach równoległych mogą być przetwarzane równolegle przez pulę wątków przy minimalnym przełączaniu kontekstów. –

1

Możliwe jest rozwiązanie .NET 4.0 oparte na licencji TPL, a jednocześnie ukrywanie faktu, że musi gdzieś przechowywać zadanie nadrzędne. Na przykład:

class QueuePool 
{ 
    private readonly Task[] _queues; 

    public QueuePool(int queueCount) 
    { _queues = new Task[queueCount]; } 

    public void Enqueue(int queueIndex, Action action) 
    { 
     lock (_queues) 
     { 
      var parent = _queue[queueIndex]; 
      if (parent == null) 
       _queues[queueIndex] = Task.Factory.StartNew(action); 
      else 
       _queues[queueIndex] = parent.ContinueWith(_ => action()); 
     } 
    } 
} 

Jest to jedna blokada dla wszystkich kolejek, aby zilustrować ten pomysł. W kodzie produkcyjnym użyłbym jednak blokady na kolejkę, aby zmniejszyć rywalizację.

+0

To nie jest dobre rozwiązanie, ponieważ ContinueWith rozpocznie akcję wewnątrz blokady, jeśli zadanie nadrzędne jest już zakończone. –

Powiązane problemy