pekko-cqrs-es-implementation
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePekko CQRS/ES 実装ガイド
Pekko CQRS/ES 实现指南
Apache Pekko + Scala 3でCQRS/Event Sourcingを実装する際の具体的なパターン集。
Apache Pekko + Scala 3实现CQRS/Event Sourcing时的具体模式集。
適用条件
适用条件
このスキルは以下のすべてを満たす場合にのみ使用する:
- プログラミング言語がScala(3.x推奨)であること
- CQRS/Event Sourcingアーキテクチャを採用していること
- Apache Pekko(またはAkka)をアクターフレームワークとして使用していること
上記を満たさない場合は、、、など
言語非依存のスキルを使用すること。
cqrs-tradeoffscqrs-to-event-sourcingcqrs-aggregate-modeling本技能仅在满足以下所有条件时使用:
- 编程语言为Scala(推荐3.x版本)
- 采用CQRS/Event Sourcing架构
- 使用Apache Pekko(或Akka)作为Actor框架
不满足上述条件时,请使用、、等语言无关的技能。
cqrs-tradeoffscqrs-to-event-sourcingcqrs-aggregate-modelingアーキテクチャ概要
架构概要
システム全体のデータフロー
系统整体数据流
クライアント
│
▼
コマンドAPI(GraphQL Mutation)
│
▼
ユースケース層(ZIO)
│
▼
集約アクター(Pekko Typed)
│
▼
ドメインモデル(純粋Scala)→ イベント生成
│
▼
イベントストア(DynamoDB)
│
▼
DynamoDB Streams
│
▼
リードモデルアップデータ(AWS Lambda)
│
▼
リードモデル(PostgreSQL)
│
▼
クエリAPI(GraphQL Query)客户端
│
▼
命令API(GraphQL Mutation)
│
▼
用例层(ZIO)
│
▼
聚合Actor(Pekko Typed)
│
▼
领域模型(纯Scala)→ 生成事件
│
▼
事件存储(DynamoDB)
│
▼
DynamoDB Streams
│
▼
读模型更新器(AWS Lambda)
│
▼
读模型(PostgreSQL)
│
▼
查询API(GraphQL Query)コマンド側とクエリ側の分離
命令端与查询端分离
| コマンド側 | クエリ側 | |
|---|---|---|
| 目的 | ビジネスルール実行 | 効率的なデータ取得 |
| データストア | DynamoDB(イベントストア) | PostgreSQL(リードモデル) |
| API | GraphQL Mutation | GraphQL Query |
| レイヤー | ドメイン → ユースケース → インターフェースアダプタ | インターフェースアダプタのみ |
| 整合性 | 強い整合性(集約内) | 結果整合性 |
クエリ側にユースケース層はない。 GraphQL自体がユースケースに相当する。
| 命令端 | 查询端 | |
|---|---|---|
| 目的 | 执行业务规则 | 高效获取数据 |
| 数据存储 | DynamoDB(事件存储) | PostgreSQL(读模型) |
| API | GraphQL Mutation | GraphQL Query |
| 层级 | 领域 → 用例 → 接口适配器 | 仅接口适配器 |
| 一致性 | 强一致性(聚合内) | 最终一致性 |
查询端没有用例层,GraphQL本身就相当于用例层。
モジュール構成
模块结构
modules/
├── command/
│ ├── domain/ # ドメイン層(純粋Scala)
│ ├── use-case/ # ユースケース層(ZIO)
│ ├── interface-adapter/ # アクター、GraphQL、シリアライザ
│ ├── interface-adapter-contract/ # コマンド/リプライのプロトコル定義
│ └── interface-adapter-event-serializer/ # Protocol Buffersシリアライザ
├── query/
│ ├── interface-adapter/ # DAO(Slick)、GraphQL
│ └── flyway-migration/ # DBマイグレーション
└── infrastructure/ # 共有ユーティリティ
apps/
├── command-api/ # コマンド側HTTPサーバー
├── query-api/ # クエリ側HTTPサーバー
└── read-model-updater/ # AWS Lambdamodules/
├── command/
│ ├── domain/ # 领域层(纯Scala)
│ ├── use-case/ # 用例层(ZIO)
│ ├── interface-adapter/ # Actor、GraphQL、序列化器
│ ├── interface-adapter-contract/ # 命令/响应协议定义
│ └── interface-adapter-event-serializer/ # Protocol Buffers序列化器
├── query/
│ ├── interface-adapter/ # DAO(Slick)、GraphQL
│ └── flyway-migration/ # 数据库迁移
└── infrastructure/ # 公共工具
apps/
├── command-api/ # 命令端HTTP服务
├── query-api/ # 查询端HTTP服务
└── read-model-updater/ # AWS Lambda依存方向
依赖方向
domain ← use-case ← interface-adapter ← apps
│ ↑
│ interface-adapter-contract
│ interface-adapter-event-serializer
└── Scalaのみに依存。フレームワーク非依存。domain ← use-case ← interface-adapter ← apps
│ ↑
│ interface-adapter-contract
│ interface-adapter-event-serializer
└── 仅依赖Scala,与框架无关コマンド側実装パターン
命令端实现模式
1. ドメインモデル
1. 领域模型
原則: ドメインモデルは純粋なScalaコードで、Pekkoに一切依存しない。
原则:领域模型为纯Scala代码,完全不依赖Pekko。
集約(trait + private case class)
聚合(trait + private case class)
scala
trait UserAccount extends Entity {
override type IdType = UserAccountId
def id: UserAccountId
def name: UserAccountName
def emailAddress: EmailAddress
def createdAt: DateTime
def updatedAt: DateTime
def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]
def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}
object UserAccount {
// ファクトリ: 新しい状態とイベントのペアを返す
def apply(
id: UserAccountId,
name: UserAccountName,
emailAddress: EmailAddress,
createdAt: DateTime = DateTime.now(),
updatedAt: DateTime = DateTime.now()
): (UserAccount, UserAccountEvent) =
(
UserAccountImpl(id, false, name, emailAddress, createdAt, updatedAt),
UserAccountEvent.Created_V1(
id = DomainEventId.generate(),
entityId = id,
name = name,
emailAddress = emailAddress,
occurredAt = DateTime.now()
))
// 実装はprivateで隠蔽
private final case class UserAccountImpl(
id: UserAccountId,
deleted: Boolean,
name: UserAccountName,
emailAddress: EmailAddress,
createdAt: DateTime,
updatedAt: DateTime
) extends UserAccount {
override def rename(
newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
if (name == newName) {
Left(RenameError.FamilyNameSame)
} else {
val updated = this.copy(name = newName, updatedAt = DateTime.now())
val event = UserAccountEvent.Renamed_V1(
id = DomainEventId.generate(),
entityId = id,
oldName = name,
newName = newName,
occurredAt = DateTime.now()
)
Right((updated, event))
}
override def delete: Either[DeleteError, (UserAccount, UserAccountEvent)] =
if (deleted) {
Left(DeleteError.AlreadyDeleted)
} else {
val updated = copy(deleted = true, updatedAt = DateTime.now())
val event = UserAccountEvent.Deleted_V1(
id = DomainEventId.generate(),
entityId = id,
occurredAt = DateTime.now()
)
Right((updated, event))
}
}
}重要なパターン:
- 状態変更メソッドは を返す
Either[Error, (NewState, Event)] - ファクトリも のペアを返す
(State, Event) - 実装クラスは で外部から直接構築できない
private - ドメインモデル内でPekkoのimportは一切ない
scala
trait UserAccount extends Entity {
override type IdType = UserAccountId
def id: UserAccountId
def name: UserAccountName
def emailAddress: EmailAddress
def createdAt: DateTime
def updatedAt: DateTime
def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]
def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}
object UserAccount {
// 工厂方法:返回新状态与事件的对
def apply(
id: UserAccountId,
name: UserAccountName,
emailAddress: EmailAddress,
createdAt: DateTime = DateTime.now(),
updatedAt: DateTime = DateTime.now()
): (UserAccount, UserAccountEvent) =
(
UserAccountImpl(id, false, name, emailAddress, createdAt, updatedAt),
UserAccountEvent.Created_V1(
id = DomainEventId.generate(),
entityId = id,
name = name,
emailAddress = emailAddress,
occurredAt = DateTime.now()
))
// 实现类用private隐藏,禁止外部直接构造
private final case class UserAccountImpl(
id: UserAccountId,
deleted: Boolean,
name: UserAccountName,
emailAddress: EmailAddress,
createdAt: DateTime,
updatedAt: DateTime
) extends UserAccount {
override def rename(
newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
if (name == newName) {
Left(RenameError.FamilyNameSame)
} else {
val updated = this.copy(name = newName, updatedAt = DateTime.now())
val event = UserAccountEvent.Renamed_V1(
id = DomainEventId.generate(),
entityId = id,
oldName = name,
newName = newName,
occurredAt = DateTime.now()
)
Right((updated, event))
}
override def delete: Either[DeleteError, (UserAccount, UserAccountEvent)] =
if (deleted) {
Left(DeleteError.AlreadyDeleted)
} else {
val updated = copy(deleted = true, updatedAt = DateTime.now())
val event = UserAccountEvent.Deleted_V1(
id = DomainEventId.generate(),
entityId = id,
occurredAt = DateTime.now()
)
Right((updated, event))
}
}
}核心模式:
- 状态变更方法返回
Either[Error, (NewState, Event)] - 工厂方法也返回 对
(State, Event) - 实现类为 ,禁止外部直接构造
private - 领域模型内完全不引入Pekko相关依赖
イベント定義(enum + バージョニング)
事件定义(enum + 版本控制)
scala
enum UserAccountEvent extends DomainEvent {
override type EntityIdType = UserAccountId
case Created_V1(
id: DomainEventId,
entityId: UserAccountId,
name: UserAccountName,
emailAddress: EmailAddress,
occurredAt: DateTime
)
case Renamed_V1(
id: DomainEventId,
entityId: UserAccountId,
oldName: UserAccountName,
newName: UserAccountName,
occurredAt: DateTime
)
case Deleted_V1(
id: DomainEventId,
entityId: UserAccountId,
occurredAt: DateTime
)
}イベント設計ルール:
| ルール | 説明 | 例 |
|---|---|---|
| 過去形で命名 | 「何が起きたか」を表す | |
| スキーマ進化に対応 | |
| 不変 | case classで自動的に保証 | |
| 自己完結 | 変更前後の値を含む | |
| 必須フィールド | | すべてのイベントに共通 |
scala
enum UserAccountEvent extends DomainEvent {
override type EntityIdType = UserAccountId
case Created_V1(
id: DomainEventId,
entityId: UserAccountId,
name: UserAccountName,
emailAddress: EmailAddress,
occurredAt: DateTime
)
case Renamed_V1(
id: DomainEventId,
entityId: UserAccountId,
oldName: UserAccountName,
newName: UserAccountName,
occurredAt: DateTime
)
case Deleted_V1(
id: DomainEventId,
entityId: UserAccountId,
occurredAt: DateTime
)
}事件设计规则:
| 规则 | 说明 | 示例 |
|---|---|---|
| 过去式命名 | 表达「已发生的事件」 | |
带 | 适配Schema演进 | |
| 不可变 | 由case class自动保证 | |
| 自包含 | 包含变更前后的所有值 | |
| 必填字段 | | 所有事件通用 |
2. 集約の状態(enum)
2. 聚合状态(enum)
scala
enum UserAccountAggregateState {
case NotCreated(id: UserAccountId)
case Created(user: UserAccount)
case Deleted(user: UserAccount)
def applyEvent(event: UserAccountEvent): UserAccountAggregateState = (this, event) match {
case (NotCreated(id), UserAccountEvent.Created_V1(_, entityId, name, emailAddress, _))
if id == entityId =>
Created(UserAccount(entityId, name, emailAddress)._1)
case (Created(user), UserAccountEvent.Renamed_V1(_, entityId, _, newName, _))
if user.id == entityId =>
Created(user.rename(newName) match {
case Right((u, _)) => u
case Left(error) =>
throw new IllegalStateException(s"Failed to rename user: $error")
})
case (Created(user), UserAccountEvent.Deleted_V1(_, entityId, _))
if user.id == entityId =>
Deleted(user.delete match {
case Right((deletedUser, _)) => deletedUser
case Left(error) =>
throw new IllegalStateException(s"Failed to delete user: $error")
})
case _ =>
throw new IllegalStateException(s"Cannot apply event $event to state $this")
}
}状態遷移の型安全性:
- →
NotCreated(Created_V1イベントのみ)Created - →
Created(Renamed_V1)/Created(Deleted_V1)Deleted - → 遷移なし(どのイベントも受け付けない)
Deleted
scala
enum UserAccountAggregateState {
case NotCreated(id: UserAccountId)
case Created(user: UserAccount)
case Deleted(user: UserAccount)
def applyEvent(event: UserAccountEvent): UserAccountAggregateState = (this, event) match {
case (NotCreated(id), UserAccountEvent.Created_V1(_, entityId, name, emailAddress, _))
if id == entityId =>
Created(UserAccount(entityId, name, emailAddress)._1)
case (Created(user), UserAccountEvent.Renamed_V1(_, entityId, _, newName, _))
if user.id == entityId =>
Created(user.rename(newName) match {
case Right((u, _)) => u
case Left(error) =>
throw new IllegalStateException(s"Failed to rename user: $error")
})
case (Created(user), UserAccountEvent.Deleted_V1(_, entityId, _))
if user.id == entityId =>
Deleted(user.delete match {
case Right((deletedUser, _)) => deletedUser
case Left(error) =>
throw new IllegalStateException(s"Failed to delete user: $error")
})
case _ =>
throw new IllegalStateException(s"Cannot apply event $event to state $this")
}
}状态迁移的类型安全:
- →
NotCreated(仅接受Created_V1事件)Created - →
Created(Renamed_V1)/Created(Deleted_V1)Deleted - → 无迁移(不接受任何事件)
Deleted
3. プロトコル定義(コマンド/リプライ)
3. 协议定义(命令/响应)
scala
object UserAccountProtocol {
// コマンド: すべてidを持つ
sealed trait Command { def id: UserAccountId }
final case class Create(
id: UserAccountId, name: UserAccountName,
emailAddress: EmailAddress, replyTo: ActorRef[CreateReply]) extends Command
final case class Rename(
id: UserAccountId, newName: UserAccountName,
replyTo: ActorRef[RenameReply]) extends Command
final case class Delete(
id: UserAccountId, replyTo: ActorRef[DeleteReply]) extends Command
final case class Get(
id: UserAccountId, replyTo: ActorRef[GetReply]) extends Command
// リプライ: コマンドごとに専用の型
sealed trait CreateReply
final case class CreateSucceeded(id: UserAccountId) extends CreateReply
sealed trait RenameReply
final case class RenameSucceeded(id: UserAccountId) extends RenameReply
final case class RenameFailed(id: UserAccountId, reason: RenameError) extends RenameReply
sealed trait DeleteReply
final case class DeleteSucceeded(id: UserAccountId) extends DeleteReply
final case class DeleteFailed(id: UserAccountId, reason: DeleteError) extends DeleteReply
sealed trait GetReply
final case class GetSucceeded(value: UserAccount) extends GetReply
final case class GetNotFoundFailed(id: UserAccountId) extends GetReply
}プロトコル設計ルール:
- コマンドはすべて を継承し、
sealed trait Commandを持つid - リプライはコマンドごとに専用の を定義する(
sealed trait,CreateReply等)RenameReply - で型安全な応答を保証
replyTo: ActorRef[XxxReply] - 成功/失敗をcase classで表現し、パターンマッチで網羅性チェック
scala
object UserAccountProtocol {
// 命令:所有命令都携带id
sealed trait Command { def id: UserAccountId }
final case class Create(
id: UserAccountId, name: UserAccountName,
emailAddress: EmailAddress, replyTo: ActorRef[CreateReply]) extends Command
final case class Rename(
id: UserAccountId, newName: UserAccountName,
replyTo: ActorRef[RenameReply]) extends Command
final case class Delete(
id: UserAccountId, replyTo: ActorRef[DeleteReply]) extends Command
final case class Get(
id: UserAccountId, replyTo: ActorRef[GetReply]) extends Command
// 响应:每个命令对应专用的响应类型
sealed trait CreateReply
final case class CreateSucceeded(id: UserAccountId) extends CreateReply
sealed trait RenameReply
final case class RenameSucceeded(id: UserAccountId) extends RenameReply
final case class RenameFailed(id: UserAccountId, reason: RenameError) extends RenameReply
sealed trait DeleteReply
final case class DeleteSucceeded(id: UserAccountId) extends DeleteReply
final case class DeleteFailed(id: UserAccountId, reason: DeleteError) extends DeleteReply
sealed trait GetReply
final case class GetSucceeded(value: UserAccount) extends GetReply
final case class GetNotFoundFailed(id: UserAccountId) extends GetReply
}协议设计规则:
- 所有命令继承,且携带
sealed trait Command字段id - 每个命令对应独立的响应类型(如
sealed trait、CreateReply)RenameReply - 通过保证类型安全的响应
replyTo: ActorRef[XxxReply] - 用case class表达成功/失败状态,通过模式匹配做完整性检查
4. 集約アクター(PersistenceEffector)
4. 聚合Actor(PersistenceEffector)
scala
object UserAccountAggregate {
// 状態ごとにハンドラ関数を分離
private def handleNotCreated(
state: UserAccountAggregateState.NotCreated,
effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
): Behavior[Command] = Behaviors.receiveMessagePartial {
case Create(id, name, emailAddress, replyTo) if state.id == id =>
val (newState, event) = UserAccount(id, name, emailAddress)
effector.persistEvent(event) { _ =>
replyTo ! CreateSucceeded(id)
handleCreated(UserAccountAggregateState.Created(newState), effector)
}
case Get(id, replyTo) if state.id == id =>
replyTo ! GetNotFoundFailed(id)
Behaviors.same
}
private def handleCreated(
state: UserAccountAggregateState.Created,
effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
): Behavior[Command] = Behaviors.receiveMessagePartial {
case Rename(id, newName, replyTo) if state.user.id == id =>
// ドメインモデルに委譲
state.user.rename(newName) match {
case Left(reason) =>
replyTo ! RenameFailed(id, reason)
Behaviors.same
case Right((newUser, event)) =>
effector.persistEvent(event) { _ =>
replyTo ! RenameSucceeded(id)
handleCreated(state.copy(user = newUser), effector)
}
}
// ... Delete, Get も同様
}
// エントリポイント
def apply(id: UserAccountId): Behavior[Command] = {
val config = PersistenceEffectorConfig
.create[UserAccountAggregateState, UserAccountEvent, Command](
persistenceId = s"${id.entityTypeName}-${id.asString}",
initialState = UserAccountAggregateState.NotCreated(id),
applyEvent = (state, event) => state.applyEvent(event)
)
.withPersistenceMode(PersistenceMode.Persisted)
.withSnapshotCriteria(SnapshotCriteria.every(1000))
.withRetentionCriteria(RetentionCriteria.snapshotEvery(2))
Behaviors.setup[Command] { implicit ctx =>
Behaviors
.supervise(
PersistenceEffector.fromConfig(config) {
case (state: UserAccountAggregateState.NotCreated, effector) =>
handleNotCreated(state, effector)
case (state: UserAccountAggregateState.Created, effector) =>
handleCreated(state, effector)
case (state: UserAccountAggregateState.Deleted, effector) =>
handleDeleted(state, effector)
})
.onFailure[IllegalArgumentException](SupervisorStrategy.restart)
}
}
}アクター実装ルール:
- ドメインロジックをアクターに書かない。 アクターは永続化とライフサイクル管理に徹する
- 状態ごとにハンドラ関数を分離し、受け付けるコマンドを限定する
- でイベント永続化後にリプライ
effector.persistEvent(event) { _ => ... } - ドメインモデルが を返したらリプライのみ(永続化しない)
Left - ドメインモデルが を返したらイベントを永続化してからリプライ
Right
scala
object UserAccountAggregate {
// 按状态拆分处理函数
private def handleNotCreated(
state: UserAccountAggregateState.NotCreated,
effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
): Behavior[Command] = Behaviors.receiveMessagePartial {
case Create(id, name, emailAddress, replyTo) if state.id == id =>
val (newState, event) = UserAccount(id, name, emailAddress)
effector.persistEvent(event) { _ =>
replyTo ! CreateSucceeded(id)
handleCreated(UserAccountAggregateState.Created(newState), effector)
}
case Get(id, replyTo) if state.id == id =>
replyTo ! GetNotFoundFailed(id)
Behaviors.same
}
private def handleCreated(
state: UserAccountAggregateState.Created,
effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
): Behavior[Command] = Behaviors.receiveMessagePartial {
case Rename(id, newName, replyTo) if state.user.id == id =>
// 逻辑委托给领域模型
state.user.rename(newName) match {
case Left(reason) =>
replyTo ! RenameFailed(id, reason)
Behaviors.same
case Right((newUser, event)) =>
effector.persistEvent(event) { _ =>
replyTo ! RenameSucceeded(id)
handleCreated(state.copy(user = newUser), effector)
}
}
// ... Delete、Get逻辑同理
}
// 入口方法
def apply(id: UserAccountId): Behavior[Command] = {
val config = PersistenceEffectorConfig
.create[UserAccountAggregateState, UserAccountEvent, Command](
persistenceId = s"${id.entityTypeName}-${id.asString}",
initialState = UserAccountAggregateState.NotCreated(id),
applyEvent = (state, event) => state.applyEvent(event)
)
.withPersistenceMode(PersistenceMode.Persisted)
.withSnapshotCriteria(SnapshotCriteria.every(1000))
.withRetentionCriteria(RetentionCriteria.snapshotEvery(2))
Behaviors.setup[Command] { implicit ctx =>
Behaviors
.supervise(
PersistenceEffector.fromConfig(config) {
case (state: UserAccountAggregateState.NotCreated, effector) =>
handleNotCreated(state, effector)
case (state: UserAccountAggregateState.Created, effector) =>
handleCreated(state, effector)
case (state: UserAccountAggregateState.Deleted, effector) =>
handleDeleted(state, effector)
})
.onFailure[IllegalArgumentException](SupervisorStrategy.restart)
}
}
}Actor实现规则:
- 不要在Actor中编写领域逻辑,Actor仅负责持久化和生命周期管理
- 按状态拆分处理函数,限定可接收的命令范围
- 通过实现事件持久化后再返回响应
effector.persistEvent(event) { _ => ... } - 领域模型返回时仅返回错误响应,不做持久化
Left - 领域模型返回时先持久化事件再返回成功响应
Right
5. ユースケース層(ZIO)
5. 用例层(ZIO)
scala
private[users] final class UserAccountUseCaseImpl(
userAccountAggregateRef: ActorRef[UserAccountProtocol.Command]
)(implicit
timeout: Timeout,
scheduler: Scheduler,
ec: ExecutionContext
) extends UserAccountUseCase {
override def createUserAccount(
userAccountName: UserAccountName,
emailAddress: EmailAddress
): IO[UserAccountUseCaseError, UserAccountId] =
for {
userAccountId <- ZIO.succeed(UserAccountId.generate())
reply <- askActor[UserAccountProtocol.CreateReply] { replyTo =>
UserAccountProtocol.Create(
id = userAccountId,
name = userAccountName,
emailAddress = emailAddress,
replyTo = replyTo
)
}.mapError(e => UserAccountUseCaseError.UnexpectedError(e.getMessage, Some(e)))
result <- reply match {
case UserAccountProtocol.CreateSucceeded(id) => ZIO.succeed(id)
}
} yield result
private def askActor[R](
createMessage: ActorRef[R] => UserAccountProtocol.Command
): Task[R] =
PekkoInterop.fromFuture { userAccountAggregateRef.ask(createMessage) }
}ユースケース層のルール:
- ビジネスロジックを書く場所ではない。 処理ステップの調整役に徹する
- ZIOのfor式でアクターとの通信を型安全に記述
- ヘルパーでPekko Ask → ZIO Task変換を共通化
askActor - エラーは にマッピング
UserAccountUseCaseError
scala
private[users] final class UserAccountUseCaseImpl(
userAccountAggregateRef: ActorRef[UserAccountProtocol.Command]
)(implicit
timeout: Timeout,
scheduler: Scheduler,
ec: ExecutionContext
) extends UserAccountUseCase {
override def createUserAccount(
userAccountName: UserAccountName,
emailAddress: EmailAddress
): IO[UserAccountUseCaseError, UserAccountId] =
for {
userAccountId <- ZIO.succeed(UserAccountId.generate())
reply <- askActor[UserAccountProtocol.CreateReply] { replyTo =>
UserAccountProtocol.Create(
id = userAccountId,
name = userAccountName,
emailAddress = emailAddress,
replyTo = replyTo
)
}.mapError(e => UserAccountUseCaseError.UnexpectedError(e.getMessage, Some(e)))
result <- reply match {
case UserAccountProtocol.CreateSucceeded(id) => ZIO.succeed(id)
}
} yield result
private def askActor[R](
createMessage: ActorRef[R] => UserAccountProtocol.Command
): Task[R] =
PekkoInterop.fromFuture { userAccountAggregateRef.ask(createMessage) }
}用例层规则:
- 不要在此处编写业务逻辑,仅承担处理流程的协调角色
- 通过ZIO的for表达式类型安全地编写与Actor的通信逻辑
- 通过工具方法统一实现Pekko Ask到ZIO Task的转换
askActor - 所有错误映射为类型
UserAccountUseCaseError
シリアライズ
序列化
Protocol Buffersによるイベントシリアライズ
基于Protocol Buffers的事件序列化
protobuf
// event.proto
syntax = "proto3";
message UserAccountCreatedV1 {
string id = 1;
string entity_id = 2;
string name = 3;
string email_address = 4;
string occurred_at = 5;
}
message UserAccountRenamedV1 {
string id = 1;
string entity_id = 2;
string old_name = 3;
string new_name = 4;
string occurred_at = 5;
}シリアライズ方針:
- イベントとスナップショットの両方をProtocol Buffersで定義
- ScalaPBでScalaコードを自動生成
- カスタムシリアライザでドメインイベント ↔ Protobufメッセージを変換
- バージョニングはProtobufのフィールド番号で後方互換性を確保
protobuf
// event.proto
syntax = "proto3";
message UserAccountCreatedV1 {
string id = 1;
string entity_id = 2;
string name = 3;
string email_address = 4;
string occurred_at = 5;
}
message UserAccountRenamedV1 {
string id = 1;
string entity_id = 2;
string old_name = 3;
string new_name = 4;
string occurred_at = 5;
}序列化策略:
- 事件和快照都通过Protocol Buffers定义
- 使用ScalaPB自动生成Scala代码
- 通过自定义序列化器实现领域事件 ↔ Protobuf消息的转换
- 利用Protobuf的字段编号保证版本向后兼容
スナップショット戦略
快照策略
| 設定 | 値 | 理由 |
|---|---|---|
| 保存頻度 | 1000イベントごと | 起動時間とストレージのバランス |
| 保持数 | 最新2つ | リカバリ安全性の確保 |
| 配置 | 值 | 原因 |
|---|---|---|
| 保存频率 | 每1000个事件 | 平衡启动时间与存储成本 |
| 保留数量 | 最新2个 | 保证恢复安全性 |
テスト戦略
测试策略
ドメインモデルテスト(Pekko非依存)
领域模型测试(无Pekko依赖)
scala
test("ユーザー名を変更できる") {
val (user, _) = UserAccount(id, name, email)
val result = user.rename(newName)
result match {
case Right((updated, event)) =>
updated.name shouldBe newName
event shouldBe a[UserAccountEvent.Renamed_V1]
case Left(error) => fail(s"Unexpected error: $error")
}
}scala
test("可修改用户名") {
val (user, _) = UserAccount(id, name, email)
val result = user.rename(newName)
result match {
case Right((updated, event)) =>
updated.name shouldBe newName
event shouldBe a[UserAccountEvent.Renamed_V1]
case Left(error) => fail(s"Unexpected error: $error")
}
}アクターテスト(ActorTestKit)
Actor测试(ActorTestKit)
scala
// PersistenceEffectorのテスト
val probe = testKit.createTestProbe[CreateReply]()
aggregateRef ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))scala
// PersistenceEffector测试
val probe = testKit.createTestProbe[CreateReply]()
aggregateRef ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))テストの分離
测试分层
| レベル | 対象 | ツール | Pekko依存 |
|---|---|---|---|
| ドメイン | ビジネスロジック | ScalaTest | なし |
| アクター | メッセージング、永続化 | ActorTestKit | あり |
| シリアライザ | イベント/スナップショットの変換 | ScalaTest | なし |
| 統合 | エンドツーエンド | LocalStack | あり |
| 层级 | 测试对象 | 工具 | 是否依赖Pekko |
|---|---|---|---|
| 领域层 | 业务逻辑 | ScalaTest | 否 |
| Actor层 | 消息处理、持久化逻辑 | ActorTestKit | 是 |
| 序列化层 | 事件/快照转换逻辑 | ScalaTest | 否 |
| 集成测试 | 端到端流程 | LocalStack | 是 |
関連スキルとの使い分け
与相关技能的区分使用
| スキル | フォーカス | 使うタイミング |
|---|---|---|
| 本スキル | Pekko + Scala 3での具体的実装 | CQRS/ESをScalaで実装するとき |
| cqrs-tradeoffs | CQRS採用判断のトレードオフ分析 | CQRS導入の是非を検討するとき |
| cqrs-to-event-sourcing | なぜESが必要になるかの論理的説明 | ESの必然性を理解したいとき |
| cqrs-aggregate-modeling | CQRS導入時の集約境界再定義 | 集約の粒度を見直すとき |
| aggregate-design | 集約設計ルール全般 | 集約の新規設計やレビューのとき |
| domain-building-blocks | VO/Entity/Aggregate等の設計 | ドメインモデル全体を設計するとき |
| 技能 | 侧重点 | 使用场景 |
|---|---|---|
| 本技能 | Pekko + Scala 3的具体实现 | 用Scala实现CQRS/ES时 |
| cqrs-tradeoffs | CQRS选型的利弊分析 | 评估是否引入CQRS时 |
| cqrs-to-event-sourcing | 事件溯源的必要性说明 | 需要理解ES的适用场景时 |
| cqrs-aggregate-modeling | CQRS落地时的聚合边界重定义 | 调整聚合粒度时 |
| aggregate-design | 通用聚合设计规则 | 新建或评审聚合设计时 |
| domain-building-blocks | VO/Entity/Aggregate等基础构件设计 | 设计整体领域模型时 |
参考文献
参考文献
- かとじゅん「Apache Pekko(もしくはAkka)で実現するCQRS/Event Sourcingシステム設計の完全開発ガイド」 - https://tech-book.precena.co.jp/entry/pekko-cqrs-es-guide
- リファレンス実装: https://github.com/j5ik2o/pekko-cqrs-es-example
- pekko-persistence-effector: https://github.com/j5ik2o/pekko-persistence-effector
- 加藤淳《Apache Pekko(或Akka)实现CQRS/Event Sourcing系统设计完全开发指南》 - https://tech-book.precena.co.jp/entry/pekko-cqrs-es-guide
- 参考实现: https://github.com/j5ik2o/pekko-cqrs-es-example
- pekko-persistence-effector: https://github.com/j5ik2o/pekko-persistence-effector
関連スキル(併読推奨)
相关技能(推荐搭配阅读)
このスキルを使用する際は、以下のスキルも併せて参照すること:
- : CQRS/ES採用判断のトレードオフ分析
cqrs-tradeoffs - : CQRSからイベントソーシングへの必然性
cqrs-to-event-sourcing - : ドメインモデルとしての集約設計ルール
aggregate-design
使用本技能时,建议同时参考以下技能:
- : CQRS/ES选型的利弊分析
cqrs-tradeoffs - : 从CQRS到事件溯源的必要性
cqrs-to-event-sourcing - : 作为领域模型的聚合设计规则
aggregate-design