2015-05-03 18 views
8

Mam aplikację klienta serwera (Java EE & Android), komunikacja za pośrednictwem stron internetowych. Działa komunikacja, a także sam protokół do wysyłania obiektów jako json, który będzie poprawnie zawijany, przekształcany do postaci szeregowej, wysyłany, deserializowany, rozpakowywany i przebudowywany. Obie aplikacje używają innego projektu biblioteki ze wszystkimi możliwymi klasami żądania i odpowiedzi.synchronizacja żądanie/odpowiedź w kontekście nio

Teraz mój problem: Biblioteka powinna również wdrożyć strategię dla non-blocking komunikację, ale przejrzystą realizację żądanie-odpowiedź. Prawdopodobnie nie jestem pierwszy z tym problemem, więc myślę, że mogą tam być jakieś fajne implementacje :).

Czego chcę:

// server should sleep 5000ms and then return 3*3 
Future<Integer> f1 = server.put(
    new SleepAndReturnSquareRequest(5000, 3), 
    new FutureCallback<Integer>{ 
    public void onSuccess(Integer square) { 
     runOnUiThread(new Runnable{ 
     // Android Toast show square 
     }); 
    } 

    // impl onFailure 
    } 
); 

Future<Date> f2 = server.put(
    new TimeRequest(), 
    new FutureCallback<Date>{ 
    public void onSuccess(Date date) { 
     // called before other onSuccess 
    } 

    // impl onFailure 
    } 
); 

// e.g. when the activity in android changes I'll cancel all futures. That means no more callbacks and (later) if possible client sends cancellations to the server for long requests. 

Kod powinien wysłać SleepAndReturnRequest a następnie TimeRequest oczywiście nieblokującą. Pierwsze żądanie trwa 5 sekund, drugie żądanie prawie zero milisekund. Oczekuję, że implementacja wywoła drugie wywołanie zwrotne natychmiast po otrzymaniu odpowiedzi, podczas gdy pierwsze wywołanie zwrotne zostanie wywołane po około 5 sekundach. Implementacja jest odpowiedzialna za dopasowanie żądanie-odpowiedź po stronie żądającej.

Co próbowałem i myślałem o:

guawa Google'a listenable future byłby dobrym podejściem do „strony odpowiadającego” myślę, bo to po prostu zadanie działa na każdym wątku, wysyłanie wynik z powrotem na końcu. To powinno być łatwiejsze.

Dla "strony żądającej" potrzebuję implementacji, która doda unikalny identyfikator do wiadomości, aby móc dopasować odpowiedź. Mam nadzieję, że możesz mi powiedzieć, że niektóre paczki wykonują tę pracę.

Dziękuję za pomoc.

// edit:

Myślę, że moje pytanie zostało źle zrozumiane lub nie jest wystarczająco precyzyjna. Pomyśl o tym, jak zaimplementować GET lub POST za pośrednictwem websocket. Każde żądanie GET/POST ma jedną odpowiedź, a następnie połączenie jest zamykane. Klienci łączą się z określonym portem, serwer pobiera wątek z puli wątków, przetwarza żądanie i odpowiada. Uważam, że dopasowanie żądania do odpowiedzi odbywa się w warstwie transportowej nr 4.

Ponieważ chcę użyć WebSockets, muszę wdrożyć że dopasowanie w warstwie oprogramowania 7.

Oto kilka kroków, jestem wykonawczych. K to unikalny typ klucza, V to ogólny rodzaj treści wiadomości. Może to być ciąg, strumień bajtów, cokolwiek.

public class Synchronizer<K, V> implements UniqueMessageListener<K, V> { 

    private final ConcurrentMap<K, FutureCallback<V>> callbackMap = new ConcurrentHashMap<>(); 

    private final ListeningExecutorService executor; 

    private final UniqueMessageFactory<K, V> factory; 
    private final UniqueMessageSender<K, V> sender; 
    private UniqueMessageReceiver<K, V> receiver = null; 

    public Synchronizer(
      ListeningExecutorService executor, 
      UniqueMessageFactory<K, V> factory, 
      UniqueMessageSender<K, V> sender 
    ) { 
     this.executor = executor; 
     this.factory = factory; 
     this.sender = sender; 
    } 

    public void register(UniqueMessageReceiver<K, V> receiver) { 

     unregister(); 

     this.receiver = receiver; 
     receiver.addListener(this); 
    } 

    public void unregister() { 
     if(receiver != null) { 
      receiver.removeListener(this); 
      receiver = null; 
     } 
    } 

    public Future<V> put(Message<V> message, final FutureCallback<V> callback) { 
     final UniqueMessage<K, V> uniqueMessage = factory.create(message); 

     final Future<Boolean> sendFuture = sender.send(uniqueMessage); 

     final ListenableFuture<Boolean> listenableSendFuture = 
       JdkFutureAdapters.listenInPoolThread(sendFuture, executor); 


     listenableSendFuture.addListener(
       new Runnable() { 
        @Override 
        public void run() { 
         try { 
          if(listenableSendFuture.get() == true) { 
           callbackMap.put(
             uniqueMessage.getId(), 
             callback 
           ); 
          } else { 
           // maybe try it later again? 
          } 
         } catch(Exception e) { 
          // ... 
         } 
        } 
       }, 
       executor 
     ); 

     // implement cancel 
     return new SynchronizeFuture<>(
       listenableSendFuture, 
       callback 
     ); 
    } 

    @Override 
    public void onReceive(UniqueMessage<K, V> message) { 
     K id = message.getId(); 
     FutureCallback<V> callback; 

     callback = callbackMap.remove(id); 

     if(callback != null) { 
      callback.onSuccess(message.getContent()); 
     } 
    } 
} 

Jest wiele do przetestowania dla mnie, ale myślę, że to zadziała.

+0

Czy istnieje coś lub grizzly Netty pracować dla mnie? – Aitch

+0

Potrzebujesz czegoś do wygenerowania identyfikatora transactionId po stronie klienta, to jest twoje pytanie? –

+0

@Mr_Thorynque no.Chodzi o to, jak zaimplementować synchronizację żądań i odpowiedzi za pośrednictwem stron internetowych. Używanie unikalnego identyfikatora transakcji jest dość oczywiste. Przeczytaj mój kod powyżej. – Aitch

Odpowiedz

-1

można spróbować ProtoBuf-RPC-Pro - ale to nie jest dla WebSocket chociaż myślę, że jest to projekt na GitHub wspieranie go :)