2011-06-30 10 views
7

Jest kilka artykułów na ten temat i mam to działa ... ale chcę wiedzieć, jak ustawić maksymalną liczbę wątków zadań dla moich subskrypcji obserwowanych jednocześnie.Przetwarzanie kolejki asynchronicznej z rozszerzeniami reaktywnymi

Mam następujący parallelize oszczędność asynchronicznej wpisów dziennika:

private BlockingCollection<ILogEntry> logEntryQueue; 

i

logEntryQueue = new BlockingCollection<ILogEntry>(); 
logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry); 

Aby zaplanować moje oszczędności ... ale jak mogę określić max wątki do terminarza w celu używać od razu?

+0

Jeśli tworzysz harmonogramy wyłącznie z limitów dla maksymalnych równoczesnych wątków, rozważ rozważenie TPL Dataflow. Został zbudowany specjalnie do tworzenia potoków, w których każdy blok w potoku ma różne granice współbieżności. Przynajmniej było to dla mnie bardziej zrozumiałe i łatwe w utrzymaniu, gdy prototypowałem obie metody. – yzorg

Odpowiedz

8

Nie jest to funkcja obserwowalnego, ale funkcja programu planującego. Observable definiuje , co oznacza, że ​​ i program planujący definiują , gdzie.

Musisz przekazać niestandardowy harmonogram. Prostym sposobem na to byłoby podklasa TaskScheduler i nadpisanie właściwości "MaximumConcurrencyLevel".

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

faktycznie znalazłem próbkę to na MSDN:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

Edycja: Pytałeś o tym, jak przejść od TaskScheduler do IScheduler. Kolejny deweloper po prostu dał mi, że trochę informacji:

var ischedulerForRx = new TaskPoolScheduler 
(
    new TaskFactory 
    (
     //This is your custom scheduler 
     new LimitedConcurrencyLevelTaskScheduler(1) 
    ) 
); 
+1

Niesamowite, dziękuję ... ale jak mogę uzyskać Reaktor IS dla nowego TaskSchedulera? Nie chcę ponownie wdrażać całej funkcjonalności. – Jeff

2

Jeśli tworzysz swoją „pracę” jako IObservable<T> z odroczonym terminem realizacji (tj. Chcą robić nic, dopóki subskrybowanych), można użyć przeciążenie Merge który akceptuje liczba maksymalnych równoczesnych subskrypcji:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize(); 

queue 
    .Select(item => StartWork(item)) 
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes 
    .Subscribe(); 

// To enqueue: 
synchronizedQueue.OnNext(new QueueItem()); 
+0

W jaki sposób obsługuje inne wątki dodające do kolejki podczas przetwarzania kolejki? Czy nie cierpiałoby to na modyfikację kolekcji podczas wyliczania? – Jeff

+0

Operator 'Merge' jest bezpieczny dla wątków, ale zaktualizowałem swoją odpowiedź, aby na wszelki wypadek użyć wątku bezpiecznego. –

+0

Zgaduję, że to tylko moje zamieszanie ... nie chodzi o bezpieczeństwo wątków, ale raczej o to, że kolejka jest modyfikowana (inne elementy są zapisywane), ponieważ jest wyliczana ... Czy to nie jest problem? ? – Jeff

Powiązane problemy