Building an Event Sourcing Crate for Rust
In one of my recent blog posts, I talked about Event Sourcing with Aggregates in Rust. In that post, I was just beginning to explore how the Rust language and its strongly typed native data structures would allow me to express event sourcing concepts and primitives.
In that article, there was an apply function on the aggregate which would, depending on how you implemented it, either mutate its own state or would consume its previous self and return a new version of itself. After thinking some more about that and how I might build an event sourcing crate for production-grade applications, I decided that my original samples needed some re-working.
Whenever I sit down to code, I always ask myself if what I’m building is applicable to more than just my own needs. If it is, then I will try and figure out how to open source it. I don’t get to use Rust in my day job (yet), but if the community continues to contribute open source libraries at the rate it has been, this should change soon enough. By creating this library, I not only solved my own need, but hopefully the next time a Rust developer goes looking for a slim implementation of ES primitives, they’ll find and use my crate.
Scratching together what little spare time I do have, I’ve created an initial version of an Event Sourcing crate that you can explore on crates.io, Rust’s central package repository. My first version of this crate had a lot of boilerplate code. I found myself writing a ton of really redundant code. As a result, I learned how to create custom derive macros (procedural macros, currently available in nightly but stabilizing soon!) that take care of the boilerplate for you, leaving you to spend more time worrying about your domain and less worrying about how to interact with my crate.
So let’s take a look at how you might structure your domain using the event sourcing crate. One thing that often gets in my way is a crate that forces me to do things only its way, so my crate tries to stay out of your hair as much as possible, while still making you use concepts like command, event, aggregate, aggregate state, and dispatcher.
Let’s start with the concept of a command. The command is a fairly simple enum, where each variant is a different type of command:
#[derive(Debug)] pub enum CombatCommand { Attack(String, u32), }
There’s no boilerplate here, and there’s no special macros. It’s just a plain old enum. Remember that commands should be named using imperatives, or action verbs in the present tense: Attack versus “attacked” or “attacks”, Moveversus “moved” or “moves”.
To define an event, we’ll create an enum with all of the variants of that event. You can use one enum per category of events, or you can have an enum carry all of your event variants. In the case of my sample “combat” demo, my event looks like this:
#[derive(Serialize, Deserialize, Debug, Clone, Event)] #[event_type_version("1.0")] #[event_source("events://.../samples/combat")] pub enum CombatEvent { EntityAttacked(String, u32), RandomEvent { a: u32, b: u32 }, UnitEvent, }
Here we’re starting to see a little bit of the event sourcing library at work. First, we’re deriving an Event, and we’re specifying the event type version and the source of that event. You can pick anything for that source, but I prefer a URI syntax with a scheme of events. Events, once they leave a bounded context, have to adhere to a fixed public contract. As a result, we need to know what version of that contract a particular event was published under.
This is boilerplate I was originally writing by hand but was able to make it significantly cleaner with some custom derive macros. Every derived event will automatically get the functions from the Event trait:
pub trait Event: Serialize { fn event_type_version(&self) -> &str; fn event_type(&self) -> &str; fn event_source(&self) -> &str; }
Now that we’ve got an event and a command, if we remember from the previous blog post, we should be able to handle an event within an aggregate and emit a new state. Likewise, we should be able to apply a command to the aggregate and get one or more events. This is where you, as the consumer of the event sourcing library and author of your own domain application, supply the core of your business logic.
Let’s define some state for the combat sample:
pub struct CombatState { pub entity_id: String, pub hitpoints: u32, pub generation: u64, }
impl AggregateState for CombatState { fn generation(&self) -> u64 { self.generation } }
What this looks like is that when we handle combat events, we’ll be able to get a new version of state. For example, if we Attack (command) a given entity, that has the potential of producing a new version (generation) of that entity’s state. I have a generation field on my state struct in order to comply with the AggregateState trait. Once I can figure out how to create a macro to automatically handle monotonic increments of state generations, I’ll put that in the crate as well.
This represents a little bit of a departure from my previous ideas. Here, I’m taking a more functional approach. My aggregate is stateless. It neither consumes nor borrows its own fields. Instead, it returns a new (generation incremented) state after each handle_command invocation.
The reason I have this generation field is to give application developers a potential tool for resolving concurrency conflicts. If, for whatever reason, they are running multiple threads that each handled an event, there might be a clue as to how to resolve the state by comparing the generation number on multiple state instances. Optimistic locking/concurrency in many databases and services is handled this way.
Now let’s implement the business logic, the core of our event sourcing domain:
pub struct Combat; impl Aggregate for Combat { type Event = CombatEvent; type Command = CombatCommand; type State = CombatState;
fn apply_event(state: &Self::State, evt: Self::Event) -> Result<Self::State> { unimplemented!() }
fn handle_command(_state: &Self::State, cmd: Self::Command) -> Result<Vec<Self::Event>> { println!("Command handled: {:#?}", cmd); // SHOULD DO: validate state and command
// if validation passes... Ok(vec![cmd.into()]) } }
Obviously I’ve got placeholders here for now, but you can imagine that I would be invoking some math and maybe looking up damage charts to resolve the combat event handling and to validate handling commands (e.g. rejecting the attack of an immortal entity or a non-existent one).
So far, I think this has dramatically simplified the use of event sourcing and makes it very easy for any Rust application to adopt this pattern for some or all of its internal plumbing. But we still have one more thing to deal with, and that’s dispatch — doing something with the events that come out of a handle_command function call.
In many scenarios, you’re going to take that vector and immediately dispatchit to an event store or a queue or some kind of messaging system. To make this easier, I’ve made the concept of a dispatcher a pluggable thing that also comes with some boilerplate-eliminating macros:
#[derive(Dispatcher)] #[aggregate(Combat)] pub struct CombatDispatcher;
The current version of the crate already comes with an implementation of a dispatcher that sends to the eventstore.org event store. This is a very popular, open source, lightweight, and easy to use event store. Here’s how we might see this in action:
let combat_store = OrgEventStore::new("localhost", 2113);
let swing = CombatCommand::Attack("ogre".to_string(), 150)
// set up initial state (likely pre-fetched from a DB in a production scenario) let state = CombatState { entity_id: "ogre".to_owned(), hitpoints: 900, generation: 0, };
// every successful event that comes out of this will automatically be sent to the eventstore DB let res = CombatDispatcher::dispatch(&state, swing, &combat_store, "ogre");
There’s some subtlety going on here that makes this work. First, we have a couple of associated types on our aggregate:
type Event = CombatEvent; type Command = CombatCommand; type State = CombatState;
These tell the aggregate what kinds of event sourcing components it’s responsible for. Next, the dispatcher custom derive macro lets us tell it the aggregate for which it will be dispatching:
#[derive(Dispatcher)] #[aggregate(Combat)] pub struct CombatDispatcher;
With all of that in place, the dispatch function can actually call handle_command on the aggregate, obtain the results, and send those results to a named stream on the dispatcher:
CombatDispatcher::dispatch(&state, swing, &combat_store, "ogre");
The eventstore.org dispatcher sends to a database, but you can imagine we might also have other kinds of dispatchers like a Kafka or NATS one that sends events to a topic. This is also important: you can have multiple dispatchers for the same aggregate, so if you want to persist and publish your events, just set up two dispatchers!
An interesting extra bonus with this crate is that I’ve included an implementation of Cloud Events. This is an open spec providing a universal, standard form for publishing and sharing events in the cloud. You can simply write code like this to get a cloud event from a “regular” event:
let ce: CloudEvent = swing.into();
Since all of your events have custom derived boilerplate, I can take advantage of that to get enough metadata from it to produce a cloud event instance without you needing to do any work. With this CloudEvent you can now interact with any system that adheres to the cloud event JSON format (this crate’s CloudEvents serialize/de-serialize according to the Cloud Event JSON spec).
Wrapping up, I am currently using this crate in two different side projects that I will eventually be pushing to real, live production infrastructure. I would love for more folks to use this crate so we can harden it and get ideas for different types of dispatchers (I’m probably going to build a NATS one very soon).
I also want to leave potential crate/library authors with a lesson learned: I would absolutely not have arrived at where I am now had I not been able to build “quick and dirty” iterations, think about them, and make small adjustments over time. After spending a lot of time refactoring, I’m finally at a foundation I think I can build on. I would never have this foundation had I not been able to start with an imperfect initial milestone.