Introducing Subscription Queries

Axon Framework 3.3 has just been released, and it has several interesting new features. In this blog, I'd like to highlight one that I’m particularly excited about: subscription queries. Before we look into the implementation and some sample code, let’s have a quick look at the background of this feature.

Queries in Axon Framework

In any application using CQRS and Event Sourcing (CQRS+ES) architecture, there are fundamentally three types of messages being exchanged: commands (representing an intent to do something), events (representing something that has happened), and queries (representing a desire for information). Axon Framework versions until 3.1 only represented commands and events explicitly (by offering corresponding Bus interfaces and Handler annotations). For queries, developers were left to figure out something on their own.

Axon Framework 3.1 fixed this situation by introducing the QueryBus and the corresponding QueryGateway facade and @QueryHandler annotation. Now, there was a mechanism to implement location transparency for query execution similarly as it had been available for commands and events.

The need for subscription queries

Queries are usually understood as simple request/response-type interactions: there’s a question (the query) that is then answered (the query response). This is also how they were implemented in Axon Framework 3.1 and 3.2. But in reality, there’s more to it than that. Modern UIs often update the information displayed to the user in response to something happening on the server. For instance, a user could be monitoring the current price of a particular stock in a trading application, where this price would be updated in real-time as it changes on the stock market.

So how to model this? Traditional request/response queries can be used to capture the stock price once but not continuously. We can resort to polling, repeating this request/response query periodically, but this is inefficient. Or, we could send an event to the client whenever the stock price has changed. If we go for the event-based approach, we have two options:

  1. Let the UI responds to the domain event. This creates a few problems in a CQRS setup. First, when the UI receives the domain event, there’s no guarantee that the read model serving queries is already updated – it is eventually consistent. Also, the UI would now be getting information from the query side (the initial response) and the command side (the events), which leads to maintainability issues. Finally, the UI would be replicating some of the logic that’s also in the query model.
  2. To bypass these challenges, we could have the read model publish an event different from the domain event to communicate that the read model has been updated. This works technically, but it isn’t conceptually lovely. In DDD terms, this is no real event, not something that happened in the domain. Nevertheless, it’s an abuse of events that we’re more or less forced to make because it’s the only thing that we can push from the read model to the client.

So we see that all of these approaches to the continuous updates challenge have some severe drawbacks. This is the problem that subscription queries solve. A subscription query is a query that consists of two simultaneous aspects:

  • Please provide the current result of this particular query.
  • Until I cancel, if anything changes in these results, please let me know.

This allows us to cleanly update query results, avoid polling, and avoid pollution of our model with technical events. Commands, events, and queries can all continue to play the role that they should play in a CQRS and Event Sourcing architecture.

cqrs

Implementation in Axon Framework 3.3 – a demo application

We’ve created a sample application that illustrates the use of subscription queries in a simple gift card demo domain. It can be found on GitHub. It has a single-page UI (see screenshot below), which can issue and redeem gift cards. It also has a table showing gift cards that have been issued and their remaining value. Issuing new gift cards leads to new entries in this table. Likewise, redeeming gift cards causes updates to entries in this table. These updates are handled through subscription queries.

screenshot

The table also has a filter, filtering on the initial characters of the gift card identifier. So, for example, if a user is currently filtering on gift cards with an identifier that starts with 'B', then issuing or redeeming a gift card with identifier ‘A2321’ shouldn’t lead to any query update. This is also something that has been implemented in Axon Framework’s subscription query implementation and used in the demo code.

The implementation of all of this consists of a client-side and a read side. In CQRS, we construct the read side based on the client’s needs, so we’ll discuss the client-side.

Client-side

We chose to implement the UI using Vaadin. Vaadin has a standard component to show tabular data, called a Grid. We provide the Grid with a DataProvider, which needs to execute two queries: one for the count of records and one to get the record data using offset and limit. From a UI perspective, this is a highly scalable model that can continue to work with millions of records.

When a card is updated (resulting from redeeming), we can update that single record in our UI by firing a DataRefreshEvent with the new record. This is how the subscription query model ideally works. However, when a new card is issued, we can’t do such a targeted update. Instead, we need to let DataProvider know that the data has changed by firing a DataChangeEvent. As a result, the DataProvider will again do the initial queries and subscribe again. The DataChangeEvent doesn’t contain any data; we don’t need any specific data from the subscription query to create it.

This being the case, we define our projection’s API using the following classes:

  • CardSummary is our core data class holding the records;
  • CardSummaryFilter is a value object representing the filtering, right now only on initial characters of the card identifier;
  • CountCardSummariesQuery represents a request to count the cards; it contains a CardSummariesFilter instance. This query results in an initial Integer and updates of type CountChangedUpdate, which by itself is an empty class, for reasons described above;
  • FetchCardSummariesQuery represents a request to get card data; it contains an offset, query, and filter. This query results in an initial List of CardSummary objects, and updates will have the form of a single CardSummary.

Now, in our CardSummaryDataProvider, we can perform queries like this:

Create a query object:


    FetchCardSummariesQuery fetchCardSummariesQuery =
    new FetchCardSummariesQuery(query.getOffset(), query.getLimit(), filter);

Executing the subscription query by submitting it to the query gateway yields a subscription query result:


	SubscriptionQueryResult<list, CardSummary> fetchQueryResult =
		queryGateway.subscriptionQuery(fetchCardSummariesQuery,
			ResponseTypes.multipleInstancesOf(CardSummary.class),
            ResponseTypes.instanceOf(CardSummary.class));
</list

A subscription query result is a holder object of two Reactor Core objects: a Mono publisher providing access to the initial result and a Flux publisher providing the updates. This is based on Project Reactor provides tremendous power in terms of what we can do with the stream of updates coming in. For instance: if cards are being issued at hundreds per second, we will get a corresponding stream of CountChangedUpdate messages. For the reasons described above, each of those messages would naively trigger a new initial query by the DataProvider. This is very inefficient. With just a single method invocation, we can limit this to max one update per 250 milliseconds.

Once we have issued the subscription query, the initial query is performed, and all updates are being collected. We can retrieve both from the SubscriptionQueryResult, but the order in which we subscribe doesn’t matter.

We should first subscribe to the updates, then get the initial results. There is a small chance that we’ll be getting an update that is already taken into account when calculating the initial result. Therefore, updates should be idempotent: if they have been already taken into account, processing them again shouldn’t make a difference. In practice, this means sending whole new records in their new state rather than sending their delta. This is how we designed our project’s query API: we expect updates to be a new CardSummary, rather than the updates representing the amount that has been redeemed. So we’re safe here.

To subscribe to updates, we do:


    fetchQueryResult.updates().subscribe(
        cardSummary -> {
                log.trace("processing query update for {}: {}", fetchCardSummariesQuery, cardSummary);
                fireEvent(new DataChangeEvent.DataRefreshEvent(this, cardSummary));
        });

Finally, to return the initial results, we do:


    return fetchQueryResult.initialResult().block().stream();

Read model side

So, how to implement our read model so that it can serve these queries? Interestingly, the @QueryHandler methods don’t change as a result of subscription queries. Instead, they need to serve the initial results. For instance, we have (in CardSummaryProjection):


    @QueryHandler
    public List handle(FetchCardSummariesQuery query) {
        log.trace("handling {}", query);
           TypedQuery jpaQuery = entityManager.createNamedQuery("CardSummary.fetch", CardSummary.class);
           jpaQuery.setParameter("idStartsWith", StringUtils.defaultString(query.getFilter().getIdStartsWith()));
           jpaQuery.setFirstResult(query.getOffset());
           jpaQuery.setMaxResults(query.getLimit());
        return log.exit(jpaQuery.getResultList());
    }

The magic of the subscription queries has to happen when we’re projecting new events into our read model. There, we’ll be using a new Axon 3.3 object called the QueryUpdateEmitter. Normally in a projection, when processing an event, we would update the read model data. We still need to do that, but we also need to update relevant subscribed queries.

To determine which subscribed queries are eligible to receive an update, we have full access to the original query message of the query. But in practice, it will often be sufficient to filter on the class of the query payload and a certain predicate on queries. For example, in our case, if a new card has been issued, we need to update subscribed CountCardSummaryQuerys, but only if the newly issued card matches their current filter status. This leads to the following code in the projection:


    queryUpdateEmitter.emit(CountCardSummariesQuery.class,
        query -> event.getId().startsWith(StringUtils.defaultString(query.getFilter().getIdStartsWith())),
        new CountChangedUpdate());

(You might think it makes sense to add the new count here as a field in the upload. But, it’s really of no use to our Vaadin client. It will have to do full initial queries anyway. We don’t have that readily available in the event handler at the query handler side, so that it would require another call to the database. So we send an empty object instead.)

In the call to the emit method, the first argument is the type of subscription queries to look for. The second argument is a Predicate on the query objects. Finally, the third argument is the update payload to send to matching subscription queries.

As a final note: subscriptions to queries may be ended in one of two ways. First, they can be canceled by the client-side; this is what we saw happening in our DataProvider when it’s doing new initial queries, such as accessing a new page of results. But subscriptions can also be canceled by the read side. For instance, maybe a card that has been fully redeemed will never receive any more events. In that case, the read side can call complete() on the QueryUpdateEmitter to signal this. If something goes wrong and no more updates will be sent because of this, it can also completeExceptionally().

Conclusions

We believe this new feature will greatly help deliver modern, reactive, CQRS+ES based applications using Axon Framework. Be sure to check out the full code on GitHub and ask any question you like about it on our public user group.

Frans van Buul
Frans was an evangelist at AxonIQ. He worked with existing and prospective Axon Framework users, specifically looking at how AxonIQ's products and services can help them be successful. Also, he told the world about Axon by speaking at conferences, running webinars, writing blogs, etc. Before joining AxonIQ, Frans was a presales architect representing Fortify, the world's leading application security testing portfolio, having worked as both a Java architect and security consultant before that.
Frans van Buul

Share: