Christian Posta bio photo

Christian Posta

Field CTO at solo.io, author Istio in Action and Microservices for Java Developers, open-source enthusiast, cloud application development, committer @ Apache, Serverless, Cloud, Integration, Kubernetes, Docker, Istio, Envoy #blogger

Twitter Google+ LinkedIn Github Stackoverflow

Apache Apollo is a next-generation, high-performance, multi-protocol messaging broker built from the ground up to one day be a drop-in replacement of ActiveMQ 5.x. I have blogged about it in the past. Apollo's non-blocking, asynchronous architecture allows it to be super fast and scale very well on multi-core systems using a minimal number of threads. The supported protocols include AMQP, STOMP, MQTT, and ActiveMQ's native binary protocol, Openwire. In this post, we take a closer look at the threading model that Apollo is built upon: HawtDispatch.

As Hiram pointed out on his blog a couple years ago, improving the performance of ActiveMQ required a deep look at the threading implementation and known bottlenecks / thread contention points. ActiveMQ was designed and implemented (2004?) when blocking IO and multithreaded shared-state systems were well known and understood, but as you try to squeeze more and more performance out of it, the foundational limitations start to show. Over the more recent years, it seems like non-blocking techniques (which have been around for much longer, especially in Operating System development) and the libraries to support those techniques have started to emerge in the Java ecosystem. After much research, prototyping, benchmarking, analysis, scrub rinse and repeat, Hiram and the original Apollo designers settled on a non-blocking, event-based approach pioneered by the engineers at Apple known as Grand Central Dispatch. This approach minimized shared-state locking, uses OS resources more efficiently (this is huge), provides better scalability on big boxes (multi-cores) and is very well suited for messaging servers.

To the rescue comes HawtDispatch with its GCD model right? Well, at the time (2008-2009-ish?) there weren't very many options for message-system optimized non-blocking, asynchronous event processing engines. Grand Central Dispatch was actually a native Mac OS implementation (libdispatch) and there wasn't any application-level equivalent. So the original Apollo developer(s) set out to build this model keeping in mind the close relationship it will have with the future messaging product.

And now we have it. (It's been around for a couple years)

And it's open source.

And it has an Apache license!

So what is it?

A non-blocking, asynchronous event/task processing engine

HawtDispatch implements a threading model based on an event-handling loop (actually, multiple event-handling loops... see the section below on pooling). An "event" or "task" (which is just a runnable in Java-speak) is submitted to a dispatcher and scheduled to run on any of the HawtDispatch threads in the thread pool. The same concepts from Grand Central Dispatch are used for submitting tasks and executing them either in parallel or serially. The concept of a "Global Dispatch Queue" is used to represent a structure to which tasks are submitted and from which tasks are dequeued and run on any of the available event-handling threads. The tasks are not guaranteed to run in any order as they will be dispersed to any available thread in the pool and will be run in parallel. A "Serial Dispatch Queue" is a simple data structure that will run its tasks in a FIFO order. If tasks A, B, and C are added to a serial queue, A will run first, then B, then C. You can use serial queues to protect shared state and enforce operation ordering and they're so lightweight you can create a large number of them as needed (for example, one per connection or one per destination on a message broker :) ) A serial dispatch queue will not create any more threads, it will run on the available HawtDispatch threads in the thread pool.

For more details about Grand Central Dispatch, see the GCD reference docs.

In order for this event processing/task execution to work correctly, none of the tasks are allowed to block (on IO) or synchronize on any shared state (using locks). But to do that, you'll need to have available a non-blocking IO (NIO) library for that... HawtDispatch has you covered!

NIO support built right in

NIO through the Java NIO support is built right into the library and more importantly right into each one of the HawtDispatch threads. This means if a threads is not busy running tasks, it will be servicing readiness selectors and executing associated event handlers.

Adding selectors is super simple with HawtDispatch's API:

[java]
DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue)
[/java]

This method creates a new DispatchSource object that will listen on the specified channel for events that you're interested in. Before that DispatchSource will start handling events, you need to give it an event handler and call resume():

[java]
source.setEventHandler(Runnable runnable)
source.resume()
[/java]

And that's it! No need to deal specifically with selectors or setting up the loop to handle readiness, etc. Just a straight forward API that fits very nicely with the overall HawtDispatch threading model.

Custom event source dispatching

In the same way that you can use the NIO support with HawtDispatch, you can create your own custom event sources and associated handlers.
One of the keys to this working smoothly is allowing multiple events to be fired and merged together before actually delivering to the event handler.

Creating a custom dispatch source is simple:

[java]
DispatchSource createSource(EventAggregator eventAggregator, DispatchQueue disptachQueue)
[/java]

The EventAggregator controls how multiple events can be merged together before delivering to the event handler.

Then you specify an event handler and call resume() just like above:

[java]
source.setEventHandler(Runnable runnable)
source.resume()
[/java]

From within your event handler, you can access the merged events (represented now as a single event) with source.getData()

See the HawtDispatch documentation on Custom Dispatch Sources for more information

Based on optimized thread pooling

HawtDispatch will decide how many threads to use for its pool based on the number of cores you have available on your machine. So if you have 8 cores, HawtDispatch will create and use 8 threads for it's task-running pool. The actual running of the threads on each core, or pinning the threads to a core, isn't configurable through HawtDispatch which is a deviation from the original libdispatch but the underlying principles still hold. Keeping the number of threads bounded to the number of cores allows each core to stay fairly hot with fewer context switches and fewer level-1 cache misses.

Scala and Java API

Both Java and Scala are supported. The benefit of using Scala with HawtDispatch is a drastically reduced boilerplate code for submitting runnables. For example, checkout how much code it takes to execute a task with Java:

[java]
queue.execute(new Runnable(){
public void run() {
System.out.println("Hi!");
}
});
[/java]

Now check out the Scala code:

[java]
queue {
System.out.println("Hi!");
}
[/java]

Writing closures and inline functions is much cleaner in Scala.

Out of the box Transport implementation for TCP, SSL, UDP with "fill in the blank" abstractions for custom protocol handling

This is an awesome feature, and makes it so much easier to deal with different transports within your own applications. Let's look at the TcpTransport as a quick example.

The TcpTransport is responsible for reading/writing from an underlying TCP stream. Under the covers it uses HawtDispatches NIO integration as well as the Custom Dispatch Sources to allow protocol handling to happen. Each TcpTransport object will have an associated ProtocolCodec object which is responsible for encoding/decoding the bytes that get sent over the wire. This is where you need to plugin your custom wire-implementation (for example, building the MQTT/AMQP binary protocol, or ActiveMQ's Openwire). An abstract class, AbstractProtocolCodec already exists to implement a lot of the heavy lifting of encoding/decoding, and all you have to do is implement the encode() and initialDecodeAction() methods.

Using HawtDispatch's Transport abstractions makes it easier to quickly implement a wire-level protocol in a non-blocking way.

Give it a shot!

HawtDispatch is simple, lightweight, and easy to use. I highly recommend taking a look at it especially if you're looking to build scalable applications using a non-blocking approach.