2015-02-06 14 views
11

problemChmura Haskell zawsze wisi przy wysyłaniu wiadomości do ManagedProcess

Hello! Piszę w Cloud Haskell prosty program Server - Worker. Problem polega na tym, że kiedy próbuję utworzyć ManagedProcess, po etapie wyszukiwania serwera, mój przykład zawiesza się na zawsze nawet podczas korzystania z callTimeout (który powinien zostać przerwany po 100 ms). Kod jest bardzo prosty, ale nie mogę znaleźć w nim nic złego.

Postawiłem też pytanie na liście mailingowej, ale o ile znam społeczność SO, mogę tu szybciej znaleźć odpowiedź. Jeśli dostanę odpowiedź z listy mailingowej, również opublikuję tutaj.

kod źródłowy

Worker.hs:

{-# LANGUAGE DeriveDataTypeable  #-} 
{-# LANGUAGE ExistentialQuantification #-} 
{-# LANGUAGE DeriveGeneric    #-} 
{-# LANGUAGE TemplateHaskell   #-} 

module Main where 

import Network.Transport  (EndPointAddress(EndPointAddress)) 
import Control.Distributed.Process hiding (call) 
import Control.Distributed.Process.Platform hiding (__remoteTable) 
import Control.Distributed.Process.Platform.Async 
import Control.Distributed.Process.Platform.ManagedProcess 
import Control.Distributed.Process.Platform.Time 
import Control.Distributed.Process.Platform.Timer (sleep) 
import Control.Distributed.Process.Closure (mkClosure, remotable) 
import Network.Transport.TCP (createTransport, defaultTCPParameters) 
import Control.Distributed.Process.Node hiding (call) 
import Control.Concurrent (threadDelay) 
import GHC.Generics (Generic) 
import Data.Binary (Binary) 
import Data.Typeable (Typeable) 
import Data.ByteString.Char8 (pack) 
import System.Environment (getArgs) 

import qualified Server as Server 

main = do 
    [host, port, serverAddr] <- getArgs 

    Right transport <- createTransport host port defaultTCPParameters 
    node <- newLocalNode transport initRemoteTable 

    let addr = EndPointAddress (pack serverAddr) 
     srvID = NodeId addr 

    _ <- forkProcess node $ do 
    sid <- discoverServer srvID 
    liftIO $ putStrLn "x" 
    liftIO $ print sid 
    r <- callTimeout sid (Server.Add 5 6) 100 :: Process (Maybe Double) 
    liftIO $ putStrLn "x" 
    liftIO $ threadDelay (10 * 1000 * 1000) 


    threadDelay (10 * 1000 * 1000) 
    return() 


discoverServer srvID = do 
    whereisRemoteAsync srvID "serverPID" 
    reply <- expectTimeout 100 :: Process (Maybe WhereIsReply) 
    case reply of 
    Just (WhereIsReply _ msid) -> case msid of 
     Just sid -> return sid 
     Nothing -> discoverServer srvID 
    Nothing     -> discoverServer srvID 

Server.hs:

{-# LANGUAGE DeriveDataTypeable  #-} 
{-# LANGUAGE ExistentialQuantification #-} 
{-# LANGUAGE DeriveGeneric    #-} 
{-# LANGUAGE TemplateHaskell   #-} 

module Server where 

import Control.Distributed.Process hiding (call) 
import Control.Distributed.Process.Platform hiding (__remoteTable) 
import Control.Distributed.Process.Platform.Async 
import Control.Distributed.Process.Platform.ManagedProcess 
import Control.Distributed.Process.Platform.Time 
import Control.Distributed.Process.Platform.Timer (sleep) 
import Control.Distributed.Process.Closure (mkClosure, remotable) 
import Network.Transport.TCP (createTransport, defaultTCPParameters) 
import Control.Distributed.Process.Node hiding (call) 
import Control.Concurrent (threadDelay) 
import GHC.Generics (Generic) 
import Data.Binary (Binary) 
import Data.Typeable (Typeable) 


data Add = Add Double Double 
    deriving (Typeable, Generic) 
instance Binary Add 

launchServer :: Process ProcessId 
launchServer = spawnLocal $ serve() (statelessInit Infinity) server >> return() where 
    server = statelessProcess { apiHandlers   = [ handleCall_ (\(Add x y) -> liftIO (putStrLn "!") >> return (x + y)) ] 
          , unhandledMessagePolicy = Drop 
          } 


main = do 
    Right transport <- createTransport "127.0.0.1" "8080" defaultTCPParameters 
    node <- newLocalNode transport initRemoteTable 
    _ <- forkProcess node $ do 
    self <- getSelfPid 
    register "serverPID" self 

    liftIO $ putStrLn "x" 
    mid <- launchServer 
    liftIO $ putStrLn "y" 
    r <- call mid (Add 5 6) :: Process Double 
    liftIO $ print r 
    liftIO $ putStrLn "z" 
    liftIO $ threadDelay (10 * 1000 * 1000) 
    liftIO $ putStrLn "z2" 

    threadDelay (10 * 1000 * 1000) 
    return() 

Możemy uruchomić je następująco:

runhaskell Server.hs 
runhaskell Worker.hs 127.0.0.2 8080 127.0.0.1:8080:0 

Efekty

Kiedy uruchomić programy, mamy następujące wyniki:

z serwera:

x 
y 
! 
11.0 -- this one shows that inside the same process we were able to use the "call" function 
z 
-- waiting - all the output above were tests from inside the server now it waits for external messages 

od Pracownika:

x 
pid://127.0.0.1:8080:0:10 -- this is the process id of the server optained with whereisRemoteAsync 
-- waiting forever on the "callTimeout sid (Server.Add 5 6) 100" code! 

Jak sidenote - I "dowiedziałem się, że podczas wysyłania wiadomości z send (od Control.Distributed.Process) Uzyskanie ich dzięki pracom expect. Ale wysyłanie ich z call (od Control.Distributed.Process.Platform) i próbuje recive im ManagedProcess api teleskopowe - wisi call zawsze (nawet przy użyciu callTimeout!)

+0

Czy możesz pisać kod dla pracujących 'expect' wersji' Server.hs'? – Cirdec

Odpowiedz

2

Twój klient robi wyjątek, którego nie są w stanie obserwować łatwo bo ciebie uruchamiają klienta w forkProcess. Jeśli chcesz to zrobić, to dobrze, ale musisz monitorować lub połączyć się z tym procesem. W takim przypadku proste użycie programu runProcess byłoby znacznie prostsze. Jeśli to zrobisz, pojawi się ten wyjątek:

Worker.hs: trying to call fromInteger for a TimeInterval. Cannot guess units 

callTimeout nie bierze liczbą całkowitą, to zajmuje TimeInterval, które zbudowane są z funkcjami w module czasu. To jest pseudo-Num - tak naprawdę nie obsługuje onInteger. uważam, że to błąd lub przynajmniej zła forma (w Haskell), ale w każdym razie sposób, aby naprawić swój kod jest po prostu

r <- callTimeout sid (Server.Add 5 6) (milliSeconds 100) :: Process (Maybe Double) 

Aby rozwiązać ten problem, ze klient dzwoni do serwera, musisz się zarejestrować pid procesu serwera, który spawnowałeś, a nie główny proces, z którego go odradzasz - npzmieniają

self <- getSelfPid 
register "serverPID" self 

liftIO $ putStrLn "x" 
mid <- launchServer 
liftIO $ putStrLn "y" 

do

mid <- launchServer 
register "serverPID" mid 
liftIO $ putStrLn "y" 
+0

Masz rację! Działa - nie całkowicie, ale działa limit czasu. Dziękuję za podpowiedź z 'runProcess'! Jedyne, czego nie dostaję, to dlaczego nie oblicza niczego (zawsze otrzymuję 'Nothing' z wywołania timeout i kiedy próbujemy wykonać' r <- call sid (Server.Add 5 6) :: Proces Double "proces trwa wiecznie, nie otrzymując żadnej odpowiedzi. Czy byłbyś tak miły i powiedz mi, dlaczego i jak mogę sprawdzić takie zachowania w przyszłości? –

+0

Niestety, nie zauważyłem, że nie mieliśmy nawet wydruków w Problem polega na tym, że rejestrowałeś pid głównego wątku tak, jakby był serwerem, robisz funkcję spawnLocal serwera i zwracasz ją jako "środkową" - więc zarejestrujesz "serverPID" na środku to naprawisz, a potem zrobię zobacz poprawny wynik w kliencie – Jeremy

+0

Jeremy o rację! hej, taki głupi błąd, dziękuję bardzo! Jedno ostatnie pytanie do Ciebie - czy wiesz, dlaczego to nie działa w systemie Windows? Mój przyjaciel próbował wykonać to, ale został odrzucony z 'getAddrInfo: nie istnieje (błąd 10093)', który według Win Dokumentacja Dows oznacza: '10093 - Pomyślne wykonanie WSASartup jeszcze nie wykonane." podczas wykonywania 'createTransport" 127.0.0.1 "'? Próbował kilku adresów z tym samym błędem. –

Powiązane problemy