Axon Framework 3.3 has just been released and it has several interesting new features. In this blog, I'd like to highlight one that I’m particularly excited about: subscription queries. Before we look into the implementation and some sample code, let’s have a quick look at the background of this feature.

Queries in Axon Framework

In any application using CQRS and Event Sourcing (CQRS+ES) architecture, there are fundamentally three types of messages being exchanged: commands (representing an intent to do something), events (representing something that has happened) and queries (representing a desire for information). Axon Framework versions until 3.1 only represented commands and events explicitly (by offering corresponding Bus interfaces and Handler annotations). For queries, developers were left to figure out something on their own.

Axon Framework 3.1 fixed this situation by introducing the QueryBus and the corresponding QueryGateway facade and @QueryHandler annotation. Now, there was a mechanism to implement location transparency for query execution in a similar same way as it had been available for commands and events.

The need for subscription queries

Queries are usually understood to be simple request/response-type interactions: there’s a question (the query) which is then answered (the query response). This is also how they were implemented in Axon Framework 3.1 and 3.2. But in reality, there’s more to it than that. Modern UIs often update the information displayed to the user in response to something happening on the server. For instance, a user could be monitoring the current price of a particular stock in a trading application, where this price would be updated in real time as it changes on the stock market.

So how to model this? Traditional request/response queries can be used to capture the stock price once, but not continuously. We can resort to polling, repeating this request/response query periodically, but this is very inefficient. Or, we could send an event to the client whenever the stock price has changed. If we go for the event-based approach, we have two options:

  1. Let the UI responds to the domain event. This creates a few problems in a CQRS setup. When the domain event is received by the UI, there’s no guarantee that the read model serving queries is already updated – it is eventually consistent. Also, the UI would now be getting information from the query side (the initial response) as well as from the command side (the events), which leads to maintainability issues. Probably, the UI would be replicating some of the logic that’s also in the query model.
  2. To bypass these challenges, we could have the read model publish an event, different from the domain event, to communicate that the read model has been updated. This works technically, but it’s conceptually very ugly. In DDD terms, this is no real event, not something that happened in the domain. It’s an abuse of events that we’re more or less forced to make, because it’s the only thing that we can push from the read model to the client.

So we see that all above these approaches to the continuous updates challenge have some severe drawbacks. This is the problem that subscription queries solve. A subscription query is a query that consists of two simultaneous aspects:

  • Please provide the current result of this particular query.
  • Until I  cancel, if anything changes in these results, please let me know.

This allows us to cleanly update query results, avoiding polling and avoiding pollution of our model with technical events. Commands, events and queries can all continue to play the role that they should play in a CQRS+ES architecture.

Implementation in Axon Framework 3.3 – a demo application

We’ve created a sample application that illustrates the use of subscription queries in a simple gift card demo domain. It can be found on GitHub. It has a single-page UI (see screenshot below), which can be used to issue and redeem gift cards. It also has a table showing gift cards that have been issued and their remaining value. Issuing new gift cards leads to new entries in this table. Redeeming gift cards causes updates to entries in this table. These updates are handled through subscription queries.

The table also has a filter, filtering on the initial characters of the gift card identifier. If a user is currently filtering on gift cards that have an identifier that starts with ‘B’, then the issuing or redeeming of a gift card with identifier ‘A2321’ shouldn’t lead to any query update. This is also something that has been implemented in Axon Framework’s subscription query implementation, and used in the demo code.

The implementation of all of this naturally consists of a client side and a read side. In CQRS, we construct the read side based on the client’s needs, so we’ll start with discussing the client side.

Client side

We chose to implement the UI using Vaadin. Vaadin has a standard component to show tabular data, called a Grid. We provide the Grid with a DataProvider, which needs to be able to execute two queries: one for the count of records, and one to get the record data using offset and limit. From a UI perspective, this is a highly scalable model that can continue to work with millions of records.

When a card is updated (as a result of redeeming), we can update that single record in our UI by firing a DataRefreshEvent with the new record. This is how the subscription query model ideally works. When a new card is issued, we can’t do such a targeted update. Instead, we just need to let DataProvider know that the data has changed by firing a DataChangeEvent. As a result, the DataProvider will again do the initial queries, and subscribe again. The DataChangeEvent doesn’t contain any data; and we don’t need any specific data from the subscription query to create it.

This being the case, we define our projection’s API using the following classes:

  • CardSummary is our core data class holding the records;
  • CardSummaryFilter is a value object representing the filtering, right now only on initial characters of the card identifier;
  • CountCardSummariesQuery represents a request to count the cards; it contains a CardSummariesFilter instance. This query results in an initial Integer, and updates of type CountChangedUpdate, which by itself is an empty class, for reasons described above;
  • FetchCardSummariesQuery, represent a request to get card data; it contain an offset, query and again the filter. This query results in an initial List of CardSummary objects, and updates will have the form of a single CardSummary.

Now, in our CardSummaryDataProvider, we can perform querying like  this:

Create a query object:

FetchCardSummariesQuery fetchCardSummariesQuery =
       new FetchCardSummariesQuery(query.getOffset(), query.getLimit(), filter);

Executing the subscription query by submitting it to the query gateway yields a subscription query result:

SubscriptionQueryResult<List<CardSummary>, CardSummary> fetchQueryResult =
queryGateway.subscriptionQuery(fetchCardSummariesQuery,
       ResponseTypes.multipleInstancesOf(CardSummary.class),
       ResponseTypes.instanceOf(CardSummary.class));

The subscription query result is a holder object of two Reactor Core objects: a Mono publisher providing access to the initial result, and a Flux publisher providing the updates. The fact that this is based on Project Reactor provides tremendous power in terms of what we can do with the stream of updates coming in. For instance: if cards are being issued at rates of hundreds per second, we would get a corresponding stream of CountChangedUpdate messages. For reasons described above, each of those messages would naively trigger a new initial query by the DataProvider. This is very inefficient.  With just a single method invocation, we can limit this to max one update per 250 milliseconds.

Once we have issued the subscription query, the initial query is performed and all updates are being collected. We can retrieve both from the SubscriptionQueryResult, but the order in which we subscribe doesn’t matter.

We should first subscribe to the updates, then get the initial results. There is a small chance that we’ll be getting an update that is already taken into account when calculating the initial result. Therefore, updates should be idempotent: if they have been already taken into account, processing them again shouldn’t make a difference. In practice, this means sending whole new records in their new state rather than sending their delta. This is how we designed our project’s query API: we expect updates to be a new CardSummary, rather than the updates representing the amount that has been redeemed. So we’re safe here.

To subscribe to updates, we do:

fetchQueryResult.updates().subscribe(
       cardSummary -> {
           log.trace("processing query update for {}: {}", fetchCardSummariesQuery, cardSummary);
           fireEvent(new DataChangeEvent.DataRefreshEvent(this, cardSummary));
       });

Finally, to return the initial results, we do:

return fetchQueryResult.initialResult().block().stream();

Read model side

So, how to implement our read model so that it can serve these queries? Interestingly, the @QueryHandler methods don’t change as a result of subscription queries. They just need to serve the initial results. For instance, we have (in CardSummaryProjection):

@QueryHandler
public List<CardSummary> handle(FetchCardSummariesQuery query) {
log.trace("handling {}", query);
   TypedQuery<CardSummary> jpaQuery = entityManager.createNamedQuery("CardSummary.fetch", CardSummary.class);
   jpaQuery.setParameter("idStartsWith", StringUtils.defaultString(query.getFilter().getIdStartsWith()));
   jpaQuery.setFirstResult(query.getOffset());
   jpaQuery.setMaxResults(query.getLimit());
return log.exit(jpaQuery.getResultList());
}

The magic of the subscription queries has to happen when we’re projecting new events into our read model. There, we’ll be using a new Axon 3.3 object called the QueryUpdateEmitter. Normally in a projection, when processing an event, we would update the read model data. We still need to do that, but additionally we need to publish an update to relevant subscribed queries.

To determine which subscribed queries are eligible to receive an update, we have full access to the original query message of the query. But in practice, it will often be sufficient to filter on the class of the query payload, as well as on a certain predicate on queries. In our case, if a new card has been issued, we need to update subscribed CountCardSummaryQuerys, but only if the newly issued card matches their current filter status. This leads to the following code in the projection:

queryUpdateEmitter.emit(CountCardSummariesQuery.class,
       query -> event.getId().startsWith(StringUtils.defaultString(query.getFilter().getIdStartsWith())),
       new CountChangedUpdate());

(You might think it makes sense to add the new count here as field in the upload. But, it’s really of no use to our Vaadin client, it will have to do full initial queries anyway. At the query handler side, we don’t have that readily available in the event handler, so it would require another call to the database. So we simply send an empty object instead.)

In the call to the emit method, the first argument is the type of subscription queries to look for, the second argument is a Predicate on the query objects, and the third argument is the update payload to send to matching subscription queries.  

As a final note: subscriptions to queries may be ended in one of two ways. They can be cancelled by the client side; this is what we saw happening in our DataProvider when it’s doing new initial queries, for instance for accessing a new page of results. But subscriptions can also be cancelled by the read side. For instance, maybe a card that has been fully redeemed will never receive any more events. In that case, the read side can call complete() on the QueryUpdateEmitter to signal this. In case something goes wrong and no more updates will be sent because of this, it can also completeExceptionally().

Conclusions

We believe this new feature will greatly help in delivering modern, reactive, CQRS+ES based applications using Axon Framework. Be sure to check out the full code on GitHub, and ask any question you like about it on our public user group. Also, keep an eye out on AxonHub – the team is currently finalizing the work to reflect the subscription queries in AxonHub.  Once this is done, you’ll have a very simple way to distribute the execution of subscription queries as well.

Sign up for our newsletter

monthly updates about new product releases and invitations to AxonIQ events