Writing a Plugin
Plugins are storage backends for events and projection state. Sourcing provides two plugin roles:
- Sourcing::Plugin::EventStore — handles event storage and retrieval
- Sourcing::Plugin::StateCache — handles projection state caching
This guide shows you how to implement either or both roles.
Option 1: Implement Both Roles
For a complete plugin, implement both EventStore and StateCache (or use Sourcing::Plugin::Memory):
use Sourcing::Plugin::EventStore;
use Sourcing::Plugin::StateCache;
unit class MyPlugin does Sourcing::Plugin::EventStore;
also does Sourcing::Plugin::StateCache;
# EventStore methods
method emit($event, :$current-version) { ... }
method get-events(%ids, %map) { ... }
method get-events-after($id, %ids, %map) { ... }
method supply { ... }
# StateCache methods
method store-cached-data(Mu:U $proj, %) { ... }
method get-cached-data(Mu:U $proj, %) is rw { ... }
# Activation
method use(|c) {
PROCESS::<$SourcingConfig> = self.new: |c;
}
Option 2: Implement Only EventStore
If you only need event storage (no state caching):
use Sourcing::Plugin::EventStore;
unit class MyEventStore does Sourcing::Plugin::EventStore;
has Supplier $.supplier .= new;
has Supply() $.supply = $!supplier;
has @.events;
method emit($event, :$current-version) {
# Store event
@!events.push: $event;
# Broadcast to subscriptions
$!supplier.emit: $event;
}
method get-events(%ids, %map) { ... }
method get-events-after(Int $id, %ids, %map) { ... }
method supply { $!supply }
method use(|c) {
PROCESS::<$SourcingConfig> = self.new: |c;
}
Option 3: Implement Only StateCache
If you only need state caching (use an existing event store):
use Sourcing::Plugin::StateCache;
unit class MyStateCache does Sourcing::Plugin::StateCache;
has %.store;
method store-cached-data(Mu:U $proj, %data, Int :$last-id!) {
my $key = $proj.^name;
%!store{$key} = { data => %data, last-id => $last-id };
}
method get-cached-data(Mu:U $proj, %ids) is rw {
%!store{$proj.^name} //= { data => %(), last-id => -1 };
}
method use(|c) {
PROCESS::<$SourcingConfig> = self.new: |c;
}
Version-Checked Emit
For optimistic locking, implement a multi method with :$type, :%ids!, and :$current-version!:
use Sourcing::Aggregation;
use Sourcing::X::OptimisticLocked;
multi method emit($event, :$type, :%ids!, :$current-version!) {
# Only aggregations can emit
unless $type ~~ Sourcing::Aggregation {
die "Only aggregations can emit events";
}
# CAS version check
my $stored-version = get-stored-version($type, %ids);
if $stored-version != $current-version {
Sourcing::X::OptimisticLocked.new(
:type($type), :ids(%ids),
:expected-version($current-version),
:actual-version($stored-version)
).throw;
}
# Store event and update version
store-event($event);
update-version($type, %ids, $current-version + 1);
# Broadcast on supply
$!supplier.emit: $event;
}
Activation
All plugins should provide a use class method:
method use(|c) {
PROCESS::<$SourcingConfig> = self.new: |c;
}
Usage
use MyPlugin;
MyPlugin.use; # Activates your plugin
# Now all sourcing operations use your plugin
my $account = sourcing BankAccount, :account-id(1);
Next Steps
- Study the Memory Plugin source for a complete implementation
- Read the EventStore API Reference for method signatures
- Read the StateCache API Reference for method signatures