In this tutorial, we will look at some advanced ways of programming Cloud Haskell using the managed process API.

Unexpected Messages

The process definition’s UnhandledMessagePolicy provides a way for processes to respond to unexpected inputs. This proves surprisingly important, since it is always possible for messages to unexpectedly arrive in a process’ mailbox, either those which do not match the server’s expected types or which fail one or more match conditions against the message body.

As we will see shortly, there are various ways to ensure that only certain messages (i.e., types) are sent to a process, but in the presense of monitoring and other system management facilities and since the node controller is responsible - both conceptually and by implementation - for dispatching messages to each process’ mailbox, it is impractical to make real guarantees about a process’ total input domain. Such policies are best enforced with session types, which is part of the Cloud Haskell roadmap, but unconnected to managed processes.

During development, the handy Log option will write an info message to the SystemLog with information about unexpected inputs (including type info), whilst in production, the obvious choice is between the silent Drop and its explosive sibling, Terminate. Since in Cloud Haskell’s open messaging architecture, it is impossible to guarantee against unexpected messages (even in the presence of advanced protocol enforcement tools such as session types), whichever option is chosen, the server must have some policy for dealing with unexpected messages.

Warning: Watch out for unhandled deliveries, especially when using the Drop policy. In particular, unhandled message types are a common cause of application failure and when servers discard messages without notifying their clients, deadlocks will quickly ensue! ——

Hiding Implementation Details

Whilst there is nothing to stop clients from sending messages directly to a managed process, there are ways to avoid this (in most cases) by hiding our ProcessId, either behind a newtype or some other opaque data structure). The author of the server is then able to force clients through API calls that can enforce the required types and ensure that the correct client-server protocol is used.

In its simplest guise, this technique simply employs the compiler to ensure that our clients only communicate with us in well-known ways. Let’s take a look at this in action, revisiting the well-trodden math server example from our previous tutorials:

module MathServer
  ( -- client facing API
  , add
    -- starting/spawning the server process
  , launchMathServer
  ) where

import .... -- elided

newtype MathServer = MathServer { mathServerPid :: ProcessId }
  deriving (Typeable)

-- other types/details elided

add :: MathServer -> Double -> Double -> Process Double
add MathServer{..} = call mathServerPid . Add

launchMathServer :: Process MathServer
launchMathServer = launch >>= return . MathServer
  where launch =
    let server = statelessProcess {
      apiHandlers = [ handleCall_ (\(Add x y) -> return (x + y)) ]
    , unhandledMessagePolicy = Drop
    in spawnLocal $ start () (statelessInit Infinity) server >> return ()

What we’ve changed here is the handle clients use to communicate with the process, hiding the ProcessId behind a newtype and forcing client code to use the MathServer handle to call our API functions. Since the MathServer newtype wraps a ProcessId, it is Serializable and can be sent to remote clients if needed.

Warning: Note that we still cannot assume that no info messages will arrive in our mailbox, since it is impossible to guarantee our ProcessId will remain private due to the presence of the management and tracing/debugging APIs in distributed-process. Servers that use the distributed-process monitoring APIs, must also be prepared to deal with monitor signals (such as the ubiquitous ProcessMonitorNotification) arriving as info messages, since these are always dispatched directly to our mailbox via the node controller. ——

Another reason to use a server handle like this, instead of a raw ProcessId, is to ensure type compatibility between client and server, in cases where the server has been written to generically deal with various types whilst the client needs to reify its calls/casts over a specific type. To demonstrate this approach, we’ll consider the Registry module, which provides an enhanced process registry that provides name registration services and also behaves like a per-process, global key-value store.

Each Registry server deals with specific types of keys and values. Allowing clients to send and receive instructions pertaining to a registry server without knowing the exact types the server was spawned to handle, is a recipe for disaster, since the client is very likely to block indefinitely if the expected request types don’t match up, since the server will ignore them. We can alleviate this problem using phantom type parameters, storing only the real ProcessId we need to communicate with the server, whilst utilising the compiler to ensure the correct types are assumed at both ends.

data Registry k v = Registry { registryPid :: ProcessId }
  deriving (Typeable, Generic, Show, Eq)
instance (Keyable k, Serializable v) => Binary (Registry k v) where

In order to start our registry, we need to know the specific k and v types, but we do not real values of these, so we use scoped type variables to reify them when creating the Registry handle:

start :: forall k v. (Keyable k, Serializable v) => Process (Registry k v)
start = return . Registry =<< spawnLocal (run (undefined :: Registry k v))

run :: forall k v. (Keyable k, Serializable v) => Registry k v -> Process ()
run _ =
  MP.pserve () (const $ return $ InitOk initState Infinity) serverDefinition
  -- etc....

Having wrapped the ProcessId in a newtype that ensures the types with which the server was initialised are respected by clients, we use the same approach as earlier to force clients of our API to interact with the server not only using the requisite call/cast protocol, but also providing the correct types in the form of a valid handle.

addProperty :: (Keyable k, Serializable v)
            => Registry k v -> k -> v -> Process RegisterKeyReply
addProperty reg k v = ....

So long as we only expose Registry newtype construction via our start API, clients cannot forge a registry handle and both client and server can rely on the compiler to have enforced the correct types for all our interactions.

Info: Forcing users to interact with your process via an opaque handle is a good habbit to get into, as is hiding the ProcessId where possible. Use phantom types along with these server handles, to ensure clients do not send unexpected data to the server. ——

Of course, you might actually need the server’s ProcessId sometimes, perhaps for monitoring, name registration or similar schemes that operate explicitly on a ProcessId. It is also common to need support for sending info messages. Some APIs are built on “plain old messaging” via send and therefore completely hiding your ProcessId becomes the right way to expose an API to your clients, but the wrong way to expose your process to other APIs it is utilising.

In these situations, the Resolvable and Routable typeclasses are your friend. By providing a Resolvable instance, you can expose your ProcessId to peers that really need it, whilst documenting (via the design decision to only expose the ProcessId via a typeclass) the need to use the handle in client code.

instance Resolvable (Registry k v) where
  resolve = return . Just . registryPid

The Routable typeclass provides a means to dispatch messages without having to know the implementation details behind the scenes. This provides us with a means for APIs that need to send messages directly to our process to do so via the opaque handle, without us exposing the ProcessId to them directly. (Of course, such APIs have to be written with Routable in mind!)

There is a default (and fairly efficient) instance of Routable for all Resolvable instances, so it is usually enough to implement the latter. An explicit implementation for our Registry would look like this:

instance Routable (Registry k v) where
  sendTo       reg msg = send (registryPid reg) msg
  unsafeSendTo reg msg = unsafeSend (registryPid reg) msg

Similar typeclasses are provided for the many occaisions when you need to link to or kill a process without knowing its ProcessId:

class Linkable a where
  -- | Create a /link/ with the supplied object.
  linkTo :: a -> Process ()

class Killable a where
  killProc :: a -> String -> Process ()
  exitProc :: (Serializable m) => a -> m -> Process ()

Again, there are default instances of both typeclasses for all Resolvable types, so it is enough to provide just that instance for your handles.

Using Typed Channels

Typed Channels can be used in two ways via the managed process API, either as inputs to the server or as a reply channel for RPC style interactions that offer an alternative to using call.

Reply Channels

When using the call API, the server can reply with a datum that doesn’t match the type(s) the client expects. This will cause the client to either deadlock or timeout, depending on which variant of call was used. This isn’t usually a problem, since the server author also writes the client facing API(s) and can therefore carefully check that the correct types are being returned. That’s still potentially error prone however, and using a SendPort as a reply channel can make it easier to spot potential type discrepancies.

The machinery behind reply channels is very simple: We create a new channel for the reply and pass the SendPort to the server along with our input message. The server is responsible for sending its reply to the given SendPort and the corresponding ReceivePort is returned so the caller can wait on it. For course, if no corresponding handler is present in the server definition, there may be no reply (and depending on the server’s unhandledMessagePolicy, we may crash the server).

Warning: Using typed reply channels does not guarantee against type mismatches! The server might not recognise the message type or the type of the reply channel, in which case the message will be considered an unhandled input and dealt with accordingly. ——

Typed channels are better suited to handling deferred client-server RPC calls than plain inter-process messaging too. The only non-blocking call API is based on Async and its only failure mode is an AsyncFailed result containing a corresponding ExitReason. The callTimeout API is equally limited, since once its delay is exceeded (and the call times out), you cannot subsequently retry listening for the message - the client is on its own at this point, and has to deal with potentially stray (and un-ordered!) replies using the low level flushPendingCalls API. By using a typed channel for replies, we can avoid both these issues since after the RPC is initiated, the client can defer obtaining a reply from the ReceivePort until it’s ready, timeout waiting for the reply and try again at a later time and even wait on the results of multiple RPC calls (to one or more servers) at the same by merging the ports.

If we wish to block and wait for a reply immediately (just as we would with call), two blocking operations are provided to simplify the task, one of which returns an ExitReason on failure, whilst the other crashes (with the given ExitReason of course!). The implementation is precisely what you’d expect a blocking call to do, right up to monitoring the server for potential exit signals (so as not to deadlock the client if the server dies before replying) - all of which is handled by awaitResponse in the platform’s Primitives module.

syncSafeCallChan server msg = do
  rp <- callChan server msg
  awaitResponse server [ matchChan rp (return . Right) ]

This might sound like a vast improvement on the usual combination of a client API that uses call and a corresponding handleCall in the process definition, with the programmer left to ensure the types always match up. In reality, there is a trade-off to be made however. Using the handleCall APIs means that our server side code can use the fluent server API for state changes, immediate replies and so on. None of these features will work with the corollary family of handleRpcChan functions. Whether or not the difference is merely aesthetic, we leave as a question for the reader to determine. The following example demonstrates the use of reply channels:

-- two versions of the same handler, one for calls, one for typed (reply) channels

data State
data Input
data Output

-- typeable and binary instances ommitted for brevity

-- client code

callDemo :: ProcessId -> Process Output
callDemo server = call server Input

chanDemo :: ProcessId -> Process Output
chanDemo server = syncCallChan server Input

-- server code (process definition ommitted for brevity)

callHandler :: Dispatcher State
callHandler = handleCall $ \state Input -> reply Output state

chanHandler :: Dispatcher State
chanHandler = handleRpcChan $ \state port Input -> replyChan port Output >> continue state

Info: Using typed channels for replies is both flexible and efficient. The trade-off is that you must remember to send a reply from the server explicitly, whereas the call API forces you to decide how to respond (to the client) via the ProcessReply type which server-side call handlers have to evaluate to. ——

Input (Control) Channels

An alternative input plane managed process servers; Control Channels provide a number of benefits above and beyond both the standard call and cast APIs and the use of reply channels. These include efficiency - typed channels are very lightweight constructs in general! - and type safety, as well as giving the server the ability to prioritise information sent on control channels over other traffic.

Using typed channels as inputs to your managed process is the most efficient way to enable client-server communication, particularly for intra-node traffic, due to their internal use of STM (and in particular, its use during selective receives). Control channels can provide an alternative to prioritised process definitions, since their use of channels ensures that, providing the control channel handler(s) occur in the process definition’s apiHandlers list before the other dispatchers, any messages received on those channels will be prioritised over other traffic. This is the most efficient kind of prioritisation - not much use if you need to prioritise info messages of course, but very useful if control messages need to be given priority over other inputs.

Warning: Control channels are not compatible with prioritised process definitions! The type system does not prevent them from being declared though, since they are represented by a Dispatcher and therefore deemded valid entries of the apiHandlers field. Upon startup, a prioritised process definition that contains control channel dispatchers in its apiHandlers will immediately exit with the reason ExitOther "IllegalControlChannel" though. ——

In order to use a typed channel as an input plane, it is necessary to leak the SendPort to your clients somehow. One way would be to send it on demand, but the simplest approach is actually to initialise a handle with all the relevant send ports and return this to the spawning process via a private channel, MVar or STM (or similar). Because a SendPort is Serializable, forwarding them (or the handle they’re contained within) is no problem either.

Since typed channels are a one way street, there’s no direct API support for RPC calls when using them to send data to a server. The work-around for this remains simple, type-safe and elegant though: we encode a reply channel into our command/request datum so the server knows where (and with what type) to reply. This does increase the amount of boilerplate code the client-facing API has to endure, but it’s a small price to pay for the efficiency and additional type safety provided.

First, we’ll look at an example of a single control channel being used with the chanServe API. This handles the messy details of passing the control channel back to the calling process, at least to some extent. For this example, we’ll examine the Mailbox module, since this combines a fire-and-forget control channel with an opaque server handle.

-- our handle is fairly simple
data Mailbox = Mailbox { pid   :: !ProcessId
                       , cchan :: !(ControlPort ControlMessage)
                       } deriving (Typeable, Generic, Eq)
instance Binary Mailbox where

instance Linkable Mailbox where
  linkTo = link . pid

instance Resolvable Mailbox where
  resolve = return . Just . pid

-- lots of details elided....

-- Starting the mailbox involves both spawning, and passing back the process id,
-- plus we need to get our hands on a control port for the control channel!

doStartMailbox :: Maybe SupervisorPid
               -> ProcessId
               -> BufferType
               -> Limit
               -> Process Mailbox
doStartMailbox mSp p b l = do
  bchan <- liftIO $ newBroadcastTChanIO
  rchan <- liftIO $ atomically $ dupTChan bchan
  spawnLocal (maybeLink mSp >> runMailbox bchan p b l) >>= \pid -> do
    cc <- liftIO $ atomically $ readTChan rchan
    return $ Mailbox pid cc  -- return our opaque handle!
    maybeLink Nothing   = return ()
    maybeLink (Just p') = link p'

runMailbox :: TChan (ControlPort ControlMessage)
           -> ProcessId
           -> BufferType
           -> Limit
           -> Process ()
runMailbox tc pid buffT maxSz = do
  link pid
  tc' <- liftIO $ atomically $ dupTChan tc
  MP.chanServe (pid, buffT, maxSz) (mboxInit tc') (processDefinition pid tc)

mboxInit :: TChan (ControlPort ControlMessage)
         -> InitHandler (ProcessId, BufferType, Limit) State
mboxInit tc (pid, buffT, maxSz) = do
  cc <- liftIO $ atomically $ readTChan tc
  return $ InitOk (State Seq.empty $ defaultState buffT maxSz pid cc) Infinity

processDefinition :: ProcessId
                  -> TChan (ControlPort ControlMessage)
                  -> ControlChannel ControlMessage
                  -> Process (ProcessDefinition State)
processDefinition pid tc cc = do
  liftIO $ atomically $ writeTChan tc $ channelControlPort cc
  return $ defaultProcess { apiHandlers = [
                               handleControlChan     cc handleControlMessages
                             , Restricted.handleCall handleGetStats
                          , infoHandlers = [ handleInfo handlePost
                                           , handleRaw  handleRawInputs ]
                          , unhandledMessagePolicy = DeadLetter pid
                          } :: Process (ProcessDefinition State)

Since the rest of the mailbox initialisation code is quite complex, we’ll leave it there for now. The important details to take away are the use of chanServe and its requirement for a thunk that initialises the ProcessDefinition, so it can perform IO - a pre-requisite to sharing the control channels with the spawning process, which must use STM or something similar in order to share data with the newly spawned server’s initialisation code. In our case, we want to pass the control port from the thunk passed to chanServe back to both the spawning process and the init function (which is normally de-coupled from the initialising thunk), which makes this a good example of how to utilise a broadcast TChan (or TQueue) to share control plane structures during initialisation.

Now we’ll cook up another (contrived) example that uses multiple typed control channels, demonstrating how to create control channels explicitly, how to obtain a ControlPort for each one, one way of passing these back to the process spawning the server (so as to fill in the opaque server handle) and how to utilise these in your client code, complete with the use of typed reply channels. This code will not use chanServe, since that API only supports a single control channel - the original purpose behind the control channel concept - and instead, we’ll create the process loop ourselves, using the exported low level recvLoop function.

type NumRequests = Int

data EchoServer = EchoServer { echoRequests :: ControlPort String
                             , statRequests :: ControlPort NumRequests
                             , serverPid    :: ProcessId
  deriving (Typeable, Generic)
instance Binary EchoServer where
instance NFData EchoServer where

instance Resolvable EchoServer where
  resolve = return . Just . serverPid

instance Linkable EchoServer where
  linkTo = link . serverPid

-- The server takes a String and returns it verbatim

data EchoRequest = EchoReq !String !(SendPort String)
  deriving (Typeable, Generic)
instance Binary EchoRequest where
instance NFData EchoRequest where

data StatsRequest = StatsReq !(SendPort Int)
  deriving (Typeable, Generic)
instance Binary StatsRequest where
instance NFData StatsRequest where

-- client code

echo :: EchoServer -> String -> Process String
echo h s = do
  (sp, rp) <- newChan
  let req = EchoReq s sp
  sendControlMessage (echoRequests h) req
  receiveWait [ matchChan rp return ]

stats :: EchoServer -> Process NumRequests
stats h = do
  (sp, rp) <- newChan
  let req = StatsReq sp
  sendControlMessage (statRequests h) req
  receiveWait [ matchChan rp return ]

demo :: Process ()
demo = do
  server <- spawnEchoServer
  foobar <- echo server "foobar"
  foobar `shouldBe` equalTo "foobar"

  baz <- echo server "baz"
  baz `shouldBe` equalTo baz

  count <- stats server
  count `shouldBe` equalTo (2 :: NumRequests)

-- server code

spawnEchoServer :: Process EchoServer
spawnEchoServer = do
  (sp, rp) <- newChan
  pid <- spawnLocal $ runEchoServer sp
  (echoPort, statsPort) <- receiveChan rp
  return $ EchoServer echoPort statsPort pid

runEchoServer :: SendPort (ControlPort EchoRequest, ControlPort StatsRequest)
              -> Process ()
runEchoServer portsChan = do
  echoChan <- newControlChan
  echoPort <- channelControlPort echoChan
  statChan <- newControlChan
  statPort <- channelControlPort statChan
  sendChan portsChan (echoPort, statPort)
  runProcess (recvLoop $ echoServerDefinition echoChan statChan ) echoServerInit

echoServerInit :: InitHandler () NumRequests
echoServerInit = return $ InitOk (0 :: Int) Infinity

echoServerDefinition :: ControlChannel EchoRequest
                     -> ControlChannel StatsRequest
                     -> ProcessDefinition NumRequests
echoServerDefinition echoChan statChan =
  defaultProcess {
      apiHandlers = [ handleControlChan echoChan handleEcho
                    , handleControlChan statChan handleStats

handleEcho :: NumRequests -> EchoRequest -> Process (ProcessAction State)
handleEcho count (EchoReq req replyTo) = do
  replyChan replyTo req  -- echo back the string
  continue $ count + 1

handleStats :: NumRequests -> StatsRequest -> Process (ProcessAction State)
handleStats count (StatsReq replyTo) = do
  replyChan replyTo count
  continue count

Although not very useful, this is a working example. Note that the client must deal with a ControlPort and not the complete ControlChannel itself. Also note that the server is completely responsible for replying (explicitly) to the client using the send ports supplied in the request data.

Info: Combining control channels with opaque handles is another great way to enforce additional type safety, since the channels must be initialised by the server code before it can create handlers for them and the client code that passes data to them (via the SendPort) is bound to exactly the same type(s)! Furthermore, adding reply channels (in the form of a SendPort) to the request types ensures that the replies will be handled correctly as well! As a result, there can be no ambiguity about the types involved for either side of the client-server relationship and therefore no unhandled messages due to runtime type mismatches - the compiler will catch that sort of thing for us! ——