Please note that this tutorial is a work in progress. We highly recommend reading the haddock documentation and reading the Well-Typed blog, which are offer the best quality sources of information at this time.
In order to go through this tutorial you will need a Haskell development environment and we recommend installing the latest version of the Haskell Platform if you’ve not done so already.
Once you’re up and running, you’ll want to get hold of the distributed-process library and a choice of network transport backend. This guide will use the network-transport-tcp backend, but the simplelocalnet or inmemory backends are also available on github, along with some other experimental options.
Cloud Haskell’s lightweight processes reside on a ‘node’, which must be initialised with a network transport implementation and a remote table. The latter is required so that physically separate nodes can identify known objects in the system (such as types and functions) when receiving messages from other nodes. We’ll look at inter-node communication later, so for now it will suffice to pass the default remote table, which defines the built-in stuff Cloud Haskell needs at a minimum.
Let’s start with imports first:
import Network.Transport.TCP (createTransport, defaultTCPParameters) import Control.Distributed.Process import Control.Distributed.Process.Node
Our TCP network transport backend needs an IP address and port to get started with, and we’re good to go…
main :: IO () main = do Right t <- createTransport "127.0.0.1" "10501" defaultTCPParameters node <- newLocalNode t initRemoteTable ....
And now we have a running node.
We can start a new lightweight process with
forkProcess, which takes a node, a
Process action - because our concurrent code will run in the
Process monad - and returns an address for the process in the form of a
ProcessId. The process id can be used to send messages to the running process - here we will send one to ourselves!
-- in main _ <- forkProcess node $ do -- get our own process id self <- getSelfPid send self "hello" hello <- expect :: Process String liftIO $ putStrLn hello return ()
Lightweight processes are implemented as
forkIO threads. In general we will try to forget about this implementation detail, but for now just note that we haven’t deadlocked ourself by sending to and receiving from our own mailbox in this fashion. Sending a message is a completely asynchronous operation - even if the recipient doesn’t exist, no error will be raised and evaluating
send will not block the caller.
Receiving messages works the other way around, blocking the caller until a message matching the expected type arrives in the process (conceptual) mailbox. If multiple messages of that type are in the queue, they will be returned in FIFO order, otherwise the caller will be blocked until a message arrives that can be decoded to the correct type.
Let’s spawn another process on the same node and make the two talk to each other.
import Control.Concurrent (threadDelay) import Control.Monad (forever) import Control.Distributed.Process import Control.Distributed.Process.Node import Network.Transport.TCP (createTransport, defaultTCPParameters) replyBack :: (ProcessId, String) -> Process () replyBack (sender, msg) = send sender msg logMessage :: String -> Process () logMessage msg = say $ "handling " ++ msg main :: IO () main = do Right t <- createTransport "127.0.0.1" "10501" defaultTCPParameters node <- newLocalNode t initRemoteTable -- Spawn a new process on a local node forkProcess node $ do -- Spawn worker inside one more process on the local node echoPid <- spawnLocal $ forever $ do -- Test the matches in order against each message in the queue receiveWait [match logMessage, match replyBack] -- `say` sends a message to the process registered as logger. -- By default, this process simply sends the string to stderr. say "send some messages!" send echoPid "hello" self <- getSelfPid send echoPid (self, "hello") -- like `expect` (waits for a message), but with timeout m <- expectTimeout 1000000 case m of -- Die immediately - throws a ProcessExitException with the given reason. Nothing -> die "nothing came back!" (Just s) -> say $ "got back " ++ s return () -- A 1 second wait. Otherwise the main thread can terminate before -- our messages reach the logging process or get flushed to stdio liftIO $ threadDelay (1*1000000) return ()
Note that we’ve used a
receive class of function this time around. These functions work with the
Match data type, and provide a range of advanced dispatching options. The
match construct allows you to construct a list of potential message handlers and have them evaluated against incoming messages. Our first match indicates that, given a tuple
t :: (ProcessId,
String) we will send the
String component back to the sender’s
ProcessId. Our second match prints out whatever string it receives.
Also note the use of a ‘timeout’ (given in microseconds), which is available for both the
receive variants. This returns
Nothing unless a message can be dequeued from the mailbox within the specified time interval.
Processes can send data if the type implements the
Serializable typeclass, which is done indirectly by implementing
Binary and deriving
Typeable. Implementations are already provided for primitives and some commonly used data structures.
In order to spawn a process on a node we need something of type
Closure (Process ()). In distributed-process if
f : T1 -> T2 then
$(mkClosure 'f) :: T1 -> Closure T2
That is, the first argument the function we pass to mkClosure will act as the closure environment for that process; if you want multiple values in the closure environment, you must tuple them up.
In order to spawn a process remotely we will need to configure the remote table (see the documentation for more details) and the easiest way to do this, is to let the library generate the relevant code for us. For example (taken from the distributed-process-platform test suites):
sampleTask :: (TimeInterval, String) -> Process String sampleTask (t, s) = sleep t >> return s $(remotable ['sampleTask])
We can now create a closure environment for
sampleTask like so:
($(mkClosure 'sampleTask) (seconds 2, "foobar"))
The call to
remotable generates a remote table and generates a definition
__remoteTable :: RemoteTable -> RemoteTable in our module for us. We can compose this with other remote tables in order to come up with a final, merged remote table for use in our program:
myRemoteTable :: RemoteTable myRemoteTable = Main.__remoteTable initRemoteTable main :: IO () main = do localNode <- newLocalNode transport myRemoteTable -- etc