2013-05-01 5 views
5

Mam problem z serwerem wielowątkowym, który buduję jako ćwiczenie akademickie, a konkretnie z nawiązaniem połączenia, aby zamknąć z wdziękiem.Z wdzięcznością kończąc wątek oczekujący na kolejkę blokowania

Każde połączenie jest zarządzane przez klasę sesji. Ta klasa utrzymuje 2 wątki dla połączenia, DownstreamThread i UpstreamThread.

Funkcja UpstreamThread blokuje w gnieździe klienta i koduje wszystkie przychodzące ciągi znaków w wiadomościach, które należy przekazać do innej warstwy, z którymi należy się uporać. DownstreamThread blokuje BlockingQueue, do którego wstawiane są wiadomości dla klienta. Gdy w kolejce znajduje się komunikat, wątek w dół odpowiada na wiadomość z kolejki, przekształca ją w ciąg i wysyła do klienta. W ostatecznym systemie warstwa aplikacji będzie działać na wiadomości przychodzące i wypycha wychodzące wiadomości na serwer, aby wysłać je do odpowiedniego klienta, ale na razie mam tylko prostą aplikację, która śpi na sekundę w przychodzącej wiadomości, zanim ją odtworzy. jako wiadomość wychodzącą z dołączonym znacznikiem czasu.

Problem, który mam, polega na tym, aby zamknąć wszystko z gracją, gdy klient się rozłączy. Pierwszy problem, z którym się kłócę, to normalne rozłączenie, w którym klient informuje serwer, że kończy połączenie z poleceniem QUIT. Podstawowy pseudokod jest następujący:

while (!quitting) { 
    inputString = socket.readLine() // blocks 
    if (inputString != "QUIT") { 
     // forward the message upstream 
     server.acceptMessage (inputString); 
    } else { 
     // Do cleanup 
     quitting = true; 
     socket.close(); 
    } 
} 

Główna pętla wątku wyższego rzędu analizuje ciąg wejściowy. Jeśli jest to QUIT, wątek ustawia flagę, która mówi, że klient zakończył komunikację i opuszcza pętlę. Prowadzi to do dobrego zamknięcia wątku wyjściowego.

Pętla główna wątku następnego strumienia czeka na komunikaty w polu BlockingQueue tak długo, jak długo nie jest ustawiona flaga zamknięcia połączenia. Kiedy jest, wątek w dół powinien również zakończyć się. Jednak nie, po prostu czeka tam. Jego psuedocode wygląda następująco:

while (!quitting) { 
    outputMessage = messageQueue.take(); // blocks 
    sendMessageToClient (outputMessage); 
} 

Kiedy testowałem to zauważyłem, że gdy klient rzucić, górna nitka zamknięty, ale wątek za nie.

Po odrobinie drapania głowy, zdałem sobie sprawę, że wątek w dalszym ciągu blokuje się na BlockingQueue czekając na wiadomość przychodzącą, która nigdy nie nadejdzie. Wątek poprzedzający nie przesyła dalej komunikatu QUIT dalej w górę łańcucha.

Jak można z wdziękiem zamknąć wątek wyjściowy? Pierwszą ideą, która przychodziła na myśl było ustawienie limitu czasu na wywołanie funkcji take(). Nie przepadam za tym pomysłem, ponieważ niezależnie od wybranej przez ciebie wartości, nie będzie to w pełni satysfakcjonujące. Albo jest zbyt długi i wątek zombie siedzi tam przez długi czas przed zamknięciem, albo jest zbyt krótki i połączenia, które pozostały bezczynne przez kilka minut, ale nadal są ważne, zostaną zabite. Myślałem o wysłaniu komunikatu QUIT do łańcucha, ale to wymaga pełnej rundy podróży do serwera, następnie aplikacji, a następnie z powrotem do serwera i wreszcie do sesji. To też nie wygląda na eleganckie rozwiązanie.

Spojrzałem na dokumentację dla Thread.stop(), ale to było najwyraźniej przestarzałe, ponieważ i tak nigdy nie działało poprawnie, więc wygląda na to, że nie jest to również opcja. Innym pomysłem, jaki miałem, było zmuszenie wyjątku do uruchomienia w wątku w dół i pozwolić mu posprzątać w jego ostatecznym bloku, ale to uderza mnie jako okropny i kludgey pomysł.

Uważam, że oba wątki powinny być w stanie sprawnie zamknąć się na własną rękę, ale podejrzewam też, że jeśli jeden wątek się skończy, musi również sygnalizować drugi wątek, aby zakończył się bardziej proaktywnie niż ustawianie flagi dla innego wątku wątek do sprawdzenia.Ponieważ nadal nie mam dużego doświadczenia z Javą, w tej chwili nie mam pomysłów. Jeśli ktokolwiek ma jakąkolwiek radę, byłby bardzo doceniony.

Dla uzupełnienia, podałem prawdziwy kod dla klasy Session poniżej, chociaż uważam, że fragmenty kodu pseudokodowego powyżej obejmują odpowiednie części problemu. Pełna klasa to około 250 linii.

import java.io.*; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.logging.*; 

/** 
* Session class 
* 
* A session manages the individual connection between a client and the server. 
* It accepts input from the client and sends output to the client over the 
* provided socket. 
* 
*/ 
public class Session { 
    private Socket    clientSocket = null; 
    private Server    server   = null; 
    private Integer    sessionId  = 0; 
    private DownstreamThread downstream  = null; 
    private UpstreamThread  upstream  = null; 
    private boolean    sessionEnding = false; 

    /** 
    * This thread handles waiting for messages from the server and sending 
    * them to the client 
    */ 
    private class DownstreamThread implements Runnable { 
     private BlockingQueue<DownstreamMessage> incomingMessages = null; 
     private OutputStreamWriter     streamWriter  = null; 
     private Session        outer    = null; 

     @Override 
     public void run() { 
      DownstreamMessage message; 
      Thread.currentThread().setName ("DownstreamThread_" + outer.getId()); 

      try { 
       // Send connect message 
       this.sendMessageToClient ("Hello, you are client " + outer.getId()); 

       while (!outer.sessionEnding) { 
        message = this.incomingMessages.take(); 
        this.sendMessageToClient (message.getPayload()); 
       } 

       // Send disconnect message 
       this.sendMessageToClient ("Goodbye, client " + getId()); 

      } catch (InterruptedException | IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } finally { 
       this.terminate(); 
      } 
     } 

     /** 
     * Add a message to the downstream queue 
     * 
     * @param message 
     * @return 
     * @throws InterruptedException 
     */ 
     public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException { 
      if (!outer.sessionEnding) { 
       this.incomingMessages.put (message); 
      } 

      return this; 
     } 

     /** 
     * Send the given message to the client 
     * 
     * @param message 
     * @throws IOException 
     */ 
     private DownstreamThread sendMessageToClient (CharSequence message) throws IOException { 
      OutputStreamWriter osw; 
      // Output to client 
      if (null != (osw = this.getStreamWriter())) { 
       osw.write ((String) message); 
       osw.write ("\r\n"); 
       osw.flush(); 
      } 

      return this; 
     } 

     /** 
     * Perform session cleanup 
     * 
     * @return 
     */ 
     private DownstreamThread terminate() { 
      try { 
       this.streamWriter.close(); 
      } catch (IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } 
      this.streamWriter = null; 

      return this; 
     } 

     /** 
     * Get an output stream writer, initialize it if it's not active 
     * 
     * @return A configured OutputStreamWriter object 
     * @throws IOException 
     */ 
     private OutputStreamWriter getStreamWriter() throws IOException { 
      if ((null == this.streamWriter) 
      && (!outer.sessionEnding)) { 
       BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream()); 
       this.streamWriter  = new OutputStreamWriter (os, "UTF8"); 
      } 

      return this.streamWriter; 
     } 

     /** 
     * 
     * @param outer 
     */ 
     public DownstreamThread (Session outer) { 
      this.outer    = outer; 
      this.incomingMessages = new LinkedBlockingQueue(); 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * This thread handles waiting for client input and sending it upstream 
    */ 
    private class UpstreamThread implements Runnable { 
     private Session outer = null; 

     @Override 
     public void run() { 
      StringBuffer inputBuffer = new StringBuffer(); 
      BufferedReader inReader; 

      Thread.currentThread().setName ("UpstreamThread_" + outer.getId()); 

      try { 
       inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8")); 

       while (!outer.sessionEnding) { 
        // Read whatever was in the input buffer 
        inputBuffer.delete (0, inputBuffer.length()); 
        inputBuffer.append (inReader.readLine()); 
        System.out.println ("Input message was: " + inputBuffer); 

        if (!inputBuffer.toString().equals ("QUIT")) { 
         // Forward the message up the chain to the Server 
         outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString())); 
        } else { 
         // End the session 
         outer.sessionEnding = true; 
        } 
       } 

      } catch (IOException | InterruptedException e) { 
       Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
      } finally { 
       outer.terminate(); 
       outer.server.deleteSession (outer.getId()); 
      } 
     } 

     /** 
     * Class constructor 
     * 
     * The Core Java volume 1 book said that a constructor such as this 
     * should be implicitly created, but that doesn't seem to be the case! 
     * 
     * @param outer 
     */ 
     public UpstreamThread (Session outer) { 
      this.outer = outer; 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * Start the session threads 
    */ 
    public void run() //throws InterruptedException 
    { 
     Thread upThread  = new Thread (this.upstream); 
     Thread downThread = new Thread (this.downstream); 

     upThread.start(); 
     downThread.start(); 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException { 
     this.downstream.acceptMessage (message); 
     return this; 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (String message) throws InterruptedException { 
     return this.acceptMessage (new DownstreamMessage (this.getId(), message)); 
    } 

    /** 
    * Terminate the client connection 
    */ 
    private void terminate() { 
     try { 
      this.clientSocket.close(); 
     } catch (IOException e) { 
      Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
     } 
    } 

    /** 
    * Get this Session's ID 
    * 
    * @return The ID of this session 
    */ 
    public Integer getId() { 
     return this.sessionId; 
    } 

    /** 
    * Session constructor 
    * 
    * @param owner The Server object that owns this session 
    * @param sessionId The unique ID this session will be given 
    * @throws IOException 
    */ 
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException { 

     this.server   = owner; 
     this.clientSocket = clientSocket; 
     this.sessionId  = sessionId; 
     this.upstream  = new UpstreamThread (this); 
     this.downstream  = new DownstreamThread (this); 

     System.out.println ("Class " + this.getClass() + " created"); 
     System.out.println ("Session ID is " + this.sessionId); 
    } 
} 

Odpowiedz

3

Zamiast wywoływać Thread.stop korzystanie Thread.interrupt. To spowoduje, że metoda take wyrzuci InterruptedException, której możesz użyć, aby wiedzieć, że powinieneś się wyłączyć.

+1

I nie myśleć o tym, ale nie zmusza coś wyjątek jako część normalnego zamykania trochę kludge? – GordonM

+0

Nie, tak to zaprojektowano do działania. W przeciwnym razie każda metoda, która mogłaby zostać przerwana, musiałaby zwrócić specjalną przerwaną wartość. Więcej informacji można znaleźć na stronie http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html. – Brigham

+0

Nie wiem zbyt wiele o Javie, ale co się stanie, gdy zadzwonisz do "Thread.interrupt" i nie będziemy blokować metody "take"? Myślę, że przerwanie nie zostanie podniesione i musimy sprawdzić "Thread.isInterrupted", jak sądzę. – Zuljin

0

Czy można po prostu utworzyć "fałszywy" komunikat o zakończeniu, zamiast ustawić outer.sessionEnding na true, gdy pojawi się "QUIT". Umieszczenie tego fałszywego komunikatu w kolejce obudzi DownstreamThread i możesz go zakończyć. W takim przypadku można nawet wyeliminować tę zmienną sessionEnding.

W pseudo kod ten może wyglądać następująco:

while (true) { 
    outputMessage = messageQueue.take(); // blocks 
    if (QUIT == outputMessage) 
     break 
    sendMessageToClient (outputMessage); 
} 
Powiązane problemy