2013-05-08 15 views
13
  • 2 strumienie:Łączy dwie (lub n) strumieni

    Biorąc czytelny streamsstream1 i stream2, co to jest idiomatyczne (zwięzły) sposobem uzyskać strumień zawierający stream1 i stream2 łączone?

    Nie mogę wykonać stream1.pipe(outStream); stream2.pipe(outStream), ponieważ wtedy zawartość strumienia jest pomieszana.

  • n strumienie:

    Biorąc pod EventEmitter emitującego nieokreślona liczba strumieni, na przykład

    eventEmitter.emit('stream', stream1) 
    eventEmitter.emit('stream', stream2) 
    eventEmitter.emit('stream', stream3) 
    ... 
    eventEmitter.emit('end') 
    

    co to jest idiomatyczne (zwięzły) sposobem uzyskać strumień ze wszystkich strumieni sklejone?

Odpowiedz

11

Pakiet combined-stream łączy strumienie. Przykład z README:

var CombinedStream = require('combined-stream'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 

Uważam, że musisz dodać wszystkie strumienie naraz. Jeśli kolejka jest pusta, automatycznie kończy się combinedStream. Zobacz issue #5.

Biblioteka stream-stream jest alternatywą, która ma wyraźne oznaczenie .end, ale jest znacznie mniej popularna i prawdopodobnie nie jest tak dobrze przetestowana. Używa API streams2 węzła 0.10 (patrz this discussion).

3

Możesz być w stanie uczynić go bardziej zwięzły, ale tutaj jest jeden, który działa:

var util = require('util'); 
var EventEmitter = require('events').EventEmitter; 

function ConcatStream(streamStream) { 
    EventEmitter.call(this); 
    var isStreaming = false, 
    streamsEnded = false, 
    that = this; 

    var streams = []; 
    streamStream.on('stream', function(stream){ 
    stream.pause(); 
    streams.push(stream); 
    ensureState(); 
    }); 

    streamStream.on('end', function() { 
    streamsEnded = true; 
    ensureState(); 
    }); 

    var ensureState = function() { 
    if(isStreaming) return; 
    if(streams.length == 0) { 
     if(streamsEnded) 
     that.emit('end'); 
     return; 
    } 
    isStreaming = true; 
    streams[0].on('data', onData); 
    streams[0].on('end', onEnd); 
    streams[0].resume(); 
    }; 

    var onData = function(data) { 
    that.emit('data', data); 
    }; 

    var onEnd = function() { 
    isStreaming = false; 
    streams[0].removeAllListeners('data'); 
    streams[0].removeAllListeners('end'); 
    streams.shift(); 
    ensureState(); 
    }; 
} 

util.inherits(ConcatStream, EventEmitter); 

Mamy śledzić stan z streams (kolejkę strumieni; push do tyłu i shift od z przodu), isStreaming i streamsEnded. Kiedy dostaniemy nowy strumień, popychamy go, a gdy skończy się strumień, przestajemy go słuchać i przesuwać. Po zakończeniu strumienia strumieni ustawiamy streamsEnded.

Na każdym z tych zdarzeń sprawdzamy stan, w którym się znajdujemy. Jeśli jesteśmy już streamowani (przesyłamy strumień), nie robimy nic. Jeśli kolejka jest pusta i ustawiona jest streamsEnded, emitujemy zdarzenie end. Jeśli coś jest w kolejce, wznawiamy je i słuchamy jego zdarzeń.

* Należy pamiętać, że pause i resume mają charakter doradczy, więc niektóre strumienie mogą nie działać poprawnie i wymagać buforowania. To ćwiczenie należy do czytelnika.

Uczyniwszy to wszystko, chciałbym zrobić sprawę n=2 przez konstruowania EventEmitter, tworząc ConcatStream z nim, i emitując dwa stream wydarzenia następnie zdarzenia end. Jestem pewien, że można to zrobić bardziej zwięźle, ale równie dobrze możemy wykorzystać to, co mamy.

+0

Dzięki Aaron! Miałem nadzieję, że będzie istniejąca biblioteka, więc mogę rozwiązać ją w trzech linijkach. Jeśli nie, myślę, że mogę wyodrębnić twoje rozwiązanie do paczki. Czy mogę użyć Twojego kodu na licencji MIT? –

+0

Ah, odnaleziono bibliotekę strumieni strumieniowych. Zobacz moją odpowiedź. –

+0

@JoLiss Również najpierw szukałem czegoś, ale nie znalazłem tej opcji. Z pewnością możesz użyć mojego kodu w bibliotece, jeśli nadal chcesz. –

1

streamee.js to zestaw transformatorów strumieniowych i kompozytorów opartych na węźle 1.0+ strumieni i obejmują sposób oddzielając:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]); 
+0

Dzięki, sprawdzę to. To Węzeł 0.10 Zakładam? –

+0

Tak Węzeł 0.10, ale można zawijać strumienie w starym stylu do strumieni 0.10+, jak napisano w README – atamborrino

2

https://github.com/joepie91/node-combined-stream2 jest Streams2 zgodny wymiana spadek w modułu łączy strumienia (który jest opisany powyżej.) Automatycznie owija Streams1 strumienie.

kod Przykład skojarzonym stream2:

var CombinedStream = require('combined-stream2'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 
3

może być wykonane z wanilii nodejs

import { PassThrough } from 'stream' 
const merge = (...streams) => { 
    let pass = new PassThrough() 
    let waiting = streams.length 
    for (let stream of streams) { 
     pass = stream.pipe(pass, {end: false}) 
     stream.once('end',() => --waiting === 0 && pass.emit('end')) 
    } 
    return pass 
} 
5

prosty reduce operacja powinna być dobrze w nodejs!

const {PassThrough} = require('stream') 

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { 
    s.pipe(pt, {end: false}) 
    s.once('end',() => a.every(s => s.ended) && pt.emit('end')) 
    return pt 
}, new PassThrough()) 

Pozdrawiam;)

+0

Czy nie powinieneś zwracać czegoś z redukcji? Wygląda na to, że 'join' będzie niezdefiniowany. Naprawiono –

+1

. dzięki;) @Mark_M – Ivo

+0

OSTRZEŻENIE: Spowoduje to równoległe przesyłanie wszystkich strumieni do strumienia PassThrough, bez żadnych szacunków dotyczących kolejności danych, co bardziej niż prawdopodobne uszkodzenie danych. –