2012-02-06 10 views
10

mam jakiś kod za pomocą RX, wywoływana z wielu wątków, które wykonuje:Unikanie nakładających OnNext wzywa w Rx przy użyciu SubscribeOn (Scheduler.TaskPool)

subject.OnNext(value); // where subject is Subject<T> 

Chcę wartości mają być przetwarzane w tle, tak moja subskrypcja jest

subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value => 
{ 
    // use value 
}); 

i naprawdę nie obchodzi, które wątki obsługiwać wartości pochodzących z obserwowalnym, o ile praca jest umieścić w TaskPool i nie blokuje bieżącego wątku. Jednak korzystanie z "wartości" wewnątrz mojego delegata OnNext nie jest bezpieczne dla wątków. W chwili obecnej, jeśli wiele wartości przechodzi przez obserwowalne, uzyskuję nakładające się połączenia do mojej procedury obsługi OnNext.

Mogę tylko dodać blokadę do mojego delegata OnNext, ale to nie wydaje się być sposobem na robienie rzeczy. Jaki jest najlepszy sposób, aby upewnić się, że mam tylko jedno połączenie z moją obsługą OnNext w czasie, gdy mam wiele wątków wywołujących subject.OnNext(value);?

Odpowiedz

10

Od Using Subjects na MSDN

Domyślnie badani nie wykonują żadnej synchronizacji całej wątkach. [...] Jeśli jednak chcesz zsynchronizować połączenia wychodzące z obserwatorami za pomocą terminarza, możesz użyć metody Synchronizuj, aby to zrobić.

Powinieneś, jak powiedział Brandon w komentarzach, zsynchronizować temat i przekazać go swoim wątkom producenta. na przykład

var syncSubject = Subject.Synchronize(subject); 

// syncSubject.OnNext(value) can be used from multiple threads 

subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value => 
{ 
    // use value 
}); 
+2

Wywołanie funkcji 'Synchronizuj' powinno nastąpić przed wywołaniem' ObserveOn', w przeciwnym razie naruszasz umowę współbieżności dla 'ObserveOn'. Ale w rzeczywistości, jeśli przypadkiem jest dzielenie się tematem wśród kilku wątków, najlepszym rozwiązaniem jest zsynchronizowanie tematu zamiast synchronizowania subskrybentów z tematem: 'var syncSubject = Subject.Synchronize (syncSubject);' Teraz ręcznie 'syncSubject' out do wątków producenta i mogą wywoływać 'syncSubject.OnNext()' bez powodowania problemów z subskrybentami oryginalnego tematu. – Brandon

+0

@Brandon var syncSubject = Subject.Synchronize (syncSubject); ... używasz zmiennej przed jej przypisaniem, czy możesz to wyjaśnić? – Beachwalker

+0

@ Beachwalker to literówka.Powinien to być 'subject' przekazany jako argument do synchronizacji, jak pokazano w odpowiedzi – Brandon

-3

Możesz spróbować zaimplementować własny serwer IObserver, który zastosuje blokadę w swojej metodzie OnNext. Po prostu prosty dekorator: zastosuj blokadę, zadzwoń do wewnętrznego OnNext, usuń blokadę.

Następnie można zaimplementować metodę rozszerzenia na IObservable, podobnie jak .AsThreadSafe().

+1

Nie najlepsze rozwiązanie, ale nie trzeba mnie obniżać z mojego punktu widzenia, ponieważ jest to prawidłowe rozwiązanie. Więc dajcie mi +1, aby to zrównoważyć nieco ze spadkiem tutaj. – Beachwalker

5

Myślę, że szukasz metody rozszerzenia .Synchronize(). Aby uzyskać poprawę wydajności w najnowszej wersji (koniec 2011 r.), Zespół Rx złagodził założenia dotyczące sekwencyjnego charakteru obserwowalnych producentów sekwencji. Wygląda jednak na to, że przełamujesz te założenia (nie jest to zła rzecz), ale aby przywrócić grę Rx tak, jak oczekiwaliby użytkownicy, powinieneś Zsynchronizować sekwencję, aby upewnić się, że jest sekwencyjna ponownie.

1

Tutaj nieco więcej wyjaśnień, dlaczego używać Synchronize (drugi akapit). Z drugiej strony Synchronizacja może wziąć udział w impasie, jeśli aktywnie używasz blokowania w kodzie, przynajmniej byłem świadkiem takiej sytuacji.

Powiązane problemy