2009-05-12 19 views
8

Mam problem ze sprzężonymi monitorami.Wait i Monitor.Pulse w wielowątkowym serwerze TCP. Aby zademonstrować moje problemy, tutaj jest mój kod serwera:Stan wyścigu wyścigu/tętna na serwerze wielowątkowym

public class Server 
{ 
    TcpListener listener; 
    Object sync; 
    IHandler handler; 
    bool running; 

    public Server(IHandler handler, int port) 
    { 
     this.handler = handler; 
     IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; 
     listener = new TcpListener(address, port); 
     sync = new Object(); 
     running = false; 
    } 

    public void Start() 
    { 
     Thread thread = new Thread(ThreadStart); 
     thread.Start(); 
    } 

    public void Stop() 
    { 
     lock (sync) 
     { 
      listener.Stop(); 
      running = false; 
      Monitor.Pulse(sync); 
     } 
    } 

    void ThreadStart() 
    { 
     if (!running) 
     { 
      listener.Start(); 
      running = true; 
      lock (sync) 
      { 
       while (running) 
       { 
        try 
        { 
         listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); 
         Monitor.Wait(sync); // Release lock and wait for a pulse 
        } 
        catch (Exception e) 
        { 
         Console.WriteLine(e.Message); 
        } 
       } 
      } 
     } 
    } 

    void Accept(IAsyncResult result) 
    { 
     // Let the server continue listening 
     lock (sync) 
     { 
      Monitor.Pulse(sync); 
     } 

     if (running) 
     { 
      TcpListener listener = (TcpListener)result.AsyncState; 
      using (TcpClient client = listener.EndAcceptTcpClient(result)) 
      { 
       handler.Handle(client.GetStream()); 
      } 
     } 
    } 
} 

I tu jest mój kod klienta:

class Client 
{ 
    class EchoHandler : IHandler 
    { 
     public void Handle(Stream stream) 
     { 
      System.Console.Out.Write("Echo Handler: "); 
      StringBuilder sb = new StringBuilder(); 
      byte[] buffer = new byte[1024]; 
      int count = 0; 
      while ((count = stream.Read(buffer, 0, 1024)) > 0) 
      { 
       sb.Append(Encoding.ASCII.GetString(buffer, 0, count)); 
      } 
      System.Console.Out.WriteLine(sb.ToString()); 
      System.Console.Out.Flush(); 
     } 
    } 

    static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; 

    public static int Main() 
    { 
     Server server1 = new Server(new EchoHandler(), 1000); 
     Server server2 = new Server(new EchoHandler(), 1001); 

     server1.Start(); 
     server2.Start(); 

     Console.WriteLine("Press return to test..."); 
     Console.ReadLine(); 

     // Note interleaved ports 
     SendMsg("Test1", 1000); 
     SendMsg("Test2", 1001); 
     SendMsg("Test3", 1000); 
     SendMsg("Test4", 1001); 
     SendMsg("Test5", 1000); 
     SendMsg("Test6", 1001); 
     SendMsg("Test7", 1000); 

     Console.WriteLine("Press return to terminate..."); 
     Console.ReadLine(); 

     server1.Stop(); 
     server2.Stop(); 

     return 0; 
    } 

    public static void SendMsg(String msg, int port) 
    { 
     IPEndPoint endPoint = new IPEndPoint(localhost, port); 

     byte[] buffer = Encoding.ASCII.GetBytes(msg); 
     using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) 
     { 
      s.Connect(endPoint); 
      s.Send(buffer); 
     } 
    } 
} 

Klient wysyła siedem wiadomości, ale serwer drukuje tylko cztery:

 
Press return to test... 

Press return to terminate... 
Echo Handler: Test1 
Echo Handler: Test3 
Echo Handler: Test2 
Echo Handler: Test4 

Podejrzewam, że monitor jest zdezorientowany, umożliwiając wystąpienie Pulse (w metodzie serwera Accept) przed wystąpieniem Wait (i w metodzie ThreadStart), mimo że ThreadStart powinien nadal mieć blokadę obiektu sync, dopóki nie wywoła Monitor.Wait(), a następnie metoda Accept może uzyskać blokadę i wysłać jej Pulse. Jeśli skomentować te dwie linie w Stop() metody serwera:

//listener.Stop(); 
//running = false; 

Pozostałe komunikaty pojawiają się, gdy Stop() metoda serwera nazywa (tj wstawania sync obiekt serwera powoduje wysyłką pozostałe wiadomości przychodzących). Wydaje mi się, że może się to zdarzyć tylko w przypadku wyścigu między metodami ThreadStart i Accept, ale blokada wokół obiektu sync powinna temu zapobiec.

Wszelkie pomysły?

Wielkie dzięki, Simon.

ps. Zauważ, że zdaję sobie sprawę, że dane wyjściowe pojawiają się poza kolejnością itp., Szczególnie pytam o stan wyścigu między blokadami a monitorem. Pozdrawiam, SH.

Odpowiedz

5

Problem polega na tym, że używasz sygnału Puls/Wait jako sygnału. Odpowiedni sygnał, taki jak AutoResetEvent, ma stan taki, że pozostaje zasygnalizowany, dopóki wątek nie wywoła funkcji WaitOne(). Wezwanie Pulse bez oczekujących na nie wątków stanie się odgłosami.

Jest to połączone z faktem, że zamek może zostać zajęty wiele razy przez ten sam wątek. Ponieważ używasz programowania Async, oddzwanianie Accept może być wywołane przez ten sam wątek, który zrobił BeginAcceptTcpClient.

Pozwolę sobie zilustrować. Skomentowałem drugi serwer i zmieniłem kod na twoim serwerze.

void ThreadStart() 
{ 
    if (!running) 
    { 
     listener.Start(); 
     running = true; 
     lock (sync) 
     { 
      while (running) 
      { 
       try 
       { 
        Console.WriteLine("BeginAccept [{0}]", 
         Thread.CurrentThread.ManagedThreadId); 
        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); 
        Console.WriteLine("Wait [{0}]", 
         Thread.CurrentThread.ManagedThreadId); 
        Monitor.Wait(sync); // Release lock and wait for a pulse 
       } 
       catch (Exception e) 
       { 
        Console.WriteLine(e.Message); 
       } 
      } 
     } 
    } 
} 

void Accept(IAsyncResult result) 
{ 
    // Let the server continue listening 
    lock (sync) 
    { 
     Console.WriteLine("Pulse [{0}]", 
      Thread.CurrentThread.ManagedThreadId); 
     Monitor.Pulse(sync); 
    } 
    if (running) 
    { 
     TcpListener localListener = (TcpListener)result.AsyncState; 
     using (TcpClient client = localListener.EndAcceptTcpClient(result)) 
     { 
      handler.Handle(client.GetStream()); 
     } 
    } 
} 

Dane wyjściowe z mojego uruchomienia pokazane poniżej. Jeśli sam uruchomisz ten kod, wartości będą się różnić, ale ogólnie będą takie same.

Press return to test... 
BeginAccept [3] 
Wait [3] 

Press return to terminate... 
Pulse [5] 
BeginAccept [3] 
Pulse [3] 
Echo Handler: Test1 
Echo Handler: Test3 
Wait [3] 

Jak widać istnieją dwa Pulse nazywa, jeden z osobnym wątku (Pulse [5]), która budzi się pierwszy Czekaj. Wątek 3 wykonuje inny BeginAccept, ale mając oczekujące połączenia przychodzące, wątek decyduje się natychmiast wywołać oddzwanianie Accept. Ponieważ Accept jest wywoływany przez ten sam wątek, Lock (sync) nie blokuje natychmiast tylko Pulse [3] w pustej kolejce wątków.

Dwie procedury obsługi są wywoływane i obsługują dwie wiadomości.

Wszystko jest w porządku, a ThreadStart uruchamia się ponownie i przechodzi do oczekiwania na czas nieokreślony.

Teraz podstawową kwestią jest to, że próbujesz użyć monitora jako sygnału. Ponieważ nie pamięta stanu, w którym traci drugi puls.

Ale jest na to łatwe rozwiązanie. Użyj AutoResetEvents, który jest właściwym sygnałem i zapamięta jego stan.

public Server(IHandler handler, int port) 
{ 
    this.handler = handler; 
    IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; 
    listener = new TcpListener(address, port); 
    running = false; 
    _event = new AutoResetEvent(false); 
} 

public void Start() 
{ 
    Thread thread = new Thread(ThreadStart); 
    thread.Start(); 
} 

public void Stop() 
{ 
    listener.Stop(); 
    running = false; 
    _event.Set(); 
} 

void ThreadStart() 
{ 
    if (!running) 
    { 
     listener.Start(); 
     running = true; 
     while (running) 
     { 
      try 
      { 
       listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); 
       _event.WaitOne(); 
      } 
      catch (Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 
    } 
} 

void Accept(IAsyncResult result) 
{ 
    // Let the server continue listening 
    _event.Set(); 
    if (running) 
    { 
     TcpListener localListener = (TcpListener) result.AsyncState; 
     using (TcpClient client = localListener.EndAcceptTcpClient(result)) 
     { 
      handler.Handle(client.GetStream()); 
     } 
    } 
} 
+0

Dzięki Mats. Zakładam, że BeginAcceptTcpClient zawsze uruchamiał osobny wątek i dlatego mogłem użyć obiektu synchronizacji jako sekcji krytycznej. Byłeś na miejscu i sygnały są drogą do zrobienia. Dzięki jeszcze raz. SH –