2015-02-23 18 views
6

Chcę używać RxJava do ładowania danych z usługi internetowej (poprzez Retrofit). Mam również pamięć podręczną bazy danych z poprzednimi wynikami.Android + RxJava - ładowanie danych z db AND web service

Załóżmy mam już obserwable dla każdego z nich:

Observable<List<MyModel>> networkObservable = retrofitService.getModels(); 
Observable<List<MyModel>> dbObservable = database.getModels(); 

chcę połączyć te 2 obserwable do jednego:

public class MyModelHelper { 
    public static Observable<List<MyModel>> getModels() { 
     // TODO: Help! 
    } 
} 

Zachowanie Chcę tylko dla subskrybentów otrzymywać bazy wyniki, gdy tylko będą dostępne, a następnie wyniki restService, gdy zostaną dostarczone (zakładając, że pobranie z bazy danych jest szybsze niż wykonanie połączenia sieciowego).

The najlepiej mogę wymyślić na własną rękę jest:

public class MyModelHelper { 
    public static Observable<List<MyModel>> getModels() { 
     List<MyModel> emptyList = new LinkedList<>(); 

     // 'startWith' because combineLatest wont call back until all source observables emit something 
     Observable.combineLatest(dbObservable.startWith(emptyList), 
      networkObservable.startWith(emptyList), 
      new Func2<List<MyModel>, List<MyModel>, List<MyModel>>() { 
       @Override 
       public List<MyModel> call(List<MyModel> first, List<MyModel> second) { 
        return merge(first, second); 
      } 
     }); 
    } 
} 

To wydaje się nieco hacky do mnie i czuję do takiego wspólnego scenariusza musi istnieć lepsze rozwiązanie.

Byłoby również miło, gdyby zaobserwowano błąd w sieci obserwowalny, że wyniki db nadal będą przekazywane. Mogę zadzwonić pod numer onErrorResumeNext() i zwrócić sam dbObservable, ale chciałbym, aby subskrybenci otrzymali powiadomienie o wystąpieniu błędu.

Wszelkie sugestie?

Odpowiedz

13

Skorzystaj bezpośrednio z Observable.merge. Łączy on kilka obserwowalnych strumieni do jednego, więc jeśli baza danych emituje szybciej, otrzymasz ją jako pierwszą.

public static Observable<List<MyModel>> getModels() { 
    return Observable.merge(dbObservable, networkObservable); 
} 
+1

To wydawało się niemal żenująco oczywiste! Użyłem 'mergeDelayError()', aby upewnić się, że obie obserwowalne mają szansę na powrót. Wielkie dzięki! – DanielGrech

+5

Oprócz tego, 'mergeDelayError' pomoże ci uzyskać pożądane zachowanie błędu. Jeśli jeden z strumieni ma błąd, nie wpłynie on na drugi, dopóki nie zostanie zakończony, po czym możesz obsłużyć błąd za pomocą dowolnego operatora błędu, który chcesz. – lopar

+0

@lopar bardzo dobry punkt. Warto również rzucić okiem na marmurowy diagram dla 'merge' i' mergeDelayError', aby zrozumieć go poprawnie http://reactivex.io/documentation/operators/merge.html – tomrozb

Powiązane problemy