____ ____ _ __
/ __ \__ __/ __ \____ _____ (_)___/ /__
/ /_/ / / / / /_/ / __ `/ __ \/ / __ / _ \
/ ____/ /_/ / _, _/ /_/ / /_/ / / /_/ / __/
/_/ \__, /_/ |_|\__,_/ .___/_/\__,_/\___/
/____/ /_/
⟨e₁⟩ ──causes──▶ ⟨e₂⟩
│ │
▼ ▼
⟨e₃⟩ ⟨e₄⟩ ──causes──▶ ⟨e₅⟩
Causal event-driven architecture modeling for Python, based on the RAPIDE 1.0 specification.
PyRapide is a Python library for modeling, executing, and analyzing causal event-driven architectures. It gives you first-class tools to define system components, connect them through typed event patterns, enforce behavioral constraints, and inspect the full causal history of every event that flows through your system.
PyRapide is based on RAPIDE (Rapid Prototyping for Application Domain-specific Integrated Environments), an architecture description language developed at Stanford University by David Luckham and his team. RAPIDE introduced the idea that the causal relationships between events, not just their order in time, are the fundamental structure worth capturing in a distributed system. PyRapide brings these ideas to modern Python with async-native execution, Pydantic-based events, and a composable pattern algebra.
This matters today because MCP (Model Context Protocol) servers and agentic AI systems generate complex, concurrent event streams where understanding why something happened is as important as what happened. PyRapide lets you define constraints like "every tool call must produce a result," detect anomalies like unexpected causal chains, and trace root causes across multiple interacting servers — all with a clean, declarative Python API.
PyRapide also includes drop-in adapters for 8 popular agentic frameworks — AutoGen, LangGraph, CrewAI, LlamaIndex, Agno, OpenAI Swarm, MetaGPT, and FlowiseAI — so you can add causal traceability to an existing project in 1-3 lines of code.
pip install pyrapideimport asyncio
from pyrapide import (
Event, Computation, Poset, Pattern,
interface, action, architecture, connect,
module, when, get_context, Engine,
must_match, visualization,
)
# 1. Define interfaces
@interface
class Sender:
@action
async def send(self, msg: str) -> None: ...
@interface
class Receiver:
@action
async def receive(self, msg: str) -> None: ...
# 2. Define module implementations
@module(implements=Sender)
class SenderModule:
async def start(self):
ctx = get_context(self)
ctx.generate_event("Sender.send", payload={"msg": "hello"})
@module(implements=Receiver)
class ReceiverModule:
@when(Pattern.match("Sender.send"))
async def on_send(self, match):
ctx = get_context(self)
event = list(match.events)[0]
ctx.generate_event("Receiver.receive",
payload={"msg": event.payload["msg"]},
caused_by=list(match.events))
# 3. Define the architecture
@architecture
class MessagingArch:
sender: Sender
receiver: Receiver
def connections(self):
return [connect(Pattern.match("Sender.send"), "receiver")]
# 4. Run it
async def main():
arch = MessagingArch()
engine = Engine()
engine.bind(arch, "sender", SenderModule)
engine.bind(arch, "receiver", ReceiverModule)
comp = await engine.run(arch, timeout=2.0)
# 5. Inspect the result
print(visualization.summary(comp))
# 6. Check a constraint
constraint = must_match(Pattern.match("Sender.send") >> Pattern.match("Receiver.receive"),
message="Every send must cause a receive")
violations = constraint.check(comp)
print(f"Violations: {len(violations)}")
asyncio.run(main())An Event is an immutable, uniquely identified record of something that happened: a tool call, a message, a state change. Events carry a name, payload, source, and timestamp. Crucially, events are linked to their causes, forming a causal history rather than a flat log.
A Poset is a directed acyclic graph (DAG) of events ordered by causality. Unlike a total-order log, a poset captures that some events are causally independent; they happened concurrently with no causal link. This is the foundational data structure in PyRapide.
An @interface declares the event types a component can produce (actions) and consume (requires). A @module implements an interface with concrete logic — lifecycle hooks (start, finish) and reactive handlers (@when). Modules are checked at decoration time for conformance to their interface.
An @architecture composes interfaces into a system. Connections (connect, pipe, agent) route events between components by pattern matching. The Engine orchestrates module lifecycle, event propagation, and connection dispatch.
Pattern provides a composable algebra for matching events in a poset. Basic patterns match by name and payload fields. Composite operators combine patterns: >> (causal sequence), & (join), | (independence), or_ (disjunction). Patterns can be guarded with predicates, timed with deadlines, and repeated.
Constraints declaratively specify behavioral rules: must_match asserts a pattern must occur, never forbids it. The ConstraintMonitor checks constraints in real time as events arrive. The @constraint decorator enables custom constraint logic with full access to the computation history.
StreamProcessor consumes events from multiple concurrent EventSource instances in real time, applying pattern watches and constraint enforcement with a configurable sliding window. MCPEventAdapter translates MCP protocol events (tool calls, results, errors) into PyRapide events with automatic causal linking via correlation IDs. LLMEventAdapter does the same for LLM request/response cycles.
import asyncio
from pyrapide import (
MCPEventAdapter, MCPEventTypes, MCPPatterns,
StreamProcessor, must_match, never,
)
from pyrapide.integrations.mcp import MockMCPClient
async def monitor_mcp_servers():
# Set up two MCP server adapters
db_client = MockMCPClient("database-server")
ai_client = MockMCPClient("ai-server")
db_adapter = MCPEventAdapter("database-server", db_client)
ai_adapter = MCPEventAdapter("ai-server", ai_client)
# Define constraints
no_errors = never(MCPPatterns.tool_error(), message="No tool errors allowed")
# Run the stream processor
processor = StreamProcessor()
processor.add_source("db", db_adapter)
processor.add_source("ai", ai_adapter)
processor.enforce(no_errors)
violations = []
processor.on_violation(lambda v: violations.append(v))
# Simulate server activity then close
async def simulate():
await db_client.connect()
await db_client.call_tool("query", {"sql": "SELECT * FROM users"})
await ai_client.connect()
await ai_client.call_tool_error("generate", error="rate limited")
await db_client.close()
await ai_client.close()
await asyncio.gather(simulate(), processor.run())
print(f"Events processed: {processor.stats['event_count']}")
print(f"Violations: {len(violations)}")
asyncio.run(monitor_mcp_servers())PyRapide includes adapter templates for 8 agentic AI frameworks, each translating framework-native events into a unified event taxonomy with automatic causal linking. Adding PyRapide to an existing project typically takes 1-3 lines of code.
| Framework | Adapter | Integration Code |
|---|---|---|
| AutoGen | AutoGenAdapter |
adapter = AutoGenAdapter("app", runtime=runtime) |
| LangGraph | LangGraphAdapter |
graph.invoke(input, config={"callbacks": [adapter.callback_handler]}) |
| CrewAI | CrewAIAdapter |
Crew(..., step_callback=adapter.on_step, task_callback=adapter.on_task) |
| LlamaIndex | LlamaIndexAdapter |
adapter = LlamaIndexAdapter("app"); adapter.attach() |
| Agno | AgnoAdapter |
adapter = AgnoAdapter("app", agent=agent) |
| OpenAI Swarm | SwarmAdapter |
client = adapter.create_instrumented_client() |
| MetaGPT | MetaGPTAdapter |
adapter = MetaGPTAdapter("app").patch() |
| FlowiseAI | FlowiseAdapter |
adapter = FlowiseAdapter("app", base_url=url, chatflow_id=id) |
import asyncio
from pyrapide.agent_templates.langgraph import LangGraphAdapter
from pyrapide.agent_templates.core import AgentPatterns
from pyrapide import StreamProcessor, never, queries, visualization
# Create adapter and processor
adapter = LangGraphAdapter("my-graph")
processor = StreamProcessor()
processor.add_source("langgraph", adapter)
processor.enforce(never(AgentPatterns.tool_error(), name="no_tool_errors"))
# Run your LangGraph app (pass the callback handler)
result = graph.invoke(
{"messages": [("user", "What is the weather?")]},
config={"callbacks": [adapter.callback_handler]},
)
# Analyze the causal event history
comp = adapter.computation
print(visualization.summary(comp))
# Trace root causes of any errors
for err in comp.events_by_name("agent.tool.error"):
roots = queries.root_causes(comp, err)
print(f"Error caused by: {[r.name for r in roots]}")All adapters normalize events into a unified taxonomy (agent.tool.call, agent.llm.request, agent.handoff, etc.) and share the same pattern library (AgentPatterns). See pyrapide-agent-documentation.md for full documentation, or the INTEGRATION.md file in each adapter's directory for framework-specific guides.
All public symbols are available from the top-level pyrapide package:
import pyrapide
print(pyrapide.__version__) # "0.3.0"| Category | Imports |
|---|---|
| Core |
Event, Poset, Computation, Clock, ClockManager, CausalCycleError
|
| Types |
interface, action, provides, requires, behavior, is_subtype, conforms_to
|
| Patterns |
Pattern, BasicPattern, PatternMatch, placeholder
|
| Architecture |
architecture, connect, pipe, agent
|
| Constraints |
Constraint, MustMatch, Never, must_match, never, constraint, ConstraintMonitor
|
| Executable |
module, when
|
| Runtime |
Engine, StreamProcessor, InMemoryEventSource
|
| Integrations |
MCPEventAdapter, MCPEventTypes, MCPPatterns, LLMEventAdapter, LLMEventTypes
|
| Analysis |
queries, visualization, CausalPredictor, AnomalyDetector
|
| Agent Templates |
AgentEventTypes, AgentPatterns, BaseAgentAdapter, CausalTracker, ResultBridge (from agent_templates.core) |
Full API documentation is available via docstrings on every public class and function. Sphinx documentation is planned for a future release.
pyrapide/
core/ Core primitives: Event, Poset, Computation, Clock, exceptions
types/ RAPIDE type system: interfaces, actions, subtype conformance
patterns/ Causal event pattern algebra: basic, composite, guarded, timed
architecture/ Architecture definitions: composition, connections, bindings
constraints/ Constraint enforcement: monitors, filters, conformance checks
executable/ Module lifecycle: @module, @when reactive handlers, processes
runtime/ Execution engine, event scheduling, streaming, serialization
integrations/ MCP and LLM protocol adapters with mock clients
analysis/ Queries, visualization (DOT/Mermaid/ASCII), prediction, anomaly detection
utils/ Internal utilities (ID generation)
agent_templates/
core/ Shared: AgentEventTypes, BaseAgentAdapter, CausalTracker, AgentPatterns
autogen/ Microsoft AutoGen adapter (InterventionHandler)
langgraph/ LangGraph / LangChain adapter (BaseCallbackHandler)
crewai/ CrewAI adapter (tool hooks + callbacks)
llamaindex/ LlamaIndex adapter (Dispatcher event handler)
agno/ Agno adapter (pre/post hooks + RunEvent stream)
swarm/ OpenAI Swarm adapter (InstrumentedSwarm subclass)
metagpt/ MetaGPT adapter (monkey-patch wrappers)
flowise/ FlowiseAI adapter (REST API + SSE consumer)
tests/ Core adapter tests (no framework dependencies required)
The package is organized around the RAPIDE specification's layered concepts: types define component interfaces, architectures compose them, executables implement behavior, and the runtime orchestrates everything. The agent_templates subpackage provides framework-specific adapters that translate agentic framework events into PyRapide's unified taxonomy, each with an INTEGRATION.md guide, an adapter.py implementation, and a runnable template.py example.
git clone https://github.com/ShaneDolphin/pyrapide.git
cd PyRapide
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"pytest tests/ -vAll tests use pytest-asyncio with asyncio_mode = "auto". The test suite includes unit tests for every module and end-to-end scenario tests.
mypy pyrapide/ --ignore-missing-imports
ruff check pyrapide/Both must pass with zero errors.
PyRapide is inspired by the RAPIDE architecture description language developed at the Stanford University Program Analysis and Verification Group by David Luckham, John Kenney, Larry Augustin, James Vera, and Doug Bryan.
Key references:
- Luckham, D.C. et al. "Specification and Analysis of System Architecture Using Rapide." IEEE Transactions on Software Engineering, 1995.
- Luckham, D.C. "The Power of Events: An Introduction to Complex Event Processing in Distributed Enterprise Systems." Addison-Wesley, 2002.
- Luckham, D.C. and Vera, J. "An Event-Based Architecture Definition Language." IEEE Transactions on Software Engineering, 1995.
PyRapide is an independent implementation that adapts RAPIDE's core ideas (causal event posets, interface types, architecture composition, and pattern-based constraints) to modern Python with async execution, Pydantic models, and integrations for MCP and LLM protocols.
MIT License. See LICENSE for details.