2013-05-26 19 views
64

W mojej aplikacji GHC Haskell wykorzystującej stm, przewód sieciowy i kanał, mam pasmo dla każdego gniazda, które jest rozwidlone automatycznie przy użyciu runTCPServer. Pasemka mogą komunikować się z innymi komponentami za pomocą transmitującego TChan.Jeden przewód przetwarzający, 2 źródła IO tego samego typu

ten prezentuje jak chciałbym skonfigurować przekazu „łańcuch”:

enter image description here

Więc, co mamy tu dwa źródła (każdy związany z przewodów pomocniczych, jakie), które wytwarzają Packet obiektu który encoder zaakceptuje i zmieni się w ByteString, a następnie wyśle ​​gniazdo. Miałem duże trudności z wydajnością (wydajność jest problemem) fuzją dwóch wejść.

Byłbym wdzięczny, gdyby ktoś wskazał mi właściwy kierunek.


Ponieważ byłoby to niegrzeczne, aby opublikować to pytanie bez próby, umieści to, co wcześniej próbowałem tutaj;

Napisałem/ukradkiem funkcję, która (blokowanie) generuje Źródło z TMChan (zamykany kanał);

-- | Takes a generic type of STM chan and, given read and close functionality, 
-- returns a conduit 'Source' which consumes the elements of the channel. 
chanSource 
    :: (MonadIO m, MonadSTM m) 
    => a     --^The channel 
    -> (a -> STM (Maybe b)) --^The read function 
    -> (a -> STM())  --^The close/finalizer function 
    -> Source m b 
chanSource ch readCh closeCh = ConduitM pull 
    where close  = liftSTM $ closeCh ch 
      pull  = PipeM $ liftSTM $ readCh ch >>= translate 
      translate = return . maybe (Done()) (HaveOutput pull close) 

Podobnie funkcja przekształcania Chana w zlew;

-- | Takes a stream and, given write and close functionality, returns a sink 
-- which wil consume elements and broadcast them into the channel 
chanSink 
    :: (MonadIO m, MonadSTM m) 
    => a     --^The channel 
    -> (a -> b -> STM()) --^The write function 
    -> (a -> STM())  --^The close/finalizer function 
    -> Sink b m() 
chanSink ch writeCh closeCh = ConduitM sink 
    where close = const . liftSTM $ closeCh ch 
      sink = NeedInput push close 
      write = liftSTM . writeCh ch 
      push x = PipeM $ write x >> return sink 

Następnie mergeSources jest prosty; rozwidlaj 2 wątki (które tak naprawdę nie chcę robić, ale co do cholery), które mogą umieścić ich nowe pozycje na liście, z której następnie tworzę źródło;

Podczas gdy udało mi się wprowadzić te funkcje, nie udało mi się uzyskać żadnego wykorzystania tych funkcji do kontroli typu;

-- | Helper which represents a conduit chain for each client connection 
serverApp :: Application SessionIO 
serverApp appdata = do 
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast 
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata 
    mergsrc $$ protocol $= encoder =$ appSink appdata 
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan 
      mergsrc = mergeSources [appSource appdata $= decoder, chansrc] 

-- | Structure which holds mutable information for clients 
data SessionState = SessionState 
    { _ssBroadcast  :: TMChan Packet --^Outbound packet broadcast channel 
    } 

makeLenses ''SessionState 

-- | A transformer encompassing both SessionReader and SessionState 
type Session m = ReaderT SessionReader (StateT SessionState m) 

-- | Macro providing Session applied to an IO monad 
type SessionIO = Session IO 

widzę tę metodę jako wadliwy jest tak czy owak - istnieje wiele pośrednich listy i konwersje. To nie może być dobre dla wydajności. Szukam wskazówek.


PS. Z tego, co rozumiem, nie jest to duplikat; Fusing conduits with multiple inputs, ponieważ w mojej sytuacji oba źródła wytwarzają ten sam typ i nie obchodzi mnie, z którego źródła powstaje obiekt Packet, o ile nie czekam na jedno, podczas gdy inny ma przedmioty gotowe do spożycia.

PPS. Przepraszam za użycie (i dlatego wymóg wiedzy) obiektywu w przykładowym kodzie.

+2

Czy istnieje powód, dla którego nie używasz 'Data.Conduit.TMChan' z pakietu' stm-conduit'? Ma wszystkie funkcje, które definiujesz, w tym 'mergeSources'. –

+0

W rzeczywistości istnieje - chciałbym, aby źródło, które łączy się z obu, zamknie, gdy tylko którekolwiek z tych źródeł się zamknie. Pakiet stm-conduit używa refcounts (i czeka, aż ostatnie źródło się zamknie, aby zamknąć powstałe źródło), co nie jest pożądanym zachowaniem. Zamykając się natychmiast po unieważnieniu któregoś z nich, daje mi to możliwość, że po zamknięciu globalnego TMChan, zamykam również każde gniazdo. – kvanberendonck

+3

Bezczynna myśl: co się stanie, jeśli pobierzesz zasoby mergeSource z TMChan, wyrzucisz liczenie błędów i zastąpisz bit 'decRefCount refcount' kodem, aby zamknąć wszystkie źródła? – Iain

Odpowiedz

1

ja nie wiem, czy to jest jakaś pomoc, ale starałem się wdrożyć sugestię Iain i wykonane wariant mergeSources' że przestanie tak szybko, jak każdy z kanałów ma:

mergeSources' :: (MonadIO m, MonadBaseControl IO m) 
       => [Source (ResourceT m) a] --^The sources to merge. 
       -> Int --^The bound of the intermediate channel. 
       -> ResourceT m (Source (ResourceT m) a) 
mergeSources' sx bound = do 
    c <- liftSTM $ newTBMChan bound 
    mapM_ (\s -> resourceForkIO $ 
        s $$ chanSink c writeTBMChan closeTBMChan) sx 
    return $ sourceTBMChan c 

(To proste dodawanie dostępny here).

Niektóre komentarze do swojej wersji mergeSources (wziąć je z przymrużeniem oka, to może być nie rozumiałem coś dobrze):

  • Korzystanie ...TMChan zamiast ...TBMChan wydaje się niebezpieczne. Jeśli scenarzyści są szybsi niż czytelnik, twoja sterta wybuchnie. Patrząc na twój diagram wydaje się, że może się to łatwo zdarzyć, jeśli twój partner TCP nie odczytuje danych wystarczająco szybko. Więc zdecydowanie użyłbym ...TBMChan z być może dużym lecz ograniczonym ograniczeniem.
  • Nie potrzebujesz ograniczenia MonadSTM m. Wszystko STM rzeczy jest owinięty w IO z

    liftSTM = liftIO . atomically 
    

    Może to pomóc nieznacznie podczas korzystania mergeSources' w serverApp.

  • tylko problem kosmetyczny, znalazłem

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn 
    

    bardzo trudne do odczytania ze względu na jego stosowania liftA2 na (->) r monady. Powiedziałbym, że będzie dłuższy, ale znacznie łatwiejszy do odczytania.

Czy mógłbyś stworzyć samowystarczalny projekt, w którym będzie można grać z serverApp?

+0

Dzięki za poradę. Będę o tym pamiętać (wkrótce będę musiał ponownie przyjrzeć się problemowi). – kvanberendonck

Powiązane problemy