2015-05-04 12 views
6

Być może kwestia, to jak moduł node-kafka Używam wdrożył rzeczy, ale być może nie, więc jedziemy ...wydarzenia node.js EventEmitter nie dzielenie pętli zdarzenia

Korzystanie z biblioteki węzła Kafa, Mam problem z subskrypcją zdarzeń consumer.on('message'). Biblioteka używa standardowego modułu events, więc myślę, że to pytanie może być dość ogólne.

Moja rzeczywista struktura kodu jest duża i skomplikowana, więc oto pseudo przykład podstawowego układu, który podkreśla mój problem. (Uwaga: Ten fragment kodu jest niesprawdzone, więc może mam błędy tutaj, ale składnia nie jest pytanie tutaj w każdym razie)

var messageCount = 0; 
var queryCount = 0; 

// Getting messages via some event Emitter 
consumer.on('message', function(message) { 
    message++; 
    console.log('Message #' + message); 

    // Making a database call for each message 
    mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) { 
     queryCount++; 
     console.log('Query #' + queryCount); 
    }); 
}) 

Co widzę tutaj jest przy uruchomieniu serwera, istnieje 100000 lub tak niedokończonej wiadomości, które kafka chce mi przekazać, i robi to poprzez emiter zdarzeń. Więc zaczynam dostawać wiadomości. Aby uzyskać i zalogować wszystkie wiadomości zajmuje około 15 sekund.

to, czego można oczekiwać, aby zobaczyć na wyjściu zakładając zapytania mysql jest dość szybki:

Message #1 
Message #2 
Message #3 
... 
Message #500 
Query #1 
Message #501 
Message #502 
Query #2 
... and so on in some intermingled fashion 

Spodziewam się tego, ponieważ mój pierwszy wynik mysql powinien być gotowy bardzo szybko i spodziewałbym wynik (s), aby włączyć swoją pętlę zdarzeń, aby odpowiedź została przetworzona. To, co otrzymuję, to:

Message #1 
Message #2 
... 
Message #100000 
Query #1 
Query #2 
... 
Query #100000 

Otrzymuję wszystkie wiadomości, zanim możliwe będzie przetworzenie odpowiedzi mysql. Moje pytanie brzmi: dlaczego? Dlaczego nie mogę uzyskać pojedynczego wyniku bazy danych, dopóki wszystkie zdarzenia wiadomości nie zostaną zakończone?

Kolejna uwaga: Ustawiłem punkt przerwania pod .emit('message') w węźle kafka i mysql.query() w moim kodzie i wybieram je na podstawie turowej. Wygląda więc na to, że wszystkie 100 000 emiterów nie gromadzi się z góry przed wejściem do subskrybenta wydarzenia. Moja pierwsza hipoteza dotyczyła tego problemu.

pomysłów i wiedzy byłoby bardzo mile widziane :)

+0

Co stanie się, jeśli zwiększysz liczbę przechowywanych wiadomości do znacznie większej liczby?Czy to możliwe, że twój mysql jest po prostu wolny? – Avery

+0

@ Avery Zastanawiałem się, że, ale kiedy powtórzę to tylko jedną wiadomość do przetworzenia, nie mogę nawet spostrzec opóźnienia odpowiedzi mysql. To wszystko działa również lokalnie. Rzeczywista kwerenda mysql jest bardzo prosta (wystarczy WYBIERZ dla ~ 8 pól z jednego wiersza tabeli i ta tabela ma teraz tylko około 60 wierszy). –

+0

Jeśli ten przykład jest rzeczywiście reprezentatywny dla twojego kodu, to również jestem zagubiony . Czy możesz faktycznie stworzyć ten wynik na tym przykładzie? Nie mam dostępnej instancji MySQL do przetestowania. – Avery

Odpowiedz

2

Kierowca node-kafka używa dość liberalny rozmiar bufora (1M), co oznacza, że ​​będzie zdobyć jak najwięcej wiadomości z Kafki, która zmieści się w buforze. Jeśli serwer jest zaległy i zależy od rozmiaru wiadomości, może to oznaczać (dziesiątki) tysięcy wiadomości przychodzących z jednym żądaniem.

Ponieważ EventEmitter jest synchroniczny (nie używa pętli zdarzeń węzła), oznacza to, że sterownik wyśle ​​(dziesiątki) tysięcy zdarzeń do swoich odbiorców, a ponieważ jest synchroniczny, nie ulegnie Węzeł zdarzeń, dopóki wszystkie wiadomości nie zostaną dostarczone.

Nie sądzę, że można obejść zalew dostaw zdarzeń, ale nie sądzę, że konkretnie dostarczanie zdarzeń jest problematyczne. Bardziej prawdopodobnym problemem jest uruchomienie operacji asynchronicznej (w tym przypadku kwerendy MySQL) dla każdego zdarzenia, które może zalać bazę danych za pomocą zapytań.

Możliwe obejście polegałoby na użyciu kolejki zamiast wykonywania zapytań bezpośrednio z procedur obsługi zdarzeń. Na przykład za pomocą async.queue można ograniczyć liczbę współbieżnych (asynchronicznych) zadań. Część "pracownik" kolejki wykonywałaby kwerendę MySQL, a w programach obsługi zdarzeń po prostu przesyłasz wiadomość do kolejki.

+0

Dzięki @robertklep. Wydam próbkę async.queue. Przekazuję własną kolejkę, aby było tylko jedno zapytanie mysql i zapamiętywanie wyników dla oczekujących żądań do użycia, ale podejrzewam, że dobrze utrzymany/przetestowany moduł będzie lepszy :) –

Powiązane problemy