Nowadays, it is very important that systems are designed to handle loads that change quickly and dynamically. In these situations, it is expected that our applications adapt accordingly. Axon applications process a lot of events, and in order to adjust to those load changes, we should be able to dynamically tune our event processing.
Axon is capable of parallel and distributed event processing. It uses segmentation to separate the event stream and then processes those segments in different threads, which may be located in different JVM; therefore, achieving distributed processing.
Event Handlers implement the business logic of event handling. They are triggered any time there is a new event fired in an application. Each Event Handler is assigned to an Event Processor, which provides the processing configuration. Inside the Event Processor, Event Handlers are grouped into Processing Groups, which are responsible for non-functional configuration (see Image 1).
Image 1 - Event Processing
On Event Processor level we may configure the threading mode, error handling, segmentation, etc. When it comes to configuration of Processing Groups we can customize a second form of error handling, but more interestingly for this article, we can set a Sequencing Policy.
In order to have better utilization of event processing, we may process events concurrently. However, not all events can be processed concurrently, some of them must be processed sequentially. For example, processing events published by the same aggregate may need to be done sequentially to ensure the model they update stays consistent.
The Sequencing Policy defines which events must be processed sequentially, and which may be processed concurrently. Basically, it boils down to one single value which is determined for each event message by the policy - all event messages with the same value will be processed sequentially.
Axon Framework provides several implementations that cover the most common use cases:
- Sequential per aggregate - all events published by the same aggregate are processed sequentially. This is the default Sequencing Policy.
- Fully sequential - all events are processed sequentially.
- Fully concurrent - all events may be processed concurrently.
The sequencing policy is configurable; it can be configured per each Processing Group, since it may differ from one Processing Group to another.
Any time the sequencing policy allows any kind of concurrency, it may make sense to segment the event stream into multiple sub-streams that can be handled independently.
A segment is a sub-stream of the main event stream. Each segment belongs to a specific Tracking Event Processor (TEP for short) and is defined by an identifier and a mask. During the processing of events, each segment uses the Sequencing Policy to ensure that, for each Processing Group, the following rules are consistently applied:
- All events in the same segment must be processed sequentially
- Events in different segments may be processed concurrently (even by different JVMs)
- Each event belongs to one segment only (this way we prevent events being processed more than once)
Furthermore, the Tracking Event Processor allows you to assign several threads to it in order to process events concurrently. Each of these threads is going to claim one single segment and process all events that belong to it. However, this does not mean that anything changes when it comes to event storage. Segments described here are involved only during event processing, they have nothing to do with event persistence. Indeed, the segments and Sequencing Policy configuration can change over time.
Under the Hood
As previously mentioned, each segment is defined by a mask and an identifier. They are both needed to determine whether a certain event should be processed by a given segment for a specific Processing Group. Let’s analyze Image 2 to understand how it works.
We can configure the max number of threads per Tracking Event Processor. A Tracking Event Processor regularly checks whether a segment may be claimed by one of its threads. It is perfectly fine that not all available threads are used; but a thread can claim one and only one segment. In essence it means that the number of threads represents the maximum number of segments the Tracking Event Processor may claim. Once a thread claims the segment, it can start processing events. In Image 2 threads are represented with light gray and segments with dark gray. Each thread reads all events (marked in green) in the stream and uses its own claimed segment in order to determine if it needs to process a given event - the mask is used to preserve only the last portion of bits of the sequencing policy value and if this last portion matches the segment identifier, the thread is responsible for processing that event.
Image 2 - Threading and Segmentation inside Tracking Event Processor
For example, the event with sequence 13902345 has Sequencing Policy Value 011010. When we mask this value with masks provided by segments we get following values:
Table 1 - Assigning events to segments
Now, we can match those results with Segment Identifiers; only the Identifier of the Segment 2 matches the Masked Sequencing Policy - meaning, that Thread that claimed Segment 2 (in this case Thread #2) is responsible for processing this event.
The number of threads for a Tracking Event Processor cannot be changed at runtime. On the other hand, the number of segments can.
In order to specify the number of threads for a TEP, it is necessary to define the following property:
By default, when the TEP is initialized, it checks for the current segments inside the Token Store (for more details check: Demystifying Tracking Event Processors in Axon Framework). If the segments are not yet initialized, it creates a number of segments according to the value of the property:
Keep in mind that this property is taken into account only during the first initialization of the segments. After the initialization, the number of segments can change at runtime only, thanks to the split and merge abilities that Tracking Event Processor provides. By default, if you don’t specify the initialSegmentCount, any Tracking Event Processor is initialized with one single segment: segment 0. This segment has mask 0, and all events are assigned to this segment.
When the number of threads is greater than (or equal to) the number of segments, we are in a good place to process events - in this scenario it is not possible that a segment of an Event Stream is left unprocessed. The usual misstep is to have more segments than threads. This is not always caused by incorrect configuration; more often this is achieved dynamically by splitting segments to a number that exceeds the number of threads.
Split & Merge
Split and Merge operations are needed to, at runtime, change the number of segments. Thus, changing the amount of concurrent processing. Note that both operations need to be performed by a single thread.
The split operation allows one segment to be split into two segments. In order to equally distribute sequencing policy values, the mask of the new segments needs to preserve one bit more than the original mask. This additional bit will be used to determine the identifier of the new segment and it will act as a differentiator bit from the original segment identifier. In Image 3, the differentiator bits are marked in red.
Image 3 - Split Operation
For example, if we want to split the initial segment (identifier=000000 / mask=000000) we will obtain two segments that both have mask 000001. In other words, this new mask will preserve one extra bit compared to the original segment. One of the two new segments will maintain exactly the same identifier of the original, while the other will get the identifier that differs from the original by the bit in the differentiator position - the most significant bit preserved by the new mask. The new identifier will be, in this case, 000001.
Any time we split a segment, the resulting parts will each be responsible for the half of the spectrum of the Sequencing Policy values that the parent was responsible for. In other words, in our example, the segment <identifier=000000 / mask=000000> is responsible for all Sequencing Policy values, whilst the segments derived from the split of the original, are responsible for half (½) of the whole spectrum of values. If we split one of these segments again, we will then have two other segments, each responsible for a quarter (¼) of the whole value's spectrum. Each split operation results in the creation of two segments that replace the parent and are each responsible for half of the parent’s portion.
Increasing the number of segments improves the parallelism in event processing. That is great of course, but any time the load decreases, the number of segments should be reduced in order to optimize resource utilization (by reducing the number of active threads). The way the TEP helps us in achieving this goal is by providing the Merge Operation.
The Merge Operation always merges two and only two segments together at the time. The nature of the Split Operation dictates the requirements for Merge Operation - not any two segments can be merged together, but only the two that originated from the same parent (see Image 4). We call these two segments siblings.
Image 4 - Disallowed Merge Operation
In other words, we can merge only the segments that have the same mask, and the identifiers differ only in the most significant bit of the mask (see Image 5).
Image 5 - Allowed Merge Operation
In this article we explained how a Tracking Event Processor can help with achieving concurrency in event processing. In the next article we are going to explain how Axon Server supports the Split and Merge operations in a distributed environment. Stay tuned!