2015-08-31 13 views
9

Mam listę plików, które chciałbym przesłać do backendu z urządzenia z systemem Android. Z powodu ograniczeń pamięci chciałbym wykonać drugie wywołanie API tylko po pierwszym zakończeniu, trzecim po drugim zakończeniu i tak dalej.Wykonywanie N sekwencyjnych wywołań api za pomocą RxJava i Retrofit

pisałem coś

private Observable<Integer> uploadFiles(List<File> files) { 
     return Observable.create(subscriber -> { 
      for (int i = 0, size = files.size(); i < size; i++) { 
       UploadModel uploadModel = new UploadModel(files.get(0)); 
       int uploadResult = retrofitApi.uploadSynchronously(uploadModel); 
       subscriber.onNext(uploadResult); 
      } 
      subscriber.onCompleted(); 
     }).subscribeOn(Schedulers.newThread()); 
    } 

Ale czuję się jak to może być wbrew duchowi Rx, a powiedzenie jest, jeśli używasz Observable.create, jesteś prawdopodobnie robi to źle. .. Czy to rozsądne podejście? Czy jest lepszy sposób na osiągnięcie tego dzięki integracji RxJava Retrofit?

+0

Dlaczego nie zdefiniować w interfejsie API 'uploadSynchronically' na ** not ** return Observable? Innym sposobem jest użycie do blokowania na każdym obserwowalnym. – marwinXXII

+0

Tak właśnie zrobiłem - retrofitApi.uploadSynchronously to blokujące połączenie modernizacyjne, które zwraca int, a nie observing. A jeśli twoje pytanie brzmi, dlaczego nie zrobiłem prostej pętli bez Rx, cóż, byłoby to trudne w obsłudze błędów i aktualizacji postępów UI elegancko, plus jest kilka dodatkowych kroków przed i po tym obserwowalnym w łańcuchu. –

+0

Właściwie to nie, wydaje mi się, że podstawowa pętla jest tu lepsza niż użycie 'map'. – marwinXXII

Odpowiedz

4

naiwnie by to zrobić (nie działa, chociaż, patrz niżej):

return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel)); 

Teraz problemem jest to, że nie ma sposobu, aby powiedzieć modernizację używać tylko jeden wątek dla tych połączeń.

reduce, jednak przekazuje wynik jednego wywołania funkcji do następnego, wraz z następną wysyłaną wartością z oryginalnego obserwowalnego. To zadziała, ale funkcja przekazana do reduce musi być synchroniczna. Niedobrze.

Innym podejściem byłoby zmodyfikować zaobserwować rekurencyjnie:

void getNextFile(int i) { 
    return retrofit.upload(i). 
     onNext(result -> getNextFile(i + 1)); 
} 

mniej. Ale nie jestem pewien, jak to wyczyścić, aby był bardziej czytelny.

Najczystsze Myślę byłoby coś takiego:

Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file))); 
+0

Observable.from jest tym, z czym początkowo miałem do czynienia (ale z obserwowalnym połączeniem retrofitu) i w ten sposób wszystko zostało wywołane równolegle. Czy to blokuje łańcuch przy każdym wywołaniu i emituje wynik do onNext()? –

+0

Nie mogę się dowiedzieć, jak wywołać następne połączenie z poprzedniego, inne niż ponowne tworzenie obserwowalnych za każdym razem. Ostatnia sugestia powinna zadziałać, ponieważ 'map' jest wywoływana tylko w jednym wątku. – njzk2

+0

Twoje ostateczne rozwiązanie działa dokładnie tak, jak powinno, i wygląda o wiele mniej podejrzanie niż moje pierwsze, więc dziękuję! Po prostu potrzebowałem popchnąć mapę do nowego wątku na początku łańcucha, ale wtedy wszystko działało na tym. Dzięki! –

0

tubylców RxJava będzie emitować wszystkie przedmioty na Observable.from(...) jakby równolegle. To najlepszy sposób, by myśleć o tym jako o emisji równoległej. Jednak niektóre przypadki wymagają rzeczywistej konsekwentnej realizacji całego łańcucha. Doszedłem do następującego rozwiązania, prawdopodobnie nie najlepszego, ale działającego.

import rx.Observable; 
import rx.Subscriber; 

import java.util.Iterator; 
import java.util.function.Function; 

public class Rx { 
    public static void ignore(Object arg) { 
    } 

    public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { 
     return Observable.create(collectorSubscriber -> 
       Observable.<Void>create(producerSubscriber -> 
         producerSubscriber.setProducer(ignoredCount -> { 
          if (!iterator.hasNext()) { 
           producerSubscriber.onCompleted(); 
           return; 
          } 

          E model = iterator.next(); 
          action.apply(model) 
            .subscribe(
              Rx::ignore, 
              producerSubscriber::onError, 
              () -> producerSubscriber.onNext(null)); 
         })) 
         .subscribe(new Subscriber<Void>() { 
          @Override 
          public void onStart() { 
           request(1); 
          } 

          @Override 
          public void onCompleted() { 
           collectorSubscriber.onNext(null); 
           collectorSubscriber.onCompleted(); 
          } 

          @Override 
          public void onError(Throwable e) { 
           collectorSubscriber.onError(e); 
          } 

          @Override 
          public void onNext(Void aVoid) { 
           request(1); 
          } 
         })); 
    } 
} 

Przykład użycia może być:

Iterator<? extends Model> iterator = models.iterator(); 

    Rx.sequential(iterator, model -> someFunctionReturnsObservable(model)) 
      .subscribe(...); 

metoda ta gwarantuje łańcuchu Wykonania z

Observable<Dummy> someFunctionReturnsObservable(Model model)

0

Obecnie preferowanym sposobem tworzenia obserwable jest z fromAsync:

Observable.fromAsync(new Action1<AsyncEmitter<Object>>() 
    { 
     @Override 
     public void call(final AsyncEmitter<Object> emitter) 
     { 
      emitter.onNext(object); 
      emitter.onCompleted(); 

      emitter.setCancellation(new AsyncEmitter.Cancellable() 
      { 
       @Override 
       public void cancel() throws Exception 
       { 
        // on unSubscribe() callback 
       } 
      }); 
     } 
    }, AsyncEmitter.BackpressureMode.BUFFER); 
Powiązane problemy