2016-01-25 11 views
11

Mam bardzo prosty timeInterval obserwowalny i chcę rozpocząć/zakończyć transmisję bez odłączania abonentów (który powinien siedzieć i czekać bez względu na obserwowalny status). Jest to możliwe, a jeśli tak, to w jaki sposób?Jak uruchomić i zatrzymać interwał obserwowalny w RXJS?

var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .take(10); 

    var subscription = source.subscribe(
    function (x) { 
    $("#result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#result").append('Error: ' + err); 
    }, 
    function() { 
    $("#result").append('Completed'); 
    }); 

Komentarz ogólny: większość przykładów, które widziałem, pokazuje, jak definiować obserwowalne i subskrybentów. jak mam wpływ na zachowanie istniejących obiektów?

Odpowiedz

14

Zależy od źródła sygnału zatrzymania/wznowienia. Najprostszy sposób, jaki mogę wymyślić, to pausable operator, który, jak mówi dokumentacja, działa lepiej z gorącymi obserwatorami. Tak więc w poniższym przykładowym kodzie usunąłem take(10) (Twój nieprzemyślany sygnał teraz przechodzi przez temat pauser) i dodano share, aby uczynić obserwowalny na gorącym.

var pauser = new Rx.Subject(); 
var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .share() 
    .pausable(pauser); 

var subscription = source.subscribe(
    function (x) { 
    $("#result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#result").append('Error: ' + err); 
    }, 
    function() { 
    $("#result").append('Completed'); 
}); 

    // To begin the flow 
pauser.onNext(true); // or source.resume(); 

// To pause the flow at any point 
pauser.onNext(false); // or source.pause(); 

Oto more sophisticated example który będzie wstrzymać źródła co 10 przedmiotów

// Helper functions 
function emits (who, who_) {return function (x) { 
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n"); 
};} 

var pauser = new Rx.Subject(); 
var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .share(); 
var pausableSource = source 
    .pausable(pauser); 

source 
    .scan(function (acc, _){return acc+1}, 0) 
    .map(function(counter){return !!(parseInt(counter/10) % 2)}) 
    .do(emits(ta_validation, 'scan')) 
    .subscribe(pauser); 

var subscription = pausableSource.subscribe(
    function (x) { 
    $("#ta_result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#ta_result").append('Error: ' + err); 
    }, 
    function() { 
    $("#ta_result").append('Completed'); 
}); 

Trzeba teraz twoja odpowiedź na drugie pytanie. Połącz obserwowalne, które otrzymałeś, z odpowiednimi operatorami RxJS, aby zrealizować swój przypadek użycia. Oto co tutaj zrobiłem.

+1

dzięki za odpowiedź. ive dodano przycisk do pauzy/odtwarzania i zauważam, że strumień jest resetowany przy każdym wznowieniu, więc nie robi dokładnie tego, czego potrzebuję, który jest zamrożeniem/wznowieniem bez restartu. –

+0

Czy znasz różnicę między obserwowanym ciepłem a zimnem. Zajrzyj tutaj: http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444. Jeśli Twój strumień jest resetowany, prawdopodobnie oznacza to, że używasz zimnego obserwowalnego, tj. Obserwowalnego, który jest "resetowany" za każdym razem, gdy miał nowego subskrybenta. Jedynym sposobem, aby się upewnić, jest zobaczenie kodu. W moim przykładzie dodałem '.share()', aby upewnić się, że strumień był gorący, zanim przekazałem go do operatora 'pausable'. W przeciwnym razie miałbym te same problemy, co teraz. – user3743222

Powiązane problemy