2014-04-27 11 views
10

Gram z kompletnymi kontraktami Java 8. Mam następujący kod:Jak anulować kompletną przyszłość Java 8?

CountDownLatch waitLatch = new CountDownLatch(1); 

CompletableFuture<?> future = CompletableFuture.runAsync(() -> { 
    try { 
     System.out.println("Wait"); 
     waitLatch.await(); //cancel should interrupt 
     System.out.println("Done"); 
    } catch (InterruptedException e) { 
     System.out.println("Interrupted"); 
     throw new RuntimeException(e); 
    } 
}); 

sleep(10); //give it some time to start (ugly, but works) 
future.cancel(true); 
System.out.println("Cancel called"); 

assertTrue(future.isCancelled()); 

assertTrue(future.isDone()); 
sleep(100); //give it some time to finish 

Użycie runAsync Planuję wykonanie kodu, który czeka na zatrzask. Następnie anuluję przyszłość, oczekując, że przerwany wyjątek zostanie wrzucony do środka. Wygląda jednak na to, że wątek pozostaje zablokowany w oczekiwaniu na połączenie, a wyjątek InterruptedException nigdy nie zostanie zgłoszony, nawet jeśli przyszłość zostanie anulowana (asercje mijają). Odpowiedni kod przy użyciu ExecutorService działa zgodnie z oczekiwaniami. Czy jest to błąd w programie CompletableFuture lub w moim przykładzie?

+0

Czy możesz odtworzyć problem za pomocą 'Executors.newFixedThreadPool' kontra' Executors.newWorkStealingPool'? Sprawiłoby to, że pytanie byłoby bardziej przejrzyste, gdyby porównać dwie różne implementacje wykonawców niż porównywać kontrakty terminowe z kontraktami futures. – nosid

+0

JavaDoc mówi, że anulowanie (true) jest anulowane z wyjątkiem CancellationException, ale nie łapiesz tego. – edharned

+0

@nosid Masz rację, newWorkStealingPool podobno nie obsługuje anulowania ani – Lukas

Odpowiedz

10

Podobno jest celowy. Javadoc dla metody CompletableFuture::cancel stanów:

[parametry:] mayInterruptIfRunning - wartość ta ma nie efekt w tej implementacji, ponieważ przerwania nie są używane do kontroli przetwarzania.

Co ciekawe, metoda ForkJoinTask::cancel korzysta prawie taką samą treść dla parametru mayInterruptIfRunning.

Mam przypuszczenia na ten temat:

  • przerwy jest przeznaczony do stosowania z działań blokujących, jak snu, czekać lub I/O operacji,
  • ale ani CompletableFuture ani ForkJoinTask są przeznaczone do blokowania.

Zamiast blokowania, A CompletableFuture należy utworzyć nowy CompletionStage i zadania cpu związany jest warunkiem wstępnym dla modelu widelec-join. Tak więc, używając jednej z nich, przerywając pracę, każdy z nich pokonałby ich cel. Z drugiej strony może to zwiększyć złożoność, która nie jest wymagana, jeśli jest używana zgodnie z przeznaczeniem.

+0

Dlaczego uważasz, że _CompletableFuture_ i _ForkJoinTask_ nie są przeznaczone do blokowania operacji? –

+0

Cały punkt CompletableFuture i innych elementów reaktywnych to: nie marnuj nici, zmuszając ich do oczekiwania na wynik długiej operacji. Zamiast tego należy podać wywołanie zwrotne, które będzie wywoływane, gdy wynik znajdzie się tutaj. Podejście reaktywne wymaga znacznie mniej wątków. –

0

Wyjątek CancellationException jest częścią wewnętrznej procedury anulowania ForkJoin. Wyjątek wyjdzie, gdy odzyskasz wynik w przyszłości:

try { future.get(); } 
     catch (Exception e){ 
      System.out.println(e.toString());    
     } 

Zajęło trochę czasu, aby zobaczyć to w debugerze. JavaDoc nie jest jasne, co się dzieje lub czego należy się spodziewać.

1

Po wywołaniu CompletableFuture#cancel zatrzymujesz tylko dolną część łańcucha. Część upstream, i. mi. coś, co ostatecznie wywoła complete(...) lub completeExceptionally(...), nie otrzyma żadnego sygnału, że wynik nie jest już potrzebny.

Co to są rzeczy "upstream" i "downstream"?

Rozważmy następujący kod:

CompletableFuture 
     .supplyAsync(() -> "hello")    //1 
     .thenApply(s -> s + " world!")   //2 
     .thenAccept(s -> System.out.println(s)); //3 

Tutaj dane płynie z góry do dołu - od tworzony przez dostawcę poprzez modyfikowane przez funkcję, aby konsumowane przez println. Część powyżej określonego etapu nazywa się "upstream", a część poniżej jest "downstream". E. g. kroki 1 i 2 są poprzedzone etapem 3.

Oto, co dzieje się za kulisami. To nie jest precyzyjne, a raczej wygodny model umysłu, który się dzieje.

  1. Dostawca (krok 1) jest wykonywany (wewnątrz wspólnej maszyny JVM ForkJoinPool).
  2. Wynik dostawcy jest następnie przekazywany przez complete(...) do następnego CompletableFuture w dół.
  3. Po otrzymaniu wyniku, CompletableFuture wywołuje następny krok - funkcję (krok 2), która przyjmuje poprzedni wynik kroku i zwraca coś, co zostanie przekazane dalej, do.
  4. Po otrzymaniu wyniku kroku 2, krok 3 CompletableFuture wywołuje konsumenta, System.out.println(s). Po konsument jest gotowy, gdy dalszy CompletableFuture otrzyma to wartość, (Void) null

Jak widzimy, każdy CompletableFuture w tym łańcuchu musi wiedzieć, którzy są tam za oczekiwanie na wartość mają być przekazane do ich'S complete(...) (lub completeExceptionally(...)). Ale CompletableFuture nie musi wiedzieć nic o tym, że jest w górze (lub w górę - może być ich kilka).

Zatem, nazywając cancel() po kroku 3 nie przerwać kroki 1 i 2, ponieważ nie ma żadnego związku z kroku 3 do kroku 2.

Przypuszcza się, że jeśli używasz CompletableFuture następnie swoje kroki są małe na tyle, aby nie zaszkodziło wykonanie kilku dodatkowych kroków.

Jeśli chcesz anulowanie być propagowane w górę, masz dwie opcje:

  • wdrożyć ten sam - stworzenie dedykowanego CompletableFuture (nazwij go jak cancelled), który jest sprawdzany po każdym kroku (coś jak step.applyToEither(cancelled, Function.identity()))
  • Zastosowanie reaktywnego stos jak RxJava 2 ProjectReactor/Flux lub Akka Strumienie