Simple (yet complete) example of game implementation using CQRS and event sourcing backed by Akka persistence and RabbitMQ. Part 1/3
Hi, in this post series we’ll create a really simple (yet complete) event-sourced game. It won’t be anything spectacular thus its rules are as simple as:
With these simple rules it’s barely playable but who cares, at least it’s event-sourced!
Full source code is available on GitHub.
We’ll split our application into three separate modules: game, webappp and statistics.
Game is our core module, it’s where all business logic lives. It has REST API to handle commands and it publishes events to RabbitMQ.
Webapp is an example user interface client consuming our game module. It’ll be created using Play Framework, AngularJS and WebSocket.
Additionaly, we’ll create statistics module that’ll collect data from game’s events and expose simple REST API to get calculated dice rolls statistics.
In this part we’ll take a look at game module followed by webapp and statistics in 2nd and 3rd part of this series.
We’ll begin with implementing main portion of game’s logic - our domain. We’ll do it using event sourcing technique.
Basic idea behind event sourcing is quite simple: an aggregate root (our game) receives (non-persistent) commands.
As a result it generates persistent events that are saved to an event store (this is what Akka persistence takes care of). Right after saving, events are applied to the aggregate root, changing it’s state - it’s the only way to modify it.
Having this in mind, we can restore any past state of an aggregate just by subsequently applying its saved events. On the other hand, it means that if we want to recreate the current state of an aggregate we need to take all of it’s saved events and apply them ony by one. And yes, you are right - this may be inefficient.
Here’s where snapshots come into play - they are state dumps taking into account events up to a certain point in the event stream. If we have a snapshot we only need to reapply the events that happened after it’s creation to obtain the current state. Usually it’s a good idea to use them but here we’ll leave them out for clarity.
Summing up, here are the ingredients that we’ll use:
Our game is an event-driven aggregate root. It’s immutable and most of its public methods return new copy of itself with some changes. Applied changes are reflected in generated events added to returned game.
Here’s how our game’s interface looks:
As you can see, game can be in one of 3 states:
There are just two commands that our game needs to handle:
I decided to make
Game trait responsible for handling these commands, thats what
handleCommand method does.
It simply dispatches commands to the corresponding methods (either
If a command cannot be applied in the current state (for example
UninitializedGame) it returns the suitable violation.
Some may prefer to have
handleCommand implemented in each subclass and not in the
Game trait. Either ways is fine.
Apart from commands we’ll have a method to update the current turn countdown (
It’ll take care of updating the time left in a player’s turn as well as the turn timeout.
We’ll execute this method periodically as time passes.
You might have already noticed that most public methods of a game return
We’ll use the following violations to indicate failure:
Here’s quick explanation:
NotEnoughPlayersViolation- returned by
startif there are not enough players to start a game (ie, less than 2)
NotCurrentPlayerViolation- returned by
rollif the player who tries to roll is not the one who’s turn it is
GameAlreadyStartedViolation- created when you try to pass a
Startcommand to game that’s already started
GameNotRunningViolation- indicates that a player tried to roll before the game was started
If a command result is a valid game (not a violation), all events generated during the command’s processing will be added to the returned game’s
You can later publish, persist or use them in any other way.
Events that we’ll use are:
I hope they are self explanatory.
Appropriate events are generated by
Here’s a simplified version of the
roll method, that generates the event:
We can see that if everything is fine (it’s the
currentPlayer that rolls) we generate one event:
roll method we return the result of applying this event to the current game’s state.
What does the
applyEvent method do then?
Here’s example for
A new copy of the game is created with an updated state (here we add the rolled number to the
We also add the new event to
uncommitedEvents so that the caller of
roll knows what events were generated.
All state changes happen as a result of events being applied. If we want to change the game state we need to generate an event and apply it.
applyEvent is overridden in each of game subclasses and each of them handles different set of events.
DiceRolled event would be invalid in uninitialized state, thus it’s not handled there.
The boon of this approach is that having a list of past events we can easily restore the current game state, just by applying them one by one.
This method is defined in the
AggregateRoot trait that
Let’s get back to
tickCountdown for a moment.
Although it doesn’t handle commands, it too will generate some events.
Now that we have our game logic, we can bring it to life. We’ll use Akka to run games and persist events using Akka persistence.
Key part of Akka persistence is
PersistentActor. It’s role is to manage event-driven state.
PersistentActor is created, first thing that it does is state recovery from its previously saved events (or snapshots).
If we create an actor that had some events saved before, all these events are given before any other message is handled (incoming messages are cached internally). Only after all past events were processed
PersistentActor can handle regular messages.
Where do the events come from then? Akka persistence gives us (amongst other things) a
persist method which we can use to save events to the journal.
Journals are pluggable and there’s quite a bit of them available (including MongoDB, Cassandra, Kafka). The default one writes to the local filesystem - it’s fine for our simple project.
Knowing this, there are several things we’ll need to implement in our
persistenceId- is the unique identifier of our persistent actor, used by the persistence mechanism to associate the stored events with our actor; here we’ll use the
gameId(which happens to be a UUID)
receiveCommand- handles regular messages received by the actor; we’ll implement this to handle game commands as well as self-sent message to update the time
receiveRecover- messages that we’ll handle here are either previously saved game events from which we’ll rebuild most recent state or
RecoveryCompletedwhich tells us that state recovery has finished
GameActor will be responsible for managing a running game’s state (stored in it’s
game field) - feeding it commands and ticking the time.
What’s going on here?
CommandRejected) A note here: Using CQRS it’s not always immediately possible to tell if something went wrong. We’re lucky that our game is simple enough for this not to be an issue. If we weren’t we could just acknowledge that we started processing a command that might be completed at some later point in time.
persistis a callback) we apply it to current game state (the one from before command). Also we mark new state as commited, that is, remove all events from
uncommittedEvents. We don’t want them anymore (if we left them, by next command processing, we wouldn’t know which events are “fresh” and which ones remained from previous commands)
GameActor will also take care of updating the game’s time (remember the
We’ll use the built-in scheduler for that. It will call
game.tickCountdown every second.
Once it does it will handle the changes the same way it does in case of a command.
Here’s related code:
To create new games and pass commands to existing ones, we’ll expose a REST API. We’ll use spray-can as a server with the following routes:
I realize last two are not very RESTful, but for our simple example let’s just stick with them.
We’ll use the actor-per-request pattern, creating one actor to handle each request.
We handle the
/game url by creating a new actor as shown below.
The request-handling actor is defined as follows
As you can see all commands go through a
GameManager. It’s responsible for creating new
passing commands to existing ones.
SendCommand is send by instances of the second per-request actor we have -
GameCommandRequestActor - created to handle the
Once a command is processed, the actor handling the request is stopped.
We already know how to take some actions in our games, but how do we know what’s actually happening in them? Let’s get back to our events.
For now events from our games just go to the
eventStream and get forgotten.
That’s not very useful. We’d prefer them to be published for consumption by external clients.
There are many possibilities: from exposing a REST API for fetching recent events (polling), through a custom socket-based
pub/sub implementation, to full-blown message queue systems. We’ll gear towards the latter and use RabbitMQ
to publish our events.
I’ll use the Reactive rabbit library to easily bind a stream (of game events) to a rabbit exchange.
Our events will be sent to a headers exchange and will have
type headers (which we can route on in our queues).
Here’s how we create an exchange and bind a publisher to it:
Our publisher is an actor that catches game events from
eventStream and publishes them (
onNext) in accordance to requested demand.
Once we pass an event to
onNext Reactive rabbit takes care of moving it to RabbitMQ.
Now that we have all the building blocks ready it’s time to wire everything up and get it running.
First, let’s start RabbitMQ. I prefer to use a docker container to avoid the tedium of setting it up from scratch.
Before we start creating games, we’ll bind a queue to game_events exchange so that we can see passing events:
Later, when there’ll be messages going to this queue, you can read them in the
Get messages section of the queue view.
Just set how many messages you want to see in the
Messages field and hit
We are now ready to run the game application:
Let’s try creating a new game using the REST API we have:
The game has been created and we got it’s id in response. Great! Let’s start it:
We just started a game with two players: “Player1” and “Player2”. The game should be running and generating events now.
Let’s check that… Go back to rabbit console and get some messages in queue overview. Here’s what I got:
We can clearly see that the game has indeed started and is running. Voila!
The turn countdown is being updated every second, and if you wait long enough without any action
you’ll surely see the
TurnTimedOut event in a few seconds.
Let’s try rolling the dice:
Good! We’ve just rolled the dice.
DiceRolled event should be generated.
Let’s check our validation, what will happen if we try to roll as a player who is not supposed to roll at the moment?
That’s it. Everything works fine.
For the impatient there is also a web interface. Just
sbt "project webapp" run and point your browser to http://localhost:9000/ to play around with it.
I realize this post didn’t cover all aspects in detail, I tried to focus on most important ones and I hope it puts some light on most of them. My goal was to give you a basic idea of how a CQRS/ES-based application could look like.
As a reminder you can access the full source code at Github.
In the next part, we’ll create a simple web application. We’ll see how to combine REST API calls with event listening to create a decent user experience.