Split & merge fundamentals
Nowadays, systems must be designed to handle loads that change quickly and dynamically. In these situations, it is expected that our applications adapt accordingly. For example, Axon applications process many events and to adjust to those load changes. We should be able to tune our event processing dynamically.
Axon is capable of parallel and distributed event processing. It uses segmentation to separate the event stream and then processes those segments in different threads, which may be located in different JVM; therefore, achieving distributed processing.
Event Processing
Event Handlers implement the business logic of event handling. They are triggered any time there is a new event fired in an application. Each Event Handler is assigned to an Event Processor, which provides the processing configuration. Event Handlers are grouped into Processing Groups, responsible for non-functional configuration (see image below) Inside the Event Processor.
We may configure the threading mode at the Event Processor level, error handling, segmentation, etc. When it comes to the configuration of Processing Groups, we can customize the second form of error handling, but more interestingly, we can set a Sequencing Policy for this article.
Sequencing Policy
To have better utilization of event processing, we may process events concurrently. However, not all events can be processed concurrently. Some of them must be processed sequentially. For example, processing events published by the same aggregate may need to be done sequentially to ensure the model they update stays consistent.
The Sequencing Policy defines which events must be processed sequentially and which may be processed concurrently. It boils down to one single value determined for each event message by the policy - all event messages with the same value will be processed sequentially.
Axon Framework provides several implementations that cover the most common use cases:
- Sequential per aggregate - all events published by the same aggregate are processed sequentially. This is the default Sequencing Policy.
- Fully sequential - all events are processed sequentially.
- Fully concurrent - all events may be processed concurrently.
The sequencing policy is configurable; it can be configured per each Processing Group since it may differ from one Processing Group to another.
Segments
Any time the sequencing policy allows any concurrency, it may make sense to segment the event stream into multiple sub-streams that can be handled independently.
A segment is a sub-stream of the main event stream. Each segment belongs to a specific Tracking Event Processor (TEP for short) and is defined by an identifier and a mask. During the processing of events, each segment uses the Sequencing Policy to ensure that, for each Processing Group, the following rules are consistently applied:
- All events in the same segment must be processed sequentially
- Events in different segments may be processed concurrently (even by different JVMs)
- Each event belongs to one segment only (this way, we prevent events from being processed more than once)
Furthermore, the Tracking Event Processor allows you to assign several threads to it to process events concurrently. Each of these threads will claim one single segment and process all events that belong to it. However, this does not mean that anything changes when it comes to event storage. Segments described here are involved only during event processing. They have nothing to do with event persistence. Indeed, the segments and Sequencing Policy configuration can change over time.
Under the Hood
As previously mentioned, each segment is defined by a mask and an identifier. They are both needed to determine whether a particular event should be processed by a given segment for a specific Processing Group. First, let’s analyze the image below to understand how it works.
We can configure the max number of threads per Tracking Event Processor. A Tracking Event Processor regularly checks whether a segment may be claimed by one of its threads. It is wonderful that not all available threads are used, but a thread can claim one and only one segment. In essence, it means that the number of threads represents the maximum number of segments the Tracking Event Processor may claim. Once a thread claims the segment, it can start processing events. For example, in the image below, threads are represented with light gray and segments with dark gray. Each thread reads all events (marked in green) in the stream and uses its own claimed segment to determine if it needs to process a given event - the mask is used to preserve only the last portion of bits of the sequencing policy value. If this last portion matches the segment identifier, the thread is responsible for processing that event.
For example, the event with sequence 13902345 has Sequencing Policy Value 011010. When we mask this value with masks provided by segments, we get the following values:
Table 1 - Assigning events to segments
Now, we can match those results with Segment Identifiers; only the Identifier of Segment 2 matches the Masked Sequencing Policy - meaning that Thread that claimed Segment 2 (in this case Thread #2) is responsible for processing this event.
Configuration
The number of threads for a Tracking Event Processor cannot be changed at runtime. But, on the other hand, the number of segments can.
To specify the number of threads for a TEP, it is necessary to define the following property:
axon.eventhandling.processors.[name-of-the-processor].threadCount
By default, when the TEP is initialized, it checks for the current segments inside the Token Store (for more details, check: Demystifying Tracking Event Processors in Axon Framework). However, if the segments are not yet initialized, it creates several segments according to the value of the property:
axon.eventhandling.processors.[name-of-the-processor].initialSegmentCount
Keep in mind that this property is taken into account only during the first initialization of the segments. After the initialization, the number of segments can change at runtime only, thanks to the split and merge abilities that Tracking Event Processor provides. By default, if you don’t specify the initialSegmentCount, any Tracking Event Processor is initialized with one single segment: segment 0. This segment has mask 0, and all events are assigned to this segment.
When the number of threads is more significant than (or equal to) the number of segments, we are in a good place to process events - in this scenario. It is not possible that a segment of an Event Stream is left unprocessed. The usual misstep is to have more segments than threads. This is not always caused by incorrect configuration; more often, this is achieved dynamically by splitting segments to a number that exceeds the number of threads.
Split & Merge
Split and Merge operations are needed to, at runtime, change the number of segments. Thus, changing the amount of concurrent processing. Note that both processes need to be performed by a single thread.
Split operation
The split operation allows one segment to be split into two segments. To equally distribute sequencing policy values, the mask of the new segments needs to preserve one bit more than the original mask. This extra bit will be used to determine the new segment's identifier and act as a differentiator bit from the initial segment identifier. In the image below, the differentiator bits are marked in red.
For example, if we want to split the initial segment (identifier=000000 / mask=000000), we will obtain two segments that both have mask 000001. In other words, this new mask will preserve one extra bit compared to the original segment. One of the two new segments will maintain the same identifier as the original. At the same time, the other will get the identifier that differs from the original by the bit in the differentiator position - the most significant bit preserved by the new mask. The new identifier will be, in this case, 000001.
Any time we split a segment, the resulting parts will each be responsible for half of the spectrum of the Sequencing Policy values that the parent was responsible for. In other words, in our example, the segment <identifier=000000 / mask=000000>
is responsible for all Sequencing Policy values. In contrast, the segments derived from the original split are responsible for half (½) of the whole spectrum of values. If we split one of these segments again, we will have two other segments, each responsible for a quarter (¼) of the whole value spectrum. Thus, each split operation creates two segments that replace the parent and are responsible for half of the parent’s portion.
Merge operation
Increasing the number of segments improves the parallelism in event processing. That is great, of course, but any time the load decreases, the number of segments should be reduced to optimize resource utilization (reducing active threads). The way the TEP helps us in achieving this goal is by providing the Merge Operation.
The Merge Operation always merges two and only two segments at a time. The nature of the Split Operation dictates the requirements for Merge Operation - not any two segments can be combined, but only the two that originated from the same parent (see Image below). We call these two segments siblings.
In other words, we can merge only the segments that have the same mask, and the identifiers differ only in the most significant bit of the mask (see image below).
Conclusion
This article explained how a Tracking Event Processor could help achieve concurrency in event processing. The following article will explain how Axon Server supports the Split and Merge operations in a distributed environment. Stay tuned!