2015-09-24 10 views
5

Uważam, że to pytanie nie jest duplikatem numeru Server sent event with Jersey: EventOutput is not closed after client drops, ale prawdopodobnie jest powiązane z Jersey Server-Sent Events - write to broken connection does not throw exception.Nadawanie za pomocą usługi Jersey SSE: Wykrywanie zamkniętego połączenia

W chapter 15.4.2 dokumentacji Jersey, SseBroadcaster opisano:

Jednak SseBroadcaster wewnętrznie identyfikuje i obsługuje również rozłącza klienta. Gdy klient zamyka połączenie, nadawca wykrywa to i usuwa nieaktualne połączenie z wewnętrznego zbioru zarejestrowanych punktów EventOutput, a także uwalnia wszystkie zasoby po stronie serwera związane z nieaktualnym połączeniem.

Nie mogę tego potwierdzić. W poniższej teście widzę metodę, która nigdy nie była wywoływana: nie, gdy EventInput jest zamknięta, a nie, gdy nadawana jest inna wiadomość.

public class NotificationsResourceTest extends JerseyTest { 
    final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class); 

    final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster(); 

    public static class CountingSseBroadcaster extends SseBroadcaster { 
     final AtomicInteger connectionCounter = new AtomicInteger(0); 

     public EventOutput createAndAttachEventOutput() { 
      EventOutput output = new EventOutput(); 
      if (add(output)) { 
       int cons = connectionCounter.incrementAndGet(); 
       log.debug("Active connection count: "+ cons); 
      } 
      return output; 
     } 

     @Override 
     public void onClose(final ChunkedOutput<OutboundEvent> output) { 
      int cons = connectionCounter.decrementAndGet(); 
      log.debug("A connection has been closed. Active connection count: "+ cons); 
     } 

     @Override 
     public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) { 
      log.trace("An exception has been detected", exception); 
     } 

     public int getConnectionCount() { 
      return connectionCounter.get(); 
     } 
    } 

    @Path("notifications") 
    public static class NotificationsResource { 

     @GET 
     @Produces(SseFeature.SERVER_SENT_EVENTS) 
     public EventOutput subscribe() { 
      log.debug("New stream subscription"); 

      EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); 
      return eventOutput; 
     } 
    } 

    @Override 
    protected Application configure() { 
     ResourceConfig config = new ResourceConfig(NotificationsResource.class); 
     config.register(SseFeature.class); 

     return config; 
    } 


    @Test 
    public void test() throws Exception { 
     // check that there are no connections 
     assertEquals(0, broadcaster.getConnectionCount()); 

     // connect subscriber 
     log.info("Connecting subscriber"); 
     EventInput eventInput = target("notifications").request().get(EventInput.class); 
     assertFalse(eventInput.isClosed()); 

     // now there are connections 
     assertEquals(1, broadcaster.getConnectionCount()); 

     // push data 
     log.info("Broadcasting data"); 
     String payload = UUID.randomUUID().toString(); 
     OutboundEvent chunk = new OutboundEvent.Builder() 
       .mediaType(MediaType.TEXT_PLAIN_TYPE) 
       .name("message") 
       .data(payload) 
       .build(); 
     broadcaster.broadcast(chunk); 

     // read data 
     log.info("Reading data"); 
     InboundEvent inboundEvent = eventInput.read(); 
     assertNotNull(inboundEvent); 
     assertEquals(payload, inboundEvent.readData()); 

     // close subscription 
     log.info("Closing subscription"); 
     eventInput.close(); 
     assertTrue(eventInput.isClosed()); 

     // at this point, the subscriber has disconnected itself, 
     // but jersey doesnt realise that 
     assertEquals(1, broadcaster.getConnectionCount()); 

     // wait, give TCP a chance to close the connection 
     log.debug("Sleeping for some time"); 
     Thread.sleep(10000); 

     // push data again, this should really flush out the not-connected client 
     log.info("Broadcasting data again"); 
     broadcaster.broadcast(chunk); 
     Thread.sleep(100); 

     // there is no subscriber anymore 
     assertEquals(0, broadcaster.getConnectionCount()); // FAILS! 
    } 
} 

Być może JerseyTest nie jest dobrym sposobem na sprawdzenie tego. W mniej ... klinicznej konfiguracji, w której używany jest JavaScript EventSource, widzę, że wywoływana jest onClose(), ale dopiero po nadaniu komunikatu na uprzednio zamkniętym połączeniu.

Co robię źle?

Dlaczego SseBroadcaster nie wykryje zamknięcia połączenia przez klienta?

Follow-up

Znalazłem JERSEY-2833 który został odrzucony z Works jako przeznaczone:

Zgodnie z dokumentacją Jersey w SSE rozdziału (https://jersey.java.net/documentation/latest/sse.html) w 15.4.1 to wspomniał, że Jersey nie zamyka wyraźnie połączenia, jest to odpowiedzialność metody zasobów lub klienta.

Co to znaczy dokładnie? Czy zasób powinien wymuszać limit czasu i zabijać wszystkie połączenia aktywne i zamknięte przez klienta?

Odpowiedz

0

wierzę, może lepiej ustawić limit czasu na swoim zasobie i zabić tylko tego połączenia, na przykład:

@Path("notifications") 
public static class NotificationsResource { 

    @GET 
    @Produces(SseFeature.SERVER_SENT_EVENTS) 
    public EventOutput subscribe() { 
     log.debug("New stream subscription"); 

     EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); 
     new Timer().schedule(new TimerTask() 
     { 
      @Override public void run() 
      { 
       eventOutput.close() 
      } 
     }, 10000); // 10 second timeout 
     return eventOutput; 
    } 
} 
+0

Dzięki za wejście. Jestem pewien, że to zadziała, i ma swoje zalety, ale to nie jest to, o co prosiłem. Moje bieżące obejście polega na wysyłaniu komentarza SSE co x sekund, co w końcu spowoduje przepełnienie uszkodzonego połączenia. Pierwotne pytanie pozostaje jednak - dlaczego Jersey nie wykryje zamkniętego połączenia, a samo sprawozdanie samo w sobie? – Hank

0

Im zastanawiasz się, czy przez instacji można zmieniły zachowanie.

@Override 
    public void onClose(final ChunkedOutput<OutboundEvent> output) { 
     int cons = connectionCounter.decrementAndGet(); 
     log.debug("A connection has been closed. Active connection count: "+ cons); 
    } 

W tym przypadku nie zamykaj ChunkedOutput, aby nie zwolnić połączenia. Czy to może być problem?

+0

Nie, to nie ma z tym nic wspólnego. W rzeczywistości jest odwrotnie, wywoływane jest 'onClose()', gdy 'OutboundEvent' jest zamykany przez Nadawcę. – Hank

0

W dokumentacji konstruktora org.glassfish.jersey.media.sse.SseBroadcaster.SseBroadcaster(), to mówi:

Tworzy nową instancję. Jeśli ten konstruktor jest wywoływany przez podklasę, przyjmuje on, że powodem istnienia podklasy jest implementacja metod onClose(org.glassfish.jersey.server.ChunkedOutput) i onException(org.glassfish.jersey.server.ChunkedOutput, Exception), więc dodaje ona nowo utworzoną instancję jako detektora.Aby tego uniknąć, podklasy mogą wywoływać SseBroadcaster(Class) przekazując swoją klasę jako argument.

Więc nie powinno pozostawić domyślny konstruktor i spróbuj realizacji powołując konstruktora SUPER swojej klasie:

public CountingSseBroadcaster(){ 
    super(CountingSseBroadcaster.class); 
} 
Powiązane problemy