2014-12-29 17 views
5

Próbuję odczytać grupę do 50 elementów z potoku i przetworzyć je w akcji we/wy jednocześnie. (W tym przypadku próbuję wstawić dane do bazy danych i chcę wykonać całą partię w ramach jednej transakcji, ponieważ jest ona znacznie wydajniejsza). Tutaj jest uproszczoną wersją tego, co mam do tej pory:Jak wykrywać koniec wejścia rurami

type ExampleType = Int 

doSomething :: [ExampleType] -> IO() 
doSomething = undefined 

inGroupsOf50 :: Monad m => Producer ExampleType m() -> m() 
inGroupsOf50 input = 
    runEffect $ input >-> loop 
     where loop = do entries <- replicateM 50 await 
        lift $ doSomething entries --Insert a bunch all in one transaction 
        loop 

Problem jest tak daleko, jak mogę powiedzieć, chyba że liczba elementów do wstawienia dzieje się podzielić przez 50, zamierzam przegapić trochę. To, czego naprawdę chcę zamiast replicateM 50 await, to coś, co daje mi do 50 elementów lub mniej, jeśli dane wejściowe się kończą, ale nie jestem w stanie wymyślić, jak to napisać.

Myślałem, że pipes-parse może być właściwą biblioteką do obejrzenia. draw wygląda na obiecującą sygnaturę ... ale jak dotąd wszystkie kawałki nie pasują do siebie w mojej głowie. Mam producer, piszę consumer i naprawdę nie rozumiem, jak to odnosi się do koncepcji parser.

Odpowiedz

11

Jeszcze więcej niż pipes-parse prawdopodobnie chcesz spojrzeć na pipes-group. W szczególności zbadajmy funkcję

-- this type is slightly specialized 
chunksOf 
    :: Monad m => 
    Int -> 
    Lens' (Producer a m x) (FreeT (Producer a m) m x) 

Lens' bit jest chyba straszny ale szybko można wyeliminować: stwierdza, że ​​możemy przekształcić Producer a m x do FreeT (Producer a m) m x [0]

import Control.Lens (view) 

chunkIt :: Monad m => Int -> Producer a m x -> FreeT (Producer a m) m x 
chunkIt n = view (chunksOf n) 

Więc teraz musimy dowiedzieć się, co zrobić z tym bitem FreeT. W szczególności, będziemy chcieli kopać w pakiecie free i wyciągnąć funkcję iterT

iterT 
    :: (Functor f, Monad m) => 
    (f (m a) -> m a) -> 
    (FreeT f m a -> m a) 

Funkcja ta, iterT, niech nas „zużywają” się FreeT jeden „krok” na raz. Aby to zrozumieć, będziemy najpierw specjalizują typ iterT zastąpić f z Producer a m

runChunk :: Monad m => 
      (Producer a m (m x)  -> m x) -> 
      (FreeT (Producer a m) m x -> m x) 
runChunk = iterT 

W szczególności runChunk może „run” a FreeT pełne Producer jest tak długo, jak to powiedzieć, jak przekonwertować Producer a m (m x) w akcję-m. Ten może zacząć wyglądać bardziej znajomo. Kiedy definiujemy pierwszy argument z runChunk, musimy po prostu wykonać Producer, który w tym przypadku nie będzie miał więcej niż wybraną liczbę elementów.

Ale co się dzieje z efektywną wartością zwrotu m x? Jest to "kontynuacja", np. wszystkie fragmenty, które przychodzą po aktualnym, który piszemy. Tak więc, na przykład, załóżmy, że mamy Producer z Char S i chcielibyśmy wydrukować i LINEBREAK po 3 znaki

main :: IO() 
main = flip runChunk (chunkIt 3 input) $ \p -> _ 

_ dziura w tym momencie ma typ IO() z p w kontekście w typ p :: Producer Char IO (IO()). Możemy spożywać tę rurę z for, zebraliśmy jej typ zwrotu (który jest kontynuacją, znowu), emitujemy znak nowej linii, a następnie uruchamiamy kontynuację.

input :: Monad m => Producer Char m() 
input = each "abcdefghijklmnopqrstuvwxyz" 

main :: IO() 
main = flip runChunk (chunkIt 3 input) $ \p -> do 
    cont <- runEffect $ for p (lift . putChar) 
    putChar '\n' 
    cont 

I to zachowuje się dokładnie tak, jak to pożądane

λ> main 
abc 
def 
ghi 
jkl 
mno 
pqr 
stu 
vwx 
yz 

Aby być jasne, a ja zrobiłem trochę ekspozycji, to jest dość prosty kod raz zobaczyć, jak wszystkie elementy pasują do siebie. Oto cała lista:

input :: Monad m => Producer Char m() 
input = each "abcdefghijklmnopqrstuvwxyz" 

main :: IO() 
main = flip iterT (input ^. chunksOf 3) $ \p -> do 
    cont <- runEffect $ for p $ \c -> do 
    lift (putChar c) 
    putChar '\n' 
    cont 

[0] Również trochę więcej, ale na razie wystarczy.