Próbuję użyć RxJS do napisania skryptu do przetworzenia kilkuset plików dziennika, z których każdy ma około 1 GB. Szkielet skryptu wyglądaJak ograniczyć współbieżność płaskiej mapy?
Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
return Rx.Node.fromReadStream(logFilePath)
.filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)
Kod działa, ale zauważ, że etap filtrowania wszystkich plików dziennika zaczną równocześnie. Jednak z perspektywy wydajności systemu plików IO lepiej jest przetworzyć jeden plik po drugim (lub przynajmniej ograniczyć współbieżność do kilku plików zamiast otwierać wszystkie setki plików w tym samym czasie). W związku z tym, jak mogę go wdrożyć w "funkcjonalny sposób reaktywny"?
Myślałem o harmonogramie, ale nie mogłem wymyślić, jak może mu w tym pomóc.
Mam to samo pytanie, ale z Rx.NET. Czy to możliwe? http://stackoverflow.com/questions/37345516/limiting-concurrent-requests-using-rx-and-selectmany – SuperJMN