This is the Cloud Haskell Platform. Cloud Haskell is a set of libraries that bring Erlang-style concurrency and distribution to Haskell programs. This project is an implementation of that distributed computing interface, where processes communicate with one another through explicit message passing rather than shared memory.
Originally described by the joint Towards Haskell in the Cloud paper, Cloud Haskell has be re-written from the ground up and supports a rich and growing number of features for
There is a presentation on Cloud Haskell and this reimplementation, which is worth reading in conjunction with the documentation and wiki pages on this website..
Cloud Haskell comprises the following components, some of which are complete, others experimental. There are three main parts:
link
and monitor
.Data.Dynamic
and Data.Typeable
but supporting polymorphic valuesgen_server
Network.Transport
APINetwork.Transport
Network.Transport
(incomplete)One of Cloud Haskell’s goals is to separate the transport layer from the process layer, so that the transport backend is entirely independent. In fact other projects can and do reuse the transport layer, even if they don’t use or have their own process layer (see e.g. HdpH).
Abstracting over the transport layer allows different protocols for
message passing, including TCP/IP, UDP,
MPI,
CCI,
ZeroMQ, SSH, MVars, Unix pipes, and more. Each of these transports provides
its own implementation of the Network.Transport
API and provide a means of creating
new connections for use within Control.Distributed.Process
.
The following diagram shows dependencies between the various subsystems, in an application using Cloud Haskell, where arrows represent explicit directional dependencies.
+------------------------------------------------------------+
| Application |
+------------------------------------------------------------+
| |
V V
+-------------------------+ +------------------------------+
| Cloud Haskell |<--| Cloud Haskell Backend |
| (distributed-process) | | (distributed-process-...) |
+-------------------------+ +------------------------------+
| ______/ |
V V V
+-------------------------+ +------------------------------+
| Transport Interface |<--| Transport Implementation |
| (network-transport) | | (network-transport-...) |
+-------------------------+ +------------------------------+
|
V
+------------------------------+
| Haskell/C Transport Library |
+------------------------------+
In this diagram, the various nodes roughly correspond to specific modules:
Cloud Haskell : Control.Distributed.Process
Cloud Haskell : Control.Distributed.Process.*
Transport Interface : Network.Transport
Transport Implementation : Network.Transport.*
An application is built using the primitives provided by the Cloud
Haskell layer, provided by the Control.Distributed.Process
module, which
defines abstractions such as nodes and processes.
The application also depends on a Cloud Haskell Backend, which provides functions to allow the initialisation of the transport layer using whatever topology might be appropriate to the application.
It is, of course, possible to create new Cloud Haskell nodes by
using a Network Transport Backend such as Network.Transport.TCP
directly.
The Cloud Haskell interface and backend make use of the Transport
interface provided by the Network.Transport
module.
This also serves as an interface for the Network.Transport.*
module, which provides a specific implementation for this transport,
and may, for example, be based on some external library written in
Haskell or C.
Cloud Haskell’s generic network-transport API is entirely independent of the concurrency and messaging passing capabilities of the process layer. Cloud Haskell applications are built using the primitives provided by the process layer (i.e., distributed-process), which provides abstractions such as nodes and processes. Applications must also depend on a Cloud Haskell backend, which provides functions to allow the initialisation of the transport layer using whatever topology might be appropriate to the application.
Network.Transport
is a network abstraction layer geared towards specific
classes of applications, offering the following high level concepts:
EndPoint
s. These are heavyweight stateful objects.EndPoint
has an EndPointAddress
.EndPoint
to another using the EndPointAddress
of the remote end.EndPointAddress
can be serialised and sent over the network, whereas EndPoint
s and connections cannot.EndPoint
s are unidirectional and lightweight.Connection
object that represents the sending end of the connection.EndPoint
are collected via a shared receive queue.EndPoint
s are notified of other Event
s such as new connections or broken connections.This design was heavily influenced by the design of the Common Communication Interface (CCI). Important design goals are:
For the purposes of most Cloud Haskell applications, it is sufficient to know
enough about the Network.Transport
API to instantiate a backend with the
required configuration and pass the returned opaque handle to the Node
API
in order to establish a new, connected, running node. More involved setups are,
of course, possible; The simplest use of the API is thus
Here we can see that the application depends explicitly on the
defaultTCPParameters
and createTransport
functions from
Network.Transport.TCP
, but little else. The application can make use
of other Network.Transport
APIs if required, but for the most part this
is irrelevant and the application will interact with Cloud Haskell through
the Process Layer and Platform.
For more details about Network.Transport
please see the wiki page.
The Process Layer is where Cloud Haskell’s support for concurrency and distributed programming are exposed to application developers. This layer deals explicitly with
The core of Cloud Haskell’s concurrency and distribution support resides in the distributed-process library. As well as the APIs necessary for starting nodes and forking processes on them, we find all the basic primitives required to
Most of this is easy enough to follow in the haddock documentation and the various tutorials. Here we focus on the essential concepts behind the process layer.
A concurrent process is somewhat like a Haskell thread - in fact it is a
forkIO
thread - but one that can send and receive messages through its
process mailbox. Each process can send messages asynchronously to other
processes, and can receive messages synchronously from its own mailbox.
The conceptual difference between threads and processes is that the latter
do not share state, but communicate only via message passing.
Code that is executed in this manner must run in the Process
monad. Our
process will look like any other monad code, plus we provide and instance
of MonadIO
for Process
, so you can liftIO
to make IO actions
available.
Processes reside on nodes, which in our implementation map directly to the
Control.Distributed.Processes.Node
module. Given a configured
Network.Transport
backend, starting a new node is fairly simple:
Once this function returns, the node will be up and running and able to
interact with other nodes and host processes. It is possible to start more
than one node in the same running program, though if you do this they will
continue to send messages to one another using the supplied Network.Transport
backend.
Given a new node, there are two primitives for starting a new process.
Once we’ve spawned some processes, they can communicate with one another using the messaging primitives provided by distributed-process, which are well documented in the haddocks.
Processes can send data if the type implements the Serializable
typeclass,
which is done indirectly by implementing Binary
and deriving Typeable
.
Implementations are already provided for primitives and some commonly used
data structures. As programmers, we see the messages in nice high-level form
(e.g., Int
, String
, Ping
, Pong
, etc), however these data have to be
encoded in order to be sent over a communications channel.
Not all types are Serializable
, for example concurrency primitives such as
MVar
and TVar
are meaningless outside the context of threads with a shared
memory. Cloud Haskell programs remain free to use these constructs within
processes or within processes on the same machine though. If you want to
pass data between processes using ordinary concurrency primitives such as
STM
then you’re free to do so. Processes spawned locally can share
types such as TMVar
just as normal Haskell threads would.
Channels provides an alternative to message transmission with send
and expect
.
While send
and expect
allow us to transmit messages of any Serializable
type, channels require a uniform type. Channels work like a distributed equivalent
of Haskell’s Control.Concurrent.Chan
, however they have distinct ends: a single
receiving port and a corollary send port.
Channels provide a nice alternative to bare send and receive, which is a bit un-Haskell-ish, since our process’ message queue can contain messages of multiple types, forcing us to undertake dynamic type checking at runtime.
We create channels with a call to newChan
, and send/receive on them using the
{send,receive}Chan
primitives:
Channels are particularly useful when you are sending a message that needs a response, because we know exactly where to look for the reply.
Channels can also allow message types to be simplified, as passing a
ProcessId
for the reply isn’t required. Channels aren’t so useful when we
need to spawn a process and send a bunch a messages to it, then wait for
replies however; we can’t send a ReceivePort
since it is not Serializable
.
ReceivePort
s can be merged, so we can listen on several simultaneously. In the
latest version of distributed-process, we can listen for regular messages
and multiple channels at the same time, using matchChan
in the list of
allowed matches passed receiveWait
and receiveTimeout
.
Processes can be linked to other processes, nodes or channels. Links are unidirectional,
and guarantee that once the linked object dies, the linked process will also be
terminated. Monitors do not cause the listening process to exit, but rather they
put a ProcessMonitorNotification
into the process’ mailbox. Linking and monitoring
are foundational tools for supervising processes, where a top level process manages
a set of children, starting, stopping and restarting them as necessary.
Because processes are implemented with forkIO
we might be tempted to stop
them by throwing an asynchronous exception to the process, but this is almost
certainly the wrong thing to do. Firstly, processes might reside on a remote
node, in which case throwing an exception is impossible. Secondly, if we send
some messages to a process’ mailbox and then dispatch an exception to kill it,
there is no guarantee that the subject will receive our message before being
terminated by the asynchronous exception.
To terminate a process unconditionally, we use the kill
primitive, which
dispatches an asynchronous exception (killing the subject) safely, respecting
remote calls to processes on disparate nodes and observing message ordering
guarantees such that send pid "hello" >> kill pid "goodbye"
behaves quite
unsurprisingly, delivering the message before the kill signal.
Exit signals come in two flavours however - those that can be caught and those
that cannot. Whilst a call to kill
results in an un-trappable exception,
a call to exit :: (Serializable a) => ProcessId -> a -> Process ()
will dispatch
an exit signal to the specified process that can be caught. These signals are
intercepted and handled by the destination process using catchExit
, allowing
the receiver to match on the Serializable
datum tucked away in the exit signal
and decide whether to oblige or not.
Towards Haskell in the Cloud describes a multi-layered architecture, in which manipulation of concurrent processes and message passing between them is managed in the process layer, whilst a higher level API described as the task layer provides additional features such as
The distributed-process-task library implements parts of the task layer, but takes a very different approach to that described in the original paper and implemented by the remote package. In particular, we diverge from the original design and defer to many of the principles defined by Erlang’s Open Telecom Platform, taking in some well established Haskell concurrency design patterns along the way.
In fact, distributed-process-async does not really consider the
task layer in great detail. We provide an API comparable to remote’s
Promise
in Control.Distributed.Process.Async
. This API however,
is derived from Simon Marlow’s Control.Concurrent.Async package, and is not
limited to blocking queries on Async
handles in the same way. Instead our
API handles both blocking and non-blocking queries, polling
and working with lists of Async
handles. We also eschew throwing exceptions
to indicate asynchronous task failures, instead handling task and connectivity
failures using monitors. Users of the API need only concern themselves with the
AsyncResult
, which encodes the status and (possibly) outcome of the computation
simply.
Unlike remote’s task layer, we do not exclude IO, allowing tasks to run in
the Process
monad and execute arbitrary code. Providing a monadic wrapper
around Async
that disallows side effects is relatively simple, and we
do not consider the presence of side effects a barrier to fault tolerance
and automated process restarts. Erlang does not forbid IO in its processes,
and yet that doesn’t render supervision trees ineffective. They key is to
provide a rich enough API that stateful processes can recognise whether or
not they need to provide idempotent initialisation routines.
The utility of preventing side effects using the type system is, however, not
to be sniffed at. A substrate of the ManagedProcess
API is under development
that provides a safe process abstraction in which side effect free computations
can be embedded, whilst reaping the benefits of the framework.
Work is also underway to provide abstractions for managing asynchronous tasks at a higher level, focussing on workload distribution and load regulation.
The kinds of task that can be performed by the async implementations in
distributed-process-async are limited only by their return type:
it must be Serializable
- that much should’ve been obvious by now.
The type of asynchronous task definitions comes in two flavours, one for
local nodes which require no remote-table or static serialisation dictionary,
and another for tasks you wish to execute on remote nodes.
The API for Async
is fairly rich, so reading the haddocks is suggested.
The main idea behind a ManagedProcess
is to separate the functional
and non-functional aspects of an actor. By functional, we mean whatever
application specific task the actor performs, and by non-functional
we mean the concurrency or, more precisely, handling of the process’
mailbox and its interaction with other actors (i.e., clients).
Looking at typed channels, we noted that their insistence on a specific input
domain was more haskell-ish than working with bare send and receive primitives.
The Async
sub-package also provides a type safe interface for receiving data,
although it is limited to running a computation and waiting for its result.
The Control.Distributed.Processes.Platform.ManagedProcess API provides a
number of different abstractions that can be used to achieve similar benefits
in your code. It works by introducing a standard protocol between your process
and the world outside, which governs how to handle request/reply processing,
exit signals, timeouts, sleeping/hibernation with threadDelay
and even provides
hooks that terminating processes can use to clean up residual state.
The API documentation is quite extensive, so here we will simply point
out the obvious differences. A process implemented with ManagedProcess
can present a type safe API to its callers (and the server side code too!),
although that’s not its primary benefit. For a very simplified example:
Apart from the types and the imports, that is a complete definition. Whilst
it’s not so obvious what’s going on here, the key point is that the invocation
of call
in the client facing API functions handles all of the relevant
waiting/blocking, converting the async result and so on. Note that the
managed process does not interact with its mailbox at all, but rather
just provides callback functions which take some state and either return a
new state and a reply, or just a new state. The process is managed in the
sense that its mailbox is under someone else’s control.
A NOTE ABOUT THE CALL API AND THAT IT WILL FAIL (WITH UNHANDLED MESSAGE) IF THE CALLER IS EXPECTING A TYPE THAT DIFFERS FROM THE ONE THE SERVER PLANS TO RETURN, SINCE THE RETURN TYPE IS ENCODED IN THE CALL-MESSAGE TYPE ITSELF.
TODO: WRITE A TEST TO PROVE THE ABOVE
TODO: ADD AN API BASED ON SESSION TYPES AS A KIND OF MANAGED PROCESS…..
In a forthcoming tutorial, we’ll look at the Control.Distributed.Process.Platform.Task
API, which looks a lot like Async
but manages exit signals in a single thread and makes
configurable task pools and task supervision strategy part of its API.
More complex examples of the ManagedProcess
API can be seen in the
Managed Processes tutorial. API documentation for HEAD is available
here.
TBC
TBC