atlas-stream-processing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

MongoDB Atlas Streams

MongoDB Atlas Streams

Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.
通过MongoDB MCP Server提供的4款MCP工具构建、运维和调试Atlas Stream Processing (ASP) 管道。

Prerequisites

前置要求

This skill requires the MongoDB MCP Server connected with:
  • Atlas API credentials (
    apiClientId
    and
    apiClientSecret
    )
The 4 tools:
atlas-streams-discover
,
atlas-streams-build
,
atlas-streams-manage
,
atlas-streams-teardown
.
All operations require an Atlas project ID. If unknown, call
atlas-list-projects
first to find your project ID.
使用本功能需要MongoDB MCP Server已连接以下凭证:
  • Atlas API凭证(
    apiClientId
    apiClientSecret
4款工具分别是:
atlas-streams-discover
atlas-streams-build
atlas-streams-manage
atlas-streams-teardown
所有操作都需要Atlas项目ID。 如果不知道项目ID,先调用
atlas-list-projects
查询。

If MCP tools are unavailable

MCP工具不可用时的处理

If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.
如果MongoDB MCP Server未连接或缺少流处理工具,查看references/mcp-troubleshooting.md获取诊断步骤和备选方案。

Tool Selection Matrix

工具选择矩阵

atlas-streams-discover — ALL read operations

atlas-streams-discover — 所有读操作

ActionUse when
list-workspaces
See all workspaces in a project
inspect-workspace
Review workspace config, state, region
list-connections
See all connections in a workspace
inspect-connection
Check connection state, config, health
list-processors
See all processors in a workspace
inspect-processor
Check processor state, pipeline, config
diagnose-processor
Full health report: state, stats, errors
get-networking
PrivateLink and VPC peering details. Optional:
cloudProvider
+
region
to get Atlas account details for PrivateLink setup
Pagination (all list actions):
limit
(1-100, default 20),
pageNum
(default 1). Response format:
responseFormat
"concise"
(default for list actions) or
"detailed"
(default for inspect/diagnose).
操作适用场景
list-workspaces
查看项目下的所有工作空间
inspect-workspace
查看工作空间的配置、状态、区域信息
list-connections
查看工作空间下的所有连接
inspect-connection
检查连接的状态、配置、健康度
list-processors
查看工作空间下的所有处理器
inspect-processor
检查处理器的状态、管道、配置
diagnose-processor
获取完整健康报告:状态、统计数据、错误信息
get-networking
获取PrivateLink和VPC对等连接详情。可选参数:
cloudProvider
+
region
,用于获取Atlas账户的PrivateLink配置信息
分页(所有列表操作适用):
limit
(取值1-100,默认20)、
pageNum
(默认1)。 响应格式
responseFormat
可选值为
"concise"
(列表操作默认)或
"detailed"
(查询/诊断操作默认)。

atlas-streams-build — ALL create operations

atlas-streams-build — 所有创建操作

ResourceKey parameters
workspace
cloudProvider
,
region
,
tier
(default SP10),
includeSampleData
connection
connectionName
,
connectionType
(Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample),
connectionConfig
processor
processorName
,
pipeline
(must start with
$source
, end with
$merge
/
$emit
),
dlq
,
autoStart
privatelink
privateLinkConfig
(project-level, not tied to a specific workspace)
Field mapping — only fill fields for the selected resource type:
  • resource = "workspace": Fill:
    projectId
    ,
    workspaceName
    ,
    cloudProvider
    ,
    region
    ,
    tier
    ,
    includeSampleData
    . Leave empty: all connection and processor fields.
  • resource = "connection": Fill:
    projectId
    ,
    workspaceName
    ,
    connectionName
    ,
    connectionType
    ,
    connectionConfig
    . Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.)
  • resource = "processor": Fill:
    projectId
    ,
    workspaceName
    ,
    processorName
    ,
    pipeline
    ,
    dlq
    (recommended),
    autoStart
    (optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.)
  • resource = "privatelink": Fill:
    projectId
    ,
    privateLinkConfig
    . Note: PrivateLink is project-level, not workspace-level.
    workspaceName
    is not required — omit it. Leave empty: all connection and processor fields.
资源核心参数
workspace
cloudProvider
region
tier
(默认SP10)、
includeSampleData
connection
connectionName
connectionType
(Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample)、
connectionConfig
processor
processorName
pipeline
(必须以
$source
开头,以
$merge
/
$emit
结尾)、
dlq
autoStart
privatelink
privateLinkConfig
(项目级别,不绑定到特定工作空间)
字段映射规则 — 仅填写所选资源类型对应的字段:
  • resource = "workspace": 填写字段:
    projectId
    workspaceName
    cloudProvider
    region
    tier
    includeSampleData
    。留空所有连接和处理器相关字段。
  • resource = "connection": 填写字段:
    projectId
    workspaceName
    connectionName
    connectionType
    connectionConfig
    。留空所有工作空间和处理器相关字段。(不同连接类型的配置 schema 参见references/connection-configs.md
  • resource = "processor": 填写字段:
    projectId
    workspaceName
    processorName
    pipeline
    dlq
    (推荐)、
    autoStart
    (可选)。留空所有工作空间和连接相关字段。(管道示例参见references/pipeline-patterns.md
  • resource = "privatelink": 填写字段:
    projectId
    privateLinkConfig
    。注意:PrivateLink是项目级别资源,不属于工作空间,不需要填写
    workspaceName
    。留空所有连接和处理器相关字段。

atlas-streams-manage — ALL update/state operations

atlas-streams-manage — 所有更新/状态操作

ActionNotes
start-processor
Begins billing. Optional
tier
override,
resumeFromCheckpoint
stop-processor
Stops billing. Retains state 45 days
modify-processor
Processor must be stopped first. Change pipeline, DLQ, or name
update-workspace
Change tier or region
update-connection
Update config (networking is immutable — must delete and recreate)
accept-peering
/
reject-peering
VPC peering management
Field mapping — always fill
projectId
,
workspaceName
, then by action:
  • "start-processor"
    resourceName
    . Optional:
    tier
    ,
    resumeFromCheckpoint
    ,
    startAtOperationTime
    (ISO 8601 timestamp to resume from a specific point)
  • "stop-processor"
    resourceName
  • "modify-processor"
    resourceName
    . At least one of:
    pipeline
    ,
    dlq
    ,
    newName
  • "update-workspace"
    newRegion
    or
    newTier
  • "update-connection"
    resourceName
    ,
    connectionConfig
    . Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate.
  • "accept-peering"
    peeringId
    ,
    requesterAccountId
    ,
    requesterVpcId
  • "reject-peering"
    peeringId
State pre-checks:
  • start-processor
    → errors if processor is already STARTED
  • stop-processor
    → no-ops if already STOPPED or CREATED (not an error)
  • modify-processor
    → errors if processor is STARTED (must stop first)
Processor states:
CREATED
STARTED
(via start) →
STOPPED
(via stop). Can also enter
FAILED
on runtime errors. Modify requires STOPPED or CREATED state.
Teardown safety checks:
  • Processor deletion → auto-stops before deleting (no need to stop manually first)
  • Connection deletion → blocks if any running processor references it. Stop/delete referencing processors first.
  • Workspace deletion → See detailed workflow below (lines 108-111).
操作说明
start-processor
开始计费。可选参数:
tier
规格覆盖、
resumeFromCheckpoint
从检查点恢复
stop-processor
停止计费。状态保留45天
modify-processor
必须先停止处理器。可修改管道、DLQ或名称
update-workspace
修改层级规格或区域
update-connection
更新配置(网络配置不可修改,必须删除重建)
accept-peering
/
reject-peering
VPC对等连接管理
字段映射规则 — 始终填写
projectId
workspaceName
,再根据操作填写对应字段:
  • "start-processor"
    resourceName
    。可选参数:
    tier
    resumeFromCheckpoint
    startAtOperationTime
    (ISO 8601格式时间戳,用于从特定时间点恢复)
  • "stop-processor"
    resourceName
  • "modify-processor"
    resourceName
    。至少填写以下一个字段:
    pipeline
    dlq
    newName
  • "update-workspace"
    newRegion
    newTier
  • "update-connection"
    resourceName
    connectionConfig
    例外:网络配置(如PrivateLink)创建后不可修改,需要删除重建。
  • "accept-peering"
    peeringId
    requesterAccountId
    requesterVpcId
  • "reject-peering"
    peeringId
状态前置检查:
  • start-processor
    → 如果处理器已经处于STARTED状态会报错
  • stop-processor
    → 如果处理器已经处于STOPPED或CREATED状态,操作无响应(不会报错)
  • modify-processor
    → 如果处理器处于STARTED状态会报错,必须先停止
处理器状态:
CREATED
STARTED
(通过start操作) →
STOPPED
(通过stop操作)。运行时出错会进入
FAILED
状态。修改操作需要处理器处于STOPPED或CREATED状态。
销毁安全检查:
  • 处理器删除 → 删除前自动停止,无需手动停止
  • 连接删除 → 如果有运行中的处理器引用该连接,删除操作会被阻止。需要先停止/删除引用该连接的处理器
  • 工作空间删除 → 参见下文详细流程(第108-111行)。

atlas-streams-teardown — ALL delete operations

atlas-streams-teardown — 所有删除操作

ResourceSafety behavior
processor
Auto-stops before deleting
connection
Blocks if referenced by running processor
workspace
Cascading delete of all connections and processors
privatelink
/
peering
Remove networking resources
Field mapping — always fill
projectId
,
resource
, then:
  • resource: "workspace"
    workspaceName
  • resource: "connection"
    or
    "processor"
    workspaceName
    ,
    resourceName
  • resource: "privatelink"
    or
    "peering"
    resourceName
    (the ID). These are project-level resources, not tied to a specific workspace.
Before deleting a workspace, inspect it first:
  1. atlas-streams-discover
    inspect-workspace
    — get connection/processor counts
  2. Present to user: "Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?"
  3. Wait for confirmation before calling
    atlas-streams-teardown
资源安全机制
processor
删除前自动停止
connection
如果被运行中的处理器引用则阻止删除
workspace
级联删除所有连接和处理器
privatelink
/
peering
移除网络资源
字段映射规则 — 始终填写
projectId
resource
,再根据资源类型填写对应字段:
  • resource: "workspace"
    workspaceName
  • resource: "connection"
    "processor"
    workspaceName
    resourceName
  • resource: "privatelink"
    "peering"
    resourceName
    (资源ID)。这些是项目级别资源,不绑定到特定工作空间。
删除工作空间前,需要先进行查询:
  1. 调用
    atlas-streams-discover
    inspect-workspace
    接口,获取连接和处理器数量
  2. 向用户确认:"工作空间X包含N个连接和M个处理器,删除后将永久移除所有资源,是否继续?"
  3. 获得用户确认后再调用
    atlas-streams-teardown

CRITICAL: Validate Before Creating Processors

重要提示:创建处理器前必须校验

You MUST call
search-knowledge
before composing any processor pipeline.
This is not optional.
  • Field validation: Query with the sink/source type, e.g. "Atlas Stream Processing $emit S3 fields" or "Atlas Stream Processing Kafka $source configuration". This catches errors like
    prefix
    vs
    path
    for S3
    $emit
    .
  • Pattern examples: Query with
    dataSources: [{"name": "devcenter"}]
    for working pipelines, e.g. "Atlas Stream Processing tumbling window example".
Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with
example_processors/README.md
for the full pattern catalog.
Key quickstarts:
QuickstartPattern
00_hello_world.json
Inline
$source.documents
with
$match
(zero infra, ephemeral)
01_changestream_basic.json
Change stream → tumbling window →
$merge
to Atlas
03_kafka_to_mongo.json
Kafka source → tumbling window rollup →
$merge
to Atlas
04_mongo_to_mongo.json
Chained processors: rollup → archive to separate collection
05_kafka_tail.json
Real-time Kafka topic monitoring (sinkless, like
tail -f
)
编写任何处理器管道前必须调用
search-knowledge
,这是强制要求。
  • 字段校验: 根据sink/source类型查询,例如"Atlas Stream Processing $emit S3 fields"或"Atlas Stream Processing Kafka $source configuration",可提前发现类似S3
    $emit
    应该用
    path
    而不是
    prefix
    的错误。
  • 模式示例: 携带
    dataSources: [{"name": "devcenter"}]
    参数查询可用管道,例如"Atlas Stream Processing tumbling window example"。
核心快速入门示例:
快速入门文件模式
00_hello_world.json
内置
$source.documents
+
$match
(无需基础设施,临时运行)
01_changestream_basic.json
变更流 → 滚动窗口 →
$merge
到Atlas
03_kafka_to_mongo.json
Kafka源 → 滚动窗口聚合 →
$merge
到Atlas
04_mongo_to_mongo.json
链式处理器:聚合 → 归档到独立集合
05_kafka_tail.json
实时Kafka Topic监控(无sink,类似
tail -f
命令)

Pipeline Rules & Warnings

管道规则与注意事项

Invalid constructs — these are NOT valid in streaming pipelines:
  • $$NOW
    ,
    $$ROOT
    ,
    $$CURRENT
    — NOT available in stream processing. NEVER use these. Use the document's own timestamp field or
    _stream_meta
    metadata for event time instead of
    $$NOW
    .
  • HTTPS connections as
    $source
    — HTTPS is for
    $https
    enrichment or sink only, NOT as a data source
  • Kafka
    $source
    without
    topic
    — topic field is required
  • Pipelines without a sink — terminal stage (
    $merge
    ,
    $emit
    ,
    $https
    , or
    $externalFunction
    async) required for deployed processors (sinkless only works via
    sp.process()
    )
  • Lambda as
    $emit
    target
    — Lambda uses
    $externalFunction
    (mid-pipeline enrichment), not
    $emit
  • $validate
    with
    validationAction: "error"
    — crashes processor; use
    "dlq"
    instead
Required fields by stage:
  • $source
    (change stream)
    : include
    fullDocument: "updateLookup"
    to get the full document content
  • $source
    (Kinesis)
    : use
    stream
    (NOT
    streamName
    or
    topic
    )
  • $emit
    (Kinesis)
    : MUST include
    partitionKey
  • $emit
    (S3)
    : use
    path
    (NOT
    prefix
    )
  • $https
    : must include
    connectionName
    ,
    path
    ,
    method
    ,
    as
    ,
    onError: "dlq"
  • $externalFunction
    : must include
    connectionName
    ,
    functionName
    ,
    execution
    ,
    as
    ,
    onError: "dlq"
  • $validate
    : must include
    validator
    with
    $jsonSchema
    and
    validationAction: "dlq"
  • $lookup
    : include
    parallelism
    setting (e.g.,
    parallelism: 2
    ) for concurrent I/O
  • AWS connections (S3, Kinesis, Lambda): IAM role ARN must be registered via Atlas Cloud Provider Access first. Always confirm this with user. See references/connection-configs.md for details.
See references/pipeline-patterns.md for stage field examples with JSON syntax.
SchemaRegistry connection:
connectionType
must be
"SchemaRegistry"
(not
"Kafka"
). Schema type values are case-sensitive (use lowercase
avro
, not
AVRO
). See references/connection-configs.md for required fields and auth types.
无效构造 — 以下内容在流处理管道中不合法:
  • $$NOW
    $$ROOT
    $$CURRENT
    — 流处理中不可用,严禁使用。请使用文档自身的时间戳字段或
    _stream_meta
    元数据作为事件时间,不要使用
    $$NOW
  • HTTPS连接作为
    $source
    — HTTPS仅用于
    $https
    enrichment或sink,不能作为数据源。
  • Kafka
    $source
    未指定
    topic
    — topic字段为必填项。
  • 管道没有sink — 部署的处理器必须有终止阶段(
    $merge
    $emit
    $https
    或异步
    $externalFunction
    ),无sink模式仅支持通过
    sp.process()
    运行。
  • Lambda作为
    $emit
    目标
    — Lambda使用
    $externalFunction
    (管道中间enrichment),不能用
    $emit
  • $validate
    使用
    validationAction: "error"
    — 会导致处理器崩溃,请改用
    "dlq"
各阶段必填字段:
  • $source
    (变更流):
    需配置
    fullDocument: "updateLookup"
    以获取完整文档内容。
  • $source
    (Kinesis):
    使用
    stream
    字段,不要用
    streamName
    topic
  • $emit
    (Kinesis):
    必须包含
    partitionKey
    字段。
  • $emit
    (S3):
    使用
    path
    字段,不要用
    prefix
  • $https
    必须包含
    connectionName
    path
    method
    as
    onError: "dlq"
  • $externalFunction
    必须包含
    connectionName
    functionName
    execution
    as
    onError: "dlq"
  • $validate
    必须包含带
    $jsonSchema
    validator
    ,以及
    validationAction: "dlq"
  • $lookup
    需配置
    parallelism
    参数(例如
    parallelism: 2
    )实现并发I/O。
  • AWS连接(S3、Kinesis、Lambda):IAM角色ARN必须先通过Atlas云提供商访问功能注册,请务必和用户确认。详情参见references/connection-configs.md
各阶段字段的JSON语法示例参见references/pipeline-patterns.md
SchemaRegistry连接:
connectionType
必须为
"SchemaRegistry"
(不是
"Kafka"
)。Schema类型值区分大小写(使用小写
avro
,不要用
AVRO
)。必填字段和认证类型参见references/connection-configs.md

MCP Tool Behaviors

MCP工具特性

Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.
Auto-normalization:
  • bootstrapServers
    array → auto-converted to comma-separated string
  • schemaRegistryUrls
    string → auto-wrapped in array
  • dbRoleToExecute
    → defaults to
    {role: "readWriteAnyDatabase", type: "BUILT_IN"}
    for Cluster connections
Workspace creation:
includeSampleData
defaults to
true
, which auto-creates the
sample_stream_solar
connection.
Region naming: The
region
field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic
dataProcessRegion
error.
ProviderCloud RegionStreams
region
Value
AWSus-east-1
VIRGINIA_USA
AWSus-east-2
OHIO_USA
AWSeu-west-1
DUBLIN_IRL
GCPus-central1
US_CENTRAL1
GCPeurope-west1
EUROPE_WEST1
Azureeastus
eastus
Azurewesteurope
westeurope
See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with
atlas-streams-discover
inspect-workspace
and check
dataProcessRegion.region
.
信息采集: 创建连接时,构建工具会通过MCP信息采集功能自动收集缺失的敏感字段(密码、bootstrap服务器地址),不要向用户索要这些信息,交由工具自动收集。
自动标准化:
  • bootstrapServers
    数组 → 自动转换为逗号分隔的字符串
  • schemaRegistryUrls
    字符串 → 自动封装为数组
  • dbRoleToExecute
    → Cluster连接默认值为
    {role: "readWriteAnyDatabase", type: "BUILT_IN"}
工作空间创建:
includeSampleData
默认值为
true
,会自动创建
sample_stream_solar
连接。
区域命名:
region
字段使用Atlas专属命名规则,不同云提供商的命名不同。格式错误会返回含义模糊的
dataProcessRegion
错误。
云提供商云区域流处理
region
取值
AWSus-east-1
VIRGINIA_USA
AWSus-east-2
OHIO_USA
AWSeu-west-1
DUBLIN_IRL
GCPus-central1
US_CENTRAL1
GCPeurope-west1
EUROPE_WEST1
Azureeastus
eastus
Azurewesteurope
westeurope
完整的区域映射表参见references/connection-configs.md。如果不确定取值,可以调用
atlas-streams-discover
inspect-workspace
接口查询现有工作空间的
dataProcessRegion.region
字段。

Connection Capabilities — Source/Sink Reference

连接能力 — 源/Sink参考

Know what each connection type can do before creating pipelines:
Connection TypeAs Source ($source)As Sink ($merge / $emit)Mid-PipelineNotes
Cluster✅ Change streams✅ $merge to collections✅ $lookupChange streams monitor insert/update/delete/replace operations
Kafka✅ Topic consumer✅ $emit to topicsSource MUST include
topic
field
Sample Stream✅ Sample data❌ Not validTesting/demo only
S3❌ Not valid✅ $emit to bucketsSink only - use
path
,
format
,
compression
. Supports AWS PrivateLink.
Https❌ Not valid✅ $https as sink✅ $https enrichmentCan be used mid-pipeline for enrichment OR as final sink stage
AWSLambda❌ Not valid✅ $externalFunction (async only)✅ $externalFunction (sync or async)Sink:
execution: "async"
required. Mid-pipeline:
execution: "sync"
or
"async"
AWS Kinesis✅ Stream consumer✅ $emit to streamsSimilar to Kafka pattern
SchemaRegistry❌ Not valid❌ Not valid✅ Schema resolutionMetadata only - used by Kafka connections for Avro schemas
Common connection usage mistakes to avoid:
  • ❌ Using
    $externalFunction
    as sink with
    execution: "sync"
    → Must use
    execution: "async"
    for sink stage
  • ❌ Forgetting change streams exist → Atlas Cluster is a powerful source, not just a sink
  • ❌ Using
    $merge
    with Kafka → Use
    $emit
    for Kafka sinks
See references/connection-configs.md for detailed connection configuration schemas by type.
创建管道前请了解各连接类型的能力:
连接类型作为源($source)作为Sink($merge / $emit)管道中间使用说明
Cluster✅ 变更流✅ $merge到集合✅ $lookup变更流可监控插入/更新/删除/替换操作
Kafka✅ Topic消费者✅ $emit到Topic源必须包含
topic
字段
Sample Stream✅ 示例数据❌ 不支持仅用于测试/演示
S3❌ 不支持✅ $emit到存储桶仅作为Sink,使用
path
format
compression
字段,支持AWS PrivateLink
Https❌ 不支持✅ $https作为Sink✅ $https enrichment可用于管道中间enrichment或作为最终Sink阶段
AWSLambda❌ 不支持✅ $externalFunction(仅异步)✅ $externalFunction(同步或异步)Sink场景: 必须配置
execution: "async"
管道中间场景:
execution
可选
"sync"
"async"
AWS Kinesis✅ 流消费者✅ $emit到流与Kafka模式类似
SchemaRegistry❌ 不支持❌ 不支持✅ Schema解析仅作为元数据使用,供Kafka连接处理Avro Schema
需要避免的常见连接使用错误:
  • ❌ 作为Sink使用
    $externalFunction
    时配置
    execution: "sync"
    → Sink阶段必须使用
    execution: "async"
  • ❌ 忽略变更流能力 → Atlas Cluster是强大的数据源,不只是Sink
  • ❌ Kafka Sink使用
    $merge
    → Kafka Sink应该用
    $emit
不同类型连接的详细配置Schema参见references/connection-configs.md

Core Workflows

核心工作流

Setup from scratch

从零开始搭建

  1. atlas-streams-discover
    list-workspaces
    (check existing)
  2. atlas-streams-build
    resource: "workspace"
    (region near data, SP10 for dev)
  3. atlas-streams-build
    resource: "connection"
    (for each source/sink/enrichment)
  4. Validate connections:
    atlas-streams-discover
    list-connections
    +
    inspect-connection
    for each — verify names match targets, present summary to user
  5. Call
    search-knowledge
    to validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_example
  6. atlas-streams-build
    resource: "processor"
    (with DLQ configured)
  7. atlas-streams-manage
    start-processor
    (warn about billing)
  1. 调用
    atlas-streams-discover
    list-workspaces
    接口,检查现有工作空间
  2. 调用
    atlas-streams-build
    创建
    resource: "workspace"
    (区域选离数据近的位置,开发环境选SP10规格)
  3. 调用
    atlas-streams-build
    创建
    resource: "connection"
    ,每个源/Sink/enrichment对应一个连接
  4. 连接校验: 调用
    atlas-streams-discover
    list-connections
    和每个连接的
    inspect-connection
    接口,验证名称与目标匹配,向用户展示校验汇总
  5. 调用
    search-knowledge
    校验字段名,从https://github.com/mongodb/ASP_example获取相关示例
  6. 调用
    atlas-streams-build
    创建
    resource: "processor"
    (配置DLQ)
  7. 调用
    atlas-streams-manage
    start-processor
    接口(提示用户计费相关说明)

Workflow Patterns

工作流模式

Incremental pipeline development (recommended): See references/development-workflow.md for the full 5-phase lifecycle.
  1. Start with basic
    $source
    $merge
    pipeline (validate connectivity)
  2. Add
    $match
    stages (validate filtering)
  3. Add
    $addFields
    /
    $project
    transforms (validate reshaping)
  4. Add windowing or enrichment (validate aggregation logic)
  5. Add error handling / DLQ configuration
Modify a processor pipeline:
  1. atlas-streams-manage
    action: "stop-processor"
    processor MUST be stopped first
  2. atlas-streams-manage
    action: "modify-processor"
    — provide new pipeline
  3. atlas-streams-manage
    action: "start-processor"
    — restart
Debug a failing processor:
  1. atlas-streams-discover
    diagnose-processor
    — one-shot health report. Always call this first.
  2. Commit to a specific root cause. Match symptoms to diagnostic patterns:
    • Error 419 + "no partitions found" → Kafka topic doesn't exist or is misspelled
    • State: FAILED + multiple restarts → connection-level error (bypasses DLQ), check connection config
    • State: STARTED + zero output + windowed pipeline → likely idle Kafka partitions blocking window closure; add
      partitionIdleTimeout
      to Kafka
      $source
      (e.g.,
      {"size": 30, "unit": "second"}
      )
    • State: STARTED + zero output + non-windowed → check if source has data; inspect Kafka offset lag
    • High memoryUsageBytes approaching tier limit → OOM risk; recommend higher tier
    • DLQ count increasing → per-document errors; use MongoDB
      find
      on DLQ collection See references/output-diagnostics.md for the full pattern table.
  3. Classify processor type before interpreting output volume (alert vs transformation vs filter).
  4. Provide concrete, ordered fix steps specific to the diagnosed root cause. Do NOT present a list of hypothetical scenarios.
  5. If detailed logs are needed, direct the user to the Atlas UI: Atlas → Stream Processing → Workspace → Processor → Logs tab.
增量管道开发(推荐): 完整的5阶段生命周期参见references/development-workflow.md
  1. 从基础的
    $source
    $merge
    管道开始,验证连通性
  2. 添加
    $match
    阶段,验证过滤逻辑
  3. 添加
    $addFields
    /
    $project
    转换,验证数据格式重塑
  4. 添加窗口或enrichment逻辑,验证聚合逻辑
  5. 添加错误处理/DLQ配置
修改处理器管道:
  1. 调用
    atlas-streams-manage
    action: "stop-processor"
    接口,必须先停止处理器
  2. 调用
    atlas-streams-manage
    action: "modify-processor"
    接口,提供新的管道
  3. 调用
    atlas-streams-manage
    action: "start-processor"
    接口重启
调试故障处理器:
  1. 调用
    atlas-streams-discover
    diagnose-processor
    接口获取一键健康报告,必须优先调用此接口
  2. 定位具体根因,将症状与诊断模式匹配:
    • 错误419 + "no partitions found" → Kafka Topic不存在或名称拼写错误
    • 状态:FAILED + 多次重启 → 连接级别错误(不经过DLQ),检查连接配置
    • 状态:STARTED + 零输出 + 窗口管道 → 可能是空闲Kafka分区阻塞了窗口关闭,给Kafka
      $source
      添加
      partitionIdleTimeout
      参数(例如
      {"size": 30, "unit": "second"}
    • 状态:STARTED + 零输出 + 非窗口管道 → 检查数据源是否有数据,查看Kafka消费偏移延迟
    • memoryUsageBytes过高接近规格上限 → OOM风险,建议升级规格
    • DLQ数量持续增加 → 单文档错误,对DLQ集合执行MongoDB
      find
      查询排查 完整模式表参见references/output-diagnostics.md
  3. 解读输出量前先对处理器类型分类(告警类/转换类/过滤类)
  4. 针对诊断出的根因提供具体、有序的修复步骤,不要给出假设性场景列表
  5. 如果需要详细日志,引导用户到Atlas UI查看:Atlas → Stream Processing → Workspace → Processor → Logs标签页

Chained processors (multi-sink pattern)

链式处理器(多Sink模式)

CRITICAL: A single pipeline can only have ONE terminal sink (
$merge
or
$emit
). When users request multiple output destinations (e.g., "write to Atlas AND emit to Kafka"), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.
重要提示:单个管道只能有一个终止Sink
$merge
$emit
)。当用户要求多个输出目标时(例如"写入Atlas同时发送到Kafka"),必须告知用户单Sink限制,建议使用中间目标实现链式处理器。完整模式和示例参见references/pipeline-patterns.md

Pre-Deploy & Post-Deploy Checklists

部署前后检查清单

See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.
完整的部署前质量检查清单(连接校验、管道校验)和部署后验证工作流参见references/development-workflow.md

Tier Sizing & Performance

规格选型与性能

See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.
规格参数、并行度计算公式、复杂度评分和性能优化策略参见references/sizing-and-parallelism.md

Troubleshooting

故障排查

See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.
涵盖处理器故障、API错误、配置问题和性能问题的完整故障排查表参见references/development-workflow.md

Billing & Cost

计费与成本

Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.
  • Charges are per-hour, calculated per-second, only while the processor is running
  • stop-processor
    stops billing; stopped processors retain state for 45 days at no charge
  • For prototyping without billing: Use
    sp.process()
    in mongosh — runs pipelines ephemerally without deploying a processor
  • See
    references/sizing-and-parallelism.md
    for tier pricing and cost optimization strategies
Atlas Stream Processing没有免费 tier,所有已部署的处理器运行时会持续产生费用。
  • 按小时计费,按秒结算,仅在处理器运行时收费
  • stop-processor
    停止计费,已停止的处理器状态免费保留45天
  • 零成本原型开发: 在mongosh中使用
    sp.process()
    ,临时运行管道无需部署处理器
  • tier定价和成本优化策略参见
    references/sizing-and-parallelism.md

Safety Rules

安全规则

  • atlas-streams-teardown
    and
    atlas-streams-manage
    require user confirmation — do not bypass
  • BEFORE calling
    atlas-streams-teardown
    for a workspace
    , you MUST first inspect the workspace with
    atlas-streams-discover
    to count connections and processors, then present this information to the user before requesting confirmation
  • BEFORE creating any processor, you MUST validate all connections per the "Pre-Deployment Validation" section in references/development-workflow.md
  • Deleting a workspace removes ALL connections and processors permanently
  • After stopping a processor, state is preserved 45 days — then checkpoints are discarded
  • resumeFromCheckpoint: false
    drops all window state — warn user first
  • Moving processors between workspaces is not supported (must recreate)
  • Dry-run / simulation is not supported — explain what you would do and ask for confirmation
  • Always warn users about billing before starting processors
  • Store API authentication credentials in connection settings, never hardcode in processor pipelines
  • atlas-streams-teardown
    atlas-streams-manage
    操作需要用户确认,不要跳过
  • 调用
    atlas-streams-teardown
    删除工作空间前
    ,必须先调用
    atlas-streams-discover
    查询工作空间的连接和处理器数量,向用户展示该信息后再请求确认
  • 创建任何处理器前,必须按照references/development-workflow.md中的"部署前校验"章节校验所有连接
  • 删除工作空间会永久移除所有连接和处理器
  • 停止处理器后,状态保留45天,之后检查点会被丢弃
  • resumeFromCheckpoint: false
    会丢弃所有窗口状态,需要提前告知用户
  • 不支持在工作空间之间迁移处理器,必须重新创建
  • 不支持试运行/模拟操作,需要向用户说明操作内容并请求确认
  • 启动处理器前必须告知用户计费相关说明
  • API认证凭证存储在连接设置中,严禁硬编码在处理器管道里

Reference Files

参考文件

FileRead when...
references/pipeline-patterns.md
Building or modifying processor pipelines
references/connection-configs.md
Creating connections (type-specific schemas)
references/development-workflow.md
Following lifecycle management or debugging decision trees
references/output-diagnostics.md
Processor output is unexpected (zero, low, or wrong)
references/sizing-and-parallelism.md
Choosing tiers, tuning parallelism, or optimizing cost
文件适用场景
references/pipeline-patterns.md
构建或修改处理器管道时
references/connection-configs.md
创建连接时(不同类型的配置Schema)
references/development-workflow.md
遵循生命周期管理或调试决策流程时
references/output-diagnostics.md
处理器输出不符合预期时(零输出、输出过少或错误)
references/sizing-and-parallelism.md
选择规格、调优并行度或优化成本时