2016-02-03 13 views
5

Jestem nowy w RxJS i zastanawiałem się, czy ktoś mógłby mi pomóc.Synchroniczny strumień odpowiedzi ze strumienia żądań z RxJS

Chcę utworzyć synchroniczny strumień odpowiedzi (najlepiej z odpowiednimi żądaniami) ze strumienia żądań (dane ładunku).

Po prostu chcę, aby żądania były wysyłane jeden po drugim, każdy czeka na odpowiedź od ostatniego.

próbowałem tego, ale wysyła wszystko naraz (jsbin):

var requestStream, responseStream; 
 
requestStream = Rx.Observable.from(['a','b','c','d','e']); 
 

 
responseStream = requestStream.flatMap(
 
    sendRequest, 
 
    (val, response)=>{ return {val, response}; } 
 
); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('result for '+val);},1000); 
 
    }); 
 
};

następujące prace, do pewnego stopnia, ale nie stosować strumienia za dane zamówienie (jsbin).

var data, responseStream; 
 
data = ['a','b','c','d','e']; 
 
responseStream = Rx.Observable.create(observer=>{ 
 
    var sendNext = function(){ 
 
    var val = data.shift(); 
 
    if (!val) { 
 
     observer.onCompleted(); 
 
     return; 
 
    } 
 
    sendRequest(val).then(response=>{ 
 
     observer.onNext({val, response}); 
 
     sendNext(); 
 
    }); 
 
    }; 
 
    sendNext(); 
 
}); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500); 
 
    }); 
 
};

Dziękujemy!

EDIT:

Właśnie w celu wyjaśnienia, to jest to, co chciałem osiągnąć:

„Wyślij, po otrzymaniu odpowiedzi na A, wysłać B, po otrzymaniu odpowiedzi na B, wysłać C, itd ...”

Korzystanie concatMap i odroczenia, jak sugeruje user3743222, wydaje się, aby to zrobić (jsbin):

responseStream = requestStream.concatMap(
    (val)=>{ 
    return Rx.Observable.defer(()=>{ 
     return sendRequest(val); 
    }); 
    }, 
    (val, response)=>{ return {val, response}; } 
); 

Odpowiedz

3

Spróbuj wymienić flatMap z concatMap w swoim pierwszym przykładzie kodu i daj mi znać, jeśli wynikowe zachowanie odpowiada temu, czego szukasz.

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    sendRequest, 
    (val, response)=>{ return {val, response}; } 
); 

Zasadniczo concatMap ma podobną sygnaturę niż flatMap, różnica w zachowaniu istoty, która będzie czekać na prąd obserwowalne są spłaszczone, aby zakończyć przed przejściem do następnego. A więc tutaj:

  • Wartość zostanie przesłana do operatora concatMap.
  • operator concatMap będzie generować sendRequest zaobserwować, a co wartości z tego zaobserwować (wydaje się krotki (val, response)) są przekazywane za pomocą funkcji sterującej, a wynik przedmiot, który zostanie przekazany za
  • kiedy to sendRequest kończy się przetwarzanie kolejnej wartości requestStream.
  • W skrócie, wasze prośby będą przetwarzane jeden po drugim

Alternatywnie, może chcesz używać defer odroczyć wykonanie sendRequest.

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    function(x){return Rx.Observable.defer(function(){return sendRequest(x);})}, 
    (val, response)=>{ return {val, response}; } 
); 
+0

Dziękuję za odpowiedź. Próbowałem Twojego rozwiązania, ale żądania są nadal wysyłane natychmiast. Dokumentacja sugeruje, że flatMap może powodować przeplatanie, podczas gdy concatMap nie. Wydaje się, że różnica polega na porządkowaniu. Używanie concatMap ma sens, ale nadal nie wywołuje pożądanego zachowania: Wyślij A, gdy otrzymasz odpowiedź dla A, wyślij B, gdy otrzymasz odpowiedź dla B, wyślij C itd. – jamesref

+0

Może źle zrozumiałem to, co chciałeś. Czy możesz spróbować w takim przypadku "odroczyć"? Zaktualizuję kod – user3743222

+0

Dziękujemy! Wygląda na to że działa. – jamesref

Powiązane problemy