2014-05-12 9 views
5

Mam problemy z pozornie niespójnym zachowaniem podczas anulowania różnych rodzajów Asynchronów.Niespójne zachowanie podczas anulowania różnych rodzajów Asyncs

Aby odtworzyć problem, załóżmy, że istnieje funkcja, która pobiera listę "zadań" (Async < _> lista), czeka na zakończenie i wydrukuje ich wyniki. Funkcja pobiera również token anulowania więc może być anulowane:

let processJobs jobs cancel = 
    Async.Start(async { 
    try 
     let! results = jobs |> Async.Parallel 
     printfn "%A" results 
    finally 
     printfn "stopped" 
    }, cancel) 

funkcja nazywa się tak:

let jobs = [job1(); job2(); job3(); job4(); job5()] 
use cancel = new CancellationTokenSource() 

processJobs jobs cancel.Token 

i nieco później zostanie anulowane:

Thread.Sleep(1000) 
printfn "cancelling..." 
cancel.Cancel() 

Kiedy źródło tokena anulowania jest anulowane, funkcja powinna uruchomić blok końcowy i wydrukować "zatrzymany".

Działa to dobrze dla zadań 1, 2 i 3, ale nie działa, gdy na liście znajduje się zadanie 4 lub zadanie5.

Job1 zaledwie Async.Sleeps:

let job1() = async { 
    do! Async.Sleep 1000000 
    return 10 
} 

Job2 zaczyna jakieś dzieciaki async i czeka na nich:

let job2() = async { 
    let! child1 = Async.StartChild(async { 
    do! Async.Sleep 1000000 
    return 10 
    }) 

    let! child2 = Async.StartChild(async { 
    do! Async.Sleep 1000000 
    return 10 
    }) 

    let! results = [child1; child2] |> Async.Parallel 
    return results |> Seq.sum 
} 

Zadanie3 czeka na jakiś brzydki uchwytem czekać tak ustaw przez jakiś jeszcze brzydszy wątek:

let job3() = async { 
    use doneevent = new ManualResetEvent(false) 

    let thread = Thread(fun() -> Thread.Sleep(1000000); doneevent.Set() |> ignore) 
    thread.Start() 

    do! Async.AwaitWaitHandle(doneevent :> WaitHandle) |> Async.Ignore 

    return 30 
} 

Zadanie4 postów do i czeka na odpowiedź od MailboxProcessor:

let job4() = async { 
    let worker = MailboxProcessor.Start(fun inbox -> async { 
    let! (msg:AsyncReplyChannel<int>) = inbox.Receive() 
    do! Async.Sleep 1000000 
    msg.Reply 40 
    }) 

    return! worker.PostAndAsyncReply (fun reply -> reply) // <- cannot cancel this 
} 

Job5 czeka na zadanie (lub TaskCompletionSource):

let job5() = async { 
    let tcs = TaskCompletionSource<int>() 

    Async.Start(async { 
    do! Async.Sleep 1000000 
    tcs.SetResult 50 
    }) 

    return! (Async.AwaitTask tcs.Task) // <- cannot cancel this 
} 

dlaczego Job1, 2 i 3 zostaną anulowane (druk "stop" zostanie wydrukowany), a Job4 i 5 sprawią, że funkcja zawiesi się "na zawsze"?

Do tej pory zawsze polegałem na F #, aby obsługiwać anulowanie za kulisami - o ile jestem w async-blokach i używam! S (niech !, wykonaj !, wróć!, ...) wszystko powinno być w porządku ... ale to nie wygląda tak przez cały czas.

Quote:

wf workflow # asynchronicznych, obiekt CancellationToken przepuszcza wokół automatycznie pod pokrywą. Oznacza to, że nie musimy robić nic specjalnego, aby wesprzeć anulowanie. Podczas pracy z przepływem asynchronicznym możemy nadać temu tokenowi anulowanie i wszystko będzie działać automatycznie .

Kompletny kod jest dostępny tutaj: http://codepad.org/euVO3xgP

EDIT

zauważyłem, że jest potokiem asynchronicznie przez Async.StartAsTask następnie Async.AwaitTask czyni go anulować w każdym przypadku.

tj dla Zadanie4 który oznacza zmianę linii:

return! worker.PostAndAsyncReply (fun reply -> reply) 

do:

return! cancelable <| worker.PostAndAsyncReply (fun reply -> reply) 

Z nieodwołalnego samopoczucie:

let cancelable (x:Async<_>) = async { 
    let! cancel = Async.CancellationToken 
    return! Async.StartAsTask(x, cancellationToken = cancel) |> Async.AwaitTask 
} 

Te same roboty dokonywania Job5 anulować.

Ale .. to tylko obejście problemu i nie mogę tego powiedzieć przy każdym wywołaniu nieznanego Async < _>.

Odpowiedz

1

Tylko Asynchronizacja. metody radzą sobie z użyciem domyślnego tokena anulowania.

W przykładzie MailboxProcessor Anuluj powinien udać się na metodzie start

let! ct= Async.CancellationToken 
use worker := MailboxProcessor.Start(theWork, ct) 

W przykładzie TaskCompletionSource, będziesz musiał zarejestrować wywołania zwrotnego, aby ją anulować.

let! ct = Async.CancellationToken 
use canceler = ct.Register(fun() -> tcs.TrySetCanceled() |> ignore) 
+0

Przekazanie tokena anulowania do MailboxProcessor.Start tylko anuluje "theWork". Ale worker.PostAndAsyncReply nadal blokuje i nigdy nie wraca. – stmax

Powiązane problemy