A Tour of Manifold’s Deferred, Stream and Event Bus API

Higher level abstractions for asynchronous things in Clojure

In this piece and I am going to explore the Manifold library to take a look at Manifold’s Deferred, Stream and Event Bus features. This is a really handy tool kit for anyone doing asynchronous programming in Clojure[Script]. We can use deferred values to helps us to manage and abstract away some of the complexity of dealing with values that might arrive asynchronously, streams to connect all sorts of things together and an Event bus to provide a simple event driven messaging architecture.

Towards the end of the article we will also look at how these tools can be combined to perform asynchronous communication between the front and back-end of our app using a web-socket. Although we won’t be going into any topic in particular detail, it might be helpful to get an overview of the library and what it has to offer.

What is Manifold?

Manifold was originally designed by Zach Tellman and it describes itself as a library that provides the basic building blocks for asynchronous programming, which can be used as a translation layer between libraries that use similar but incompatible abstractions.

I like to think of Manifold as offering a slightly higher level of abstraction for our asynchronous programming. We can use it as we would any other asynchronous library. However, the cool part is that it offers far more compatibility and composability than we would have if we were to work with other libraries.

Image for post
Image for post

Here is a brief overview of three key parts of the library:

Manifold.deferred
Methods for creating, transforming, and interacting with asynchronous values.

Manifold.streams
Manifold’s streams provide mechanisms for asynchronous puts and takes, timeouts, and backpressure. They are compatible with Java’s BlockingQueues, Clojure's lazy sequences, and core.async's channels. Methods for converting to and from each are provided.

Manifold.bus
Manifold also provides a simple event bus that can be used for pub/sub communication.

You can download the library here:

Before we go on, Zach also has quite a few interesting talks on asynchronous programming which are well worth a watch:

The two core components of Manifold are deferreds for single asynchronous values and streams, for an ordered sequence of asynchronous values. I will be borrowing shamelessly from the documentation in a bid to understand the topic, so I can’t take any credit for the examples which you can find in the library.

A deferred is a bit like a promise and we can define a deferred as follows:

(def mydeferredvar (d/deferred)

Once we have a deferred object, we can return a success value or throw an error:

(d/success! mydeferredvar :result)
(d/error! mydeferredvar (Exception. "Error Message")

We can then dereference the deferred object using the @ operator:

@mydeferredvar
; Exception: Error Message

We can also get a deferred to do stuff when events happen by attaching callback functions. This mirrors other promise-like behavior by passing a success function followed by an error function.

(d/on-realized d
(fn [x] (println "success!" x))
(fn [x] (println "error!" x)))

However, you shouldn’t really use the call-back handler on-realized, as call-backs are a poor way of doing asynchronous programming. Instead, Manifold deferred features much better ways to handle values.

Chaining and Composition

Once of the cool features of the Deferred library is the ability to chain multiple functions together to process a result, which works a bit like a threading macro. The chain will fire sequentially when the deferred has been given a value via the (d/success!) function. However, as chain functions themselves might contain deferred values or deferrable-like values such as Clojrue futures, Java futures and Clojure promises. If any of the functions in the chain have these values they will pause the processing in the chain until the specified deferred yields a value.

(d/chain mydeferred processfn1 processfn2 #(println "result: " %))

In English this means that we can compose a number of functions together using chain, and have them complete sequentially in order, but only when each function’s deferred values are ready for the next step. If any of the functions in the chain throw an error, the remaining steps are skipped and the deferred returned by the chain function returns the error message. These can be handled using manfold.deferred/catch using Clojure’s threading macro.

(-> d
(d/chain dec #(/ 1 %))
(d/catch Exception #(println "whoops, that didn't work:" %)))

We can also combine multiple deferred values into a single deferred value using the zip operator. This might be useful if we have asynchronous values that we want to pass into another function when all of them have resolved.

(d/zip (future 1) (future 2) (future 3))
; (1 2 3)

We can also create deferred values that expire after x ms, using the d/timeout! function.

@(d/timeout!
(d/future (Thread/sleep 1000) :deferredval)
100
:timedoutvalue)
; :timedoutvalue

We can also use let-flow which some clever wizardry that allows us to treat a deferred value as if it already exists:

(defn deferred-sum []
(let-flow [a (call-service-a)
b (call-service-b)]
(+ a b)))

Voilla, we now have multiple asynchronous deferred’s being passed into the function, which will not execute until both values are present. We can also use asynchronous loops:

(defn my-consume [f stream]
(d/loop []
(d/chain (s/take! stream ::drained)

;; if we got a message, run it through `f`
(fn [msg]
(if (identical? ::drained msg)
::drained
(f msg)))

;; wait for the result from `f` to be realized, and
;; recur, unless the stream is already drained
(fn [result]
(when-not (identical? ::drained result)
(d/recur))))))

The loop magically only processes deferred values when they have been realized, so we get synchronous style programming with asynchronous values.

Introducing Manifold Streams

Manifold streams provide an abstraction layer to work with Java’s BlockingQueues, Clojure’s lazy sequences and core.async’s channels. One of the main benefits of Manifold is that we can convert easily between these, which give us a lot of flexibility if we want to change our architecture in the future or add functionality to it.

We can create a stream in the same way we would create a Core.async channel. We can also put! values and take! values from the stream. put! and take! return deferred’s which we can work with using Manifold’s deferred library functions (e.g. chain / loop).

Key Concepts:

  • All asynchronous values and operations in Manifold are represented as deferreds.
  • Streams can be message sources (consumers) or sinks (producers) or both.
  • Messages from anything which is sourceable can be piped into anything which is sinkable using manifold.stream/connect

If we needed buffering, then we can also create a buffer using (stream buffer-size) or call (buffer size stream) to create a downstream buffer. Messages can be rate-limited using (throttle max-rate stream).

(def s (s/stream))
(s/put! s "hello")
(s/take! s)
; "hello"

The main benefit is that we can also use other libraries with our Manifold streams:

; Core async
(def c (async/chan))
; Call the ->source fn in manifold on the channel to connect it.
(def s (s/->source c))

Now that we have connected the Core.async channel as a source, we can emit messages into it using the core .async put operator (enclosed in a go block):

(async/go (async/>! c 1))

The cool part is that we can now de-reference our manifold stream and call a take on it to get the value:

@(s/take! s)
; 1

We can also do a similar thing the other way around by calling ->sink

(def s (s/->sink c))
@(s/put! s "hello")
(async/go (println (async/<! c)))
;"hello"

The connect function allows us to bridge connections between different streams:

> (def s (s/stream))
#'s
> (def c (a/chan))
#'c
> (s/connect s c)
nil
> (s/put! s 1)
<< true >>
> (a/<!! c)
1

If we have a source that will never produce any further messages then we consider the source drained. We can test for this using drained? and register an on-drained callback. take! on a drained source returns nil by default but we can change this by passing a different value if we wish:

> @(s/take! s ::drained)
::drained

We also might want to set time-limits on how long we are willing to wait for our put or take to complete.

> (def s (s/stream))
> @(s/try-put! s :foo 1000 ::timeout)
> @(s/try-take! s ::drained 1000 ::timeout)

If the put of :foo to our stream hasn’t succeeded within 1,000ms, we return ::timeout. Similarly, if the take! stream is drained, we return ::drained or return a ::timeout value after 1,000ms.

Stream Operations

We can consume messages instantly using s/consume:

(s/consume #(prn 'message! %) s)

We can also create derivative streams:

(->> [1 2 3]
(s/map inc)
s/stream->seq)
; (2 3 4)

In the above example, we are passing the vector into the source, mapping a function (inc) onto each value using s/map and then converting the output back into a sequence. Behind the scenes, Manifold is also calling ->source on the sequence for us. If we weren’t using a map, we might have to be a bit more explicit:

> (->> [1 2 3]
s/->source
s/stream->seq
(map inc))
(2 3 4)

The key thing to remember here is that (s/map inc) automatically calls (s/->source) for us if it is being applied to a sequence and we can convert a stream into a sequence using s/stream->seq.

What if we tried taking a new Manifold stream and defined two downstream functions that act upon the values:

(def sourcestream (s/stream)); Downstream derived stream
(def a (s/map inc sourcestream))
(def b (s/map dec sourcestream))
@(s/put! sourcestream 0)@(s/take! a)
; 1
@(s/take! b)
; -1

We can also use manifold.stream/transform with any Clojure transducer if an operation isn’t supported.

(->> [1 2 3]
(s/transform (map inc))
s/stream->seq)
(2 3 4)

Connecting Streams and Feeding Sinks

We can connect two streams together using the s/connect function. Remember to de-reference the return value as s/put! and s/take! return a Manifold deferred. Connect takes a source and propagates all messages into the sink.

> (def a (s/stream))
#'a
> (def b (s/stream))
#'b
> (s/connect a b)
true
> @(s/put! a 1)
true
> @(s/take! b)
1

Connect also takes an option map:

  • downstream? — Closes the sink if the source closes
  • upstream? — whether the sink closing will close the source, even if there are other sinks downstream of the source (default to false)
  • timeout — The maximum time spent waiting to convey a message into the sink before the connection is severed (default to nil)
  • description — A description of the connection between source and sink
> (def a (s/stream))
#'a
> (def b (s/stream))
#'b
> (s/connect a b {:description "a connection"})
nil
> (s/description a)
{:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, ...}
> (s/downstream a)
(["a connection" << stream: ... >>])

We can also take two streams and apply a function in-between them:

> (def a (s/stream))
#'a
> (def b (s/stream))
#'b
> (s/connect-via a #(s/put! b (inc %)) b)
nil

The value returned by the callback for connect-via provides backpressure, as if a deferred value is returned, further messages will not be passed in until the deferred value is realized.

Introducing the Manifold Event Bus

The Event bus is a simple publish/subscribe mechanism for topics. We can use it to transport our events around our application. Parts of our application that are interested in certain events can register to the event bus by using the subscribe function. The event bus then knows to forward events on to these subscribers when they arrive. This will happen automatically by default if there are subscribers registered, but we can also use a buffer to hold a certain number of messages.

Event buses are great because they are simple to use and they help to decouple our application code. When we post an event, we don’t have to know who the subscribers are or what will happen to our message. We just put it on the bus and away it goes. Think of it as buying a coach ticket for Grandpa. The ticket has an approximate destination, say Ohio, for example. We buy the ticket for Gramps, put him on the bus, and trust that the bus will take him where he needs to go. We know nothing about the route the bus will take or how many stops off in Ohio our Grandpa might end up at. We just put him on the bus and hope that someone tells us when he arrives in Ohio.

In a Manifold Event bus, we also need to know when Gramps has visited all of the destinations. We create a new destination for Gramps using our subscribe function. When we send Grandpa off (publish) we need to wait for feedback to ensure that he has arrived at all of the destinations (subscribers) safely. The default behavior is to sit by the phone and wait for confirmation (a paused thread) before we go about our business. This behavior can change if we supply a buffer. With a buffer of 2 Grandparents, we could send Grandpa and Grandma off and not worry about hearing back from them. However, if we wanted to send a third Grandparent on the bus we would need to wait for one of our first grandparents to be delivered successfully. Buffers change how much we need worry about delivery. We can immediately resume other duties once we drop Grandpa off because we know that he is in a queue waiting to be processed.

We can create a simple event bus as follows:

(def bus (event-bus))

To use our event bus, we can publish to topics . A topic is akin to a logical channel for our messages. When we create a topic, we can split our event messages into events that are purely related to that topic. This gives us the ability to:

  • Ensure that our events are only processed when relavent
  • Route certain messages differently.
  • Scale up our hardware or distributed systems to handle an increased message load.

Here is how we can publish a message to a topic:

(publish! bus topic msg)

Calls to publish! return a Manifold deferred that won’t be realised until all streams have accepted the message. You should be aware that calls to publish! will wait to hear back from all subscribers to ensure that the message has been accepted. If you have a slow consumer then you might want to use a buffer or a connect with a timeout to get around this.

We can also subscribe to topics:

(subscribe bus topic)

By default subscribe returns an unbuffered stream, but we can change this if we need to. Manifold also provides a few extra utility functions:

(active? bus topic) Returns true if there are any subscribers to the topic.

(downstream bus topic) Returns a list of all streams subscribed to the topic.

Connecting things together

We have covered a lot of topics in this article, but the key question is how do we actually use the above to get real work done? As an example, here is how we might initialize and consume a web-socket connection (a stream) and use the Manifold Streams API to connect it to a Manifold Event bus:

(ddo [ws (http/websocket-connection req)]
(stream/consume up-consumer ws)
(stream/connect (bus/subscribe usermsg-bus topic)
ws))))

In the above example, we call consume and pass in a callback up-consumer with the web socket as our source. The up-consumer callback will then receive (consume) messages from the websocket as they arrive. We can also call stream/connect to propagate any messages into a separate event bus subscription we have created. In the code above, we subscribe the usermsg-bus event bus to our topic, and the bus will now receive a stream of data from our web-socket. We can now call (bus/publish! usermsg-bus topic message) to send messages through to our web-socket.

The cool thing is that Manifold is super smart and allows us to take an event bus, with it’s pub/sub event driven architecture, and connect it directly to a web-socket. We can then subscribe to any topics and messages that arrive on that socket from our front-end application.

In our front-end app, we could create a web-socket connection using a web-socket library such as chord as follows:

(go
(let [{ws :ws-channel err :error}
(<! (ws-ch (str "ws://localhost:2200/websockets/" user-id)
{:format :json-kw}))]
(if-not err
(do
(reset! user-websocket ws)
(rx-messages ws)
(js/console.log "subscribed: " user-id))

Here we can see that a call to (ws-ch url format) will give us a web-socket connection that we can use to send messages in a json-kw format. We can then send a message as follows:

(defn tx-message
[ws msg]
(go (let [r (>! ws msg)]
(js/console.log "tx-message" msg r))))

Our tx-message function takes our web socket ws and a message ms and uses core.async put to send the message into the web socket connection. These messages will then be passed to our Event bus that we set up on the server.

We can also listen for messages as follows:

(defn rx-messages
[ws]
(go-loop [{content :message :as msg} (<! ws)]
(when msg
(js/console.log "rx-messages" content)
(re-frame/dispatch
[::rx-message content])
(recur (<! ws)))))

We use a go-loop macro to constantly take values from our web socket.

(<! ws) is the bit of code that returns a value which then gets destructured into our let binding. The content value holds the useful content of :message and :msg contains the entire message envelope. We can then process each message as it arrives and dispatch re-frame events when they are received. In our example, we will dispatch an ::rx-message with the content of the message. We will then continue to loop indefinitely until the connection to ws is terminated.

The go-loop helps to our make our code easier to read. It allows us to bind values and create a loop in a Core.async go function without too much boiler plate. We need the go macro to execute the body and return control without blocking.

You can read more about core.async in my Asynchronous Clojure article:

Functional Programming for Humans

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store