Próbowałem użyć strumienia czytelnego i transformowanego do przetworzenia bardzo dużego pliku. Problem, na który się natknąłem, polega na tym, że jeśli na końcu nie ustawię zapisywalnego strumienia, program zdaje się kończyć, zanim wynik zostanie zwrócony.Strumienie Node.js czytelne do przekształcenia
Przykład: rstream.pipe(split()).pipe(tstream)
My tstream
ma nadajnik, który emituje gdy licznik trafienia próg. Kiedy ten próg jest ustawiony na niską liczbę, otrzymuję wynik, ale kiedy jest wysoki, to nie zwraca niczego. Jeśli poprowadzę go do programu piszącego, zawsze zwraca wynik. Czy brakuje mi czegoś oczywistego?
Kod:
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream);
// this always works
rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
Oto kod na Qtransformstream
// Dependencies
var Transform = require('stream').Transform,
util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
// Create this as a Transform Stream
Transform.call(this, {
objectMode: true
});
// Default the Qbase to 32 as an assumption
this.Qbase = 32;
if (Quser) {
this.Quser = Quser;
} else {
this.Quser = 20;
}
this.Qpass = this.Quser + this.Qbase;
this.Counter = 0;
// Variables used as intermediates
this.Qmin = 120;
this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
var Qmin = this.Qmin;
var Qmax = this.Qmax;
var Qbase = this.Qbase;
var Quser = this.Quser;
this.Counter++;
// Stop the stream after 100 reads and emit the data
if (this.Counter === 100) {
this.emit('completed', this.Qbase, this.Quser);
}
// do some calcs on this.Qbase
this.push('something not important');
callback();
};
// export the object
module.exports = TransformStream;
Czy możesz opublikować kod implementacji 'QTransformStream'? – mscdex
Ile wierszy masz w pliku wejściowym i jaka jest maksymalna wartość licznika w tym przypadku. Jeśli wartość licznika jest większa niż liczba linii, zdarzenie "zakończone" nie będzie emitowane. Musisz także nacisnąć 'null', aby zakończyć strumień. Nie wiem, co masz w "coś nieważnego", ale w pewnym momencie powinno być "null". – hassansin
Istnieje def mniej linii niż licznik, około 7000 linii. To działa, gdy podłączę to do strumienia zapisu. Czy strumień transformacji musi mieć funkcję push (zerową) do działania? – ace040686