2015-06-24 15 views
10

Zastanawiam się, czy jest to legalne zadzwonić unsubscribe od wewnątrz onNext obsługi tak:RxJava: wywołanie wypisz od wewnątrz onNext

List<Integer> gatheredItems = new ArrayList<>(); 

Subscriber<Integer> subscriber = new Subscriber<Integer>() { 
    public void onNext(Integer item) { 
     gatheredItems.add(item); 
     if (item == 3) { 
      unsubscribe(); 
     } 
    } 
    public void onCompleted() { 
     // noop 
    } 
    public void onError(Throwable sourceError) { 
     // noop 
    } 
}; 

Observable<Integer> source = Observable.range(0,100); 

source.subscribe(subscriber); 
sleep(1000); 
System.out.println(gatheredItems); 

Powyższy kod poprawnie wyprowadza że zaledwie cztery elementy zostały zebrane: [0, 1, 2, 3]. Ale jeśli ktoś zmieni źródło obserwowalne, które ma być buforowane:

Observable<Integer> source = Observable.range(0,100).cache(); 

Następnie zebrano wszystkie sto elementów. Nie mam kontroli nad obserwowalnym źródłem (bez względu na to, czy jest ono zbuforowane czy nie), więc jak zdecydowanie zrezygnować z subskrypcji w ramach onNext?

BTW: Czy anulowanie subskrypcji w ramach onNext jest niewłaściwe?

(My rzeczywisty przypadek użycia jest to, że w onNext ja faktycznie pisania do strumienia wyjściowego, a gdy IOException występuje nic więcej można zapisać do wyjścia, więc trzeba jakoś zatrzymać dalsze przetwarzanie.)

Odpowiedz

3

cache() buforuje wszystko, gdy przybył pierwszy subskrybent i nie daje żadnych opcji, aby zatrzymać go z dołu. Trzeba zatrzymać strumień przed pamięci podręcznej(), aby uniknąć zbyt dużej retencji:

Observable<Integer> source = Observable.range(1, 100); 

PublishSubject<Integer> stop = PublishSubject.create(); 

source 
.doOnNext(v -> System.out.println("Generating " + v)) 
.takeUntil(stop) 
.cache() 
.doOnNext(new Action1<Integer>() { 
    int calls; 
    @Override 
    public void call(Integer t) { 
     System.out.println("Saving " + t); 
     if (++calls == 3) { 
      stop.onNext(1); 
     } 
    } 
}) 
.subscribe(); 

Edit: powyższy przykład nie działa poniżej 1.0.13 więc tutaj jest wersja, która powinna:

SerialSubscription ssub = new SerialSubscription(); 

ConnectableObservable<Integer> co = source 
     .doOnNext(v -> System.out.println("Generating " + v)) 
     .replay(); 

co.doOnNext(v -> { 
    System.out.println("Saving " + v); 
    if (v == 3) { 
     ssub.unsubscribe(); 
    } 
}) 
.subscribe(); 

co.connect(v -> ssub.set(v)); 
+0

Ten kod nie działa zgodnie z oczekiwaniami, drukuje setki razy "Generowanie", a następnie sto razy "Zapisywanie". Chciałbym wykluczyć "Oszczędzanie" tylko cztery razy. –

+0

Uruchomiłem mój przykład, zanim opublikowałem go, aby wypisał 3 wiersze. Czy próbowałeś tego dosłownie, czy próbowałeś wstawić go do swojego przepływu? – akarnokd

+0

Użyłem twojego przykładu dosłownie, po prostu dodano główny i import. Uruchomiłem go na javarx 1.0.12 na dwóch różnych komputerach i zawsze wypisuje "Saving" setki razy. –