2014-11-29 11 views
6

Mam strumień z częstymi wartościami i jeden z wolniejszymi. Chcę je łączyć, ale emitować wartość tylko wtedy, gdy emituje wolniejszą. Więc combineLatest nie działa. tak:CombineLatest emituje tylko wtedy, gdy zmienia się jeden ze strumieni

a1 
a2 
b1 
(a2,b1) 
a3 
a4 
a5 
b2 
(a5,b2) 

Obecnie robię to jak następuje, jest tam czystsze sposób?

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = 
    Observable({ o => 
    var last : A 
    fast.subscribe({a => last = a}) 
    slow.subscribe({b => o.onNext((last,b))}) 
    }) 

edit: Ten operator jest teraz w Rx i nazywa withLatestFrom.

+0

Przepraszamy. Moja poprzednia odpowiedź jest błędna. Usunąłem to. – zsxwing

+1

Zadałem właśnie to pytanie na trackerze problemów RxJavy: https://github.com/ReactiveX/RxJava/issues/405 Ale odpowiedzi nie były satysfakcjonujące ... –

Odpowiedz

4

Czego szukasz to kombinator, który nazwałam "CombinePrev", który nie istnieje w API, ale okazuje się bardzo potrzebny w wielu sytuacjach. sample Operator jest blisko, ale nie łączy dwóch strumieni. I've also missed "combinePrev" in RxJS. Okazuje się, że wdrażanie „combinePrev” („withLatest”) jest prosta i po prostu zależy na mapie i przełącznikiem:

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = { 
    val hotSlow = slow.publish.refCount 
    fast.map({a => hotSlow.map({b => (a,b)})}).switch 
} 

Oto jsfiddle Przykładem tego samego operatora, realizowanego w RxJS.

Podczas gdy operator nie jest w Rx, można użyć niejawny klasy, dzięki czemu można używać slow.withLatest(fast):

implicit class RXwithLatest[B](slow: Observable[B]) { 
    def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */ 
} 

Uwaga: slow musi być hot. Jeśli slow jest zimne Obserwowalne, withLatest nie działa.

+0

Miło widzieć, że nie jestem jedyny. Mam nadzieję, że to zostanie dodane do RxJava. Jednak myślę, że w twojej obecnej odpowiedzi 'B' powinno być' slow', a 'A' powinno być' fast'? – dtech

+0

Poprawiono nazwy scali i zmiennych, ale poza tym działa idealnie. Dzięki! – dtech

+0

Tak, B powinien być powolny i szybki, przykro, że nie doprowadziliśmy go do kontekstu. I tak powinno to być dodane do RxJava jako operatora @Beta, sprawdź ten problem z github: https://github.com/ReactiveX/RxJava/issues/405 –

Powiązane problemy