2016-03-02 10 views
6

Próbuję użyć RxJS do prostej, krótkiej ankiety. Musi składać żądanie co godzinę delay sekund do lokalizacji path na serwerze, kończąc, gdy jeden z dwóch warunków zostanie osiągnięty: albo wywołanie zwrotne isComplete(data) zwraca wartość true, albo spróbowało serwera więcej niż maxTries. Oto kod podstawowy:RxJS 5.0 Mechanizm "rób tak, jak"

newShortPoll(path, maxTries, delay, isComplete) { 
    return Observable.interval(delay) 
    .take(maxTries) 
    .flatMap((tryNumber) => http.get(path)) 
    .doWhile((data) => !isComplete(data)); 
    } 

Jednak doWhile nie istnieje w RxJS 5,0, a więc stan, w którym może on próbować serwera tylko maxTries prace, dzięki odbioru() rozmowy, ale stan isComplete nie praca. Jak mogę uczynić to tak, aby obserwowalne wartości next() do momentu, aż isComplete zwróci true, w którym to momencie next() będzie wartością i complete().

Należy zauważyć, że takeWhile() nie działa dla mnie tutaj. Nie zwraca ostatniej wartości, która jest rzeczywiście najważniejsza, ponieważ wtedy wiemy, że to zrobione.

Dzięki!

+0

Ewentualnie duplikat: http://stackoverflow.com/questions/36007911/rxjs-poll-until-interval-done-or-correct-data-received –

+0

To nie jest duplikat w tym sensie, że pytanie wymaga zastąpienia "doWhile". –

Odpowiedz

0

Możemy stworzyć funkcję użyteczności, aby utworzyć drugie Obserwowalne, które emituje każdy przedmiot, który emituje wewnętrzny Observable; Jednak będziemy wywołać funkcję onCompleted raz nasz warunek jest spełniony:

function takeUntilInclusive(inner$, predicate) { 
    return Rx.Observable.create(observer => { 
     var subscription = inner$.subscribe(item => { 
      observer.onNext(item); 

      if (predicate(item)) { 
       observer.onCompleted(); 
      } 
     }, observer.onError, observer.onCompleted); 


     return() => { 
      subscription.dispose(); 
     } 
    }); 
} 

A oto szybkie fragment stosując naszą nową metodę użytkowy:

const inner$ = Rx.Observable.range(0, 4); 
const data$ = takeUntilInclusive(inner$, (x) => x > 2); 
data$.subscribe(x => console.log(x)); 

// >> 0 
// >> 1 
// >> 2 
// >> 3 

Ta odpowiedź jest oparta off: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

+0

Dziwnie, to rozwiązanie wydaje się działać o połowę krótszym. Czasami ostateczna partia danych trafia do subskrybentów, czasem po prostu nie. Każdy pomysł, dlaczego? Przejście na switchMap również tego nie naprawiło. –

0

Można to osiągnąć, używając operatorów retry i first.

// helper observable that can return incomplete/complete data or fail. 
 
var server = Rx.Observable.create(function (observer) { 
 
    var x = Math.random(); 
 

 
    if(x < 0.1) { 
 
    observer.next(true); 
 
    } else if (x < 0.5) { 
 
    observer.error("error"); 
 
    } else { 
 
    observer.next(false); 
 
    } 
 
    observer.complete(); 
 

 
    return function() { 
 
    }; 
 
}); 
 
    
 
function isComplete(data) { 
 
    return data; 
 
} 
 
    
 
var delay = 1000; 
 
Rx.Observable.interval(delay) 
 
    .switchMap(() => { 
 
    return server 
 
     .do((data) => { 
 
     console.log('Server returned ' + data); 
 
     },() => { 
 
     console.log('Server threw'); 
 
     }) 
 
     .retry(3); 
 
    }) 
 
    .first((data) => isComplete(data)) 
 
    .subscribe(() => { 
 
    console.log('Got completed value'); 
 
    },() => { 
 
    console.log('Got error'); 
 
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>