We have already met the send
primitive, used to deliver messages from one
process to another. Here’s a review of what we’ve learned about send
thus far:
Asynchronous sending buys us several benefits. Improved concurrency is
possible, because processes need not block or wait for acknowledgements,
nor does error handling need to be implemented each time a message is sent.
Consider a stream of messages sent from one process to another. If the
stream consists of messages a, b, c
and we have seen c
, then we know for
certain that we will have already seen a, b
(in that order), so long as the
messages were sent to us by the same peer process.
When two concurrent process exchange messages, Cloud Haskell guarantees that messages will be delivered in FIFO order, if at all. No such guarantee exists between N processes where N > 1, so if processes A and B are both communicating (concurrently) with process C, the ordering guarantee will only hold for each pair of interactions, i.e., between A and C and/or B and C the ordering will be guaranteed, but not between A and B with regards messages sent to C.
Because the mailbox contains messages of varying types, when we expect
a message, we eschew the ordering because we’re searching for a message
whose contents can be decoded to a specific type. Of course, we may want
to process messages in the precise order which they arrived. To achieve
this, we must defer the type checking that would normally cause a traversal
of the mailbox and extract the raw message ourselves. This can be achieved
using receive
and matchAny
, as we will demonstrate later.
Processes dequeue messages (from their mailbox) using the expect
and receive
family of primitives. Both take an optional timeout,
allowing the expression to evaluate to Nothing
if no matching input
is found.
The expect
primitive blocks until a message matching the expected type
(of the expression) is found in the process’ mailbox. If a match is found by
scanning the mailbox, it is dequeued and returned, otherwise the caller
(i.e., the calling thread/process) is blocked until a message of the expected
type is delivered to the mailbox. Let’s take a look at this in action:
This program will print "hello"
, then Nothing
and finally pid://...
.
The first expect
- labelled “third” because of the order in which we
know it will arrive in our mailbox - will succeed, since the parent process
sends its ProcessId
after the string “hello”, yet the listener blocks until it
can dequeue the ProcessId
before “expecting” a string. The second expect
(labelled “first”) also succeeds, demonstrating that the listener has selectively
removed messages from its mailbox based on their type rather than the order in
which they arrived. The third expect
will timeout and evaluate to Nothing
,
because only one string is ever sent to the listener and that has already been
removed from the mailbox. The removal of messages from the process’ mailbox based
on type is what makes this program viable - without this “selective receiving”,
the program would block and never complete.
By contrast, the receive
family of primitives take a list of Match
objects, each derived from evaluating a match
style primitive. This
subject was covered briefly in the first tutorial. Matching on messages allows
us to separate the type(s) of messages we can handle from the type that the
whole receive
expression evaluates to.
Consider the following snippet:
Note that each of the matches in the list must evaluate to the same type,
as the type signature indicates: receiveWait :: [Match b] -> Process b
.
The behaviour of receiveWait
differs from receiveTimeout
in that it
blocks forever (until a match is found in the process’ mailbox), whereas the
variant taking a timeout will return Nothing
unless a match is found within
the specified time interval. Note that as with System.Timeout
, the only
guarantee we have about a timeout based function is that it will not
expire before the given interval. Both functions scan the mailbox in FIFO
order, evaluating the list of match
expressions in declarative
(i.e., insertion) order until one of the matches succeeds or the operation
times out.
There are times when it is desirable to take a message from our mailbox without explicitly specifying its type. Not only is this a useful capability, it is the only way to process messages in the precise order they were received.
To see how this works in practise, let’s consider the relay
primitive that
ships with distributed-process. This utility function starts a process that
simply dequeues any messages it receives and forwards them to some other process.
In order to dequeue messages regardless of their type, this code relies on the
matchAny
primitive, which has the following type:
Since forwarding raw messages (without decoding them first) is a common pattern in Cloud Haskell programs, there is also a primitive to do that for us:
Given these types, we can see that in order to combine matchAny
with forward
we need to either flip forward
and apply the ProcessId
(leaving us with
the required type Message -> Process b
) or use a lambda - the actual implementation
does the latter and looks like this:
This is pretty useful, but since matchAny
operates on the raw Message
type,
we’re limited in what we can do with the messages we receive. In order to delve
inside a message, we have to know its type. If we have an expression that operates
on a specific type, we can attempt to decode the message to that type and examine
the result to see whether the decoding succeeds or not. There are two primitives
we can use to that effect: unwrapMessage
and handleMessage
. Their types look like
this:
Of the two, unwrapMessage
is the simpler, taking a raw Message
and evaluating to
Maybe a
before returning that value in the monad m
. If the type of the raw Message
does not match our expectation, the result will be Nothing
, otherwise Just a
.
The approach handleMessage
takes is a bit more flexible, taking a function
from a -> m b
and returning Just b
if the underlying message is of type a
(hence the
operation can be executed and evaluate to Maybe b
) or Nothing
if the message’s type
is incompatible with the handler function.
Let’s look at handleMessage
in action. Earlier on we looked at relay
from
distributed-process and now we’ll consider its sibling proxy
- this takes a predicate,
evaluates some input of type a
and returns Process Bool
, allowing us to run arbitrary
Process
code in order to decide whether or not the a
is eligible to be forwarded to
the relay ProcessId
. The type of proxy
is thus:
Since matchAny
operates on (Message -> Process b)
and handleMessage
operates on
a -> Process b
we can compose these to make our proxy server. We must not forward
messages for which the predicate function evaluates to Just False
, nor can we sensibly
forward messages which the predicate function is unable to evaluate due to type
incompatibility. This leaves us with the definition found in distributed-process:
Beyond simple relays and proxies, the raw message handling capabilities available in
distributed-process can be utilised to develop highly generic message processing code.
All the richness of the distributed-process-client-server APIs (such as ManagedProcess
) which
will be discussed in later tutorials are, in fact, built upon these families of primitives.
While being able to send and receive any Serializable
datum is very powerful, the burden
of decoding types correctly at runtime is levied on the programmer and there are runtime
overheads to be aware of (which will be covered in later tutorials). Fortunately,
distributed-provides provides a type safe alternative to send
and receive
, in the form
of Typed Channels. Represented by distinct ends, a SendPort a
(which is Serializable
)
and ReceivePort a
(which is not), channels are a lightweight and useful abstraction that
provides a type safe interface for interacting with processes separately from their primary
mailbox.
Channels are created with newChan :: Process (SendPort a, ReceivePort a)
, with
messages sent via sendChan :: SendPort a -> a -> Process ()
. The ReceivePort
can be
passed directly to receiveChan
, or used in a receive{Wait, Timeout}
call via the
matchChan
primitive, so as to combine mailbox scans with channel reads.
A process will continue executing until it has evaluated to some value, or is abruptly
terminated either by crashing (with an un-handled exception) or being instructed to
stop executing. Deliberate stop instructions take one of two forms: a ProcessExitException
or ProcessKillException
. As the names suggest, these signals are delivered in the form
of asynchronous exceptions, however you should not to rely on that fact! After all,
we cannot throw an exception to a thread that is executing in some other operating
system process or on a remote host! Instead, you should use the exit
and kill
primitives from distributed-process, which not only ensure that remote target processes
are handled seamlessly, but also maintain a guarantee that if you send a message and
then an exit signal, the message will be delivered to the destination process (via its
local node controller) before the exception is thrown - note that this does not guarantee
that the destination process will have time to do anything with the message before it
is terminated.
The ProcessExitException
signal is sent from one process to another, indicating that the
receiver is being asked to terminate. A process can choose to tell itself to exit, and the
die
primitive simplifies doing so without worrying about the expected type for the
action. In fact, die
has slightly different semantics from exit
, since the
latter involves sending an internal signal to the local node controller. A direct consequence
of this is that the exit signal may not arrive immediately, since the Node Controller could
be busy processing other events. On the other hand, the die
primitive throws a
ProcessExitException
directly in the calling thread, thus terminating it without delay.
In practise, this means the following two functions could behave quite differently at
runtime:
The ProcessExitException
type holds a reason field, which is serialised as a raw Message
.
This exception type is exported, so it is possible to catch these exit signals and decide how
to respond to them. Catching exit signals is done via a set of primitives in
distributed-process, and the use of them forms a key component of the various fault tolerance
strategies provided by distributed-process-supervisor.
A ProcessKillException
is intended to be an untrappable exit signal, so its type is
not exported and therefore you can only handle it by catching all exceptions, which
as we all know is very bad practise. The kill
primitive is intended to be a
brutal means for terminating process - e.g., it is used to terminate supervised child
processes that haven’t shutdown on request, or to terminate processes that don’t require
any special cleanup code to run when exiting - although it does behave like exit
in so much as it is dispatched (to the target process) via the Node Controller.
Processes can be linked to other processes (or nodes or channels). A link, which is
unidirectional, guarantees that once any object we have linked to exits, we will also
be terminated. A simple way to test this is to spawn a child process, link to it and then
terminate it, noting that we will subsequently die ourselves. Here’s a simple example,
in which we link to a child process and then cause it to terminate (by sending it a message
of the type it is waiting for). Even though the child terminates “normally”, our process
is also terminated since link
will link the lifetime of two processes together regardless
of exit reasons.
The medium that link failures uses to signal exit conditions is the same as exit and kill
signals - asynchronous exceptions. Once again, it is a bad idea to rely on this (not least
because it might change in some future release) and the exception type (ProcessLinkException
)
is not exported so as to prevent developers from abusing exception handling code in this
special case. Since link exit signals cannot be caught directly, if you find yourself wanting
to trap a link failure, you probably want to use a monitor instead.
Whilst the built-in link
primitive terminates the link-ee regardless of exit reason,
distributed-process-extras provides an alternate function linkOnFailure
, which only
dispatches the ProcessLinkException
if the link-ed process dies abnormally (i.e., with
some DiedReason
other than DiedNormal
).
Monitors on the other hand, do not cause the listening process to exit at all, instead
putting a ProcessMonitorNotification
into the process’ mailbox. This signal and its
constituent fields can be introspected in order to decide what action (if any) the receiver
can/should take in response to the monitored process’ death. Let’s take a look at how
monitors can be used to determine both when and how a process has terminated. Tucked
away in distributed-process-extras, the linkOnFailure
primitive works in exactly this
way, only terminating the caller if the subject terminates abnormally. Let’s take a look…
As we can see, this code makes use of monitors to track both processes involved in the link. In order to track both processes and react to changes in their status, it is necessary to spawn a third process which will do the monitoring. This doesn’t happen with the built-in link primitive, but is necessary in this case since the link handling code resides outside the Node Controller.
The two matches passed to receiveWait
both handle a ProcessMonitorNotification
, and
the predicate passed to matchIf
is used to determine whether the notification we’re
receiving is for the process that called us, or the linked to process. If the former
dies, we’ve nothing more to do, since links are unidirectional. If the latter dies
however, we must examine the DiedReason
the ProcessMonitorNotification
provides us
with, to determine whether the subject exited normally (i.e., with DiedNormal
).
If the exit was abnormal, we throw a ProcessLinkException
to the original caller,
which is exactly how an ordinary link would behave.
Linking and monitoring are foundational tools for supervising processes, where a top level process manages a set of children, starting, stopping and restarting them as necessary.
Exit signals in Cloud Haskell then, are unlike asynchronous exceptions in other
haskell code. Whilst a process can use asynchronous exceptions - there’s
nothing stoping this since the Process
monad is an instance of MonadIO
-
as we’ve seen, exceptions thrown are not bound by the same ordering guarantees
as messages delivered to a process. Link failures and exit signals might work
via asynchronous exceptions - that is the case in the current implementation - but
these are implemented in such a fashion that if you send a message and then an
exit signal, the message is guaranteed to arrive first.
You should avoid throwing your own exceptions in code where possible. Instead,
you should terminate yourself, or another process, using the built-in primitives
exit
, kill
and die
.
The getProcessInfo
function provides a means for us to obtain information about a running
process. The ProcessInfo
type it returns contains the local node id and a list of
registered names, monitors and links for the process. The call returns Nothing
if the
process in question is not alive.
It is not generally necessary, but it may be convenient in your application to use a custom monad transformer stack with the Process monad at the bottom. For example, you may have decided that in various places in your application you will make calls to a network database. You may create a data access module, and it will need configuration information available to it in order to connect to the database server. A ReaderT can be a nice way to make configuration data available throughout an application without schlepping it around by hand.
This example is a bit contrived and over-simplified but
illustrates the concept. Consider the fetchUser
function below, it runs in the AppProcess
monad which provides the configuration settings required to connect to the database:
So this would mostly work but it is not complete. What happens if an exception
is thrown by the query
function? Your open database handle may not be
closed. Typically we manage this with the bracket function.
In the base library, bracket is defined in Control.Exception with this signature:
Great! We pass an IO action that acquires a resource; bracket
passes that
resource to a function which takes the resource and runs another action.
We also provide a release function which bracket
is guaranteed to run
even if the primary action raises an exception.
Unfortunately, we cannot directly use bracket
in our
fetchUser
function: openDB (resource acquisition) runs in the AppProcess
monad. If our functions ran in IO, we could lift the entire bracket computation into
our monad transformer stack with liftIO; but we cannot do that for the computations
passed to bracket.
It is perfectly possible to write our own bracket; distributed-process
does this
for the Process
monad (which is itself a newtyped ReaderT stack). Here is how that is done:
distributed-process
needs to do this sort of thing to keep its dependency
list small, but do we really want to write this for every transformer stack
we use in our own applications? No! And we do not have to, thanks to
the monad-control and lifted-base libraries.
monad-control provides several typeclasses and helper functions that make it possible to fully generalize the wrapping/unwrapping required to keep transformer effects stashed away while actions run in the base monad. Of most concern to end users of this library are the typeclass MonadBase and MonadBaseControl. How it works is beyond the scope of this tutorial, but there is an excellent and thorough explanation written by Michael Snoyman which is available here.
lifted-base takes advantage of these typeclasses to provide lifted versions of many functions in the Haskell base library. For example, Control.Exception.Lifted has a definition of bracket that looks like this:
It is just the same as the version found in base, except it is generalized to work
with actions in any monad that implements MonadBaseControl IO. monad-control defines
instances for the standard transformers, but that instance requires the base monad
(in this case, Process
) to also have an instance of these classes.
To address this the distributed-process-monad-control package
provides orphan instances of the Process
type for both MonadBase IO and MonadBaseControl IO.
After importing these, we can rewrite our fetchUser
function to use the instance of bracket
provided by lifted-base.
lifted-base also provides conveniences like MVar and other concurrency primitives that operate in MonadBase IO. One benefit here is that your code is not sprinkled with liftIO; but MonadBaseControl IO also makes things like a lifted withMVar possible - which is really just a specialization of bracket. You will also find lots of other libraries on hackage which use these instances - at present count there are more than 150 packages using it.
One note of caution: This instance can enable use of functions such as forkIO
(or, fork
from lifted-base) which compromise invariants in the Process
monad and can lead to confusing and subtle issues. Always use the Cloud Haskell functions such as spawnLocal instead.