2012-09-06 13 views
8

Mam aplikację, która reaguje na wiadomości wysyłane przez klientów. Jedna wiadomość to reload_credentials, że aplikacja otrzymuje w momencie rejestracji nowego klienta. Ta wiadomość następnie połączy się z bazą danych PostgreSQL, wykona zapytanie dla wszystkich poświadczeń, a następnie zapisze je w zwykłym haśle Ruby (client_id => client_token).Jak powinienem obsługiwać ten przypadek użycia przy użyciu EventMachine?

Niektóre inne wiadomości, które może otrzymać aplikacja, to: start, stop, pause, które służą do śledzenia niektórych czasów sesji. Chodzi mi o to, że ja sobie wyobrazić aplikacja działa w następujący sposób:

  • klient wysyła wiadomość
  • wiadomość zostanie kolejce
  • kolejka jest przetwarzane

Jednakże, na przykład, don nie chce blokować reaktora. Ponadto, wyobraźmy sobie, że mam wiadomość reload_credentials, która jest następna w kolejce. Nie chcę, aby jakiekolwiek inne wiadomości z kolejki były przetwarzane, dopóki referencje nie zostaną przeładowane z bazy danych. Ponadto podczas przetwarzania pewnej wiadomości (takiej jak oczekiwanie na zakończenie zapytania dotyczącego danych logowania) chcę zezwolić na kolejną wiadomość.

Czy mógłbyś poprowadzić mnie do rozwiązania tego problemu? Myślę, że będę musiał użyć em-synchrony, ale nie jestem pewien.

Odpowiedz

7

Użyj jednego z sterowników Postgreql EM lub EM.defer, aby nie blokować reaktora.

Po wyświetleniu komunikatu "reload_credentials" po prostu odwróć flagę, która spowoduje, że wszystkie kolejne wiadomości będą skolumnowane. Po zakończeniu "reload_credentials", przetwarzaj wszystkie wiadomości z kolejki. Po pustej kolejce odwróć flagę, która spowoduje przetworzenie komunikatów po ich otrzymaniu.

sterowniki EM dla PostgreSQL są wymienione tutaj: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server 
    def post_init 
    @queue    = [] 
    @loading_credentials = false 
    end 

    def recieve_message(type, data) 
    return @queue << [type, data] if @loading_credentials || [email protected]? 
    return process_msg(type, data) unless :reload_credentials == type 
    @loading_credentials = true 
    reload_credentials do 
     @loading_credentials = false 
     process_queue 
    end 
    end 

    def reload_credentials(&when_done) 
    EM.defer(proc { query_and_load_credentials }, when_done) 
    end 


    def process_queue 
    while (type, data = @queue.shift) 
     process_msg(type, data) 
    end 
    end 

    # lots of other methods 
end 

EM.start_server(HOST, PORT, Server) 

Jeśli chcesz wszystkie połączenia w kolejce wiadomości ilekroć jakiś związek otrzymuje wiadomość „reload_connections” będziesz musiał koordynować poprzez eigenclass.

+0

Jednak wiadomość reload_credentials mogła zostać odebrana wiele razy. Czy nie powinno być 2 nitek? Jeden, który utrzymuje kolejkę i przetwarza? – Geo

+0

Tak, jeśli reload_credentials zostanie odebrany podczas przetwarzania kolejnego reload_credentials, zostanie umieszczony w kolejce tak jak inne wiadomości. – simulacre

+0

Wiele komunikatów reload_credentials powinno być traktowanych jak pierwsze. Umieszczając reload_credentials w bloku EM.defer, uruchamiasz go w innym wątku. Tak długo, jak kod "przetwarzania" nie będzie blokowany, będziesz otrzymywać wiadomości. Korzystaj z bibliotek kompatybilnych z EM, aby nie blokować. Alternatywnie można użyć EM.defer do przetwarzania. – simulacre

4

Poniżej przypuszczam, coś bieżącej realizacji:

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 


    q = EM::Queue.new 

    workers = Array.new(10) { Worker.new q } 

Problem wyżej, jeśli cię rozumiem poprawnie, jest to, że nie chcą pracowników pracujących na nowych miejsc pracy (Praca że dotarły wcześniej w harmonogramie producenta), niż jakiekolwiek zadania reload_credentials. Poniższe powinny obsługiwać to (dodatkowe słowa ostrzeżenia na końcu).

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 

    class LockingDispatcher 
     def initialize channel, queue 
     @channel = channel 
     @queue = queue 

     @backlog = [] 
     @channel.subscribe method(:dispatch_with_locking) 

     @locked = false 
     end 

     def dispatch_with_locking item 
     if locked? 
      @backlog << item 
     else 
      # You probably want to move the specialization here out into a method or 
      # block that's passed into the constructor, to make the lockingdispatcher 
      # more of a generic processor 
      case item.type 
      when :reload_credentials 
      lock 
      deferrable = CredentialReloader.new(item).start 
      deferrable.callback { unlock } 
      deferrable.errback { unlock } 
      else 
      dispatch_without_locking item 
      end 
     end 
     end 

     def dispatch_without_locking item 
     @queue << item 
     end 

     def locked? 
     @locked 
     end 

     def lock 
     @locked = true 
     end 

     def unlock 
     @locked = false 
     bl = @backlog.dup 
     @backlog.clear 
     bl.each { |item| dispatch_with_locking item } 
     end 

    end 

    channel = EM::Channel.new 
    queue = EM::Queue.new 

    dispatcher = LockingDispatcher.new channel, queue 

    workers = Array.new(10) { Worker.new queue } 

Więc wejście do pierwszego systemu przychodzi na q, ale w tym nowym systemie chodzi w sprawie channel. Model queue jest nadal używany do dystrybucji pracy wśród pracowników, ale queue nie jest wypełniany podczas operacji odświeżania poświadczeń. Niestety, ponieważ nie zajęło mi to więcej czasu, nie uogólniałem tak, aby nie było sprzężone z typem przedmiotu i kodem do wysyłania CredentialsReloader. Zostawię to tobie.

Należy zauważyć, że chociaż usługi te są zgodne z moją pierwotną prośbą, to ogólnie rzecz biorąc lepiej jest odpocząć tego rodzaju wymogu. Istnieje kilka zaległych problemów, które zasadniczo nie można wyeliminować bez zmian w tym wymogu:

  • System nie czeka na wykonanie zadania do wykonania przed rozpoczęciem poświadczeń pracy
  • System będzie obsługiwać wybuchy mandatów pracy bardzo źle - inne elementy, które mogą być przetwarzalne, nie będą.
  • W przypadku błędu w kodzie danych, zaległości mogą wypełnić pamięć RAM i spowodować awarię. Prosty limit czasu może wystarczyć, aby uniknąć katastrofalnych skutków, i kod jest przerywany, a kolejne komunikaty są wystarczająco przetwarzalne, aby uniknąć dalszych zakleszczeń.

Wygląda na to, że w systemie istnieje pojęcie użytkownika. Jeśli myślisz o swoich wymaganiach, najprawdopodobniej będziesz musiał tylko zalegać pozycje odnoszące się do identyfikatora użytkownika, którego dane uwierzytelniające znajdują się w stanie odświeżenia. Jest to inny problem, który wymaga innego rodzaju wysyłki. Wypróbuj mieszankę zablokowanych zaległości dla tych użytkowników, z wezwaniem na zakończenie poświadczenia, aby odprowadzić zaległości do pracowników lub podobne rozwiązania.

Powodzenia!

Powiązane problemy