eventsourcing

CQRS and Event Soucing in Gleam!


License
Other

Documentation

Eventsourcing

Event Sourcing Library for Gleam
A Gleam library for building event-sourced systems with supervision trees.


Table of contents

Introduction

Eventsourcing is a Gleam library for building robust, concurrent event-sourced systems using OTP supervision trees. Event sourcing stores changes to application state as a sequence of immutable events, providing excellent auditability, debugging capabilities, and system resilience.

Version 9.0 introduces API simplifications with cleaner function signatures and streamlined message passing for improved developer experience while maintaining all the production-ready features from v8.0.

Architecture

The library is built on a supervised actor architecture:

  • Supervision Trees: Production-ready fault tolerance with automatic actor restarts
  • Query Actors: Event processing runs in separate supervised actors for non-blocking processing
  • Domain Error Resilience: Business rule violations don't crash the system
  • Asynchronous Processing: Commands and queries are processed independently

Features

🏗️ Supervision & Fault Tolerance

  • Supervised architecture with built-in supervision trees
  • Graceful domain error handling - business rule violations don't crash actors
  • Automatic actor recovery - failed components restart automatically
  • Fault isolation - actor crashes don't cascade through the system

Concurrent Processing

  • Asynchronous query processing - queries don't block command execution
  • Multi-aggregate support - process multiple aggregates simultaneously
  • Actor-based isolation - queries run in independent processes

📊 Event Sourcing Core

  • Command validation and event generation
  • Event persistence with multiple store implementations
  • Aggregate reconstruction from event streams
  • Partial event loading from specific sequence numbers
  • Snapshot support for performance optimization

🔧 Event Store Support

🛡️ Type Safety

  • Comprehensive error types with Result-based API
  • Input validation for timeouts and frequencies
  • Type-safe event sourcing pipeline

Quick Start

import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process

// 1. Create memory store with supervision
let events_actor_name = process.new_name("events_actor")
let snapshot_actor_name = process.new_name("snapshot_actor")
let #(eventstore, memory_store_spec) = memory_store.supervised(
  events_actor_name,
  snapshot_actor_name, 
  static_supervisor.OneForOne
)

// 2. Start memory store supervisor
let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(memory_store_spec)
  |> static_supervisor.start()

// 3. Create event sourcing system
let name = process.new_name("eventsourcing_actor")
let queries = [#(process.new_name("my_query"), my_query)]
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
  name: name,
  eventstore: eventstore,
  handle: my_handle,
  apply: my_apply,
  empty_state: MyEmptyState,
  queries: queries,
  snapshot_config: None
)

// 4. Start event sourcing supervisor
let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(eventsourcing_spec)
  |> static_supervisor.start()

// 5. Get actor from name
let eventsourcing_actor = process.named_subject(name)

// 6. Execute commands
eventsourcing.execute(eventsourcing_actor, "aggregate-123", MyCommand)

// 7. Load events and monitor system
let events_subject = eventsourcing.load_events(eventsourcing_actor, "aggregate-123")
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)

Example

Define Your Domain

pub type BankAccount {
  BankAccount(balance: Float)
  UnopenedBankAccount
}

pub type BankAccountCommand {
  OpenAccount(account_id: String)
  DepositMoney(amount: Float)
  WithDrawMoney(amount: Float)
}

pub type BankAccountEvent {
  AccountOpened(account_id: String)
  CustomerDepositedCash(amount: Float, balance: Float)
  CustomerWithdrewCash(amount: Float, balance: Float)
}

pub type BankAccountError {
  CantDepositNegativeAmount
  CantOperateOnUnopenedAccount
  CantWithdrawMoreThanCurrentBalance
}

Command Handling

pub fn handle(
  bank_account: BankAccount,
  command: BankAccountCommand,
) -> Result(List(BankAccountEvent), BankAccountError) {
  case bank_account, command {
    UnopenedBankAccount, OpenAccount(account_id) ->
      Ok([AccountOpened(account_id)])
    BankAccount(balance), DepositMoney(amount) -> {
      case amount >. 0.0 {
        True -> {
          let new_balance = balance +. amount
          Ok([CustomerDepositedCash(amount, new_balance)])
        }
        False -> Error(CantDepositNegativeAmount)
      }
    }
    BankAccount(balance), WithDrawMoney(amount) -> {
      case amount >. 0.0 && balance >=. amount {
        True -> {
          let new_balance = balance -. amount
          Ok([CustomerWithdrewCash(amount, new_balance)])
        }
        False -> Error(CantWithdrawMoreThanCurrentBalance)
      }
    }
    _, _ -> Error(CantOperateOnUnopenedAccount)
  }
}

Event Application

pub fn apply(bank_account: BankAccount, event: BankAccountEvent) -> BankAccount {
  case event {
    AccountOpened(_) -> BankAccount(0.0)
    CustomerDepositedCash(_, balance) -> BankAccount(balance)
    CustomerWithdrewCash(_, balance) -> BankAccount(balance)
  }
}

Supervised Usage (Recommended)

import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process

pub fn main() {
  // Define query for read model updates
  let balance_query = fn(aggregate_id, events) {
    io.println(
      "Account " <> aggregate_id <> " processed " 
      <> int.to_string(list.length(events)) <> " events"
    )
  }

  // 1. Create memory store with supervision
  let events_actor_name = process.new_name("events_actor")
  let snapshot_actor_name = process.new_name("snapshot_actor")
  let #(eventstore, memory_store_spec) = memory_store.supervised(
    events_actor_name,
    snapshot_actor_name,
    static_supervisor.OneForOne
  )

  // 2. Start memory store supervisor
  let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(memory_store_spec)
    |> static_supervisor.start()

  // 3. Create supervised event sourcing system
  let name = process.new_name("eventsourcing_actor")
  let queries = [#(process.new_name("balance_query"), balance_query)]
  let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
    name: name,
    eventstore: eventstore,
    handle: handle,
    apply: apply,
    empty_state: UnopenedBankAccount,
    queries: queries,
    snapshot_config: None
  )

  // 4. Start event sourcing supervision tree
  let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(eventsourcing_spec)
    |> static_supervisor.start()

  // 5. Get actor from name
  let eventsourcing_actor = process.named_subject(name)

  // Give time for actors to start
  process.sleep(100)

  // 6. Execute commands - they will be processed asynchronously
  eventsourcing.execute(
    eventsourcing_actor,
    "account-123",
    OpenAccount("account-123")
  )
  
  eventsourcing.execute(
    eventsourcing_actor,
    "account-123",
    DepositMoney(100.0)
  )
}

Async API Usage

All data operations now use an asynchronous message-passing pattern:

pub fn async_example() {
  // Set up supervised system (memory store setup omitted for brevity)
  let name = process.new_name("eventsourcing_actor")
  let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
    name: name,
    eventstore: eventstore, // from memory_store.supervised()
    handle: handle,
    apply: apply,
    empty_state: UnopenedBankAccount,
    queries: [],
    snapshot_config: None
  )

  let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(eventsourcing_spec)
    |> static_supervisor.start()

  let eventsourcing_actor = process.named_subject(name)
  process.sleep(100) // Give time for actors to start

  // Load aggregate state asynchronously
  let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "account-123")
  case process.receive(load_subject, 1000) {
    Ok(Ok(aggregate)) -> io.println("Account loaded: " <> aggregate.aggregate_id)
    Ok(Error(eventsourcing.EntityNotFound)) -> io.println("Account not found")
    Ok(Error(other)) -> io.println("Error: " <> string.inspect(other))
    Error(_) -> io.println("Timeout waiting for response")
  }
  
  // Load events asynchronously
  let events_subject = eventsourcing.load_events_from(eventsourcing_actor, "account-123", 0)
  case process.receive(events_subject, 1000) {
    Ok(Ok(events)) -> io.println("Loaded " <> int.to_string(list.length(events)) <> " events")
    Ok(Error(error)) -> io.println("Error loading events: " <> string.inspect(error))
    Error(_) -> io.println("Timeout waiting for events")
  }
}

Snapshot Configuration

// Create validated frequency (snapshots every 100 events)
let assert Ok(frequency) = eventsourcing.frequency(100)
let snapshot_config = eventsourcing.SnapshotConfig(frequency)

// Enable snapshots during supervised system setup
let name = process.new_name("eventsourcing_actor")
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
  name: name,
  eventstore: eventstore, // from memory_store.supervised()
  handle: handle,
  apply: apply,
  empty_state: UnopenedBankAccount,
  queries: [],
  snapshot_config: Some(snapshot_config) // Enable snapshots
)

// Get actor and load latest snapshot asynchronously
let eventsourcing_actor = process.named_subject(name)
let snapshot_subject = eventsourcing.latest_snapshot(eventsourcing_actor, "account-123")
case process.receive(snapshot_subject, 1000) {
  Ok(Ok(Some(snapshot))) -> {
    io.println("Using snapshot from sequence " <> int.to_string(snapshot.sequence))
  }
  Ok(Ok(None)) -> io.println("No snapshot available, loading from events")
  Ok(Error(error)) -> io.println("Error loading snapshot: " <> string.inspect(error))
  Error(_) -> io.println("Timeout waiting for snapshot")
}

Enhanced API Features

Execute Commands with Metadata

// Execute with additional tracking information
eventsourcing.execute_with_metadata(
  eventsourcing_actor,
  "account-123",
  DepositMoney(100.0),
  [#("user_id", "user-456"), #("source", "mobile_app"), #("trace_id", "abc-123")]
)

System Monitoring and Stats

// Get system health statistics
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)
case process.receive(stats_subject, 1000) {
  Ok(stats) -> {
    io.println("Query actors: " <> int.to_string(stats.query_actors_count))
    io.println("Commands processed: " <> int.to_string(stats.total_commands_processed))
  }
  Error(_) -> io.println("Timeout getting stats")
}

// Get individual aggregate statistics  
let agg_stats_subject = eventsourcing.aggregate_stats(eventsourcing_actor, "account-123")
case process.receive(agg_stats_subject, 1000) {
  Ok(Ok(stats)) -> {
    io.println("Aggregate: " <> stats.aggregate_id)
    io.println("Has snapshot: " <> string.inspect(stats.has_snapshot))
  }
  Ok(Error(error)) -> io.println("Error: " <> string.inspect(error))
  Error(_) -> io.println("Timeout")
}

Migration Guide

Migrating from v8 to v9

Version 9.0 introduces API simplifications to improve the developer experience:

Key Changes

  • Simplified function signatures: All public functions now accept process.Subject(...) directly instead of actor.Started(...)
  • Cleaner function names: Removed get_ prefixes from stats functions (system_stats, aggregate_stats, latest_snapshot)
  • Direct message passing: Functions send messages directly without .data accessor

Migration Steps

1. Update Function Calls - Remove .data accessor

// v8.0
eventsourcing.execute(eventsourcing_actor.data, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor.data, "agg-123")

// v9.0 - Direct subject passing
eventsourcing.execute(eventsourcing_actor, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")

2. Update Statistics Function Names

// v8.0
let stats = eventsourcing.get_system_stats(eventsourcing_actor.data)
let agg_stats = eventsourcing.get_aggregate_stats(eventsourcing_actor.data, "agg-123")
let snapshot = eventsourcing.get_latest_snapshot(eventsourcing_actor.data, "agg-123")

// v9.0 - Cleaner names
let stats = eventsourcing.system_stats(eventsourcing_actor)
let agg_stats = eventsourcing.aggregate_stats(eventsourcing_actor, "agg-123") 
let snapshot = eventsourcing.latest_snapshot(eventsourcing_actor, "agg-123")

3. Add Named Actor Support

// v8.0
let assert Ok(spec) = eventsourcing.supervised(
  eventstore, handle: handle, apply: apply,
  empty_state: state, queries: [balance_query],
  eventsourcing_actor_receiver: receiver1,
  query_actors_receiver: receiver2,
  snapshot_config: None
)

// v9.0 - Named actors required
let assert Ok(spec) = eventsourcing.supervised(
  name: process.new_name("eventsourcing_actor"),
  eventstore: eventstore, handle: handle, apply: apply,
  empty_state: state, 
  queries: [#(process.new_name("balance_query"), balance_query)],
  snapshot_config: None
)

Migrating from v7 to v9

Key Changes

  • Supervision required for production use
  • Query registration needed after supervisor startup
  • Asynchronous execution via actor message passing
  • Async data loading - all load operations return subjects and require process.receive()
  • Enhanced error handling with graceful domain error recovery
  • Snapshot config now passed during system initialization

Migration Steps

1. Update Initialization

// v7.x
let eventsourcing = eventsourcing.new(store, queries, handle, apply, empty_state)

// v8.0 - Supervised (Recommended)  
let eventsourcing_spec = eventsourcing.supervised(
  store, handle, apply, empty_state, queries,
  eventsourcing_receiver, query_receiver
)

2. Query Registration (No Longer Needed)

// v8.0 - Required after supervisor start
eventsourcing.register_queries(eventsourcing_actor, query_actors)

// v9.0 - Not needed! Queries are automatically registered through supervised() function

3. Update Command Execution

// v7.x
eventsourcing.execute(eventsourcing, "agg-123", command)

// v8.0  
eventsourcing.execute(eventsourcing_actor, "agg-123", command)

4. Update Data Loading (Now Async)

// v7.x - Synchronous
let assert Ok(aggregate) = eventsourcing.load_aggregate(eventsourcing, "agg-123")

// v8.0 - Asynchronous message passing
let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")
let assert Ok(Ok(aggregate)) = process.receive(load_subject, 1000)

5. Update Snapshot Configuration

// v7.x
let config = eventsourcing.SnapshotConfig(5)

// v8.0
let assert Ok(frequency) = eventsourcing.frequency(5)
let config = eventsourcing.SnapshotConfig(frequency)

Philosophy

Eventsourcing v8 embraces OTP supervision principles to build production-ready, fault-tolerant event-sourced systems:

  • Fault Tolerance First: Supervision trees ensure system resilience
  • Let It Crash: System errors crash actors for clean recovery, business errors don't
  • Clear Separation: Commands, events, and queries are distinct supervised processes
  • Concurrent by Design: Asynchronous processing maximizes throughput
  • Type Safety: Full Gleam type safety with comprehensive error handling
  • Production Ready: Built for real-world concurrent systems

This design makes your event-sourced systems naturally concurrent, fault-tolerant, and maintainable while preserving Gleam's excellent type safety.

Installation

Eventsourcing is published on Hex! You can add it to your Gleam projects from the command line:

gleam add eventsourcing

Support

Eventsourcing is built by Renatillas. Contributions are very welcome! If you've spotted a bug, or would like to suggest a feature, please open an issue or a pull request.

Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch (git checkout -b my-feature-branch).
  3. Make your changes and commit them (git commit -m 'Add new feature').
  4. Push to the branch (git push origin my-feature-branch).
  5. Open a pull request.

Please ensure your code adheres to the project's coding standards and includes appropriate tests.

License

This project is licensed under the MIT License. See the LICENSE file for details.