Axon Framework 4.6.0: Streaming queries
Introduction
Axon Framework 4.6.0 has recently been released. Besides other significant features like the Dead Letter Queue, OpenTelemetry, and Jakarta support, the release also contains a new type of query, streaming queries. This blog will explain why Streaming queries were added and how you can use them.
Current Query Types
Before the 4.6.0 release, there were three query types. Point-to-Point queries, Scatter-Gather queries, and Subscription queries. You can read more about them in the Axon Reference Guide or on Baeldung. Where the Reference guide is more in-depth about how they are implemented, and Baeldung is more about how to use them with code.
Need for Streaming queries
For some queries, there might be a lot of data in the response, which could become problematic. Typically there would be a Point-to-Point query that returns all entities, with either all the details or just a summary. As the data volume grows this might become problematic. With a streaming query, the result can be returned in parts. Where for example Server-Sent Events can be used for the API.
Requirements for Streaming queries
To be able to use streaming queries in a distributed setup, you need a QueryBus implementation that supports them. Currently, that’s only the AxonServerQueryBus, which uses Axon Server to route the queries. If it’s not already one of your dependencies, you also need to add reactor-core as that is what provides the streaming capabilities for Axon Framework. Optionally you can use the reactor extension providing an easier-to-use API. We will use the extension in the examples of this blog.
Example implementation
To make it more concrete we show a few snippets on how a Streaming query can be used. Let’s use the hotel booking example from the hotel-demo. In that project, we already have a FindAccounts query which is currently used in a Point-to-Point query. Let's implement it as a Streaming query instead.
Query handler
For the query handler, we need to return a Publisher. The implementation is similar to the Point-to-Point implementation which returns a list. We implement the handler in the AccountHandler class.
@QueryHandler Publisher handle(FindAccounts query) { return Mono.fromCallable(accountEntityRepository::findAll).flatMapMany(Flux::fromIterable).map(this::convert); }
This will send the list in pieces through Axon Server, instead of sending the whole list in one packet.
REST controller
We can now use the Streaming query directly in endpoints. We do this by a method to the AccountQueryController that returns a Flux.
@GetMapping(path = "/accounts-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux allstreaming() { return reactorQueryGateway.streamingQuery(new FindAccounts(), AccountResponseData.class); }
This will also pass each AccountResponseData as a separate message to the client calling the REST endpoint.
Conclusion
We hope this brief introduction to Streaming queries was helpful. Please reach out on Discuss with any questions you have.