2012-12-03 15 views
6

jestem obliczania przyszłość ma limitu czasu w oczekiwaniu na wydarzenia szeregowego wydarzy:Jak zatrzymać czas oczekiwania przyszłego

Future<Response> future = executor.submit(new CommunicationTask(this, request)); 
response = new Response("timeout"); 
try { 
    response = future.get(timeoutMilliseconds, TimeUnit.MILLISECONDS); 
} catch (InterruptedException | TimeoutException e) { 
    future.cancel(true); 
    log.info("Execution time out." + e); 
} catch (ExecutionException e) { 
    future.cancel(true); 
    log.error("Encountered problem communicating with device: " + e); 
} 

Klasa CommunicationTask wdrożyła interfejs Observer wysłuchać na zmianę z port szeregowy.

Problem polega na tym, że odczyt z portu szeregowego jest stosunkowo powolny, a nawet gdy zdarza się zdarzenie szeregowe, czas się kończy i zostaje zgłoszony TimeoutException. Co mogę zrobić, aby zatrzymać zegar przekroczenia limitu czasu w mojej przyszłości, gdy wydarzy się zdarzenie seryjne?

próbowałem go z AtomicReference ale to niczego nie zmienia:

public class CommunicationTask implements Callable<Response>, Observer { 
    private AtomicReference atomicResponse = new AtomicReference(new Response("timeout")); 
    private CountDownLatch latch = new CountDownLatch(1); 
    private SerialPort port; 

    CommunicationTask(SerialCommunicator communicator, Request request) { 
    this.communicator = communicator; 
    this.message = request.serialize(); 
    this.port = communicator.getPort(); 
    } 

    @Override 
    public Response call() throws Exception { 
    return query(message); 
    } 

    public Response query(String message) { 
    communicator.getListener().addObserver(this); 
    message = message + "\r\n"; 
    try { 
     port.writeString(message); 
    } catch (Exception e) { 
     log.warn("Could not write to port: " + e); 
     communicator.disconnect(); 
    } 
    try { 
     latch.await(); 
    } catch (InterruptedException e) { 
     log.info("Execution time out."); 
    } 
    communicator.getListener().deleteObserver(this); 
    return (Response)atomicResponse.get(); 
    } 

    @Override 
    public void update(Observable o, Object arg) { 
    atomicResponse.set((Response)arg); 
    latch.countDown(); 
    } 
} 

Co mogę zrobić, aby rozwiązać ten problem?

EDYTOWANIE:

Ok Miałem jeden błąd. Odliczałem zatrzask przed ustawieniem atomicResponse w mojej funkcji update. Teraz wydaje się działać, ale wciąż pozostaje pytanie, czy to podejście jest właściwą drogą?

+0

można dodać 'metody isCommunicationStarted' do listy CommunicationTask - kiedy wychwycisz 'TimeoutException', sprawdź' isCommunicationStarted' - jeśli zwróci false => cancel, w przeciwnym razie spróbuj ponownie 'future.get()' (być może z nowym czasem oczekiwania). – assylias

+0

Czy podzielisz swoją klasę zadań na dwie: najpierw zaczekasz, aż wydarzenie rozpocznie się z przekroczeniem limitu czasu, a na początku zdarzenia zada zadanie drugiego typu. 2. zadanie odpowiednio przetworzy wydarzenie. –

+0

Nie jest całkiem jasne, co chcesz osiągnąć. Ustawienie limitu czasu oznacza, że ​​zdarzenia, które dotarły po przekroczeniu limitu czasu, mogą zostać zignorowane. Jeśli tak, to dlaczego rozpatrywany przypadek narożny ma znaczenie? – axtavt

Odpowiedz

0

Mam nadzieję, że to pomoże. Nie będę komentował tego w nadziei, że wszystko jest jasne z kodu.

class CommunicationTask implements Callable<String>, Observer { 
    volatile boolean ignoreTimeoutException; 

    public CommunicationTask(SerialCommunicator communicator, Request request) { 
    } 

    public String call() throws Exception { 
     Thread.sleep(1000); 
     return "done"; 
    } 

    public void update(Observable o, Object arg) { 
     ignoreTimeoutException = true; 
    } 
} 

class FutureCommunicationTask extends FutureTask<String> { 
    private CommunicationTask ct; 

    public FutureCommunicationTask(CommunicationTask ct) { 
     super(ct); 
     this.ct = ct; 
    } 

    public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     try { 
      return super.get(timeout, unit); 
     } catch (TimeoutException e) { 
      if (ct.ignoreTimeoutException) { 
       return get(); // no timeout wait 
      } 
      throw e; 
     } 
    } 
} 

public class Test { 

    public static void main(String[] args) throws Exception { 
     CommunicationTask ct = new CommunicationTask(null, null); 
     FutureTask<String> fct = new FutureCommunicationTask(ct); 
     ExecutorService ex = Executors.newSingleThreadExecutor(); 
     ex.execute(fct); 
//  uncomment this line and timeout will be cancelled 
     ct.update(null, null); 
     String res = fct.get(1, TimeUnit.MILLISECONDS); 
     System.out.println(res); 
    } 
} 
1

masz zbadane Google guawy przyszłego słuchacza „”, jest ona oparta na Async przyszłości nadzieję następujący fragment kodu pomaga ....

import java.util.concurrent.Callable; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

import com.google.common.util.concurrent.FutureCallback; 
import com.google.common.util.concurrent.Futures; 
import com.google.common.util.concurrent.ListenableFuture; 
import com.google.common.util.concurrent.ListeningExecutorService; 
import com.google.common.util.concurrent.MoreExecutors; 

public class SyncFutureExample { 
    public static void main(String[] args) { 
     ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); 
     ListenableFuture<String> lf = service.submit(new CommuncationTask()); 

     //no need for future.get() or future.get(10,time minutes) 


     //add callbacks(= async future listeners) .... 
     Futures.addCallback(lf, new FutureCallback<String>() { 
       public void onSuccess(String input) { 
       System.out.println(input + " >>> success");//gets a callback once task is success 
       } 
       public void onFailure(Throwable thrown) { 
        System.out.println(thrown + " >>> failure");//gets a callback if task is failed 
       } 
      }); 
     service.shutdown(); 
    } 

} 

class CommuncationTask implements Callable<String>{ 

    public String call() throws Exception { 
     TimeUnit.SECONDS.sleep(15);// some dummy serious task ............. 
     return "TaskDone"; 
    } 


} 
Powiązane problemy