Axon Framework 4.6.0: Replay context propagation

We recently released Axon Framework 4.6.0. Besides more significant features like the Dead-letter Queue, OpenTelemetry, Jakarta support, and streaming queries, the release also contains a new minor feature: Replay context propagation. This blog will revisit how replays work, and you will learn about this new feature.

Replays

When using Axon Framework, we save the events and create projections out of those events. This decoupling of events and data allows us to create new views of historical events at any time in the future.
Sometimes we make mistakes that cause the data in a view to be incorrect. To correct those mistakes, we use replays. A replay means preparing it (such as cleaning out the existing data) and restarting the processor from an earlier position of the event store, processing all historical events since that point to rebuild the projection.


We might need to alter an event handler's logic based on whether the processor is replaying to prevent unwanted behavior. You can see how this works in the following code sample, which sends an email, but only when we are not replaying.


    @RequiredArgsConstructor
    @Service
    public class MailProjection {
        private final EmailSender emailSender;
        private final InformationRepository repository;
        
        public void handle(EmailUpdatedEvent event, ReplayStatus status) {
            repository.updateEmail(event.getUserId(), event.getNewEmail());
            
            if(!status.isReplay()) {
                emailSender.sendEmail(event.getNewEmail(), "Your email address was successfull updated!");
            }
        }
    }

If we didn't check whether we were replaying, the user would get an email whenever we replay the event processor. The customer will probably get confused, suspect they have been hacked, call your customer support, or do all those things. Axon Framework will tell us whether it's a replay using the ReplayStatus argument you can use in any event handler. 
We can now safely start a replay without worrying that customers will get angry. You can see this in the following code sample; We stop the processor, reset it, and then start it again. It will now start from the beginning of the event store again. 


    @Configuration
    @RequiredArgsConstructor
    public class MailReplayStarter {
        private final EventProcessingConfiguration configuration;
    
        public void startReplay() {
            // This only works when running one node. We first need to stop all instances of the processor, on all nodes.
            configuration.eventProcessor("mail", StreamingEventProcessor.class)
                    .ifPresent(processor -> {
                        processor.shutDown();
                        processor.resetTokens();
                        processor.start();
                    });
        }
    }

As you can see, programmatically triggering a reset is relatively easy. Please note that if your application runs on multiple instances, you will need to stop the processor on all instances before invoking the reset on one of them. If you don't do that, another instance will automatically claim the token, preventing the reset.
 

New event processors

When introducing a new event processor, it can be useful to immediately introduce it as a replay for the same reason outlined in the previous section. A new event processor will start from the event stream's tail (index 0), which means all past events are processed as if it was not replaying. When we introduce the MailProjection of code sample 1, this would still send an email to all customers who changed their email in the past since the ReplayStatus will not show it as a replay. 

What determines the ReplayStatus is the TrackingToken attached to the message that we're currently handling. If it's a ReplayToken, it will register as a replay. If it's not, it will register as a regular (non-replay) invocation. 
When we introduce the MailProjection, we want to save the data of all events to the database. However, we only want to send emails for the events after the moment we deployed it for the first time.
By initializing the event processor's token as a ReplayToken right from the start, we can ensure the ReplayStatus shows as a replay until it catches up to the index when the event processor was created. You can see how to configure this in the following sample.


    @Configuration
    public class MailConfiguration {
        @Autowired
        public void configure(EventProcessingConfigurer configurer) {
            configurer.registerTrackingEventProcessor(
                    "mail",
                    config -> config.eventStore(),
                    config -> TrackingEventProcessorConfiguration
                            .forParallelProcessing(2)
                            .andInitialTrackingToken(source -> ReplayToken.createReplayToken(source.createHeadToken(), source.createTailToken()))
            );
        }
    }

In code sample 3, we create a ReplayToken as the initial token. The head token, as the first argument, is the token at which it will stop being a replay and become a regular event processor again. The second argument is the token the replay starts, which is the tail of the event store (index 0) by default. 

Imagine that we have an event store of 1.000.000 events and introduce the MailProjection using the configuration of the previous sample. The ReplayToken will start at index 0 and end at index 999.999. After event 999.999, the ReplayToken will change into a regular token and no longer represent a replay. Any events written to the event store during the replay will be handled as regular invocations. 
 

More customization

For some customers, the level of customization that the ReplayStatus parameter brings is insufficient to cover their use case. For example, we encountered cases where the customer only wanted to replay a specific range of aggregate identifiers. 

Axon Framework already contained the notion of a reset context. You can define methods in your processing group annotated with @ResetHandler. These methods are called when resetting a token, and you can use them, for example, to clear the data in the projection. These methods could be passed a context by providing a context to the resetToken method of the event processor.

However, there was no way to propagate the reset context information to an event handler, which left the customer to create a self-built mechanism instead. For example, by saving data into the database beforehand.

With Axon Framework 4.6.0, we are changing this. The information provided in the resetTokens call will now be saved in the ReplayToken. You can access this information in a variety of ways. But first, we must create a context and provide it during the reset; the following sample shows how you can do this.
 


    @Getter
    public class MailReplayContext {
        private final List userIdentifiers;
    }
    
    @Configuration
    @RequiredArgsConstructor
    public class MailReplayStarter {
        private final EventProcessingConfiguration configuration;
    
        public void startReplay() {
            List userIdentifiers = List.of("UserId1", "UserId5");
            configuration.eventProcessor("mail", StreamingEventProcessor.class)
                    .ifPresent(processor -> {
                        processor.shutDown();
                        processor.resetTokens(new MailReplayContext(userIdentifiers));
                        processor.start();
                    });
        }
    }

Now, we have a list of user identifiers in our ReplayToken and can use this to make decisions during a replay. Let's dive into some examples. 

Reset context injection

You can inject the context as a parameter in your event handler and do a manual check.


    @RequiredArgsConstructor
    @Service
    public class MailProjection {
        private final EmailSender emailSender;
        private final InformationRepository repository;
        
        public void handle(EmailUpdatedEvent event, @ReplayContext MailReplayContext context) {
            if(context != null && context.getUserIdentifiers().contains(event.getUserId())) {
                return;
            }
            repository.updateEmail(event.getUserId(), event.getNewEmail());
            emailSender.sendEmail(event.getNewEmail(), "Your email address was successfull updated!");
        }
    }

The context parameter in this code sample will be null when the processor is not replaying. When replaying, it will check whether the userIdentifiers contain the user's identifier and do nothing if it's not the case. This way, we can replay only for a specific list of users.
 

Reusing Behavior

Suppose you want a wide range of similar event processors to have the same behavior. In that case, it might be better to go for a MessageHandlerInterceptor that checks the context of whether the event needs to be processed. You can see such an interceptor in the following code.


    @Service
    @RequiredArgsConstructor
    public class MailHandlerInterceptor implements MessageHandlerInterceptor<EventMessage<?>> {
        private final EventProcessingConfigurer configurer;
    
        @PostConstruct
        public void register() {
            configurer.registerHandlerInterceptor("mail", c -> this);
        }
    
        @Override
        public Object handle(@NotNull final UnitOfWork> unitOfWork, @NotNull final InterceptorChain interceptorChain) throws Exception {
            final TrackedEventMessage trackedMessage = (TrackedEventMessage) unitOfWork.getMessage();
            final DomainEventMessage domainMessage = (DomainEventMessage) unitOfWork.getMessage();
    
            final Optional<MailReplayContext> myResetContext = ReplayToken.replayContext(trackedMessage.trackingToken(), MailReplayContext.class);
            if (myResetContext.isEmpty()) {
                // No replay with this context type
                return interceptorChain.proceed();
            }
            if (myResetContext.get().getUserIdentifiers().contains(domainMessage.getAggregateIdentifier())) {
                // Execute replay, aggregate identifier was in list
                return interceptorChain.proceed();
            }
            // Do nothing since it was a replay and is not in the list
            return null;
        }
    }

The MailHandlerInterceptor will register as a handler interceptor of the mail event processor. For every event, the interceptor will check whether there is a replay active at the moment. If that's the case, it will check whether the aggregate identifier is contained in the context. If it's not present, it will skip the event. We can register this interceptor to as many event processors we would like, allowing the reuse of this beavior.

Conclusion

If you want to customize replays of an event processor, Axon Framework 4.6.0 has you covered. You can now propagate reset-time context information to event handlers and interceptors, allowing you to customize them to your heart's content.

Mitchell Herrijgers
Solutions Architect for Axon Framework at AxonIQ Mitchell is a software craftsman, passionate coder, and eager to learn. He likes complex challenges and he doesn't get discouraged easily. He is interested in the following topics; Kotlin/Java, Event-Sourcing / CQRS / Axon Framework, Cloud/AWS/Infrastructure as Code, and performance tuning. After his study, in Computer Science at the Rotterdam University of Applied Sciences, he worked as a software engineer at ING Bank, Codecentric, and the Port of Rotterdam.
Mitchell Herrijgers

Share: