- Introduction
- Architecture
- Features
- Quick Start
- Example
- Migration Guide
- Philosophy
- Installation
- Support
- Contributing
- License
Eventsourcing is a Gleam library for building robust, concurrent event-sourced systems using OTP supervision trees. Event sourcing stores changes to application state as a sequence of immutable events, providing excellent auditability, debugging capabilities, and system resilience.
Version 9.0 introduces API simplifications with cleaner function signatures and streamlined message passing for improved developer experience while maintaining all the production-ready features from v8.0.
The library is built on a supervised actor architecture:
- Supervision Trees: Production-ready fault tolerance with automatic actor restarts
- Query Actors: Event processing runs in separate supervised actors for non-blocking processing
- Domain Error Resilience: Business rule violations don't crash the system
- Asynchronous Processing: Commands and queries are processed independently
- Supervised architecture with built-in supervision trees
- Graceful domain error handling - business rule violations don't crash actors
- Automatic actor recovery - failed components restart automatically
- Fault isolation - actor crashes don't cascade through the system
- Asynchronous query processing - queries don't block command execution
- Multi-aggregate support - process multiple aggregates simultaneously
- Actor-based isolation - queries run in independent processes
- Command validation and event generation
- Event persistence with multiple store implementations
- Aggregate reconstruction from event streams
- Partial event loading from specific sequence numbers
- Snapshot support for performance optimization
- In-memory Store: Development and testing
- Postgres Store: PostgreSQL persistence
- SQLite Store: SQLite persistence
- Comprehensive error types with Result-based API
- Input validation for timeouts and frequencies
- Type-safe event sourcing pipeline
import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process
// 1. Create memory store with supervision
let events_actor_name = process.new_name("events_actor")
let snapshot_actor_name = process.new_name("snapshot_actor")
let #(eventstore, memory_store_spec) = memory_store.supervised(
events_actor_name,
snapshot_actor_name,
static_supervisor.OneForOne
)
// 2. Start memory store supervisor
let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(memory_store_spec)
|> static_supervisor.start()
// 3. Create event sourcing system
let name = process.new_name("eventsourcing_actor")
let queries = [#(process.new_name("my_query"), my_query)]
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
name: name,
eventstore: eventstore,
handle: my_handle,
apply: my_apply,
empty_state: MyEmptyState,
queries: queries,
snapshot_config: None
)
// 4. Start event sourcing supervisor
let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(eventsourcing_spec)
|> static_supervisor.start()
// 5. Get actor from name
let eventsourcing_actor = process.named_subject(name)
// 6. Execute commands
eventsourcing.execute(eventsourcing_actor, "aggregate-123", MyCommand)
// 7. Load events and monitor system
let events_subject = eventsourcing.load_events(eventsourcing_actor, "aggregate-123")
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)
pub type BankAccount {
BankAccount(balance: Float)
UnopenedBankAccount
}
pub type BankAccountCommand {
OpenAccount(account_id: String)
DepositMoney(amount: Float)
WithDrawMoney(amount: Float)
}
pub type BankAccountEvent {
AccountOpened(account_id: String)
CustomerDepositedCash(amount: Float, balance: Float)
CustomerWithdrewCash(amount: Float, balance: Float)
}
pub type BankAccountError {
CantDepositNegativeAmount
CantOperateOnUnopenedAccount
CantWithdrawMoreThanCurrentBalance
}
pub fn handle(
bank_account: BankAccount,
command: BankAccountCommand,
) -> Result(List(BankAccountEvent), BankAccountError) {
case bank_account, command {
UnopenedBankAccount, OpenAccount(account_id) ->
Ok([AccountOpened(account_id)])
BankAccount(balance), DepositMoney(amount) -> {
case amount >. 0.0 {
True -> {
let new_balance = balance +. amount
Ok([CustomerDepositedCash(amount, new_balance)])
}
False -> Error(CantDepositNegativeAmount)
}
}
BankAccount(balance), WithDrawMoney(amount) -> {
case amount >. 0.0 && balance >=. amount {
True -> {
let new_balance = balance -. amount
Ok([CustomerWithdrewCash(amount, new_balance)])
}
False -> Error(CantWithdrawMoreThanCurrentBalance)
}
}
_, _ -> Error(CantOperateOnUnopenedAccount)
}
}
pub fn apply(bank_account: BankAccount, event: BankAccountEvent) -> BankAccount {
case event {
AccountOpened(_) -> BankAccount(0.0)
CustomerDepositedCash(_, balance) -> BankAccount(balance)
CustomerWithdrewCash(_, balance) -> BankAccount(balance)
}
}
import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process
pub fn main() {
// Define query for read model updates
let balance_query = fn(aggregate_id, events) {
io.println(
"Account " <> aggregate_id <> " processed "
<> int.to_string(list.length(events)) <> " events"
)
}
// 1. Create memory store with supervision
let events_actor_name = process.new_name("events_actor")
let snapshot_actor_name = process.new_name("snapshot_actor")
let #(eventstore, memory_store_spec) = memory_store.supervised(
events_actor_name,
snapshot_actor_name,
static_supervisor.OneForOne
)
// 2. Start memory store supervisor
let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(memory_store_spec)
|> static_supervisor.start()
// 3. Create supervised event sourcing system
let name = process.new_name("eventsourcing_actor")
let queries = [#(process.new_name("balance_query"), balance_query)]
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
name: name,
eventstore: eventstore,
handle: handle,
apply: apply,
empty_state: UnopenedBankAccount,
queries: queries,
snapshot_config: None
)
// 4. Start event sourcing supervision tree
let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(eventsourcing_spec)
|> static_supervisor.start()
// 5. Get actor from name
let eventsourcing_actor = process.named_subject(name)
// Give time for actors to start
process.sleep(100)
// 6. Execute commands - they will be processed asynchronously
eventsourcing.execute(
eventsourcing_actor,
"account-123",
OpenAccount("account-123")
)
eventsourcing.execute(
eventsourcing_actor,
"account-123",
DepositMoney(100.0)
)
}
All data operations now use an asynchronous message-passing pattern:
pub fn async_example() {
// Set up supervised system (memory store setup omitted for brevity)
let name = process.new_name("eventsourcing_actor")
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
name: name,
eventstore: eventstore, // from memory_store.supervised()
handle: handle,
apply: apply,
empty_state: UnopenedBankAccount,
queries: [],
snapshot_config: None
)
let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(eventsourcing_spec)
|> static_supervisor.start()
let eventsourcing_actor = process.named_subject(name)
process.sleep(100) // Give time for actors to start
// Load aggregate state asynchronously
let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "account-123")
case process.receive(load_subject, 1000) {
Ok(Ok(aggregate)) -> io.println("Account loaded: " <> aggregate.aggregate_id)
Ok(Error(eventsourcing.EntityNotFound)) -> io.println("Account not found")
Ok(Error(other)) -> io.println("Error: " <> string.inspect(other))
Error(_) -> io.println("Timeout waiting for response")
}
// Load events asynchronously
let events_subject = eventsourcing.load_events_from(eventsourcing_actor, "account-123", 0)
case process.receive(events_subject, 1000) {
Ok(Ok(events)) -> io.println("Loaded " <> int.to_string(list.length(events)) <> " events")
Ok(Error(error)) -> io.println("Error loading events: " <> string.inspect(error))
Error(_) -> io.println("Timeout waiting for events")
}
}
// Create validated frequency (snapshots every 100 events)
let assert Ok(frequency) = eventsourcing.frequency(100)
let snapshot_config = eventsourcing.SnapshotConfig(frequency)
// Enable snapshots during supervised system setup
let name = process.new_name("eventsourcing_actor")
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
name: name,
eventstore: eventstore, // from memory_store.supervised()
handle: handle,
apply: apply,
empty_state: UnopenedBankAccount,
queries: [],
snapshot_config: Some(snapshot_config) // Enable snapshots
)
// Get actor and load latest snapshot asynchronously
let eventsourcing_actor = process.named_subject(name)
let snapshot_subject = eventsourcing.latest_snapshot(eventsourcing_actor, "account-123")
case process.receive(snapshot_subject, 1000) {
Ok(Ok(Some(snapshot))) -> {
io.println("Using snapshot from sequence " <> int.to_string(snapshot.sequence))
}
Ok(Ok(None)) -> io.println("No snapshot available, loading from events")
Ok(Error(error)) -> io.println("Error loading snapshot: " <> string.inspect(error))
Error(_) -> io.println("Timeout waiting for snapshot")
}
Execute Commands with Metadata
// Execute with additional tracking information
eventsourcing.execute_with_metadata(
eventsourcing_actor,
"account-123",
DepositMoney(100.0),
[#("user_id", "user-456"), #("source", "mobile_app"), #("trace_id", "abc-123")]
)
System Monitoring and Stats
// Get system health statistics
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)
case process.receive(stats_subject, 1000) {
Ok(stats) -> {
io.println("Query actors: " <> int.to_string(stats.query_actors_count))
io.println("Commands processed: " <> int.to_string(stats.total_commands_processed))
}
Error(_) -> io.println("Timeout getting stats")
}
// Get individual aggregate statistics
let agg_stats_subject = eventsourcing.aggregate_stats(eventsourcing_actor, "account-123")
case process.receive(agg_stats_subject, 1000) {
Ok(Ok(stats)) -> {
io.println("Aggregate: " <> stats.aggregate_id)
io.println("Has snapshot: " <> string.inspect(stats.has_snapshot))
}
Ok(Error(error)) -> io.println("Error: " <> string.inspect(error))
Error(_) -> io.println("Timeout")
}
Version 9.0 introduces API simplifications to improve the developer experience:
-
Simplified function signatures: All public functions now accept
process.Subject(...)
directly instead ofactor.Started(...)
-
Cleaner function names: Removed
get_
prefixes from stats functions (system_stats
,aggregate_stats
,latest_snapshot
) -
Direct message passing: Functions send messages directly without
.data
accessor
1. Update Function Calls - Remove .data accessor
// v8.0
eventsourcing.execute(eventsourcing_actor.data, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor.data, "agg-123")
// v9.0 - Direct subject passing
eventsourcing.execute(eventsourcing_actor, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")
2. Update Statistics Function Names
// v8.0
let stats = eventsourcing.get_system_stats(eventsourcing_actor.data)
let agg_stats = eventsourcing.get_aggregate_stats(eventsourcing_actor.data, "agg-123")
let snapshot = eventsourcing.get_latest_snapshot(eventsourcing_actor.data, "agg-123")
// v9.0 - Cleaner names
let stats = eventsourcing.system_stats(eventsourcing_actor)
let agg_stats = eventsourcing.aggregate_stats(eventsourcing_actor, "agg-123")
let snapshot = eventsourcing.latest_snapshot(eventsourcing_actor, "agg-123")
3. Add Named Actor Support
// v8.0
let assert Ok(spec) = eventsourcing.supervised(
eventstore, handle: handle, apply: apply,
empty_state: state, queries: [balance_query],
eventsourcing_actor_receiver: receiver1,
query_actors_receiver: receiver2,
snapshot_config: None
)
// v9.0 - Named actors required
let assert Ok(spec) = eventsourcing.supervised(
name: process.new_name("eventsourcing_actor"),
eventstore: eventstore, handle: handle, apply: apply,
empty_state: state,
queries: [#(process.new_name("balance_query"), balance_query)],
snapshot_config: None
)
- Supervision required for production use
- Query registration needed after supervisor startup
- Asynchronous execution via actor message passing
-
Async data loading - all load operations return subjects and require
process.receive()
- Enhanced error handling with graceful domain error recovery
- Snapshot config now passed during system initialization
1. Update Initialization
// v7.x
let eventsourcing = eventsourcing.new(store, queries, handle, apply, empty_state)
// v8.0 - Supervised (Recommended)
let eventsourcing_spec = eventsourcing.supervised(
store, handle, apply, empty_state, queries,
eventsourcing_receiver, query_receiver
)
2. Query Registration (No Longer Needed)
// v8.0 - Required after supervisor start
eventsourcing.register_queries(eventsourcing_actor, query_actors)
// v9.0 - Not needed! Queries are automatically registered through supervised() function
3. Update Command Execution
// v7.x
eventsourcing.execute(eventsourcing, "agg-123", command)
// v8.0
eventsourcing.execute(eventsourcing_actor, "agg-123", command)
4. Update Data Loading (Now Async)
// v7.x - Synchronous
let assert Ok(aggregate) = eventsourcing.load_aggregate(eventsourcing, "agg-123")
// v8.0 - Asynchronous message passing
let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")
let assert Ok(Ok(aggregate)) = process.receive(load_subject, 1000)
5. Update Snapshot Configuration
// v7.x
let config = eventsourcing.SnapshotConfig(5)
// v8.0
let assert Ok(frequency) = eventsourcing.frequency(5)
let config = eventsourcing.SnapshotConfig(frequency)
Eventsourcing v8 embraces OTP supervision principles to build production-ready, fault-tolerant event-sourced systems:
- Fault Tolerance First: Supervision trees ensure system resilience
- Let It Crash: System errors crash actors for clean recovery, business errors don't
- Clear Separation: Commands, events, and queries are distinct supervised processes
- Concurrent by Design: Asynchronous processing maximizes throughput
- Type Safety: Full Gleam type safety with comprehensive error handling
- Production Ready: Built for real-world concurrent systems
This design makes your event-sourced systems naturally concurrent, fault-tolerant, and maintainable while preserving Gleam's excellent type safety.
Eventsourcing is published on Hex! You can add it to your Gleam projects from the command line:
gleam add eventsourcing
Eventsourcing is built by Renatillas. Contributions are very welcome! If you've spotted a bug, or would like to suggest a feature, please open an issue or a pull request.
Contributions are welcome! Please follow these steps:
- Fork the repository.
- Create a new branch (
git checkout -b my-feature-branch
). - Make your changes and commit them (
git commit -m 'Add new feature'
). - Push to the branch (
git push origin my-feature-branch
). - Open a pull request.
Please ensure your code adheres to the project's coding standards and includes appropriate tests.
This project is licensed under the MIT License. See the LICENSE file for details.