2016-02-29 11 views
6

Rozważmy następujący:Dlaczego RefCount nie działa po rozłączeniu wszystkich początkowych subskrybentów?

[Fact] 
public void foo() 
{ 
    var result = new Subject<bool>(); 
    var startCount = 0; 
    var completionCount = 0; 
    var obs = Observable 
     .Defer(() => 
      { 
       ++startCount; 
       return result.FirstAsync(); 
      }) 
     .Do(_ => ++completionCount) 
     .Publish() 
     .RefCount(); 

    // pretend there are lots of subscribers at once 
    var s1 = obs.Subscribe(); 
    var s2 = obs.Subscribe(); 
    var s3 = obs.Subscribe(); 

    // even so, we only expect to be started once 
    Assert.Equal(1, startCount); 
    Assert.Equal(0, completionCount); 

    // and we won't complete until the result ticks through 
    result.OnNext(true); 
    Assert.Equal(1, startCount); 
    Assert.Equal(1, completionCount); 

    s1.Dispose(); 
    s2.Dispose(); 
    s3.Dispose(); 

    // now try exactly the same thing again 
    s1 = obs.Subscribe(); 
    s2 = obs.Subscribe(); 
    s3 = obs.Subscribe(); 

    // startCount is 4 here instead of the expected 2! 
    Assert.Equal(2, startCount); 
    Assert.Equal(1, completionCount); 

    result.OnNext(true); 
    Assert.Equal(2, startCount); 
    Assert.Equal(2, completionCount); 

    s1.Dispose(); 
    s2.Dispose(); 
    s3.Dispose(); 
} 

Moje rozumienie Publish + RefCount jest to, że podłączenie do źródła utrzymuje się tak długo, jak istnieje co najmniej jeden abonent. Po rozłączeniu ostatniego subskrybenta każdy przyszły subskrybent ponownie zainicjuje połączenie ze źródłem.

Jak widać w moim teście, wszystko działa idealnie za pierwszym razem. Ale drugi raz, odroczony obserwowalny wewnątrz potoku jest wykonywany raz dla każdego nowego abonenta.

Widzę przez debugger, że dla pierwszej grupy subskrybentów, obs._count (który liczy abonentów) wzrasta dla każdego połączenia do Subscribe. Ale dla drugiej grupy abonentów pozostaje ona zerowa.

Dlaczego tak się dzieje i co mogę zrobić, aby naprawić mój rurociąg?

Odpowiedz

1

Jest tak, ponieważ podstawowy obserwowalny wynik zakończył się. Tak więc każdy nowy subskrybent otrzymuje właśnie wywołanie zwrotne OnCompleted.

Jeśli ObservableDefer tworzył nową sekwencję za każdym razem lub taką, która nie została zakończona, zobaczysz pożądane zachowanie.

np.

return result.FirstAsync().Concat(Observable.Never<bool>()); 

Trzeba będzie usunąć Assert.Equal(1, completionCount);

+0

To brzmi wiarygodnie, ale mam problemy z wygenerowaniem sekwencji, która działałaby zgodnie z oczekiwaniami. Myślałem, że 'return result.Take (1);' zamiast 'return result.FirstAsync();' pracował, ale otrzymuję taki sam wynik. Dość ciekawy., – Enigmativity

+0

'wynik' ma _nie_ ukończony. Każde pojedyncze wywołanie funkcji 'result.FirstAsync' zakończy się po zaznaczeniu nowej wartości. –

+0

Powinienem był powiedzieć, że wynik zakończył sięFirstAsync. Zgadzam się, zachowanie jest dziwne. Wygląda na to, że zostało to uchwycone, aby przyszli subskrybenci mogli odzyskać ukończone obserwacje.Możesz to zobaczyć, podpinając drugi zestaw subskrybentów zdarzeń OnCompleted, uruchamiając natychmiast, zamiast czekać na wysłanie kolejnego .OnNext. Dlatego każdy nowy subskrybent zwiększa liczbę. – user630190

3

odpowiedzi z @ user631090 jest blisko, ale błędne, więc myślałem, że sobie odpowiedzieć.

To dlatego, że Publish natychmiast ukończy nowych subskrybentów, jeśli strumień, który opublikował sam zakończył. niby można zobaczyć, że na wykresie here:

enter image description here

Ale byłoby miło, gdyby schemat zawarte abonenta po podstawowa strumień kończy.

Aby dodać do zamieszania, Defer jest nadal wezwany dla nowych subskrybentów. Ale jego wartość zwracana jest po prostu ignorowana przez Publish z powodu początkowego zakończenia strumienia.

Nie jestem jeszcze w stanie wymyślić sposobu na wdrożenie mojego przypadku użycia. Pomyślałem, że może używając Multicast zamiast Publish, w razie potrzeby tworząc nowy temat. Ale nie udało mi się tego jeszcze osiągnąć. I wydaje się raczej bolesne dla tego, co uważam za powszechny przypadek użycia.

+2

Kent, czy mógłbybyś wyjaśnić swój plan użycia (w innym poście)? Może społeczność może ci bardziej pomóc. (potencjalnie zmniejszając ilość ruchomych części: temat + odroczenie + pierwsze + publikowanie + przeliczanie i podając problem (nie błąd) może nam pozwolić na pomoc. –

+0

Sure Lee. Właśnie wysłałem to pytanie jako kontynuację : http://stackoverflow.com/questions/35762063/why-is-refcount-not-working-after-all-initial-subscribers-disconnect-redux –

Powiązane problemy