2016-03-10 15 views
13

Środowisko: NodeJS, Express, DynamoDB (ale może być dowolny bazie naprawdę)Jak utworzyć czytelny strumień z asynchronicznym źródłem danych w NodeJs?

Scenariusz: trzeba czytać dużą liczbę rekordów i powrócić do użytkownika w postaci pliku do pobrania. Oznacza to, że nie mogę buforować wszystkich treści naraz, a następnie wysłać je w odpowiedzi od Express. Może też zajść konieczność wykonania kwerendy wiele razy, ponieważ wszystkie dane mogą nie zostać zwrócone w jednym zapytaniu.

Proponowane rozwiązanie: Użyj czytelny strumień, który może być wyprowadzony do strumienia odpowiedzi w Express.

Zacząłem od utworzenia obiektu dziedziczącego ze strumienia. Odtworzono i zaimplementowano metodę _read(), która powoduje wypychanie wyników zapytania. Problem polega na tym, że zapytanie bazy danych wywołane w _read() jest asynchroniczne, ale metoda stream.read() jest metodą synchronizacji.

Gdy strumień jest przesyłany strumieniowo do odpowiedzi serwera, odczyt jest wywoływany kilka razy, zanim zapytanie db nawet ma szansę wykonać. Tak więc kwerenda jest wywoływana wiele razy, a nawet gdy pierwsza instancja zapytania kończy się i wykonuje push (null), pozostałe kwerendy kończą się i pojawia się komunikat "push() after EOF".

  1. Czy istnieje sposób, aby to zrobić poprawnie z _read()?
  2. Czy powinienem zapomnieć o _read() i po prostu wykonać zapytanie, a push() spowoduje utworzenie konstruktora?
  3. Czy należy wykonać zapytanie i emitować zdarzenia danych zamiast funkcji push()?

Dziękuję

function DynamoDbResultStream(query, options){ 
    if(!(this instanceof DynamoDbResultStream)){ 
     return new DynamoDbResultStream(query, options); 
    } 

    Readable.call(this, options); 

    this.dbQuery = query; 
    this.done = false; 
} 
util.inherits(DynamoDbResultStream, Readable); 

DynamoDbResultStream.prototype._read = function(){ 
    var self = this; 
    if(!this.done){ 
     dynamoDB.query(this.dbQuery, function(err, data) { 
      if (!err) { 
       try{ 
        for(i=0;i<data.Items.length;i++){ 
         self.push(data.Items[i]); 
        } 
       }catch(err){ 
        console.log(err); 
       } 
       if (data.LastEvaluatedKey) { 
        //Next read() should invoke the query with a new start key 
        self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey; 
       }else{ 
        self.done=true; 
        self.push(null); 
       } 
      }else{ 
       console.log(err); 
       self.emit('error',err); 
      } 
     }); 
    }else{ 
     self.push(null); 
    } 
}; 

EDIT: Po wysłaniu na to pytanie, Znalazłem ten post z odpowiedzią, która pokazuje, jak to zrobić bez użycia dziedziczenia: How to call an asynchronous function inside a node.js readable stream

A wprowadzono tam komentarz, że wewnątrz _read() powinno być tylko jedno naciśnięcie(). I każde polecenie push() zwykle generuje inwokację read().

+0

można podać przykład kodu piszesz? – mikefrey

+0

Dodałem kod, który mam do tej pory – swbandit

+0

Prawdopodobnie powiązane: http://stackoverflow.com/questions/20058614/stream-from-a-mongodb-cursor-to-express-response-in-node-js – Tomalak

Odpowiedz

2

Bądź świadomy różnymi rodzajami strumienia: https://nodejs.org/api/stream.html#stream_two_modes

const Readable = require('stream').Readable; 

// starts in paused mode 
const readable = new Readable(); 

let i = 0; 
fetchMyAsyncData() { 
    setTimeout(() => { 
    // still remains in paused mode 
    readable.push(++i); 

    if (i === 5) { 
     return readable.emit('end'); 
    } 

    fetchMyAsyncData(); 
    }, 500);  
} 

// "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods." 
app.get('/mystreamingresponse', (req, res) => { 

    // remains in paused mode 
    readable.on('readable',() => res.write(readable.read())); 

    fetchMyAsyncData(); 

    // closes the response stream once all external data arrived 
    readable.on('end',() => res.end()); 
}) 
+0

Próba uruchomienia tego powoduje, że _read nie jest zaimplementowany – atlanteh

Powiązane problemy