2015-11-22 20 views
5

Mam trzy obserwable które łączę z combineLastest:RxJava Obserwowalne otrzymywać powiadomienia o pierwszej emisji

Observable<String> o1 = Observable.just("1"); 
    Observable<String> o2 = Observable.just("2"); 
    Observable<String> o3 = Observable.just("3"); 

    Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() { 
     @Override 
     public Object call(String s, String s2, String s3) { 
      return null; 
     } 
    }); 

Chcę być informowany o pierwszej emisji jednego z obserwable bez pomijając późniejsze emisje, które myślę, first operator zrobi. Czy istnieje wygodny operator do tego typu (przykład):

o1.doOnFirst(new Func1<String, Void>() { 
     @Override 
     public Void call(String s) { 
      return null; 
     } 
    }) 

Odpowiedz

5

myślę, że można mieć praktyczne doOnFirst z prostym take jeśli jesteś obsługi strumień:

public static <T> Observable<T> withDoOnFirst(Observable<T> source, Action1<T> action) { 
    return source.take(1).doOnNext(action).concatWith(source); 
} 

W ten sposób akcja jest wiązana tylko z pierwszym elementem.

To może być zmieniony do obsługi obserwable które nie poparte strumieni dodając skip pominąć już podjęte elementy:

public static <T> Observable<T> withDoOnFirstNonStream(Observable<T> source, Action1<T> action) { 
    return source.take(1).doOnNext(action).concatWith(source.skip(1)); 
} 
+0

Wymaganie dwóch subskrypcji do źródła jest potencjalnie nieefektywne (nie wiemy, jakie obciążenie może dotyczyć danego obserwowalnego obiektu). Sugerowałbym skorzystanie z odpowiedzi, która korzystała z jednego abonamentu do ogólnego użytku. W przypadku gorącego źródła można również przegapić emisję. –

3

Istnieje kilka rozwiązań, które mogę wymyślić. Pierwszy to brzydki, ale prosty hack doOnNext. Wystarczy dodać pole logiczne do Action1 wskazujące, czy pierwszy element został odebrany. Po otrzymaniu rób to, co chcesz, i przerzuć boolean. Na przykład:

Observable.just("1").doOnNext(new Action1<String>() { 

     boolean first = true; 

     @Override 
     public void call(String t) { 
      if (first) { 
       // Do soemthing 
       first = false; 
      } 
     } 
    }); 

Drugim jest subskrybować dwukrotnie na obserwowalne chcesz monitorować za pomocą publish lub share(), z jednym z tych publikacji przeżywa first (w zależności od tego, czy chcesz, aby ręcznie połączyć się z opublikowanym obserwowalne). Skończysz z dwoma oddzielnymi obserwabli, które emitują te same elementy, tylko pierwszy z nich zatrzyma się po pierwszej emisji:

ConnectableObservable<String> o1 = Observable.just("1").publish(); 

o1.first().subscribe(System.out::println); //Subscirbed only to the first item 
o1.subscribe(System.out::println); //Subscirbed to all items 

o1.connect(); //Connect both subscribers 
+0

Twoja pierwsza sugestia zadziała tylko raz. Lepiej zawijać z opóźnieniem, aby utrzymać stan boolowski, aby uzyskać zachowanie w każdej subskrypcji. –

2

Korzystanie rxjava-extras:

observable 
    .compose(Transformers.doOnFirst(System.out::println)) 

To urządzenie przetestowane i na podstawie okładki po prostu używają licznika na subskrypcję w operatorze. Pamiętaj, że subskrypcja jest ważna, ponieważ istnieje wiele przypadków użycia, w których obserwowalna instancja jest używana więcej niż raz i chcemy, aby operator doOnFirst stosował się za każdym razem.

Kod źródłowy to here.

+1

O rany, jeszcze więcej RxJava! ;) Dzięki. – FWeigl

+0

Więcej RxGoodies! Hurra – Vaiden

Powiązane problemy