Demystifying tracking event processors in Axon Framework
Axon is a popular framework for writing Java applications following DDD, event sourcing, and CQRS principles. While especially useful in a microservices context, Axon provides great value in building structured monoliths that can be broken down into microservices when needed. A good place to see how is in our webinar recording section.
An important ingredient in the Framework is the explicit use of messages. One of the types of messages is events. In Axon Framework there are two ways of event processing - subscribing and tracking. Therefore, the Framework provides Subscribing Event Processor (SEP) and Tracking Event Processor (TEP) components to support these two ways of event processing.
Events Published on the Event Bus are supplied to the SEP in the thread that published them. Depending on the Event Processing Strategy, these events might be processed in the same thread or a different one. Having said that, processing events in the same thread allows us to roll back the whole transaction if event processing is unsuccessful.
TEPs use their own threads and are in full control of how handling happens, although always independent of the transaction that has published the event (this transaction must have been committed). So when we want to replay the events that have happened in the past (usually for read model (re)building), this is the mechanism to use. The rest of this blog discusses the features of TEP and how those are implemented.
Tracking Token
Tracking Event Processor uses a Tracking Token to keep track of events that have been processed. A Tracking Token represents the position of an event in the event stream. Different Event Store implementations may use different implementations of the Tracking Token to represent this position reliably. To continue event processing after the process restarts (we’ll see later that this is not the only reason), Tracking Token is stored in a Token Store. There are several implementations of Token Store - JPA, JDBC, Mongo, and, of course, you can provide your own. Usually, the best place to store the Tracking Token is where the projection (or Saga) is also stored. Figure 1 shows how these concepts are aligned together.
Each TEP claims its Tracking Token (to avoid multiple processing of the same event in different threads/nodes). Claiming the Tracking Token is setting the owner of the Tracking Token in the Token Store. The owner is not set indefinitely but for a configurable timeout. When this timeout expires, and the current owner does not reclaim the token, a different owner (TEP) can claim it. TEP can release the claim telling other TEPs to continue with the processing. A conscious release of the Tracking Token sets the basis for TEP rebalancing - having a load of event processing equally distributed across TEPs.
Parallel Processing
In Axon, parallel processing is achieved by segmenting an event stream. For a certain TEP, we would start several threads which would work on their own segment of event stream in parallel. The number of segments per TEP is configurable. Let’s define what a segment actually is.
A segment is a fraction of the total stream of events (see Figure 2). In other words, Tracking Token is segmented into several portions, which means that Token Store contains an entry per Tracking Token and Segment. A segment contains a segment identifier and a mask. Mask is used to determine whether a certain event belongs to the given segment. The segment plays a significant role in parallel and distributed event processing. Each TEP can have several segments assigned to it. Each of them starts a separate thread for event processing, enabling us with parallel processing. Furthermore, segments can be distributed among TEPs on several nodes giving us the possibility to process events in a distributed fashion (note that TEPs on different nodes still have to claim the Tracking Token, so they don’t process the same event).
Replay
When we want to rebuild projections (view models), replaying past events comes in handy. The idea is to start from the beginning of time (or from a certain point in time) and invoke all event handlers anew. To do so, one has to reset the TEP (do note that before resetting, the TEP should be stopped) at a certain point in time - this means that Tracking Token gets updated. You could ask yourself whether a manual update of Tracking Token in Token Store is sufficient to trigger a TEP to re-process the past events, and you’re right. The replaying of events will happen in this way too. The benefit that you can extract from the Replay API is that TEP keeps the difference between the newly published events and ones that are replayed. This puts you in control of which events get replayed and which don’t. Sagas are (by default) not replayable. For replay purposes, Axon provides a Replay Token.
Processing Loop
When TEP is started, it starts a worker for each segment in a separate thread. Each one of them tries to claim a Tracking Token for a certain amount of time. If the claim is successful, a processing loop is started. The happy flow of the processing loop contains the steps shown below.
- The event stream is opened at the position where Tracking Token points. If there aren’t events available (we wait for one second for events to become available) in the stream, the Tracking Token claim gets extended, and we start the loop from the beginning. Otherwise, we proceed with the loop.
- Events are read from the stream and put into a batch. Events that do not have a handler capable of handling them subscribed to this TEP do not get in the batch. If, after this step, the batch stays empty, we extend the claim on Tracking Token, update the TEP Status and start the loop from the beginning. Otherwise, we proceed with the loop.
- The previously created batch of events gets processed by the event handlers subscribed to this TEP in a UOW (Unit Of Work). Before committing UOW, Tracking Token gets stored in the Token Store.
- TEP status contains information about the progress of event processing - segment, whether TEP is caught up with the event processing, whether TEP is replaying events, and the actual Tracking Token.
After event processing has stopped (either explicitly or by an error), the TEP releases the segment so another processor can claim it and continue processing.
Error Handling
In Tracking Event Processing, events are processed in different threads, making the error handling more complicated. Axon provides Error Handlers that may be configured on the TEP and act when the exception occurs in the event handling component to solve this problem. By default, exceptions are propagated, ultimately causing the TEP to release its claims and start retrying. If necessary, custom Error Handlers may be provided to the TEP. The recommendation is to have error cases clearly defined and represented by corresponding exceptions. Having this, Error Handlers can act on them with different strategies to resolve unexpected behavior.
Conclusion
Tracking event processing is a really powerful mechanism in the Axon Framework, which gives us.
- Ability to replay events - an important advantage of event-sourced systems is building new projections based on the past or rebuilding existing ones if requirements change.
- Location transparency - process your events in whichever node you want. Just make sure you have access to the event stream and token store.
- Performance - start several segments on the same node to process them in parallel and speed up the processing.
In CQRS architectures, query models are updated separately from command models who give us the possibility to scale our query models differently. In such cases (and many others), Tracking Processing is a preferred way of event processing. Hence, Tracking Processing is the default when using an Event Store.
We hope the above is useful to Axon practitioners. Feel free to reach out to us with any questions or comments!