event-sourcing-cqrs
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseEvent Sourcing & CQRS
Event Sourcing & CQRS
Core Concepts
核心概念
Command ──▶ Command Handler ──▶ Aggregate ──▶ Events ──▶ Event Store
│
┌──────┘
▼
Projections ──▶ Read Models ──▶ QueriesCommand ──▶ Command Handler ──▶ Aggregate ──▶ Events ──▶ Event Store
│
┌──────┘
▼
Projections ──▶ Read Models ──▶ QueriesEvent Sourcing (TypeScript)
Event Sourcing(TypeScript实现)
Event Store
Event Store
typescript
interface DomainEvent {
eventId: string;
aggregateId: string;
type: string;
data: unknown;
version: number;
timestamp: Date;
}
class EventStore {
async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
// Optimistic concurrency: check current version matches expected
const current = await this.getCurrentVersion(aggregateId);
if (current !== expectedVersion) throw new ConcurrencyError(aggregateId);
await db.events.createMany({
data: events.map((e, i) => ({ ...e, version: expectedVersion + i + 1 })),
});
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
return db.events.findMany({
where: { aggregateId },
orderBy: { version: 'asc' },
});
}
}typescript
interface DomainEvent {
eventId: string;
aggregateId: string;
type: string;
data: unknown;
version: number;
timestamp: Date;
}
class EventStore {
async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
// Optimistic concurrency: check current version matches expected
const current = await this.getCurrentVersion(aggregateId);
if (current !== expectedVersion) throw new ConcurrencyError(aggregateId);
await db.events.createMany({
data: events.map((e, i) => ({ ...e, version: expectedVersion + i + 1 })),
});
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
return db.events.findMany({
where: { aggregateId },
orderBy: { version: 'asc' },
});
}
}Event-Sourced Aggregate
事件溯源聚合根
typescript
class OrderAggregate {
private id!: string;
private status!: string;
private items: OrderItem[] = [];
private version = 0;
private uncommittedEvents: DomainEvent[] = [];
static fromHistory(events: DomainEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
events.forEach((e) => aggregate.apply(e, false));
return aggregate;
}
createOrder(id: string, items: OrderItem[]) {
this.apply({ type: 'OrderCreated', data: { id, items } }, true);
}
confirmOrder() {
if (this.status !== 'pending') throw new DomainError('Cannot confirm');
this.apply({ type: 'OrderConfirmed', data: { id: this.id } }, true);
}
private apply(event: Partial<DomainEvent>, isNew: boolean) {
switch (event.type) {
case 'OrderCreated':
this.id = event.data.id;
this.items = event.data.items;
this.status = 'pending';
break;
case 'OrderConfirmed':
this.status = 'confirmed';
break;
}
this.version++;
if (isNew) this.uncommittedEvents.push(event as DomainEvent);
}
}typescript
class OrderAggregate {
private id!: string;
private status!: string;
private items: OrderItem[] = [];
private version = 0;
private uncommittedEvents: DomainEvent[] = [];
static fromHistory(events: DomainEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
events.forEach((e) => aggregate.apply(e, false));
return aggregate;
}
createOrder(id: string, items: OrderItem[]) {
this.apply({ type: 'OrderCreated', data: { id, items } }, true);
}
confirmOrder() {
if (this.status !== 'pending') throw new DomainError('Cannot confirm');
this.apply({ type: 'OrderConfirmed', data: { id: this.id } }, true);
}
private apply(event: Partial<DomainEvent>, isNew: boolean) {
switch (event.type) {
case 'OrderCreated':
this.id = event.data.id;
this.items = event.data.items;
this.status = 'pending';
break;
case 'OrderConfirmed':
this.status = 'confirmed';
break;
}
this.version++;
if (isNew) this.uncommittedEvents.push(event as DomainEvent);
}
}CQRS
CQRS
Command Side
命令端
typescript
class ConfirmOrderHandler {
constructor(private eventStore: EventStore) {}
async handle(cmd: ConfirmOrderCommand): Promise<void> {
const events = await this.eventStore.getEvents(cmd.orderId);
const aggregate = OrderAggregate.fromHistory(events);
aggregate.confirmOrder();
await this.eventStore.append(
cmd.orderId,
aggregate.uncommittedEvents,
aggregate.version - aggregate.uncommittedEvents.length,
);
}
}typescript
class ConfirmOrderHandler {
constructor(private eventStore: EventStore) {}
async handle(cmd: ConfirmOrderCommand): Promise<void> {
const events = await this.eventStore.getEvents(cmd.orderId);
const aggregate = OrderAggregate.fromHistory(events);
aggregate.confirmOrder();
await this.eventStore.append(
cmd.orderId,
aggregate.uncommittedEvents,
aggregate.version - aggregate.uncommittedEvents.length,
);
}
}Query Side (Projection)
查询端(投影)
typescript
class OrderProjection {
async handle(event: DomainEvent): Promise<void> {
switch (event.type) {
case 'OrderCreated':
await readDb.orders.create({
data: { id: event.data.id, status: 'pending', total: calcTotal(event.data.items) },
});
break;
case 'OrderConfirmed':
await readDb.orders.update({
where: { id: event.aggregateId },
data: { status: 'confirmed', confirmedAt: event.timestamp },
});
break;
}
}
}typescript
class OrderProjection {
async handle(event: DomainEvent): Promise<void> {
switch (event.type) {
case 'OrderCreated':
await readDb.orders.create({
data: { id: event.data.id, status: 'pending', total: calcTotal(event.data.items) },
});
break;
case 'OrderConfirmed':
await readDb.orders.update({
where: { id: event.aggregateId },
data: { status: 'confirmed', confirmedAt: event.timestamp },
});
break;
}
}
}Snapshots (for long event streams)
快照(用于长事件流)
typescript
async function loadAggregate(id: string): Promise<OrderAggregate> {
const snapshot = await snapshotStore.getLatest(id);
const events = await eventStore.getEvents(id, snapshot?.version ?? 0);
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot)
: new OrderAggregate();
events.forEach((e) => aggregate.apply(e, false));
return aggregate;
}typescript
async function loadAggregate(id: string): Promise<OrderAggregate> {
const snapshot = await snapshotStore.getLatest(id);
const events = await eventStore.getEvents(id, snapshot?.version ?? 0);
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot)
: new OrderAggregate();
events.forEach((e) => aggregate.apply(e, false));
return aggregate;
}Anti-Patterns
反模式
| Anti-Pattern | Fix |
|---|---|
| Querying event store for reads | Build read models via projections |
| No optimistic concurrency | Check expected version on append |
| Deleting events | Events are immutable; use compensating events |
| Projections coupled to write model | Projections consume events independently |
| No snapshots for long streams | Snapshot every N events (e.g., 100) |
| 反模式 | 修复方案 |
|---|---|
| 直接查询事件存储用于读操作 | 通过投影构建读模型 |
| 未实现乐观并发 | 追加事件时检查预期版本 |
| 删除事件 | 事件是不可变的;使用补偿事件 |
| 投影与写模型耦合 | 投影独立消费事件 |
| 长事件流未使用快照 | 每N个事件创建一次快照(例如100个) |
Production Checklist
生产环境检查清单
- Event store with optimistic concurrency
- Projections for each read model
- Snapshots for aggregates with many events
- Idempotent projection handlers
- Event versioning strategy (upcasting)
- Monitoring: projection lag, event throughput
- 支持乐观并发的事件存储
- 为每个读模型配置投影
- 为包含大量事件的聚合根配置快照
- 实现幂等的投影处理器
- 事件版本化策略(向上转换)
- 监控:投影延迟、事件吞吐量