2011-07-19 16 views
8

Próbuję zbudować prostą aplikację do czatu z wykorzystaniem AMQP, Websockets i Ruby. Rozumiem, że może to nie być najlepszy przypadek użycia, aby zrozumieć AMQP, ale chciałbym zrozumieć, gdzie idę źle.AMQP dynamicznie subskrybując kolejki

Oto mój kod AMQP-server

require 'rubygems' 
require 'amqp' 
require 'mongo' 
require 'em-websocket' 
require 'json' 

class MessageParser 
    # message format => "room:harry_potter, nickname:siddharth, room:members" 
    def self.parse(message) 
    parsed_message = JSON.parse(message) 

    response = {} 
    if parsed_message['status'] == 'status' 
     response[:status] = 'STATUS' 
     response[:username] = parsed_message['username'] 
     response[:roomname] = parsed_message['roomname'] 
    elsif parsed_message['status'] == 'message' 
     response[:status] = 'MESSAGE' 
     response[:message] = parsed_message['message'] 
     response[:roomname] = parsed_message['roomname'].split().join('_') 
    end 

    response 
    end 
end 

class MongoManager 
    def self.establish_connection(database) 
    @db ||= Mongo::Connection.new('localhost', 27017).db(database) 
    @db.collection('rooms') 

    @db 
    end 
end 


@sockets = [] 
EventMachine.run do 
    connection = AMQP.connect(:host => '127.0.0.1') 
    channel = AMQP::Channel.new(connection) 

    puts "Connected to AMQP broker. #{AMQP::VERSION} " 

    mongo = MongoManager.establish_connection("trackertalk_development") 

    EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| 
    socket_detail = {:socket => ws} 
    ws.onopen do 
     @sockets << socket_detail 

    end 

    ws.onmessage do |message| 

     status = MessageParser.parse(message)   
     exchange = channel.fanout(status[:roomname].split().join('_')) 

     if status[:status] == 'STATUS'    
     queue = channel.queue(status[:username], :durable => true) 

     unless queue.subscribed? 
     puts "--------- SUBSCRIBED --------------" 
     queue.bind(exchange).subscribe do |payload| 
      puts "PAYLOAD : #{payload}" 
      ws.send(payload) 
      end 
     else 
      puts "----ALREADY SUBSCRIBED" 
     end     

     # only after 0.8.0rc14 
     #queue = channel.queue(status[:username], :durable => true)  
     #AMQP::Consumer.new(channel, queue)   

     elsif status[:status] == 'MESSAGE' 
     puts "********************* Message- published ******************************" 
     exchange.publish(status[:message) 
     end     
    end 

    ws.onclose do 
     @sockets.delete ws 
    end 
    end  
end 

używam statusu, aby wskazać, czy przychodząca wiadomość jest wiadomością o trwającej rozmowy lub wiadomości stanu wymagającego mnie obsłużyć prace jak zapisanie się na kolejki .

Problem twarz jest, że kiedy wysłać wiadomość jak socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

The exchange.publish' is called but it still doesn't get pushed via the ws.send` do przeglądarki.

Czy jest coś fundamentalnie złego w moim rozumieniu EventMachine i AMQP?

Oto pastie dla tego samego kodu http://pastie.org/private/xosgb8tw1w5vuroa4w7a

Mój kod wydaje się działać zgodnie z oczekiwaniami, kiedy usunąć durable => true z queue = channel.queue(status[:username], :durable => true)

Poniżej znajduje się fragment moich Rails zobaczyć, który identyfikuje użytkownika użytkownika i nazwa pokoju i wysyła go jako część wiadomości za pośrednictwem Websockets.

Chociaż kod wydaje się działać po usunięciu durable => true, nie rozumiem, dlaczego ma to wpływ na dostarczaną wiadomość. Zignoruj ​​część mongo, ponieważ nie gra ona jeszcze żadnej części.

Chciałbym również wiedzieć, czy moje podejście do AMQP i jego wykorzystanie jest poprawna

<script> 
    $(document).ready(function(){ 
     var username = '<%= @user.email %>'; 
     var roomname = 'Bazingaa'; 

     socket = new WebSocket('ws://127.0.0.1:8080/'); 

     socket.onopen = function(msg){ 
      console.log('connected'); 
      socket.send(JSON.stringify({status:'status', username:username, roomname:roomname})); 
     } 

     socket.onmessage = function(msg){ 
      $('#chat-log').append(msg.data); 

     } 

    }); 

</script> 
<div class='block'> 
    <div class='content'> 
    <h2 class='title'><%= @room.name %></h2> 
    <div class='inner'> 
     <div id="chat-log"> 
     </div> 

     <div id="chat-console"> 
     <textarea rows="5" cols="40"></textarea> 
     </div> 
    </div> 
    </div> 
</div> 

<style> 
    #chat-log{ 
     color:#000; 
     font-weight:bold; 
     margin-top:1em; 
     width:900px; 
     overflow:auto; 
     height:300px; 
    } 
    #chat-console{ 
     bottom:10px; 
    } 

    textarea{ 
     width:100%; 
     height:60px; 
    } 
</style> 
+0

Ktoś może mi powiedzieć, jak zorganizować mój kod w środowisku produkcyjnym, jeśli mam uruchomić mój kod amqp jako demon. Każdy przykładowy kod, który pomoże mi zorganizować mój kod, będzie bardzo pomocny. – Sid

Odpowiedz

1

Myślę, że Twój problem może polegać na zawieszaniu się kolejki w brokerze między wywołaniami ws.onmessage. Gdy klient ponownie łączy się z kolejką, a powiązanie już istnieje, funkcja ws.send() nie jest wywoływana.

Domyślnie podczas tworzenia kolejki, jej i wszelkich powiązań, które ma, zawiesza się do czasu ponownego uruchomienia brokera lub wyraźnie informuje brokera, aby go usunąć.

Istnieją dwa sposoby, aby to zmienić:

  • Dodanie trwałą flagę podczas tworzenia kolejki, która spowoduje, że kolejka trzymać się nawet jeśli pośrednik restartuje
  • Dodanie auto_delete Flaga, która spowoduje, że pośrednik automatycznie usunie jednostkę po krótkim czasie braku kontaktu z nią.

Jeśli masz kontrolę nad brokerem, używasz brokera rabbitmq, prostym sposobem na introspekcję tego, co się dzieje w brokerze, jest zainstalowanie management plugin, który zapewnia interfejs sieciowy do wymiany, powiązań i kolejek w brokerze.

0

Na pierwszy rzut oka te bity AMQP wydaje się być OK, ale nie chcę, aby skonfigurować wszystkie zależności. Jeśli podasz minimalny przykład tylko częścią AMQP, sprawdzę to.

+0

W moim kodzie Railsowym identyfikuję identyfikator e-mail użytkownika i nazwę pokoju i przekazuję go do kodu serwera AMQP za pośrednictwem Websockets. Chociaż kod wydaje się działać, gdy usuwam durable => true, nie rozumiem, dlaczego ma to wpływ na dostarczaną wiadomość. Zignoruj ​​część mongo, ponieważ nie gra ona jeszcze żadnej części. Chciałbym również wiedzieć, czy moje podejście do AMQP i jego użycie jest poprawne – Sid