Christian Posta bio photo

Christian Posta

Field CTO at solo.io, author Istio in Action and Microservices for Java Developers, open-source enthusiast, cloud application development, committer @ Apache, Serverless, Cloud, Integration, Kubernetes, Docker, Istio, Envoy #blogger

Twitter Google+ LinkedIn Github Stackoverflow

There's usually a steady drip of questions on the mailing list surrounding ActiveMQ's message-priority support as well as good questions about observed behaviors and "what's really supported"? I hope to help you understand what happens under the covers and what levels of priority can be supported. The details could get gory for some. If you're not interested in the details, take a look at the ActiveMQ wiki for the high-level overview.

First, since ActiveMQ supports JMS 1.1, let's take a look at what the JMS Spec says about support for "JMSPriority":


JMS defines a ten-level priority value, with 0 as the lowest priority and 9
as the highest. In addition, clients should consider priorities 0-4 as
gradations of normal priority and priorities 5-9 as gradations of expedited
priority.

JMS does not require that a provider strictly implement priority ordering
of messages; however, it should do its best to deliver expedited messages
ahead of normal messages.

ActiveMQ observes three distinct levels of "Priority":

  • Default (JMSPriority == 4)
  • High (JMSPriority > 4 && <= 9)
  • Low (JMSPriority > 0 && < 4)

If you don't specify a priority for your MessageProducer or individual messages (see MessageProducer#send(message, deliveryMode, priority, timeToLive)), ActiveMQ's client will default to using a JMSPriority == 4. As a JMS consumer, you can expect a FIFO ordering if the producers aren't using priority or you're not using some other form of selection criteria on the destination.

ActiveMQ also "does its best" to deliver expedited messages ahead of "normal" messages, as the spec states. The message store that your broker uses greatly contributes to how that's exactly done, but in general you can expect the broker to honor strict (0-9) priority support for only the JDBC backed messages stores. For KahaDB-backed message stores, only "category priority" is supported (Low, Default, High, where priorities in each category are not always differentiated, that is 5 and 9 are considered "High"). However, with the right settings and messaging profile, you can affect how [strict] prioritization happens even with KahaDB, so let's take a quick look.

Enabling Message Priority

You can enable message priority on your Queues with the following setting in your activemq.xml configuration file:

[xml]<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="queueName" prioritizedMessages="true" />
</policyEntries>
</policyMap>
</destinationPolicy>[/xml]

For queueName there is wildcard support, so you can enable priority support on a hierarchy of messages.

When you enable priority support, the broker will use prioritized linked-list structures in its messages cursors as well as give KahaDB a hint to use priority categories when storing messages onto disk. There are varying levels of how strict the priority ordering can get, but at worst, you can assume priorities will be upheld by category. The following factors come into play which control how strict the priority ordering can get when using the KahaDB store:

  • Caching enabled/disabled in the queue cursor
  • MaxPageInSize for how many messages to page from the store in a batch
  • Consumer prefetching
  • Expired-message checking
  • Broker Memory settings
  • Persistent/Non-persistent messages

The next section presents a little detail about what happens in KahaDB to support priority, while the following sections will go into how things happen in broker memory and are finally dispatched to a consumer and will point out along the way how the different factors from above come into play.

KahaDB Prioritization Categories

First we'll start with how messages are stored on disk and loaded into a destination.

KahaDB (the default message store) is a file-based message database that the broker uses to persist messages in a "log" or "journal". The broker also keeps track of which messages are in the log by keeping a separate "index" that holds information about messages (like its location in the log, to which destination it's associated, ordering, etc). The index also has a notion of message "priority", which is implemented with three B+Tree structures, one for each priority level (see MessageOrderIndex in org.apache.activemq.store.kahadb.MessageDatabase). This implementation detail is the root of message prioritization and has implications for the rest of the broker as messages are removed from the store.

When messages are retrieved from the store, they are done so in batches (maxPageInSize), and messages that are in the "highPriority" BTree are retrieved first. When the high-priority messages are exhausted, the store will then offer up the default priority and subsequently the low priority messages.

You can set the maxPageInSize like so:

[xml]<policyEntry queue="queueName" prioritizedMessages="true"
maxPageSize="500">[/xml]

The larger the page size, the larger the number of messages in a batch and the more messages you can see at a time per "snapshot". For each batch that's brought into memory, it's messages are going to be strictly prioritized as described below by the store cursor. The downside is that if your messages are large, bringing in 500 at a time could exhaust your broker memory. The default setting is 200.

Message Cursor Priority Lists

When persistent messages come into the broker from a producer, they will be stored onto disk, but they will also be cached in memory waiting to be dispatched to a consumer. This is a default setting, so no need to explicitly set it. The idea behind this is to be able to dispatch to fast consumers without having to retrieve it directly from disk (if consumers become slow, the broker will auto-tune itself to not use the cache once it's filled so as to not OOM). The good thing about this is that when prioritization support is used for a queue, the internal lists used for the cursors will support strict priority (0-9), so for all of the messages that are currently in memory (in the cache), they will be sorted properly from highest to lowest. The trick is what happens when all of the messages in the cache are "lower priority messages" and then a high-priority message comes in to the broker but won't fit in the cache because it's full... in that case the message will go directly to the store, be indexed in the "high-priority" index, but won't be available for dispatch ahead of the lower priority messages until it's paged into memory in the next batch.

When NON persistent messages come into the broker, they will not go to the message store. They will be kept in memory for as long as possible and only pushed to disk (in a temporary store) when memory has passed a defined threshold (> 70% by default). So the same behaviors for cached messages above apply for non-persistent messages, namely, those that are in memory will be ordered strictly (0-9), but once they get pushed to disk, only categories are observed.

If you disable the cursor's cache (with the following setting)

[xml]<policyEntry queue="queueName" prioritizedMessages="true"
useCache="false" />[/xml]

then you could help to eliminate the above scenario where the cache becomes full with lower priority messages right when a high-priority message comes in (and becomes stuck on disk because it cannot be paged into memory). However, doing this will slow down your throughput because messages must be paged in from disk before sending to consumers which will slow down the dispatch. But note, when doing this, you are more likely to see messages not following "strict" priority even with the priority lists in the cursor. They will, however, follow the priority categories (High, Default, Low) properly. So to recap, if you disable the cache, you can get higher priority messages delivered more timely than you can if the cache is enabled and it's filled with lower priority messages. But disabling the cache, by itself, won't get you to strict priority.

Disabling the cache helps getting high priority messages to consumers ahead of lower priority messages, however for this to work as intended (and has bitten me), you'll want to disable the asynchronous message expiry check. This expiry check pages messages into memory every 30 seconds regardless if they're ready to be dispatched (by default) and performs a TTL check (time to live) on them and discards those messages that should be expired. This sort of checking effectively brings messages into memory and will stall the normal "page in for dispatch" just enough to miss higher priority messages.

[xml]<policyEntry queue="queueName" prioritizedMessages="true"
useCache="false" expireMessagesPeriod="0" >[/xml]

Turning off expiry checking, however, will keep expired messages in the store longer because the only expiry check would be done right before dispatch, so make an educated decision on this, and all ActiveMQ settings you tinker with. But to move in the direction of strict(er) order priorities, you'll want to disable this.

Lastly, consumer prefetch plays a role in achieving "strict ordering." By default, prefetch is set to 1000 for queue consumers, which means they will be sent 1000 messages in a batch. This helps speed up the consumer when it's consuming messages, but in terms of priority handling it in essence also acts like a cache of messages (discussed above) and could contribute to not seeing "strict ordering". "category priority" could also be violated if your prefetch is filled with lower priority messages, and there is a new high-priority message that came in to the broker, you wouldn't see it until the next message dispatch to the consumer. So the lower the prefetch, the better chance of seeing higher priority messages ahead of lower ones. With prefetch of 1, you'll always get the highest priority message that the store cursor knows about.

[xml]<policyEntry queue="queueName" prioritizedMessages="true"
useCache="false" expireMessagesPeriod="0" queuePrefetch="1" >[/xml]

Client side message priority

ActiveMQ also has priority support built right into the message client and it's enabled by default.

This means, when messages are being sent to your consumer (even before your consumer is receiving them, using prefetch), they will be cached on the consumer side and prioritized by default. This is regardless of whether you're using priority support on the broker side. This could impact the ordering you see on the consumer so just keep this in mind. To disable it, set the following configuration option on your broker URL, e.g., tcp://0.0.0.0:61616?jms.messagePrioritySupported=false

But as mentioned above, you'll want to lower the prefetch to 1 to get the best chance of achieving strict ordering.

Tradeoffs

So ultimately, getting strictly ordered messages with KahaDB is possible but there are significant tradeoffs to consider and it won't apply for every messaging situation. Do you want optimized, fast messaging? or do you want to slow down the messaging to achieve strict(er) ordering for priorities. Each situation is different and should be evaluated on a case-by-case basis. In general, however, you can rely on category level priorities.

Reordering messages across large queues AND keeping high performance is problematic, and most Message Queue vendors do not do that very well. ActiveMQ's priority support is strong, but another good alternative exists as discussed on the ActiveMQ wiki describing message priority and that is: using message selectors and balancing out the consumers in such a way that high priority messages end up getting consumed first. This approach tends to give more flexibility and control, but that's for another post :)

Leave me some comments if something wasn't clear, or drop an email in the mailing list!