2015-08-04 14 views
8

Próbowałem użyć strumienia czytelnego i transformowanego do przetworzenia bardzo dużego pliku. Problem, na który się natknąłem, polega na tym, że jeśli na końcu nie ustawię zapisywalnego strumienia, program zdaje się kończyć, zanim wynik zostanie zwrócony.Strumienie Node.js czytelne do przekształcenia

Przykład: rstream.pipe(split()).pipe(tstream)

My tstream ma nadajnik, który emituje gdy licznik trafienia próg. Kiedy ten próg jest ustawiony na niską liczbę, otrzymuję wynik, ale kiedy jest wysoki, to nie zwraca niczego. Jeśli poprowadzę go do programu piszącego, zawsze zwraca wynik. Czy brakuje mi czegoś oczywistego?

Kod:

// Dependencies 
var fs = require('fs'); 
var rstream = fs.createReadStream('file'); 
var wstream = fs.createWriteStream('output'); 
var split = require('split'); // used for separating stream by new line 
var QTransformStream = require('./transform'); 

var qtransformstream = new QTransformStream(); 
qtransformstream.on('completed', function(result) { 
    console.log('Result: ' + result); 
}); 
exports.getQ = function getQ(filename, callback) { 

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result 
    // rstream.pipe(split()).pipe(qtransformstream); 

    // this always works 
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream); 

}; 

Oto kod na Qtransformstream

// Dependencies 
var Transform = require('stream').Transform, 
    util = require('util'); 
// Constructor, takes in the Quser as an input 
var TransformStream = function(Quser) { 
    // Create this as a Transform Stream 
    Transform.call(this, { 
     objectMode: true 
    }); 
    // Default the Qbase to 32 as an assumption 
    this.Qbase = 32; 
    if (Quser) { 
     this.Quser = Quser; 
    } else { 
     this.Quser = 20; 
    } 
    this.Qpass = this.Quser + this.Qbase; 
    this.Counter = 0; 
    // Variables used as intermediates 
    this.Qmin = 120; 
    this.Qmax = 0; 
}; 
// Extend the transform object 
util.inherits(TransformStream, Transform); 
// The Transformation to get the Qbase and Qpass 
TransformStream.prototype._transform = function(chunk, encoding, callback) { 
    var Qmin = this.Qmin; 
    var Qmax = this.Qmax; 
    var Qbase = this.Qbase; 
    var Quser = this.Quser; 
    this.Counter++; 
    // Stop the stream after 100 reads and emit the data 
    if (this.Counter === 100) { 
     this.emit('completed', this.Qbase, this.Quser); 
    } 
    // do some calcs on this.Qbase 

    this.push('something not important'); 
    callback(); 
}; 
// export the object 
module.exports = TransformStream; 
+0

Czy możesz opublikować kod implementacji 'QTransformStream'? – mscdex

+0

Ile wierszy masz w pliku wejściowym i jaka jest maksymalna wartość licznika w tym przypadku. Jeśli wartość licznika jest większa niż liczba linii, zdarzenie "zakończone" nie będzie emitowane. Musisz także nacisnąć 'null', aby zakończyć strumień. Nie wiem, co masz w "coś nieważnego", ale w pewnym momencie powinno być "null". – hassansin

+0

Istnieje def mniej linii niż licznik, około 7000 linii. To działa, gdy podłączę to do strumienia zapisu. Czy strumień transformacji musi mieć funkcję push (zerową) do działania? – ace040686

Odpowiedz

6

EDIT:

Również nie wiem jak wysoko Twój licznik idzie ale jeśli wypełnij bufor, który przestanie przekazywać dane do strumienia transformacji, w takim przypadku completed nigdy nie zostanie trafiony, ponieważ nie masz r dostać się do limitu licznika. Spróbuj zmienić swój highwatermark.

EDIT 2: A Little Better Objaśnienie

Jak dobrze wiedzieć transform streamjest strumień duplex które w zasadzie oznacza, że ​​może przyjmować dane ze źródła i może wysyłać dane do miejsca przeznaczenia. Jest to zwykle nazywane czytaniem i pisaniem. Model transform stream dziedziczy zarówno z wersji read stream, jak i write stream zaimplementowanej przez Node.js. Jest jedno zastrzeżenie, ale transform streamnie musi implementować funkcji _read ani _write. W tym sensie można myśleć o nim jako o mniej znanym passthrough stream.

Jeśli myślisz o tym, że transform stream implementuje write stream, musisz również pomyśleć o tym, że strumień zapisu ma zawsze miejsce docelowe do zrzutu zawartości. Problem , który masz polega na tym, że gdy tworzysz transform stream, nie możesz określić miejsca, w którym chcesz wysłać swoją zawartość. Jedynym sposobem całkowitego przekazania danych przez strumień transformacji jest przekierowanie go do strumienia zapisu, w przeciwnym razie twoje strumienie zostaną zarchiwizowane i nie będą mogły przyjąć więcej danych, ponieważ nie ma miejsca na dane.

To jest powód, dla którego podczas pipowania do strumienia zapisu zawsze działa. Strumień zapisu zmniejsza kopię zapasową danych, przesyłając dane do miejsca docelowego, dzięki czemu wszystkie dane będą przesyłane strumieniowo, a wydarzenie zostanie zakończone.

Powód, dla którego twój kod działa bez strumienia zapisu, gdy rozmiar próbki jest mały, polega na tym, że nie wypełniasz swojego strumienia, więc strumień transformacji może zaakceptować wystarczającą ilość danych, aby pozwolić na trafienie całego zdarzenia/progu . Gdy próg zwiększa ilość danych, które twój strumień może zaakceptować bez wysyłania go do innego miejsca (strumień zapisu) pozostaje taki sam. Powoduje to utworzenie kopii zapasowej strumienia i nie może już akceptować danych, co oznacza, że ​​ukończone wydarzenie nigdy nie zostanie wyemitowane.

Powiedziałbym, że jeśli zwiększysz swój highwatermark dla strumienia transformacji, będziesz mógł zwiększyć swój próg i nadal będzie działał kod. Ta metoda jest jednak niepoprawna. Rura Twój strumień do strumienia zapisu, który będzie wysyłał dane do dev/null drogę creat że strumień zapisu jest:

var writer = fs.createWriteStream('/dev/null'); 

Sekcja w docs node.js na buffering wyjaśnić ten błąd jest uruchomiony na.

+0

Strumienie w węźle nie są tak proste, jak wyglądają. Chciałbym zobaczyć dobre szczegółowe wyjaśnienie tych subtelności. – thorn

+0

Próbowałem zrobić lepsze wyjaśnienie, daj mi znać, jeśli jest ich część, które nie są jasne. – RadleyMith

1

Nie przerywasz _transform, a proces przebiega daleko. Spróbuj:

this.emit('completed', ...); 
this.end(); 

Dlatego 'program wydaje się zakończyć zanim wynik zostanie zwrócony'

I nie wyprowadzać bezużyteczne dane:

var wstream = fs.createWriteStream('/dev/null'); 

Powodzenia)

1

Proponuję raczej użyć strumienia Writable niż transformacji. Następnie zmień nazwę na _transform na _write, a Twój kod pochłonie strumień, jeśli się do niego dodzwonisz. Strumień transformacji, jak już zaznaczył @Bradgnar, potrzebuje konsumenta lub będzie przesyłać więcej danych do swojego bufora.