2014-09-03 16 views
7

Mam dwie obserwowalne (o nazwie A i B dla uproszczenia) i jeden subskrybent. Tak więc subskrybent subskrybuje A, a jeśli wystąpi błąd na A, wtedy B (który jest awaryjnym) wciąga. Teraz, gdy A trafi błąd B zostanie wywołany dobrze, jednak wywoła on onComplete() na subskrybencie, więc odpowiedź B nigdy nie dociera do subskrybenta, nawet jeśli wykonanie B zakończyło się powodzeniem.RxJava onErrorResumeNext()

Czy to normalne zachowanie? Myślałem, że onErrorResumeNext() powinien kontynuować strumień i powiadomić subskrybenta po zakończeniu, zgodnie z zapisem w dokumentacji (https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext).

To jest ogólna struktura, co robię (pominięto kilka „nudny” kod):

public Observable<ModelA> observeGetAPI(){ 
    return retrofitAPI.getObservableAPI1() 
      .flatMap(observableApi1Response -> { 
       ModelA model = new ModelA(); 

       model.setApi1Response(observableApi1Response); 

       return retrofitAPI.getObservableAPI2() 
         .map(observableApi2Response -> { 
          // Blah blah blah... 
          return model; 
         }) 
         .onErrorResumeNext(observeGetAPIFallback(model)) 
         .subscribeOn(Schedulers.newThread()) 
      }) 
      .onErrorReturn(throwable -> { 
       // Blah blah blah... 
       return model; 
      }) 
      .subscribeOn(Schedulers.newThread()); 
} 

private Observable<ModelA> observeGetAPIFallback(ModelA model){ 
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> { 
     // Blah blah blah... 
     return model; 
    }).onErrorReturn(throwable -> { 
     // Blah blah blah... 
     return model; 
    }) 
    .subscribeOn(Schedulers.immediate()); 
} 

Subscription subscription; 
subscription = observeGetAPI.subscribe(ModelA -> { 
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE... 
}, throwable ->{ 
    // WE NEVER GET HERE... onErrorResumeNext() 
}, 
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED } 
); 

pomysłów, co robię źle?

Dzięki!

EDIT: Oto szorstki kalendarium, co się dzieje:

---> HTTP GET REQUEST B 
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS) 

---> HTTP GET REQUEST A 
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!) 

---> HTTP GET FALLBACK A 
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time. 
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS) 

A oto link do prostego schematu zrobiłem, które reprezentują to, co chcę się zdarzyć: Diagram

+0

Twoja oś czasu pokazuje HTTP 200 dla odpowiedzi awarii. Czy istnieje inny sposób sygnalizowania błędu z getObservableAPI2()? Czy możesz też określić, które żądania interfejsu API odpowiadają wynikom osi czasu? Wygląda na to, że getObservableAPI1-> REQUEST B, getObservableAPI2-> REQUEST A, getObservableAPI3-> FALLBACK A, ale chcę się tylko upewnić. – kjones

+0

Tak, faktycznie, mimo że odpowiedź ma wartość 200, niektóre dane mogą mieć wartość null, więc zgłaszam błędy i błędy w tych scenariuszach. I tak, to jest relacja między osią czasu a żądaniami, będę edytować pytanie JAK NAJSZYBCIEJ, aby dopasować żądanie osi czasu jako twoje. – mradzinski

+0

Twoja logika wygląda na dźwięk. Powinieneś otrzymać odpowiedź awaryjną przed onComplete. Czy możesz usunąć wszystkie wywołania subscribeOn() i zobaczyć, co się stanie. Nie powinny one być konieczne, ponieważ Retrofit i tak wykonuje żądania we własnym puli wątków. – kjones

Odpowiedz

5

RX Połączenia używane poniżej powinny symulować to, co robisz z Retrofit.

fallbackObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting A Fallback"); 
         subscriber.onNext("A Fallback"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Fallback Error"); 
         return "Fallback Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.immediate()); 

stringObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting B"); 
         subscriber.onNext("B"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .flatMap(new Func1<String, Observable<String>>() { 
        @Override 
        public Observable<String> call(String s) { 
         logger.v("flatMapping B"); 
         return Observable 
           .create(new Observable.OnSubscribe<String>() { 
            @Override 
            public void call(Subscriber<? super String> subscriber) { 
             logger.v("emitting A"); 
             subscriber.onNext("A"); 
             subscriber.onCompleted(); 
            } 
           }) 
           .delay(1, TimeUnit.SECONDS) 
           .map(new Func1<String, String>() { 
            @Override 
            public String call(String s) { 
             logger.v("A completes but contains invalid data - throwing error"); 
             throw new NotImplementedException("YUCK!"); 
            } 
           }) 
           .onErrorResumeNext(fallbackObservable) 
           .subscribeOn(Schedulers.newThread()); 
        } 
       }) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Return Error"); 
         return "Return Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.newThread()); 

subscription = stringObservable.subscribe(
     new Action1<String>() { 
      @Override 
      public void call(String s) { 
       logger.v("onNext " + s); 
      } 
     }, 
     new Action1<Throwable>() { 
      @Override 
      public void call(Throwable throwable) { 
       logger.v("onError"); 
      } 
     }, 
     new Action0() { 
      @Override 
      public void call() { 
       logger.v("onCompleted"); 
      } 
     }); 

Wyjście ze sprawozdania z bali jest:

 
RxNewThreadScheduler-1 emitting B 
RxComputationThreadPool-1 flatMapping B 
RxNewThreadScheduler-2 emitting A 
RxComputationThreadPool-2 A completes but contains invalid data - throwing error 
RxComputationThreadPool-2 emitting A Fallback 
RxComputationThreadPool-1 onNext A Fallback 
RxComputationThreadPool-1 onCompleted 

Wydaje się to, czego szukasz, ale może nie jestem czegoś brakuje.