2017-02-17 12 views
13

Robimy odczytać plik XML (przy użyciu xml-stream) z około 500k elementów i wkładać je do MongoDB tak:Jak buforować wstawki MongoDB podczas rozłączania w pliku node.js?

xml.on(`endElement: product`, writeDataToDb.bind(this, "product")); 

Insert w writeDataToDb(type, obj) wygląda następująco:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { }); 

Teraz gdy Mongo połączenie zostanie rozłączone, strumień xml będzie nadal czytany, a konsola zostanie zalana komunikatami o błędach (nie można wstawić, rozłączyć, uszkodzić EPIPE ...).

W docs mówi:

Podczas zamykania procesu mongod, kierowca zatrzymuje operacje przetwarzania i utrzymuje je buforowanie powodu bufferMaxEntries będących -1 domyślnie oznaczającego bufor wszystkie operacje.

Co właściwie robi ten bufor?

Zauważamy kiedy wstawiamy dane i zamykamy serwer mongo, rzeczy się buforują, to przywracamy serwer mongo, macierzysty sterownik z powodzeniem ponownie łączy się i węzeł wznawia wstawianie danych, ale buforowane dokumenty (podczas mongo w trybie offline) nie wstawiaj ponownie.

Więc kwestionuję ten bufor i jego użycie.

Cel:

Szukamy najlepszego sposobu, aby utrzymać wkładki w buforze aż Mongo wraca (w 15000milliseconds według wtimeout) i niech następnie włóż buforowane dokumentów lub wykorzystanie xml.pause(); i xml.resume() który próbowaliśmy bez powodzenia.

Zasadniczo potrzebujemy niewielkiej pomocy w zakresie obsługi rozłączeń bez utraty danych lub przerwania.

+0

nie może replikować to zarówno przykład w docs i testów z użyciem 'xml-stream' włóż buforowane obiekty kiedyś mon Serwer go wraca. Może napiszesz więcej kodu/podasz więcej informacji o swojej konfiguracji? – cviejo

+0

@cviejo Nie mogę udostępnić moich skryptów, ponieważ są związane z firmą, ale czy mógłbyś przesłać mi skrypt, który próbowałeś powielić? Gist/pastebin byłoby w porządku. – DanFromGermany

Odpowiedz

1

Nie wiem konkretnie o sterowniku Mongodb i tym buforze wpisów. Może tylko gromadzi dane w określonych scenariuszach.

Więc odpowiem na to pytanie z bardziej ogólnym podejściem, które może współpracować z dowolną bazą danych.

Podsumowując, masz dwa problemy:

  1. Nie jesteś wychodzenie z nieudanych prób
  2. strumień
  3. XML wysłać dane zbyt szybko

obsłużyć pierwszy problem, należy wdrożyć algorytm ponownej próby, który zapewni wykonanie wielu prób przed poddaniem się.

Aby obsłużyć drugi problem, należy zaimplementować przeciwciśnienie w strumieniu xml. Można to zrobić za pomocą metody pause, metody resume i bufora wejściowego.

var Promise = require('bluebird'); 
var fs = require('fs'); 
var Xml = require('xml-stream'); 

var fileStream = fs.createReadStream('myFile.xml'); 
var xml = new Xml(fileStream); 

// simple exponential retry algorithm based on promises 
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) { 
    var delay = initialDelay; 
    var retry = 0; 
    var closure = function() { 
     return task().catch(function(error) { 
      retry++; 
      if (retry > maxRetry) { 
       throw error 
      } 
      var promise = Promise.delay(delay).then(closure); 
      delay = Math.min(delay * 2, maxDelay); 
      return promise; 
     }) 
    }; 
    return closure(); 
} 

var maxPressure = 100; 
var currentPressure = 0; 
var suspended = false; 
var stopped = false; 
var buffer = []; 

// handle back pressure by storing incoming tasks in the buffer 
// pause the xml stream as soon as we have enough tasks to work on 
// resume it when the buffer is empty 
function writeXmlDataWithBackPressure(product) { 
    // closure used to try to start a task 
    var tryStartTask = function() { 
     // if we have enough tasks running, pause the xml stream 
     if (!stopped && !suspended && currentPressure >= maxPressure) { 
      xml.pause(); 
      suspended = true; 
      console.log("stream paused"); 
     } 
     // if we have room to run tasks 
     if (currentPressure < maxPressure) { 
      // if we have a buffered task, start it 
      // if not, resume the xml stream 
      if (buffer.length > 0) { 
       buffer.shift()(); 
      } else if (!stopped) { 
       try { 
        xml.resume(); 
        suspended = false; 
        console.log("stream resumed"); 
       } catch (e) { 
        // the only way to know if you've reached the end of the stream 
        // xml.on('end') can be triggered BEFORE all handlers are called 
        // probably a bug of xml-stream 
        stopped = true; 
        console.log("stream end"); 
       } 
      } 
     } 
    }; 

    // push the task to the buffer 
    buffer.push(function() { 
     currentPressure++; 
     // use exponential retry to ensure we will try this operation 100 times before giving up 
     exponentialRetry(function() { 
      return writeDataToDb(product) 
     }, 100, 2000, 100).finally(function() { 
      currentPressure--; 
      // a task has just finished, let's try to run a new one 
      tryStartTask(); 
     }); 
    }); 

    // we've just buffered a task, let's try to run it 
    tryStartTask(); 
} 

// write the product to database here :) 
function writeDataToDb(product) { 
    // the following code is here to create random delays and random failures (just for testing) 
    var timeToWrite = Math.random() * 100; 
    var failure = Math.random() > 0.5; 
    return Promise.delay(timeToWrite).then(function() { 
     if (failure) { 
      throw new Error(); 
     } 
     return null; 
    }) 
} 

xml.on('endElement: product', writeXmlDataWithBackPressure); 

Zagraj z tym, dodaj trochę console.log, aby zrozumieć, jak się zachowuje. Mam nadzieję, że to pomoże ci rozwiązać twój problem :)

+0

To jest zasadniczo dobra implementacja, ale miałem nadzieję, że będę w stanie wykorzystać wewnętrzny bufor zapisu/zapisu mongo - zajrzyj do [tej strony] (https://mongodb.github.io/node-mongodb- native/driver-articles/anintroductionto1_4_and_2_6.html) i słowo kluczowe 'bufferMaxEntries'. – DanFromGermany

2

Wkładanie elementów 500K z insertOne() to bardzo zły pomysł. Zamiast tego należy użyć opcji bulk operations, która umożliwia wstawianie wielu dokumentów w jednym żądaniu. (tu na przykład 10000, więc to może być wykonane w 50 pojedynczych wniosków) Aby uniknąć problemu buforowania, można ręcznie obsługiwać go:

  1. Wyłącz buforowanie z bufferMaxEntries: 0
  2. Set ponownego połączenia właściwości: reconnectTries: 30, reconnectInterval: 1000
  3. Utwórz zbiorczą operację i podaj ją za pomocą 10000 elementów.
  4. Zatrzymaj czytnik xml. Spróbuj wstawić 10000 przedmiotów. Jeśli to się nie powiedzie, ponownie co 3000ms aż uda
  5. można napotkać powtarzające zagadnienia tożsamości, jeśli operacja masowy jest przerwany w trakcie realizacji, więc je zignorować (kod błędu: 11000)

Oto przykładowy skrypt:

var fs = require('fs') 
var Xml = require('xml-stream') 

var MongoClient = require('mongodb').MongoClient 
var url = 'mongodb://localhost:27017/test' 

MongoClient.connect(url, { 
    reconnectTries: 30, 
    reconnectInterval: 1000, 
    bufferMaxEntries: 0 
}, function (err, db) { 
    if (err != null) { 
    console.log('connect error: ' + err) 
    } else { 
    var collection = db.collection('product') 
    var bulk = collection.initializeUnorderedBulkOp() 
    var totalSize = 500001 
    var size = 0 

    var fileStream = fs.createReadStream('data.xml') 
    var xml = new Xml(fileStream) 
    xml.on('endElement: product', function (product) { 
     bulk.insert(product) 
     size++ 
     // if we have enough product, save them using bulk insert 
     if (size % 10000 == 0) { 
     xml.pause() 
     bulk.execute(function (err, result) { 
      if (err == null) { 
      bulk = collection.initializeUnorderedBulkOp() 
      console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try') 
      xml.resume() 
      } else { 
      console.log('bulk insert failed: ' + err) 
      counter = 0 
      var retryInsert = setInterval(function() { 
       counter++ 
       bulk.execute(function (err, result) { 
       if (err == null) { 
        clearInterval(retryInsert) 
        bulk = collection.initializeUnorderedBulkOp() 
        console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
        xml.resume() 
       } else if (err.code === 11000) { // ignore duplicate ID error 
        clearInterval(retryInsert) 
        bulk = collection.initializeUnorderedBulkOp() 
        console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
        xml.resume() 
       } else { 
        console.log('failed after first try: ' + counter, 'error: ' + err) 
       } 
       }) 
      }, 3000) // retry every 3000ms until success 
      } 
     }) 
     } else if (size === totalSize) { 
     bulk.execute(function (err, result) { 
      if (err == null) { 
      db.close() 
      } else { 
      console.log('bulk insert failed: ' + err) 
      } 
     }) 
     } 
    }) 
    } 
}) 

próbka wyjściowa dziennika:

doc 0 : 10000 saved on first try 
doc 10000 : 20000 saved on first try 
doc 20000 : 30000 saved on first try 
[...] 
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown 
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0 
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0 
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0 
doc 130000 : 140000 saved after 4 tries 
doc 140000 : 150000 saved on first try 
[...] 
+0

Twoja odpowiedź nie zawiera informacji o buforze zapisu Mongo i nie ma rozwiązania, jak wstawić wszystkie dokumenty, nawet podczas zmiany pozycji w zestawie replik lub rozłączyć. Informacje na temat wkładki luzem są interesujące i zajrzę do tego, dzięki! – DanFromGermany

+0

@DanFromGermany tak, ponieważ dla mnie wygląda na to, że próbujesz rozwiązać niewłaściwy problem: prawdziwy problem polega na tym, że twoje aplikacje zostają odłączone od bazy danych. Mniej połączeń z bazą danych ułatwiłoby automatyczne ponowne łączenie, dlatego nie trzeba buforować zapisu. – felix

+0

Moje aplikacje ** nie ** odłączają się od bazy danych. Chcę napisać aplikacje, które ** w przypadku ** rozłączeń * lub * głównych przełączników w zestawie replik potwierdzają, aby ponownie połączyć i zapisać wszystkie dane. – DanFromGermany