The source code for this tutorial is based on the BlockingQueue
API
from distributed-process-task and can be accessed here.
Please note that this tutorial is based on the stable (master) branch
of distributed-process-task.
There may be subtle bugs hiding in code that evaluates send
and receive
directly. Forgetting to monitor the destination whilst waiting for a reply
or failing to match on the correct message types are the most common and
other, more esoteric problems exist, such as badly formed Binary
instances
for user defined data types which can crash the sender or worse, in the
presence of unsafe operations and unevaluated thunks, unexpectedly
crash the receiver.
The /Managed Process/ API handles sending to and receiving from the server process, in-process error handling and message decoding, leaving you to focus on writing code that describes what the server process does when it receives messages, rather than how it receives them. The API also provides a set of pre-defined client interactions, all of which have well defined semantics and failure modes. There is support for sending messages to/from a process’ mailbox, using typed channels for inputs and outputs, RPC calls (i.e., waiting for a reply from the server) and fire-and-forget client-server messages.
Managed processess are defined using record syntax, providing lists of
Dispatcher
objects describing how the server handles particular kinds of
client interaction for specific input types. The ProcessDefinition
record
also provides hooks for error handling (in case of either server code crashing
or exit signals dispatched to the server process from elsewhere) and cleanup
code to be run on termination/shutdown.
When defining a protocol between client and server, we typically decide on
a set of types the server will handle and possibly maps these to replies we
may wish to send back. The cast
and call
mechanisms cater for this
specifically, providing tight control over the domain of input messages from
clients, whilst ensuring that client code handles errors consistently and
input messages are routed to a suitable message handling function in the
server process.
In the following example, we’ll take a look at this API in action.
Let’s consider the simple math server we encountered in the high level
documentation. We could allow clients to send us a tuple of
(ProcessId, Double, Double)
, replying to the first tuple element with the
sum of the second and third. What happens if our server process is killed
while the client is waiting for the reply though? The client would deadlock.
Clients could always set up a monitor and wait for the reply or a monitor
signal, and could even write such code generically, but what if the code
evaluating some such utility function then expect
s the wrong type? We could
use a typed channel to alleviate that ill, but that only helps with the client
receiving messages, not the server. How can we ensure that the server receives
the correct type(s) as well? Creating multiple typed channels (one for each
kind of message we’re expecting) and then distributing those to all our clients
is awkward at best (though we will see how to do something like this using the
API in a later tutorial).
The call
and cast
APIs help us to avoid this conundrum, providing a uniform
API for both the client and the server to observe. Here’s a better example of
that math server that does just that:
This style of programming will already be familiar if you’ve used some
combination of send
in your clients and the receive [ match ... ]
family of functions to write your servers. The primary difference here,
is that the choice of when to return to (potentially blocking on) the
server’s mailbox is taken out of the programmer’s hands, leaving the
implementor to worry only about the logic to be applied once a message
of one type or another is received.
We could even hide the math server behind a newtype and prevent messages
being sent to its ProcessId
altogether, but we will leave that as an
exercise for the reader.
Of course, it would still be possible to write the server and client code
and encounter data type decoding failures, since the call
function takes
an arbitrary Serializable
datum just like send
. We can solve that for
the return type of the remote call by sending a typed channel and
replying explicitly to it in our server side code. Whilst this doesn’t
make the server code any prettier (since it has to reply to the channel
explicitly, rather than just evaluating to a result), it does reduce the
likelihood of runtime errors somewhat.
Ensuring that only valid types are sent to the server is relatively simple,
given that we do not expose the client directly to call
and write our own
wrapper functions. An additional level of isolation and safety is available
when using /control channels/, which will be covered in a subsequent tutorial.
Before we leave the math server behind, let’s take a brief look at the cast
part of the client-server protocol. Unlike its synchronous cousin, cast
does
not expect a reply at all - it is a fire and forget call, much like send
,
but carries the same additional type information that a call
does (about its
inputs) and is also routed to a Dispatcher
in the apiHandlers
2 field of
the process definition.
We will use cast with the existing Add
type, to implement a function that
takes an /add request/ and prints the result instead of returning it. If we
were implementing this with call
we would be a bit stuck, because there is
nothing to differentiate between two Add
instances and the server would
choose the first valid (i.e., type safe) handler and ignore the others we’d
declared.
Also note that because the client doesn’t wait for a reply, if you execute this function in a test/demo application, you’ll need to block the main thread for a while to wait for the server to receive the message and print out the result.
Of course this is a toy example - why defer simple computations like addition and/or printing results to a separate process? Next, we’ll build something a bit more interesting and useful.
This section of the tutorial is based on a real module from the
distributed-process-task library, called BlockingQueue
.
Let’s imagine we want to execute tasks on an arbitrary node, but want
the caller to block whilst the remote task is executing. We also want
to put an upper bound on the number of concurrent tasks/callers that
the server will accept. Let’s use ManagedProcess
to implement a generic
task server like this, with the following characteristics
Once the upper bound is reached, tasks will be queued up for execution. Only when we drop below this limit will tasks be taken from the backlog and executed.
Since we want the server to proceed with its work whilst the client is
blocked, the asynchronous cast
API may sound like the ideal approach,
or we might use the asynchronous cousin of our typed-channel
handling API callChan
. The call
API however, offers exactly the
tools we need to keep the client blocked (waiting for a reply) whilst
the server is allowed to proceed with its work.
We’ll start by thinking about the types we need to consume in the server and client processes: the tasks we’re being asked to perform.
To submit a task, our clients will submit an action in the process
monad, wrapped in a Closure
environment. We will use the Addressable
typeclass to allow clients to specify the server’s location in whatever
manner suits them: The type of a task will be Closure (Process a)
and
the server will explicitly return an /either/ value with Left String
for errors and Right a
for successful results.
Remember that in Cloud Haskell, the only way to communicate with a process
(apart from introducing scoped concurrency primitives like MVar
or using
stm) is via its mailbox and typed channels. Also, all communication with
the process is asynchronous from the sender’s perspective and synchronous
from the receiver’s. Although call
is a synchronous (RPC-like) protocol,
communication with the server process has to take place out of band.
The server implementation chooses to reply to each request and when handling
a call
, can defer its reply until a later stage, thus going back to
receiving and processing other messages in the meantime. As far as the client
is concerned, it is simply waiting for a reply. Note that the call
primitive
is implemented so that messages from other processes cannot interleave with
the server’s response. This is very important, since another message of type
Either String a
could theoretically arrive in our mailbox from somewhere
else whilst we’re receiving, therefore call
transparently tags the call
message and awaits a specific reply from the server (containing the same
tag). These tags are guaranteed to be unique across multiple nodes, since
they’re based on a MonitorRef
, which holds a Identifier ProcessId
and
a node local monitor ref counter. All monitor creation is coordinated by
the caller’s node controller (guaranteeing the uniqueness of the ref
counter for the lifetime of the node) and the references are not easily
forged (i.e., sent by mistake - this is not a security feature of any sort)
since the type is opaque.
In terms of code for the client then, that’s all there is to it!
Note that the type signature we expose to our consumers is specific, and that
we do not expose them to arbitrary messages arriving in their mailbox. Note
that if a call
fails, a ProcessExitException
will be thrown in the caller’s
thread (since the implementation calls die
if it detects that the server has
died before replying). Other variations of call
exist that return a Maybe
or
an Either ExitReason a
instead of making the caller’s process exit.
Note that if the server replies to this call with some other type (i.e., a type
other than Either String a
) then our client will be blocked indefinitely!
We could alleviate this by using a typed channel as we saw previously with our
math server, but there’s little point since we’re in total charge of both the
client and the server’s code.
To implement the server, we’ll need to hang on to some internal state. As well as
knowing our queue’s size limit, we will need to track the active tasks we’re
currently running. Each task will be submitted as a Closure (Process a)
and we’ll
need to spawn the task (asynchronously), handle the result (once the closure has
run to completion) and communicate the result (or failure) to the original caller.
This means our pool state will need to be parameterised by the result type it will accept in its closures. So now we have the beginnings of our state type:
So how can we execute this Closure (Process a)
without blocking the server
process itself? We can use the Control.Distributed.Process.Async
API
to execute each task asynchronously and provide a means for waiting on the result.
In order to use an Async
handle to get the result of the computation once it’s
complete, we’ll have to hang on to a reference. We also need a way to associate the
submitter with the handle, so we end up with one field for the active (running)
tasks and another for the queue of accepted (but inactive) ones, as expected.
Since we cannot wait on all these Async
handles at once whilst we’re supposed to
be accepting new messages from clients - actually, distributed-process-async does provide an API for
multiplexing on async results, but that’s no use here - instead we will monitor the
async tasks and pull the results when we receive their monitor signals. So for the
active tasks, we’ll need to store a MonitorRef
and a reference to the original
caller, plus the async handle itself. We’ll use a simple association list for this
state, though we should probably use a more optimal data structure eventually.
For the tasks that we cannot execute immediately (i.e., when we reach the queue’s size limit), we hold the client ref and the closure, but no monitor ref. We’ll use a data structure that support FIFO ordering semantics for this, since that’s probably what clients will expect of something calling itself a “queue”.
Our queue-like behaviour is fairly simple to define using Data.Sequence
:
Now, to turn that Closure
environment into a thunk we can evaluate, we’ll use the
built in unClosure
function, and we’ll pass the thunk to async
and get back
a handle to the running async task, which we’ll then need to monitor. We won’t cover
the async API in detail here, except to point out that the call to async
spawns a
new process to do the actual work and returns a handle that we can use to query for
the result.
We can now implement the acceptTask
function, which the server will use to handle
submitted tasks. The signature of our function must be compatible with the message
handling API from ManagedProcess
that we’re going to use it with - in this case
handleCallFrom
. This variant of the handleCall
family of functions is specifically
intended for use when the server is going to potentially delay its reply, rather than
replying immediately. It takes an expression that operates over our server’s state, a
CallRef
that uniquely identifies the caller and can be used to reply to them later
on and the message that was sent to the server - in this case, a Closure (Process a)
.
All managed process handler functions must return either a ProcessAction
, indicating
how the server should proceed, or a ProcessReply
, which combines a ProcessAction
with a possible reply to one of the call
derivatives. Since we’re deferring our reply
until later, we will use noReply_
, which creates a ProcessAction
for us, telling
the server to continue receiving messages.
If we’re at capacity, we add the task (and caller) to the accepted
queue,
otherwise we launch and monitor the task using async
and stash the monitor
ref, caller ref and the async handle together in the active
field.
Now we must write a function that handles the results of these closures. When a monitor signal arrives in our mailbox, we need to lookup the async handle associated with it so as to obtain the result and send it back to the caller. Because, even if we were running at capacity, we’ve now seen a task complete (and therefore reduced the number of active tasks by one), we will also pull off a pending task from the backlog (i.e., accepted), if any exists, and execute it.
The steps then, are
This chain then, looks like wait >>= respond >> bump-next-task >>= continue
.
Item (3) requires special API support from ManagedProcess
, because we’re not
just sending any message back to the caller. We’re replying to a specific call
that has taken place and is, from the client’s perspective, still running.
The ManagedProcess
API call for this is replyTo
.
There is quite a bit of code in this next function, which we’ll look at in detail.
Firstly, note that the signature is similar to the one we used for storeTask
, but
returns just a ProcessAction
instead of ProcessReply
. This function will not be
wired up to a call
(or even a cast
), because the node controller will send the
monitor signal directly to our mailbox, not using the managed process APIs at all.
This kind of client interaction is called an info call in the managed process API,
and since there’s no expected reply, as with cast
, we simply return a ProcessAction
telling the server what to do next - in this case, to continue
reading from the
mailbox.
We’ve dealt with mapping the AsyncResult
to Either
values, which we could have
left to the caller, but this makes the client facing API much simpler to work with.
Note that our use of an association list for the active run queue makes for an
O(n) search for our worker, but that can be optimised with a map or dictionary later.
Worse, we have to scan the list again when deleting the worker from the run queue,
but the same fix (using a Map) should alleviate that problem too. We leave that as
an exercise for the reader.
Call and cast handlers live in the apiHandlers
list of our ProcessDefinition
and have the type Dispatcher s
where s
is the state type for the process. We
cannot construct a Dispatcher
ourselves, but a range of functions in the
ManagedProcess.Server
module exist to convert functions like the ones we’ve just
defined, to the correct type.
In order to spell things out for the compiler, we need to put a type signature
in place at the call site for storeTask
, so our final construct for that
handler is thus:
No such thing is required for taskComplete
, as there’s no ambiguity about its
type. Our process definition is now finished, and here it is:
Starting the server takes a bit of work: ManagedProcess
provides several
utility functions to help with spawning and running processes. The serve
function takes an initialising thunk (which has the type InitHandler
)
that must generate the initial state and set up the server’s receive timeout,
then the process definition which we’ve already encountered. For more details
about starting managed processes, see the haddocks.
Defining tasks is as simple as making them remote-worthy:
And executing them is just as simple too.
Starting up the server itself locally or on a remote node, is just a matter of
combining spawn
or spawnLocal
with start
. We can go a step further though,
and add a bit more type safety to our API by using an opaque handle to communicate
with the server. The advantage of this is that it right now it is possible for
a client to send a Closure
to the server with a return type different from the
one the server is expecting! Since the server won’t recognise that message, the
unhandledMessagePolicy
will be applied, which by default crashes the server with
an exit reason referring to “unhandled inputs”!
By returning a handle to the server using a parameterised type, we can ensure that
only closures returning a matching type are sent. To do so, we use a phantom type
parameter and simply stash the real ProcessId
in a newtype. We also need to be
able to pass this handle to the managed process call
API, so we define an
instance of the Resolvable
typeclass for it, which makes a (default) instance of
Routable
available, which is exactly what call
is expecting:
Finally, we write a start
function that returns this handle and change the
signature of executeTask
to match it:
In this tutorial, we’ve really just scratched the surface of the ManagedProcess
API. By handing over control of the client/server protocol to the framework, we
are able to focus on the code that matters, such as state transitions and decision
making, without getting bogged down (much) with the business of sending and
receiving messages, handling client/server failures and such like.
We did not take much care over our choice of data structures. Might this have
profound consequences for clients? Perhaps more of a concern is the cost of
using Async
everywhere - remember we used this in the server to handle
concurrently executing tasks and obtaining their results. An invocation of
async
will create two new processes: one to perform the calculation and
another to monitor the first and handle failures and/or cancellation. Spawning
processes is cheap, but not free as each process is a haskell thread, plus some
additional book keeping data.