2015-09-22 11 views
7

Załóżmy, że mam następujący kod RxJava (który uzyskuje dostęp do DB, ale dokładny przypadek użycia nie ma znaczenia):Czy anulowanie subskrypcji jest bezpieczne w aplikacji RxJava?

public Observable<List<DbPlaceDto>> getPlaceByStringId(final List<String> stringIds) { 
    return Observable.create(new Observable.OnSubscribe<List<DbPlaceDto>>() { 
     @Override 
     public void call(Subscriber<? super List<DbPlaceDto>> subscriber) { 
      try { 
       Cursor c = getPlacseDb(stringIds); 

       List<DbPlaceDto> dbPlaceDtoList = new ArrayList<>(); 
       while (c.moveToNext()) { 
        dbPlaceDtoList.add(getDbPlaceDto(c)); 
       } 
       c.close(); 

       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(dbPlaceDtoList); 
        subscriber.onCompleted(); 
       } 
      } catch (Exception e) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(e); 
       } 
      } 
     } 
    }); 
} 

Biorąc pod uwagę ten kod, mam następujące pytania:

  1. jeśli ktoś anulowanie subskrypcji z obserwowalnej zwróconej z tej metody (po poprzedniej subskrypcji), czy ta operacja jest bezpieczna dla wątków? Czy moje kontrole "isUnsubscribed()" są poprawne w tym sensie, niezależnie od harmonogramu?

  2. Czy jest jakiś bardziej przejrzysty sposób z mniejszą liczbą znaków, aby sprawdzić stan wypowiedzenia niż to, czego używam tutaj? Nie mogłem znaleźć niczego w ramach. Myślałem, że SafeSubscriber rozwiązuje problem nieprzesyłania zdarzeń, gdy subskrybent jest zrezygnowany, ale najwyraźniej tak nie jest.

Odpowiedz

8

jest bezpieczny wątku operacja?

Tak. Otrzymujesz rx.Subscriber, który (ostatecznie) sprawdza, czy zmienna boolowska jest ustawiona na true, gdy subskrypcja subskrybenta jest anulowana.

przejrzysty sposób mniej standardowy kod do sprawdzania stanów bez subskrypcji

SyncOnSubscribe i AsyncOnSubscribe (dostępny jako @Experimental api jako wydania 1.0.15) została utworzona w tym przypadku zastosowania. Działają one jako bezpieczna alternatywa dla wywoływania Observable.create. Oto (wymyślony) przykład przypadku synchronicznego.

public static class FooState { 
    public Integer next() { 
     return 1; 
    } 
    public void shutdown() { 

    } 
    public FooState nextState() { 
     return new FooState(); 
    } 
} 
public static void main(String[] args) { 
    OnSubscribe<Integer> sos = SyncOnSubscribe.createStateful(FooState::new, 
      (state, o) -> { 
       o.onNext(state.next()); 
       return state.nextState(); 
      }, 
      state -> state.shutdown()); 
    Observable<Integer> obs = Observable.create(sos); 
} 

Uwaga że następna funkcja SyncOnSubscribe nie wolno nazywać observer.onNext więcej niż raz na iteracji ani nie może zadzwonić do tego obserwatora równocześnie. Oto kilka linków do SyncOnSubscribeimplementation i tests na czubku gałęzi 1.x. Jego podstawowym zastosowaniem jest uproszczenie pisania obserwowalnych, które iterują lub parsują dane synchronicznie i dalejNastępnie w dół, ale robiąc to w ramach obsługującej ciśnienie zwrotne i sprawdzające, czy jest to subskrypcja. Zasadniczo utworzono by funkcję next, która byłaby wywoływana za każdym razem, gdy operatorzy niższego rzędu będą potrzebowali nowego elementu danych w trybie nowszym. Twoja następna funkcja może zadzwonićNastępnie 0 lub 1 raz.

Urządzenie AsyncOnSubscribe zostało zaprojektowane do ładnej zabawy z przeciwciśnieniem w przypadku obserwowalnych źródeł, które działają asynchronicznie (np. Wywołania poza skrzynką). Argumenty do następnej funkcji obejmują liczbę żądań, a pod warunkiem, że obserwowalne powinny zapewnić możliwe do zaobserwowania, że ​​spełnia dane do tej żądanej kwoty. Przykładem tego zachowania będą paginowane zapytania z zewnętrznego źródła danych.

Wcześniej bezpieczną praktyką było przekształcenie numeru OnSubscribe na Iterable i użycie Observable.from(Iterable). Ta implementacja otrzymuje iterator i sprawdza dla ciebie subscriber.isUnsubscribed().

+0

Dzięki, faktycznie odpowiedziałeś na inne moje pytanie dotyczące tworzenia "niestandardowych" obserwowalnych z odpowiednim wsparciem przeciwciśnienia! Sprawdziłem SyncSubscriber i wygląda naprawdę dobrze.W wielu przypadkach przekonwertowałbym operację na Iterable nieco niezręcznie semantycznie, ale wciąż dobrze jest wiedzieć, że możemy w ten sposób uzyskać proste wsparcie przeciwprężne. Widzę jednak, że ta klasa jest nadal oznaczona jako @Experimental, kiedy myślisz, że można ją z grubsza uznać za gotową do produkcji? –

+0

Dobrze słyszeć! Następnym krokiem jest wprowadzenie go do wydania (które powinno wkrótce nastąpić w wersji 1.0.15). Potem zwykle promuje się do statusu @ Beta lub bezpośrednio do statusu publicznego, gdy zyskujemy zaufanie. – Aaron

Powiązane problemy