2013-07-21 15 views
7

Obecnie próbuję otoczyć moją głowę wokół współbieżności z RX .NET i wprowadzając w błąd przez coś. Chcę równolegle uruchomić cztery stosunkowo powolne zadania, więc założyłem, że NewThreadScheduler.Default może być drogą do wykonania, ponieważ jest to "Reprezentuje obiekt, który planuje każdą jednostkę pracy w osobnym wątku.".NewThreadScheduler.Default harmonogramuje wszystkie prace na tym samym wątku

Oto mój kod instalacyjny:

static void Test() 
    { 
     Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId); 

     var query = Enumerable.Range(1, 4); 
     var obsQuery = query.ToObservable(NewThreadScheduler.Default); 
     obsQuery.Subscribe(DoWork, Done); 

     Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

    static void DoWork(int i) 
    { 
     Thread.Sleep(500); 
     Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId); 
    } 

    static void Done() 
    { 
     Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

Przypuszczałem „X wątek Y” będzie wyjście inny identyfikator wątku za każdym razem, jednak rzeczywista moc jest:

Starting. Thread 1 
Last line. Thread 1 
1 Thread 3 
2 Thread 3 
3 Thread 3 
4 Thread 3 
Done. Thread 3 

Cała praca jest bycie jednym z tego samego nowego wątku w porządku sekwencyjnym, czego nie oczekiwałem.

Zakładam, że czegoś brakuje, ale nie wiem, co.

Odpowiedz

8

Istnieją dwie części do obserwowalnego zapytania, samego Query i Subscription. (Jest też różnica między operatorami ObserveOn i SubscribeOn.)

Twój Query jest

Enumerable 
    .Range(1, 4) 
    .ToObservable(NewThreadScheduler.Default); 

Stwarza to zaobserwować, że produkuje wartości na domyślny NewThreadScheduler dla tego systemu.

Twój Subskrypcja jest

obsQuery.Subscribe(DoWork, Done); 

ten biegnie DoWork dla każdej wartości wytwarzanej przez Query i Done gdy Query kończy się OnComplete rozmowy. Nie sądzę, że istnieją jakiekolwiek gwarancje dotyczące tego, do którego wątku będą uruchamiane funkcje w metodzie subskrybowania, w praktyce, jeśli wszystkie wartości zapytania są tworzone w tym samym wątku, który jest wątkiem, na którym będzie uruchamiana subskrypcja. Wygląda na to, że robią to również dlatego, że wszystkie subskrypcje są wykonywane w tym samym wątku, co najprawdopodobniej jest wykonywane, aby pozbyć się wielu typowych błędów związanych z wielowątkowością.

Więc masz dwa problemy, jeden z wyrębu, jeśli zmienisz Query do

Enumerable 
    .Range(1, 4) 
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId); 
    .ToObservable(NewThreadScheduler.Default); 

Zobaczysz każdą wartość wytwarzanego na nowym wątku.

Druga sprawa to jedna z intencji i projektu Rx. Jest to przeznaczone, że Query jest długim procesem, a Subscription jest krótką metodą, która zajmuje się wynikami. Jeśli chcesz uruchomić długotrwałą funkcję jako Rx Obserwowalna, najlepszą opcją jest użycie Observable.ToAsync.

+0

Dziękuję za to. Nie tylko wyjaśnisz, co robię źle, ale myślę też, że to, co mówisz odnośnie intencji Rx, ma wiele sensu. –

+0

Powyższa odpowiedź wydaje się być nieaktualna, metoda Do nie jest już dostępna w Enumerable. – VivekDev

+0

Aby metoda Do mogła działać, należy zainstalować pakiet System.Interactive nuget – VivekDev

Powiązane problemy