mam jakiś kod za pomocą RX, wywoływana z wielu wątków, które wykonuje:Unikanie nakładających OnNext wzywa w Rx przy użyciu SubscribeOn (Scheduler.TaskPool)
subject.OnNext(value); // where subject is Subject<T>
Chcę wartości mają być przetwarzane w tle, tak moja subskrypcja jest
subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
// use value
});
i naprawdę nie obchodzi, które wątki obsługiwać wartości pochodzących z obserwowalnym, o ile praca jest umieścić w TaskPool i nie blokuje bieżącego wątku. Jednak korzystanie z "wartości" wewnątrz mojego delegata OnNext nie jest bezpieczne dla wątków. W chwili obecnej, jeśli wiele wartości przechodzi przez obserwowalne, uzyskuję nakładające się połączenia do mojej procedury obsługi OnNext.
Mogę tylko dodać blokadę do mojego delegata OnNext, ale to nie wydaje się być sposobem na robienie rzeczy. Jaki jest najlepszy sposób, aby upewnić się, że mam tylko jedno połączenie z moją obsługą OnNext w czasie, gdy mam wiele wątków wywołujących subject.OnNext(value);
?
Wywołanie funkcji 'Synchronizuj' powinno nastąpić przed wywołaniem' ObserveOn', w przeciwnym razie naruszasz umowę współbieżności dla 'ObserveOn'. Ale w rzeczywistości, jeśli przypadkiem jest dzielenie się tematem wśród kilku wątków, najlepszym rozwiązaniem jest zsynchronizowanie tematu zamiast synchronizowania subskrybentów z tematem: 'var syncSubject = Subject.Synchronize (syncSubject);' Teraz ręcznie 'syncSubject' out do wątków producenta i mogą wywoływać 'syncSubject.OnNext()' bez powodowania problemów z subskrybentami oryginalnego tematu. – Brandon
@Brandon var syncSubject = Subject.Synchronize (syncSubject); ... używasz zmiennej przed jej przypisaniem, czy możesz to wyjaśnić? – Beachwalker
@ Beachwalker to literówka.Powinien to być 'subject' przekazany jako argument do synchronizacji, jak pokazano w odpowiedzi – Brandon