event-sourcing-cqrs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Event Sourcing & CQRS

Event Sourcing & CQRS

Core Concepts

核心概念

Command ──▶ Command Handler ──▶ Aggregate ──▶ Events ──▶ Event Store
                                                    ┌──────┘
                                              Projections ──▶ Read Models ──▶ Queries
Command ──▶ Command Handler ──▶ Aggregate ──▶ Events ──▶ Event Store
                                                    ┌──────┘
                                              Projections ──▶ Read Models ──▶ Queries

Event Sourcing (TypeScript)

Event Sourcing(TypeScript实现)

Event Store

Event Store

typescript
interface DomainEvent {
  eventId: string;
  aggregateId: string;
  type: string;
  data: unknown;
  version: number;
  timestamp: Date;
}

class EventStore {
  async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
    // Optimistic concurrency: check current version matches expected
    const current = await this.getCurrentVersion(aggregateId);
    if (current !== expectedVersion) throw new ConcurrencyError(aggregateId);

    await db.events.createMany({
      data: events.map((e, i) => ({ ...e, version: expectedVersion + i + 1 })),
    });
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    return db.events.findMany({
      where: { aggregateId },
      orderBy: { version: 'asc' },
    });
  }
}
typescript
interface DomainEvent {
  eventId: string;
  aggregateId: string;
  type: string;
  data: unknown;
  version: number;
  timestamp: Date;
}

class EventStore {
  async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
    // Optimistic concurrency: check current version matches expected
    const current = await this.getCurrentVersion(aggregateId);
    if (current !== expectedVersion) throw new ConcurrencyError(aggregateId);

    await db.events.createMany({
      data: events.map((e, i) => ({ ...e, version: expectedVersion + i + 1 })),
    });
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    return db.events.findMany({
      where: { aggregateId },
      orderBy: { version: 'asc' },
    });
  }
}

Event-Sourced Aggregate

事件溯源聚合根

typescript
class OrderAggregate {
  private id!: string;
  private status!: string;
  private items: OrderItem[] = [];
  private version = 0;
  private uncommittedEvents: DomainEvent[] = [];

  static fromHistory(events: DomainEvent[]): OrderAggregate {
    const aggregate = new OrderAggregate();
    events.forEach((e) => aggregate.apply(e, false));
    return aggregate;
  }

  createOrder(id: string, items: OrderItem[]) {
    this.apply({ type: 'OrderCreated', data: { id, items } }, true);
  }

  confirmOrder() {
    if (this.status !== 'pending') throw new DomainError('Cannot confirm');
    this.apply({ type: 'OrderConfirmed', data: { id: this.id } }, true);
  }

  private apply(event: Partial<DomainEvent>, isNew: boolean) {
    switch (event.type) {
      case 'OrderCreated':
        this.id = event.data.id;
        this.items = event.data.items;
        this.status = 'pending';
        break;
      case 'OrderConfirmed':
        this.status = 'confirmed';
        break;
    }
    this.version++;
    if (isNew) this.uncommittedEvents.push(event as DomainEvent);
  }
}
typescript
class OrderAggregate {
  private id!: string;
  private status!: string;
  private items: OrderItem[] = [];
  private version = 0;
  private uncommittedEvents: DomainEvent[] = [];

  static fromHistory(events: DomainEvent[]): OrderAggregate {
    const aggregate = new OrderAggregate();
    events.forEach((e) => aggregate.apply(e, false));
    return aggregate;
  }

  createOrder(id: string, items: OrderItem[]) {
    this.apply({ type: 'OrderCreated', data: { id, items } }, true);
  }

  confirmOrder() {
    if (this.status !== 'pending') throw new DomainError('Cannot confirm');
    this.apply({ type: 'OrderConfirmed', data: { id: this.id } }, true);
  }

  private apply(event: Partial<DomainEvent>, isNew: boolean) {
    switch (event.type) {
      case 'OrderCreated':
        this.id = event.data.id;
        this.items = event.data.items;
        this.status = 'pending';
        break;
      case 'OrderConfirmed':
        this.status = 'confirmed';
        break;
    }
    this.version++;
    if (isNew) this.uncommittedEvents.push(event as DomainEvent);
  }
}

CQRS

CQRS

Command Side

命令端

typescript
class ConfirmOrderHandler {
  constructor(private eventStore: EventStore) {}

  async handle(cmd: ConfirmOrderCommand): Promise<void> {
    const events = await this.eventStore.getEvents(cmd.orderId);
    const aggregate = OrderAggregate.fromHistory(events);

    aggregate.confirmOrder();

    await this.eventStore.append(
      cmd.orderId,
      aggregate.uncommittedEvents,
      aggregate.version - aggregate.uncommittedEvents.length,
    );
  }
}
typescript
class ConfirmOrderHandler {
  constructor(private eventStore: EventStore) {}

  async handle(cmd: ConfirmOrderCommand): Promise<void> {
    const events = await this.eventStore.getEvents(cmd.orderId);
    const aggregate = OrderAggregate.fromHistory(events);

    aggregate.confirmOrder();

    await this.eventStore.append(
      cmd.orderId,
      aggregate.uncommittedEvents,
      aggregate.version - aggregate.uncommittedEvents.length,
    );
  }
}

Query Side (Projection)

查询端(投影)

typescript
class OrderProjection {
  async handle(event: DomainEvent): Promise<void> {
    switch (event.type) {
      case 'OrderCreated':
        await readDb.orders.create({
          data: { id: event.data.id, status: 'pending', total: calcTotal(event.data.items) },
        });
        break;
      case 'OrderConfirmed':
        await readDb.orders.update({
          where: { id: event.aggregateId },
          data: { status: 'confirmed', confirmedAt: event.timestamp },
        });
        break;
    }
  }
}
typescript
class OrderProjection {
  async handle(event: DomainEvent): Promise<void> {
    switch (event.type) {
      case 'OrderCreated':
        await readDb.orders.create({
          data: { id: event.data.id, status: 'pending', total: calcTotal(event.data.items) },
        });
        break;
      case 'OrderConfirmed':
        await readDb.orders.update({
          where: { id: event.aggregateId },
          data: { status: 'confirmed', confirmedAt: event.timestamp },
        });
        break;
    }
  }
}

Snapshots (for long event streams)

快照(用于长事件流)

typescript
async function loadAggregate(id: string): Promise<OrderAggregate> {
  const snapshot = await snapshotStore.getLatest(id);
  const events = await eventStore.getEvents(id, snapshot?.version ?? 0);

  const aggregate = snapshot
    ? OrderAggregate.fromSnapshot(snapshot)
    : new OrderAggregate();

  events.forEach((e) => aggregate.apply(e, false));
  return aggregate;
}
typescript
async function loadAggregate(id: string): Promise<OrderAggregate> {
  const snapshot = await snapshotStore.getLatest(id);
  const events = await eventStore.getEvents(id, snapshot?.version ?? 0);

  const aggregate = snapshot
    ? OrderAggregate.fromSnapshot(snapshot)
    : new OrderAggregate();

  events.forEach((e) => aggregate.apply(e, false));
  return aggregate;
}

Anti-Patterns

反模式

Anti-PatternFix
Querying event store for readsBuild read models via projections
No optimistic concurrencyCheck expected version on append
Deleting eventsEvents are immutable; use compensating events
Projections coupled to write modelProjections consume events independently
No snapshots for long streamsSnapshot every N events (e.g., 100)
反模式修复方案
直接查询事件存储用于读操作通过投影构建读模型
未实现乐观并发追加事件时检查预期版本
删除事件事件是不可变的;使用补偿事件
投影与写模型耦合投影独立消费事件
长事件流未使用快照每N个事件创建一次快照(例如100个)

Production Checklist

生产环境检查清单

  • Event store with optimistic concurrency
  • Projections for each read model
  • Snapshots for aggregates with many events
  • Idempotent projection handlers
  • Event versioning strategy (upcasting)
  • Monitoring: projection lag, event throughput
  • 支持乐观并发的事件存储
  • 为每个读模型配置投影
  • 为包含大量事件的聚合根配置快照
  • 实现幂等的投影处理器
  • 事件版本化策略(向上转换)
  • 监控:投影延迟、事件吞吐量