The source code for this tutorial is based on the
from distributed-process-task and can be accessed here.
Please note that this tutorial is based on the stable (master) branch
There may be subtle bugs hiding in code that evaluates
directly. Forgetting to monitor the destination whilst waiting for a reply
or failing to match on the correct message types are the most common and
other, more esoteric problems exist, such as badly formed
for user defined data types which can crash the sender or worse, in the
presence of unsafe operations and unevaluated thunks, unexpectedly
crash the receiver.
The /Managed Process/ API handles sending to and receiving from the server process, in-process error handling and message decoding, leaving you to focus on writing code that describes what the server process does when it receives messages, rather than how it receives them. The API also provides a set of pre-defined client interactions, all of which have well defined semantics and failure modes. There is support for sending messages to/from a process’ mailbox, using typed channels for inputs and outputs, RPC calls (i.e., waiting for a reply from the server) and fire-and-forget client-server messages.
Managed processess are defined using record syntax, providing lists of
Dispatcher objects describing how the server handles particular kinds of
client interaction for specific input types. The
also provides hooks for error handling (in case of either server code crashing
or exit signals dispatched to the server process from elsewhere) and cleanup
code to be run on termination/shutdown.
When defining a protocol between client and server, we typically decide on
a set of types the server will handle and possibly maps these to replies we
may wish to send back. The
call mechanisms cater for this
specifically, providing tight control over the domain of input messages from
clients, whilst ensuring that client code handles errors consistently and
input messages are routed to a suitable message handling function in the
In the following example, we’ll take a look at this API in action.
Let’s consider the simple math server we encountered in the high level
documentation. We could allow clients to send us a tuple of
(ProcessId, Double, Double), replying to the first tuple element with the
sum of the second and third. What happens if our server process is killed
while the client is waiting for the reply though? The client would deadlock.
Clients could always set up a monitor and wait for the reply or a monitor
signal, and could even write such code generically, but what if the code
evaluating some such utility function then
expects the wrong type? We could
use a typed channel to alleviate that ill, but that only helps with the client
receiving messages, not the server. How can we ensure that the server receives
the correct type(s) as well? Creating multiple typed channels (one for each
kind of message we’re expecting) and then distributing those to all our clients
is awkward at best (though we will see how to do something like this using the
API in a later tutorial).
cast APIs help us to avoid this conundrum, providing a uniform
API for both the client and the server to observe. Here’s a better example of
that math server that does just that:
This style of programming will already be familiar if you’ve used some
send in your clients and the
receive [ match ... ]
family of functions to write your servers. The primary difference here,
is that the choice of when to return to (potentially blocking on) the
server’s mailbox is taken out of the programmer’s hands, leaving the
implementor to worry only about the logic to be applied once a message
of one type or another is received.
We could even hide the math server behind a newtype and prevent messages
being sent to its
ProcessId altogether, but we will leave that as an
exercise for the reader.
Of course, it would still be possible to write the server and client code
and encounter data type decoding failures, since the
call function takes
Serializable datum just like
send. We can solve that for
the return type of the remote call by sending a typed channel and
replying explicitly to it in our server side code. Whilst this doesn’t
make the server code any prettier (since it has to reply to the channel
explicitly, rather than just evaluating to a result), it does reduce the
likelihood of runtime errors somewhat.
Ensuring that only valid types are sent to the server is relatively simple,
given that we do not expose the client directly to
call and write our own
wrapper functions. An additional level of isolation and safety is available
when using /control channels/, which will be covered in a subsequent tutorial.
Before we leave the math server behind, let’s take a brief look at the
part of the client-server protocol. Unlike its synchronous cousin,
not expect a reply at all - it is a fire and forget call, much like
but carries the same additional type information that a
call does (about its
inputs) and is also routed to a
Dispatcher in the
apiHandlers2 field of
the process definition.
We will use cast with the existing
Add type, to implement a function that
takes an /add request/ and prints the result instead of returning it. If we
were implementing this with
call we would be a bit stuck, because there is
nothing to differentiate between two
Add instances and the server would
choose the first valid (i.e., type safe) handler and ignore the others we’d
Also note that because the client doesn’t wait for a reply, if you execute this function in a test/demo application, you’ll need to block the main thread for a while to wait for the server to receive the message and print out the result.
Of course this is a toy example - why defer simple computations like addition and/or printing results to a separate process? Next, we’ll build something a bit more interesting and useful.
This section of the tutorial is based on a real module from the
distributed-process-task library, called
Let’s imagine we want to execute tasks on an arbitrary node, but want
the caller to block whilst the remote task is executing. We also want
to put an upper bound on the number of concurrent tasks/callers that
the server will accept. Let’s use
ManagedProcess to implement a generic
task server like this, with the following characteristics
Once the upper bound is reached, tasks will be queued up for execution. Only when we drop below this limit will tasks be taken from the backlog and executed.
Since we want the server to proceed with its work whilst the client is
blocked, the asynchronous
cast API may sound like the ideal approach,
or we might use the asynchronous cousin of our typed-channel
call API however, offers exactly the
tools we need to keep the client blocked (waiting for a reply) whilst
the server is allowed to proceed with its work.
We’ll start by thinking about the types we need to consume in the server and client processes: the tasks we’re being asked to perform.
To submit a task, our clients will submit an action in the process
monad, wrapped in a
Closure environment. We will use the
typeclass to allow clients to specify the server’s location in whatever
manner suits them: The type of a task will be
Closure (Process a) and
the server will explicitly return an /either/ value with
for errors and
Right a for successful results.
Remember that in Cloud Haskell, the only way to communicate with a process
(apart from introducing scoped concurrency primitives like
MVar or using
stm) is via its mailbox and typed channels. Also, all communication with
the process is asynchronous from the sender’s perspective and synchronous
from the receiver’s. Although
call is a synchronous (RPC-like) protocol,
communication with the server process has to take place out of band.
The server implementation chooses to reply to each request and when handling
call, can defer its reply until a later stage, thus going back to
receiving and processing other messages in the meantime. As far as the client
is concerned, it is simply waiting for a reply. Note that the
is implemented so that messages from other processes cannot interleave with
the server’s response. This is very important, since another message of type
Either String a could theoretically arrive in our mailbox from somewhere
else whilst we’re receiving, therefore
call transparently tags the call
message and awaits a specific reply from the server (containing the same
tag). These tags are guaranteed to be unique across multiple nodes, since
they’re based on a
MonitorRef, which holds a
Identifier ProcessId and
a node local monitor ref counter. All monitor creation is coordinated by
the caller’s node controller (guaranteeing the uniqueness of the ref
counter for the lifetime of the node) and the references are not easily
forged (i.e., sent by mistake - this is not a security feature of any sort)
since the type is opaque.
In terms of code for the client then, that’s all there is to it!
Note that the type signature we expose to our consumers is specific, and that
we do not expose them to arbitrary messages arriving in their mailbox. Note
that if a
call fails, a
ProcessExitException will be thrown in the caller’s
thread (since the implementation calls
die if it detects that the server has
died before replying). Other variations of
call exist that return a
Either ExitReason a instead of making the caller’s process exit.
Note that if the server replies to this call with some other type (i.e., a type
Either String a) then our client will be blocked indefinitely!
We could alleviate this by using a typed channel as we saw previously with our
math server, but there’s little point since we’re in total charge of both the
client and the server’s code.
To implement the server, we’ll need to hang on to some internal state. As well as
knowing our queue’s size limit, we will need to track the active tasks we’re
currently running. Each task will be submitted as a
Closure (Process a) and we’ll
need to spawn the task (asynchronously), handle the result (once the closure has
run to completion) and communicate the result (or failure) to the original caller.
This means our pool state will need to be parameterised by the result type it will accept in its closures. So now we have the beginnings of our state type:
So how can we execute this
Closure (Process a) without blocking the server
process itself? We can use the
to execute each task asynchronously and provide a means for waiting on the result.
In order to use an
Async handle to get the result of the computation once it’s
complete, we’ll have to hang on to a reference. We also need a way to associate the
submitter with the handle, so we end up with one field for the active (running)
tasks and another for the queue of accepted (but inactive) ones, as expected.
Since we cannot wait on all these
Async handles at once whilst we’re supposed to
be accepting new messages from clients - actually, distributed-process-async does provide an API for
multiplexing on async results, but that’s no use here - instead we will monitor the
async tasks and pull the results when we receive their monitor signals. So for the
active tasks, we’ll need to store a
MonitorRef and a reference to the original
caller, plus the async handle itself. We’ll use a simple association list for this
state, though we should probably use a more optimal data structure eventually.
For the tasks that we cannot execute immediately (i.e., when we reach the queue’s size limit), we hold the client ref and the closure, but no monitor ref. We’ll use a data structure that support FIFO ordering semantics for this, since that’s probably what clients will expect of something calling itself a “queue”.
Our queue-like behaviour is fairly simple to define using
Now, to turn that
Closure environment into a thunk we can evaluate, we’ll use the
unClosure function, and we’ll pass the thunk to
async and get back
a handle to the running async task, which we’ll then need to monitor. We won’t cover
the async API in detail here, except to point out that the call to
async spawns a
new process to do the actual work and returns a handle that we can use to query for
We can now implement the
acceptTask function, which the server will use to handle
submitted tasks. The signature of our function must be compatible with the message
handling API from
ManagedProcess that we’re going to use it with - in this case
handleCallFrom. This variant of the
handleCall family of functions is specifically
intended for use when the server is going to potentially delay its reply, rather than
replying immediately. It takes an expression that operates over our server’s state, a
CallRef that uniquely identifies the caller and can be used to reply to them later
on and the message that was sent to the server - in this case, a
Closure (Process a).
All managed process handler functions must return either a
how the server should proceed, or a
ProcessReply, which combines a
with a possible reply to one of the
call derivatives. Since we’re deferring our reply
until later, we will use
noReply_, which creates a
ProcessAction for us, telling
the server to continue receiving messages.
If we’re at capacity, we add the task (and caller) to the
otherwise we launch and monitor the task using
async and stash the monitor
ref, caller ref and the async handle together in the
Now we must write a function that handles the results of these closures. When a monitor signal arrives in our mailbox, we need to lookup the async handle associated with it so as to obtain the result and send it back to the caller. Because, even if we were running at capacity, we’ve now seen a task complete (and therefore reduced the number of active tasks by one), we will also pull off a pending task from the backlog (i.e., accepted), if any exists, and execute it.
The steps then, are
This chain then, looks like
wait >>= respond >> bump-next-task >>= continue.
Item (3) requires special API support from
ManagedProcess, because we’re not
just sending any message back to the caller. We’re replying to a specific
that has taken place and is, from the client’s perspective, still running.
ManagedProcess API call for this is
There is quite a bit of code in this next function, which we’ll look at in detail.
Firstly, note that the signature is similar to the one we used for
returns just a
ProcessAction instead of
ProcessReply. This function will not be
wired up to a
call (or even a
cast), because the node controller will send the
monitor signal directly to our mailbox, not using the managed process APIs at all.
This kind of client interaction is called an info call in the managed process API,
and since there’s no expected reply, as with
cast, we simply return a
telling the server what to do next - in this case, to
continue reading from the
We’ve dealt with mapping the
Either values, which we could have
left to the caller, but this makes the client facing API much simpler to work with.
Note that our use of an association list for the active run queue makes for an
O(n) search for our worker, but that can be optimised with a map or dictionary later.
Worse, we have to scan the list again when deleting the worker from the run queue,
but the same fix (using a Map) should alleviate that problem too. We leave that as
an exercise for the reader.
Call and cast handlers live in the
apiHandlers list of our
and have the type
Dispatcher s where
s is the state type for the process. We
cannot construct a
Dispatcher ourselves, but a range of functions in the
ManagedProcess.Server module exist to convert functions like the ones we’ve just
defined, to the correct type.
In order to spell things out for the compiler, we need to put a type signature
in place at the call site for
storeTask, so our final construct for that
handler is thus:
No such thing is required for
taskComplete, as there’s no ambiguity about its
type. Our process definition is now finished, and here it is:
Starting the server takes a bit of work:
ManagedProcess provides several
utility functions to help with spawning and running processes. The
function takes an initialising thunk (which has the type
that must generate the initial state and set up the server’s receive timeout,
then the process definition which we’ve already encountered. For more details
about starting managed processes, see the haddocks.
Defining tasks is as simple as making them remote-worthy:
And executing them is just as simple too.
Starting up the server itself locally or on a remote node, is just a matter of
start. We can go a step further though,
and add a bit more type safety to our API by using an opaque handle to communicate
with the server. The advantage of this is that it right now it is possible for
a client to send a
Closure to the server with a return type different from the
one the server is expecting! Since the server won’t recognise that message, the
unhandledMessagePolicy will be applied, which by default crashes the server with
an exit reason referring to “unhandled inputs”!
By returning a handle to the server using a parameterised type, we can ensure that
only closures returning a matching type are sent. To do so, we use a phantom type
parameter and simply stash the real
ProcessId in a newtype. We also need to be
able to pass this handle to the managed process
call API, so we define an
instance of the
Resolvable typeclass for it, which makes a (default) instance of
Routable available, which is exactly what
call is expecting:
Finally, we write a
start function that returns this handle and change the
executeTask to match it:
In this tutorial, we’ve really just scratched the surface of the
API. By handing over control of the client/server protocol to the framework, we
are able to focus on the code that matters, such as state transitions and decision
making, without getting bogged down (much) with the business of sending and
receiving messages, handling client/server failures and such like.
We did not take much care over our choice of data structures. Might this have
profound consequences for clients? Perhaps more of a concern is the cost of
Async everywhere - remember we used this in the server to handle
concurrently executing tasks and obtaining their results. An invocation of
async will create two new processes: one to perform the calculation and
another to monitor the first and handle failures and/or cancellation. Spawning
processes is cheap, but not free as each process is a haskell thread, plus some
additional book keeping data.