This is the Cloud Haskell Platform. Cloud Haskell is a set of libraries that bring Erlang-style concurrency and distribution to Haskell programs. This project is an implementation of that distributed computing interface, where processes communicate with one another through explicit message passing rather than shared memory.
Originally described by the joint Towards Haskell in the Cloud paper, Cloud Haskell has be re-written from the ground up and supports a rich and growing number of features for
There is a presentation on Cloud Haskell and this reimplementation, which is worth reading in conjunction with the documentation and wiki pages on this website..
Cloud Haskell comprises the following components, some of which are complete, others experimental. There are three main parts:
Data.Typeablebut supporting polymorphic values
One of Cloud Haskell’s goals is to separate the transport layer from the process layer, so that the transport backend is entirely independent. In fact other projects can and do reuse the transport layer, even if they don’t use or have their own process layer (see e.g. HdpH).
Abstracting over the transport layer allows different protocols for
message passing, including TCP/IP, UDP,
ZeroMQ, SSH, MVars, Unix pipes, and more. Each of these transports provides
its own implementation of the
Network.Transport API and provide a means of creating
new connections for use within
The following diagram shows dependencies between the various subsystems, in an application using Cloud Haskell, where arrows represent explicit directional dependencies.
+------------------------------------------------------------+ | Application | +------------------------------------------------------------+ | | V V +-------------------------+ +------------------------------+ | Cloud Haskell |<--| Cloud Haskell Backend | | (distributed-process) | | (distributed-process-...) | +-------------------------+ +------------------------------+ | ______/ | V V V +-------------------------+ +------------------------------+ | Transport Interface |<--| Transport Implementation | | (network-transport) | | (network-transport-...) | +-------------------------+ +------------------------------+ | V +------------------------------+ | Haskell/C Transport Library | +------------------------------+
In this diagram, the various nodes roughly correspond to specific modules:
Cloud Haskell : Control.Distributed.Process Cloud Haskell : Control.Distributed.Process.* Transport Interface : Network.Transport Transport Implementation : Network.Transport.*
An application is built using the primitives provided by the Cloud
Haskell layer, provided by the
Control.Distributed.Process module, which
defines abstractions such as nodes and processes.
The application also depends on a Cloud Haskell Backend, which provides functions to allow the initialisation of the transport layer using whatever topology might be appropriate to the application.
It is, of course, possible to create new Cloud Haskell nodes by
using a Network Transport Backend such as
The Cloud Haskell interface and backend make use of the Transport
interface provided by the
This also serves as an interface for the
module, which provides a specific implementation for this transport,
and may, for example, be based on some external library written in
Haskell or C.
Cloud Haskell’s generic network-transport API is entirely independent of the concurrency and messaging passing capabilities of the process layer. Cloud Haskell applications are built using the primitives provided by the process layer (i.e., distributed-process), which provides abstractions such as nodes and processes. Applications must also depend on a Cloud Haskell backend, which provides functions to allow the initialisation of the transport layer using whatever topology might be appropriate to the application.
Network.Transport is a network abstraction layer geared towards specific
classes of applications, offering the following high level concepts:
EndPoints. These are heavyweight stateful objects.
EndPointto another using the
EndPointAddressof the remote end.
EndPointAddresscan be serialised and sent over the network, whereas
EndPoints and connections cannot.
EndPoints are unidirectional and lightweight.
Connectionobject that represents the sending end of the connection.
EndPointare collected via a shared receive queue.
EndPoints are notified of other
Events such as new connections or broken connections.
This design was heavily influenced by the design of the Common Communication Interface (CCI). Important design goals are:
For the purposes of most Cloud Haskell applications, it is sufficient to know
enough about the
Network.Transport API to instantiate a backend with the
required configuration and pass the returned opaque handle to the
in order to establish a new, connected, running node. More involved setups are,
of course, possible; The simplest use of the API is thus
Here we can see that the application depends explicitly on the
createTransport functions from
Network.Transport.TCP, but little else. The application can make use
Network.Transport APIs if required, but for the most part this
is irrelevant and the application will interact with Cloud Haskell through
the Process Layer and Platform.
For more details about
Network.Transport please see the wiki page.
The Process Layer is where Cloud Haskell’s support for concurrency and distributed programming are exposed to application developers. This layer deals explicitly with
The core of Cloud Haskell’s concurrency and distribution support resides in the distributed-process library. As well as the APIs necessary for starting nodes and forking processes on them, we find all the basic primitives required to
Most of this is easy enough to follow in the haddock documentation and the various tutorials. Here we focus on the essential concepts behind the process layer.
A concurrent process is somewhat like a Haskell thread - in fact it is a
forkIO thread - but one that can send and receive messages through its
process mailbox. Each process can send messages asynchronously to other
processes, and can receive messages synchronously from its own mailbox.
The conceptual difference between threads and processes is that the latter
do not share state, but communicate only via message passing.
Code that is executed in this manner must run in the
Process monad. Our
process will look like any other monad code, plus we provide and instance
Process, so you can
liftIO to make IO actions
Processes reside on nodes, which in our implementation map directly to the
Control.Distributed.Processes.Node module. Given a configured
Network.Transport backend, starting a new node is fairly simple:
Once this function returns, the node will be up and running and able to
interact with other nodes and host processes. It is possible to start more
than one node in the same running program, though if you do this they will
continue to send messages to one another using the supplied
Given a new node, there are two primitives for starting a new process.
Once we’ve spawned some processes, they can communicate with one another using the messaging primitives provided by [distributed-processes][distributed-processes], which are well documented in the haddocks.
Processes can send data if the type implements the
which is done indirectly by implementing
Binary and deriving
Implementations are already provided for primitives and some commonly used
data structures. As programmers, we see the messages in nice high-level form
Pong, etc), however these data have to be
encoded in order to be sent over a communications channel.
Not all types are
Serializable, for example concurrency primitives such as
TVar are meaningless outside the context of threads with a shared
memory. Cloud Haskell programs remain free to use these constructs within
processes or within processes on the same machine though. If you want to
pass data between processes using ordinary concurrency primitives such as
STM then you’re free to do so. Processes spawned locally can share
types such as
TMVar just as normal Haskell threads would.
Channels provides an alternative to message transmission with
expect allow us to transmit messages of any
type, channels require a uniform type. Channels work like a distributed equivalent
Control.Concurrent.Chan, however they have distinct ends: a single
receiving port and a corollary send port.
Channels provide a nice alternative to bare send and receive, which is a bit un-Haskell-ish, since our process’ message queue can contain messages of multiple types, forcing us to undertake dynamic type checking at runtime.
We create channels with a call to
newChan, and send/receive on them using the
Channels are particularly useful when you are sending a message that needs a response, because we know exactly where to look for the reply.
Channels can also allow message types to be simplified, as passing a
ProcessId for the reply isn’t required. Channels aren’t so useful when we
need to spawn a process and send a bunch a messages to it, then wait for
replies however; we can’t send a
ReceivePort since it is not
ReceivePorts can be merged, so we can listen on several simultaneously. In the
latest version of distributed-process, we can listen for regular messages
and multiple channels at the same time, using
matchChan in the list of
allowed matches passed
Processes can be linked to other processes, nodes or channels. Links are unidirectional,
and guarantee that once the linked object dies, the linked process will also be
terminated. Monitors do not cause the listening process to exit, but rather they
ProcessMonitorNotification into the process’ mailbox. Linking and monitoring
are foundational tools for supervising processes, where a top level process manages
a set of children, starting, stopping and restarting them as necessary.
Because processes are implemented with
forkIO we might be tempted to stop
them by throwing an asynchronous exception to the process, but this is almost
certainly the wrong thing to do. Firstly, processes might reside on a remote
node, in which case throwing an exception is impossible. Secondly, if we send
some messages to a process’ mailbox and then dispatch an exception to kill it,
there is no guarantee that the subject will receive our message before being
terminated by the asynchronous exception.
To terminate a process unconditionally, we use the
kill primitive, which
dispatches an asynchronous exception (killing the subject) safely, respecting
remote calls to processes on disparate nodes and observing message ordering
guarantees such that
send pid "hello" >> kill pid "goodbye" behaves quite
unsurprisingly, delivering the message before the kill signal.
Exit signals come in two flavours however - those that can be caught and those
that cannot. Whilst a call to
kill results in an un-trappable exception,
a call to
exit :: (Serializable a) => ProcessId -> a -> Process () will dispatch
an exit signal to the specified process that can be caught. These signals are
intercepted and handled by the destination process using
the receiver to match on the
Serializable datum tucked away in the exit signal
and decide whether to oblige or not.
Towards Haskell in the Cloud describes a multi-layered architecture, in which manipulation of concurrent processes and message passing between them is managed in the process layer, whilst a higher level API described as the task layer provides additional features such as
The distributed-process-task library implements parts of the task layer, but takes a very different approach to that described in the original paper and implemented by the remote package. In particular, we diverge from the original design and defer to many of the principles defined by Erlang’s Open Telecom Platform, taking in some well established Haskell concurrency design patterns along the way.
In fact, distributed-process-async does not really consider the
task layer in great detail. We provide an API comparable to remote’s
Control.Distributed.Process.Async. This API however,
is derived from Simon Marlow’s Control.Concurrent.Async package, and is not
limited to blocking queries on
Async handles in the same way. Instead our
API handles both blocking and non-blocking queries, polling
and working with lists of
Async handles. We also eschew throwing exceptions
to indicate asynchronous task failures, instead handling task and connectivity
failures using monitors. Users of the API need only concern themselves with the
AsyncResult, which encodes the status and (possibly) outcome of the computation
Unlike remote’s task layer, we do not exclude IO, allowing tasks to run in
Process monad and execute arbitrary code. Providing a monadic wrapper
Async that disallows side effects is relatively simple, and we
do not consider the presence of side effects a barrier to fault tolerance
and automated process restarts. Erlang does not forbid IO in its processes,
and yet that doesn’t render supervision trees ineffective. They key is to
provide a rich enough API that stateful processes can recognise whether or
not they need to provide idempotent initialisation routines.
The utility of preventing side effects using the type system is, however, not
to be sniffed at. A substrate of the
ManagedProcess API is under development
that provides a safe process abstraction in which side effect free computations
can be embedded, whilst reaping the benefits of the framework.
Work is also underway to provide abstractions for managing asynchronous tasks at a higher level, focussing on workload distribution and load regulation.
The kinds of task that can be performed by the async implementations in
distributed-process-async are limited only by their return type:
it must be
Serializable - that much should’ve been obvious by now.
The type of asynchronous task definitions comes in two flavours, one for
local nodes which require no remote-table or static serialisation dictionary,
and another for tasks you wish to execute on remote nodes.
The API for
Async is fairly rich, so reading the haddocks is suggested.
The main idea behind a
ManagedProcess is to separate the functional
and non-functional aspects of an actor. By functional, we mean whatever
application specific task the actor performs, and by non-functional
we mean the concurrency or, more precisely, handling of the process’
mailbox and its interaction with other actors (i.e., clients).
Looking at typed channels, we noted that their insistence on a specific input
domain was more haskell-ish than working with bare send and receive primitives.
Async sub-package also provides a type safe interface for receiving data,
although it is limited to running a computation and waiting for its result.
The Control.Distributed.Processes.Platform.ManagedProcess API provides a
number of different abstractions that can be used to achieve similar benefits
in your code. It works by introducing a standard protocol between your process
and the world outside, which governs how to handle request/reply processing,
exit signals, timeouts, sleeping/hibernation with
threadDelay and even provides
hooks that terminating processes can use to clean up residual state.
The API documentation is quite extensive, so here we will simply point
out the obvious differences. A process implemented with
can present a type safe API to its callers (and the server side code too!),
although that’s not its primary benefit. For a very simplified example:
Apart from the types and the imports, that is a complete definition. Whilst
it’s not so obvious what’s going on here, the key point is that the invocation
call in the client facing API functions handles all of the relevant
waiting/blocking, converting the async result and so on. Note that the
managed process does not interact with its mailbox at all, but rather
just provides callback functions which take some state and either return a
new state and a reply, or just a new state. The process is managed in the
sense that its mailbox is under someone else’s control.
A NOTE ABOUT THE CALL API AND THAT IT WILL FAIL (WITH UNHANDLED MESSAGE) IF THE CALLER IS EXPECTING A TYPE THAT DIFFERS FROM THE ONE THE SERVER PLANS TO RETURN, SINCE THE RETURN TYPE IS ENCODED IN THE CALL-MESSAGE TYPE ITSELF.
TODO: WRITE A TEST TO PROVE THE ABOVE
TODO: ADD AN API BASED ON SESSION TYPES AS A KIND OF MANAGED PROCESS…..
In a forthcoming tutorial, we’ll look at the
API, which looks a lot like
Async but manages exit signals in a single thread and makes
configurable task pools and task supervision strategy part of its API.