2014-09-30 16 views
5

Próbuję użyć RxJS do napisania skryptu do przetworzenia kilkuset plików dziennika, z których każdy ma około 1 GB. Szkielet skryptu wyglądaJak ograniczyć współbieżność płaskiej mapy?

Rx.Observable.from(arrayOfLogFilePath) 
.flatMap(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

Kod działa, ale zauważ, że etap filtrowania wszystkich plików dziennika zaczną równocześnie. Jednak z perspektywy wydajności systemu plików IO lepiej jest przetworzyć jeden plik po drugim (lub przynajmniej ograniczyć współbieżność do kilku plików zamiast otwierać wszystkie setki plików w tym samym czasie). W związku z tym, jak mogę go wdrożyć w "funkcjonalny sposób reaktywny"?

Myślałem o harmonogramie, ale nie mogłem wymyślić, jak może mu w tym pomóc.

+0

Mam to samo pytanie, ale z Rx.NET. Czy to możliwe? http://stackoverflow.com/questions/37345516/limiting-concurrent-requests-using-rx-and-selectmany – SuperJMN

Odpowiedz

12

Możesz użyć .merge(maxConcurrent), aby ograniczyć współbieżność. Ponieważ .merge(maxConcurrent) spłaszcza metaobserwowalne (możliwe do zaobserwowania) w obserwowalne, należy zamienić .flatMap na .map, tak aby wyjście było metaobserwowalne ("unflat"), a następnie wywoływane jest .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath) 
.map(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

Ten kod nie był testowany (ponieważ nie mam dostępu do posiadanego środowiska programistycznego), ale jak to zrobić. RxJS nie ma wielu operatorów o parametrach współbieżności, ale prawie zawsze możesz zrobić to, czego potrzebujesz, używając .merge(maxConcurrent).

+1

To jest dokładnie to, co próbuję dostać do pracy. Mam listę 500 adresów URL do załadowania i nie chcę uruchamiać wszystkich żądań w tym samym czasie. Użyłem mapy (5), ale nie działa ... Wszystkie wnioski są wysyłane w tym samym czasie. – Roaders

+0

@Roaders czy to rozwiązanie działa? Próbuję tego samego. Ale wszystkie żądania są uruchamiane w tym samym czasie. Wszedłem w Google i nie znalazłem nic. – Diego

+0

Jeśli na przykład wykonujesz asynchroniczne wywołanie http, musisz zawinąć je w Rx.defer(), aby Rx mógł zdecydować, kiedy wywołanie zostanie wykonane (i ponowić je, jeśli na przykład nie powiedzie się) – Roaders

0

Właśnie rozwiązałem podobny problem z RxJs 5, więc mam nadzieję, że rozwiązanie pomoże innym z podobnym problemem.

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more), 
 
// retry two times, push error on stream if retry fails. 
 

 
//const Rx = require('rxjs-es6/Rx'); 
 

 
// -- Global variabel just to show that it works. -- 
 
let parallelRequests = 0; 
 
// -------------------------------------------------- 
 

 
function simulateRequest(req) { 
 
    console.log("Request " + req); 
 
    // --- To log retries --- 
 
    var retry = 0; 
 
    // ---------------------- 
 

 
    // Can't retry a promise, need to restart before the promise is made. 
 
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => { 
 

 
     var random = Math.floor(Math.random() * 2000); 
 
     // -- To show that it works -- 
 
     if (retry) { 
 
      console.log("Retrying request " + req + " ,retry " + retry); 
 
     } else { 
 

 
      parallelRequests++; 
 
     } 
 
     // --------------------------- 
 
     setTimeout(() => { 
 
      if (random < 900) { 
 
       retry++; 
 
       return reject(req + " !!!FAILED!!!"); 
 
      } 
 

 
      return resolve(req); 
 
     }, random); 
 
    })).retry(2).catch(e => Rx.Observable.of(e)); 
 
} 
 

 
Rx.Observable.range(1, 10) 
 
    .flatMap(e => simulateRequest(e), null, 2) 
 
    // -- To show that it works -- 
 
    .do(() => { 
 
     console.log("ParallelRequests " + parallelRequests); 
 
     parallelRequests--; 
 
    }) 
 
    // --------------------------- 
 
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>