2013-06-13 17 views
7

Robię rzeczy ze STM i korzystałem między innymi ze struktury danych TBQueue z wielkim sukcesem. Użytecznym elementem Używam go do czytania wymaga od niego na podstawie warunku w TVar, w zasadzie tak:STM z częściową atomowością dla niektórych telewizorów

shouldRead <- readTVar shouldReadVar 
if shouldRead 
    then do 
    a <- readTBQueue queue 
    doSomethingWith a 
    else doSomethingElse 

Jeśli założymy, że queue jest pusty i shouldReadVar zawiera True przed wykonaniem tego bloku, to spowoduje readTBQueue wywołanie retry, a blok zostanie ponownie wykonany, gdy shouldReadVar zawiera False lub queue zawiera element, cokolwiek dzieje się jako pierwsze.


teraz jestem w potrzebie synchronicznej struktury danych kanał, podobny do struktury opisanej w this article (proszę przeczytać, jeśli chcesz zrozumieć to pytanie), oprócz tego, że musi być czytelny z warunkiem wstępnym jak w poprzednim przykładzie i ewentualnie komponuj z innymi rzeczami.

Nazwijmy tę strukturę danych SyncChan z określonymi na niej operacjami writeSyncChan i readSyncChan.

I tu jest możliwy przypadek użycia: Ten (pseudo) kod (który nie będzie działać, bo mieszać pojęć STM/IO):

shouldRead <- readTVar shouldReadVar 
if shouldRead 
    then do 
    a <- readSyncChan syncChan 
    doSomethingWith a 
    else doSomethingElse 

Zakładając, że żaden inny wątek jest aktualnie blokuje na writeSyncChan rozmowy i shouldReadChan zawiera True, chcę bloku do "retry", aż shouldReadChan zawiera False, lub różnych bloków wątku na writeSyncChan. Innymi słowy: kiedy jeden wątek retry s na writeSyncChan i inne bloki wątków osiągnie readSyncChan lub odwrotnie, chcę, aby wartość została przesłana wzdłuż kanału. We wszystkich innych przypadkach obie strony powinny znajdować się w stanie retry, a tym samym reagować na zmianę w shouldReadVar, aby odczyt lub zapis mógł zostać anulowany.

Metoda naiwna opisana w artykule połączonym powyżej przy użyciu dwóch (T) MVar s jest oczywiście niemożliwa. Ponieważ struktura danych jest synchroniczna, niemożliwe jest jej użycie w ramach dwóch bloków atomically, ponieważ nie można zmienić jednego TMVar i czekać na kolejną zmianę TMVar w kontekście atomowym.

Zamiast tego szukam pewnego rodzaju częściowej atomowości, w której mogę "popełnić" pewną część transakcji i wycofać ją tylko wtedy, gdy pewne zmienne ulegną zmianie, ale inne nie. Jeśli mam zmienne "msg" i "ack", takie jak pierwszy przykład w powyższym artykule, chcę mieć możliwość zapisu do zmiennej "msg", a następnie oczekiwania na podanie wartości "ack" lub inne zmienne transakcyjne do zmiany. Jeśli inne zmienne transakcyjne ulegną zmianie, cały blok atomowy powinien zostać ponownie sprawdzony, a jeśli pojawi się wartość "ack", transakcja powinna być kontynuowana, tak jak poprzednio. Po stronie czytelniczej coś podobnego powinno się wydarzyć, z wyjątkiem tego, że oczywiście czytałem z "msg" i pisząc do "ack".

Czy można to zrobić przy użyciu GHC STM, czy też trzeba wykonać ręczną obsługę MVar/rollback?

Odpowiedz

3

To jest to, co chcesz:

import Control.Concurrent 
import Control.Concurrent.STM 
import Control.Monad 

data SyncChan a = SyncChan (TMVar a) (TMVar()) 

newSyncChan :: IO (SyncChan a) 
newSyncChan = do 
    msg <- newEmptyTMVarIO 
    ack <- newEmptyTMVarIO 
    return (SyncChan msg ack) 

readIf :: SyncChan a -> TVar Bool -> STM (Maybe a) 
readIf (SyncChan msg ack) shouldReadVar = do 
    b <- readTVar shouldReadVar 
    if b 
     then do 
      a <- takeTMVar msg 
      putTMVar ack() 
      return (Just a) 
     else return Nothing 

write :: SyncChan a -> a -> IO() 
write (SyncChan msg ack) a = do 
    atomically $ putTMVar msg a 
    atomically $ takeTMVar ack 

main = do 
    sc <- newSyncChan 
    tv <- newTVarIO True 
    forkIO $ forever $ forM_ [False, True] $ \b -> do 
     threadDelay 2000000 
     atomically $ writeTVar tv b 
    forkIO $ forM_ [0..] $ \i -> do 
     putStrLn "Writing..." 
     write sc i 
     putStrLn "Write Complete" 
     threadDelay 300000 
    forever $ do 
     putStrLn "Reading..." 
     a <- atomically $ readIf sc tv 
     print a 
     putStrLn "Read Complete" 

Daje to zachowanie miał na myśli. Podczas gdy TVar jest True, końce wejściowe i wyjściowe będą zsynchronizowane ze sobą. Kiedy TVar przełącza się na False, koniec odczytu swobodnie przerywa i zwraca Nothing.

+0

Zakładasz, że istnieje struktura danych o nazwie 'SyncChan' z pewną semantyką. Jednak nie ma takiej struktury danych; problem pojawia się podczas próby jego wdrożenia. Zasadniczo wziąłeś kod z mojego drugiego bloku kodu w pytaniu i wyodrębniłeś gałąź do wartości 'Maybe'. Rzeczywisty problem polega na implementacji 'readSyncChan' i' writeSyncChan'! – dflemstr

+0

@dflemstr Naprawiłem to i napisałem całą implementację, w tym przykładowy kod użycia. –

+0

dzięki za poświęcenie czasu na napisanie całego tego kodu. Jednak tutaj nie można pisać warunkowo (z "shouldWriteVar", że tak powiem). To nie działa po prostu dodanie do pierwszego 'atomowego' bloku w funkcji' write', ponieważ jeśli wartość została zapisana ahd wątek czeka na "ack", nie ma sposobu, aby odpowiedzieć na zmianę w 'shouldWriteVar'! Czy jest to najczystszy sposób, aby ponownie sprawdzić 'shouldWriteVar' tutaj, czy jest jakaś inna opcja, która pozwala uniknąć dziwnej sytuacji impasu, której nie rozważałem? – dflemstr

Powiązane problemy