2010-12-03 13 views
10

Eksperymentuję z ZeroMQ i próbuję uzyskać coś działającego. Moją pierwszą myślą było skonfigurowanie REP/REQ za pomocą transportu inproc, aby sprawdzić, czy mogę wysyłać wiadomości między dwoma wątkami. Większość poniższego kodu pochodzi z przykładów clzmq, ale nie wydaje się działać.Używanie ZeroMQ z C# z inproc transportem

Zarówno serwer, jak i klient są związani z transportem, ale gdy klient próbuje wykonać numer Send, blokuje go i po prostu tam siedzi. Nie mam doświadczenia z ZeroMQ, więc nie jestem pewien, gdzie szukać najpierw, każda pomoc byłaby bardzo doceniana. Oto przestępstwa (ofensywa) Kod:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     [Test] 
     public void Should() 
     { 
      var clientThread = new Thread(StartClient); 
      clientThread.Start(); 

      var serverThread = new Thread(StartServer); 
      serverThread.Start(); 

      clientThread.Join(); 
      serverThread.Join(); 

      Console.WriteLine("Done with life"); 
     } 

     private void StartServer() 
     { 


      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 
      } 

      Console.WriteLine("Done with server"); 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
         1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
         latency.ToString("f2")); 
       } 
      } 

      Console.WriteLine("Done with client"); 
     } 

    } 
} 

Edit:

mam ten działa za pomocą poniższej odpowiedzi, ale także wymagane mnie do zmiany Bind do Connect, co ma sens, gdy myślisz o tym, ponieważ mamy serwer wiążący się z lokalnym transportem i klientem łączącym się ze zdalnym transportem. Oto zaktualizowany kod:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     private static Context ctx; 

     [Test] 
     public void Should() 
     { 
      using (ctx = new Context(1)) 
      { 
       var clientThread = new Thread(StartClient); 
       clientThread.Start(); 

       var serverThread = new Thread(StartServer); 
       serverThread.Start(); 

       clientThread.Join(); 
       serverThread.Join(); 

       Console.WriteLine("Done with life"); 
      } 
     } 

     private void StartServer() 
     { 
      try 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 

       Console.WriteLine("Done with server"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      try 
      { 
       // Initialise 0MQ infrastructure 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Connect(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
            1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
             latency.ToString("f2")); 
       } 

       Console.WriteLine("Done with client"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

    } 
} 

Odpowiedz

14

Wierzę, że oba wątki muszą używać tego samego kontekstu. Przewodnik Zeromq nie zaleca używania więcej niż jednego kontekstu w procesie. Utwórz kontekst, udostępnij ten kontekst między oboma wątkami. To powinno działać.

Od http://zguide.zeromq.org/chapter:all

należy utworzyć obiekt „kontekstu” dla procesu, a które przechodzą do wszystkich wątków. Kontekst zbiera stan ØMQ. Aby utworzyć połączenie w całym inproc: transport, zarówno wątek serwera, jak i klienta musi współdzielić ten sam obiekt kontekstu.

+0

To było bardzo przydatne, dzięki! – jonnii

2

Tylko jeden koniec może przywiązać drugi musi połączyć, możesz mieć wiele połączeń.