Loading...
Loading...
Event sourcing patterns for storing state as a sequence of events. Use when implementing event-driven architectures, CQRS, audit trails, or building systems requiring full history reconstruction.
npx skill4agent add yonatangross/orchestkit event-sourcingfrom pydantic import BaseModel, Field
from datetime import datetime, timezone
from uuid import UUID, uuid4
class DomainEvent(BaseModel):
event_id: UUID = Field(default_factory=uuid4)
aggregate_id: UUID
event_type: str
version: int
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Config:
frozen = True # Events are immutableclass Account:
def __init__(self):
self._changes, self._version, self.balance = [], 0, 0.0
def deposit(self, amount: float):
self._raise_event(MoneyDeposited(aggregate_id=self.id, amount=amount, version=self._version + 1))
def _apply(self, event):
match event:
case MoneyDeposited(): self.balance += event.amount
case MoneyWithdrawn(): self.balance -= event.amount
def load_from_history(self, events):
for e in events: self._apply(e); self._version = e.versionasync def append_events(self, aggregate_id: UUID, events: list, expected_version: int):
current = await self.get_version(aggregate_id)
if current != expected_version:
raise ConcurrencyError(f"Expected {expected_version}, got {current}")
for event in events:
await self.session.execute(insert(event_store).values(
event_id=event.event_id, aggregate_id=aggregate_id,
event_type=event.event_type, version=event.version, data=event.model_dump()
))| Decision | Recommendation |
|---|---|
| Event naming | Past tense ( |
| Concurrency | Optimistic locking with version check |
| Snapshots | Every 100-500 events for large aggregates |
| Event schema | Version events, support upcasting |
| Projections | Async handlers, idempotent updates |
| Storage | PostgreSQL + JSONB or dedicated event store |
# NEVER modify stored events
await event_store.update(event_id, new_data) # Destroys audit trail
# NEVER include computed data in events
class OrderPlaced(DomainEvent):
total: float # WRONG - compute from line items
# NEVER ignore event ordering
async for event in events: # May arrive out of order
await handle(event) # Must check version/sequence
# ALWAYS use immutable events
class Event(BaseModel):
class Config:
frozen = True # Correct
# ALWAYS version your events
event_schema_version: int = 1 # Support schema evolutionmessage-queuesdatabase-schema-designerintegration-testing