Getting Started


Please note that this tutorial is a work in progress. We highly recommend reading the haddock documentation and reading the Well-Typed blog, which are offer the best quality sources of information at this time.

In order to go through this tutorial you will need a Haskell development environment and we recommend installing the latest version of the Haskell Platform if you’ve not done so already.

Once you’re up and running, you’ll want to get hold of the distributed-process library and a choice of network transport backend. This guide will use the network-transport-tcp backend, but the simplelocalnet or inmemory backends are also available on github, along with some other experimental options.

Create a node

Cloud Haskell’s lightweight processes reside on a ‘node’, which must be initialised with a network transport implementation and a remote table. The latter is required so that physically separate nodes can identify known objects in the system (such as types and functions) when receiving messages from other nodes. We’ll look at inter-node communication later, so for now it will suffice to pass the default remote table, which defines the built-in stuff Cloud Haskell needs at a minimum.

Let’s start with imports first:

import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process
import Control.Distributed.Process.Node

Our TCP network transport backend needs an IP address and port to get started with, and we’re good to go…

main :: IO ()
main = do
  Right t <- createTransport "127.0.0.1" "10501" defaultTCPParameters
  node <- newLocalNode t initRemoteTable
  ....

And now we have a running node.

Send messages

We can start a new lightweight process with forkProcess, which takes a node, a Process action - because our concurrent code will run in the Process monad - and returns an address for the process in the form of a ProcessId. The process id can be used to send messages to the running process - here we will send one to ourselves!

-- in main
  _ <- forkProcess node $ do
    -- get our own process id
    self <- getSelfPid
    send self "hello"
    hello <- expect :: Process String
    liftIO $ putStrLn hello
  return ()

Lightweight processes are implemented as forkIO threads. In general we will try to forget about this implementation detail, but for now just note that we haven’t deadlocked ourself by sending to and receiving from our own mailbox in this fashion. Sending a message is a completely asynchronous operation - even if the recipient doesn’t exist, no error will be raised and evaluating send will not block the caller.

Receiving messages works the other way around, blocking the caller until a message matching the expected type arrives in the process (conceptual) mailbox. If multiple messages of that type are in the queue, they will be returned in FIFO order, otherwise the caller will be blocked until a message arrives that can be decoded to the correct type.

Let’s spawn another process on the same node and make the two talk to each other.

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Control.Distributed.Process
import Control.Distributed.Process.Node
import Network.Transport.TCP (createTransport, defaultTCPParameters)

replyBack :: (ProcessId, String) -> Process ()
replyBack (sender, msg) = send sender msg

logMessage :: String -> Process ()
logMessage msg = say $ "handling " ++ msg

main :: IO ()
main = do
  Right t <- createTransport "127.0.0.1" "10501" defaultTCPParameters
  node <- newLocalNode t initRemoteTable
  -- Spawn a new process on a local node 
  forkProcess node $ do
    -- Spawn worker inside one more process on the local node 
    echoPid <- spawnLocal $ forever $ do
      -- Test the matches in order against each message in the queue
      receiveWait [match logMessage, match replyBack]

    -- `say` sends a message to the process registered as logger.
    -- By default, this process simply sends the string to stderr.
    say "send some messages!"
    send echoPid "hello"
    self <- getSelfPid
    send echoPid (self, "hello")
    -- like `expect` (waits for a message), but with timeout
    m <- expectTimeout 1000000
    case m of
      -- Die immediately - throws a ProcessExitException with the given reason.
      Nothing  -> die "nothing came back!"
      (Just s) -> say $ "got back " ++ s
    return ()

  -- A 1 second wait. Otherwise the main thread can terminate before
  -- our messages reach the logging process or get flushed to stdio
  liftIO $ threadDelay (1*1000000)
  return ()

Note that we’ve used a receive class of function this time around. These functions work with the Match data type, and provide a range of advanced dispatching options. The match construct allows you to construct a list of potential message handlers and have them evaluated against incoming messages. Our first match indicates that, given a tuple t :: (ProcessId, String) we will send the String component back to the sender’s ProcessId. Our second match prints out whatever string it receives.

Also note the use of a ‘timeout’ (given in microseconds), which is available for both the expect and receive variants. This returns Nothing unless a message can be dequeued from the mailbox within the specified time interval.

Serializable

Processes can send data if the type implements the Serializable typeclass, which is done indirectly by implementing Binary and deriving Typeable. Implementations are already provided for primitives and some commonly used data structures.

Spawning Remote Processes

In order to spawn a process on a node we need something of type Closure (Process ()). In distributed-process if f : T1 -> T2 then

  $(mkClosure 'f) :: T1 -> Closure T2

That is, the first argument the function we pass to mkClosure will act as the closure environment for that process; if you want multiple values in the closure environment, you must tuple them up.

In order to spawn a process remotely we will need to configure the remote table (see the documentation for more details) and the easiest way to do this, is to let the library generate the relevant code for us. For example (taken from the distributed-process-platform test suites):

sampleTask :: (TimeInterval, String) -> Process String
sampleTask (t, s) = sleep t >> return s

$(remotable ['sampleTask])

We can now create a closure environment for sampleTask like so:

($(mkClosure 'sampleTask) (seconds 2, "foobar"))

The call to remotable generates a remote table and generates a definition __remoteTable :: RemoteTable -> RemoteTable in our module for us. We can compose this with other remote tables in order to come up with a final, merged remote table for use in our program:

myRemoteTable :: RemoteTable
myRemoteTable = Main.__remoteTable initRemoteTable

main :: IO ()
main = do
 localNode <- newLocalNode transport myRemoteTable
 -- etc