W mojej aplikacji GHC Haskell
wykorzystującej stm, przewód sieciowy i kanał, mam pasmo dla każdego gniazda, które jest rozwidlone automatycznie przy użyciu runTCPServer
. Pasemka mogą komunikować się z innymi komponentami za pomocą transmitującego TChan.Jeden przewód przetwarzający, 2 źródła IO tego samego typu
ten prezentuje jak chciałbym skonfigurować przekazu „łańcuch”:
Więc, co mamy tu dwa źródła (każdy związany z przewodów pomocniczych, jakie), które wytwarzają Packet
obiektu który encoder
zaakceptuje i zmieni się w ByteString
, a następnie wyśle gniazdo. Miałem duże trudności z wydajnością (wydajność jest problemem) fuzją dwóch wejść.
Byłbym wdzięczny, gdyby ktoś wskazał mi właściwy kierunek.
Ponieważ byłoby to niegrzeczne, aby opublikować to pytanie bez próby, umieści to, co wcześniej próbowałem tutaj;
Napisałem/ukradkiem funkcję, która (blokowanie) generuje Źródło z TMChan (zamykany kanał);
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a --^The channel
-> (a -> STM (Maybe b)) --^The read function
-> (a -> STM()) --^The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done()) (HaveOutput pull close)
Podobnie funkcja przekształcania Chana w zlew;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a --^The channel
-> (a -> b -> STM()) --^The write function
-> (a -> STM()) --^The close/finalizer function
-> Sink b m()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
Następnie mergeSources jest prosty; rozwidlaj 2 wątki (które tak naprawdę nie chcę robić, ale co do cholery), które mogą umieścić ich nowe pozycje na liście, z której następnie tworzę źródło;
Podczas gdy udało mi się wprowadzić te funkcje, nie udało mi się uzyskać żadnego wykorzystania tych funkcji do kontroli typu;
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet --^Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
widzę tę metodę jako wadliwy jest tak czy owak - istnieje wiele pośrednich listy i konwersje. To nie może być dobre dla wydajności. Szukam wskazówek.
PS. Z tego, co rozumiem, nie jest to duplikat; Fusing conduits with multiple inputs, ponieważ w mojej sytuacji oba źródła wytwarzają ten sam typ i nie obchodzi mnie, z którego źródła powstaje obiekt Packet
, o ile nie czekam na jedno, podczas gdy inny ma przedmioty gotowe do spożycia.
PPS. Przepraszam za użycie (i dlatego wymóg wiedzy) obiektywu w przykładowym kodzie.
Czy istnieje powód, dla którego nie używasz 'Data.Conduit.TMChan' z pakietu' stm-conduit'? Ma wszystkie funkcje, które definiujesz, w tym 'mergeSources'. –
W rzeczywistości istnieje - chciałbym, aby źródło, które łączy się z obu, zamknie, gdy tylko którekolwiek z tych źródeł się zamknie. Pakiet stm-conduit używa refcounts (i czeka, aż ostatnie źródło się zamknie, aby zamknąć powstałe źródło), co nie jest pożądanym zachowaniem. Zamykając się natychmiast po unieważnieniu któregoś z nich, daje mi to możliwość, że po zamknięciu globalnego TMChan, zamykam również każde gniazdo. – kvanberendonck
Bezczynna myśl: co się stanie, jeśli pobierzesz zasoby mergeSource z TMChan, wyrzucisz liczenie błędów i zastąpisz bit 'decRefCount refcount' kodem, aby zamknąć wszystkie źródła? – Iain