Zastanawiam się, czy jest to legalne zadzwonić unsubscribe
od wewnątrz onNext
obsługi tak:RxJava: wywołanie wypisz od wewnątrz onNext
List<Integer> gatheredItems = new ArrayList<>();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
public void onNext(Integer item) {
gatheredItems.add(item);
if (item == 3) {
unsubscribe();
}
}
public void onCompleted() {
// noop
}
public void onError(Throwable sourceError) {
// noop
}
};
Observable<Integer> source = Observable.range(0,100);
source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);
Powyższy kod poprawnie wyprowadza że zaledwie cztery elementy zostały zebrane: [0, 1, 2, 3]
. Ale jeśli ktoś zmieni źródło obserwowalne, które ma być buforowane:
Observable<Integer> source = Observable.range(0,100).cache();
Następnie zebrano wszystkie sto elementów. Nie mam kontroli nad obserwowalnym źródłem (bez względu na to, czy jest ono zbuforowane czy nie), więc jak zdecydowanie zrezygnować z subskrypcji w ramach onNext
?
BTW: Czy anulowanie subskrypcji w ramach onNext
jest niewłaściwe?
(My rzeczywisty przypadek użycia jest to, że w onNext
ja faktycznie pisania do strumienia wyjściowego, a gdy IOException
występuje nic więcej można zapisać do wyjścia, więc trzeba jakoś zatrzymać dalsze przetwarzanie.)
Ten kod nie działa zgodnie z oczekiwaniami, drukuje setki razy "Generowanie", a następnie sto razy "Zapisywanie". Chciałbym wykluczyć "Oszczędzanie" tylko cztery razy. –
Uruchomiłem mój przykład, zanim opublikowałem go, aby wypisał 3 wiersze. Czy próbowałeś tego dosłownie, czy próbowałeś wstawić go do swojego przepływu? – akarnokd
Użyłem twojego przykładu dosłownie, po prostu dodano główny i import. Uruchomiłem go na javarx 1.0.12 na dwóch różnych komputerach i zawsze wypisuje "Saving" setki razy. –