atlas-stream-processing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMongoDB Atlas Streams
MongoDB Atlas Streams
Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.
通过MongoDB MCP Server提供的4款MCP工具构建、运维和调试Atlas Stream Processing (ASP) 管道。
Prerequisites
前置要求
This skill requires the MongoDB MCP Server connected with:
- Atlas API credentials (and
apiClientId)apiClientSecret
The 4 tools: , , , .
atlas-streams-discoveratlas-streams-buildatlas-streams-manageatlas-streams-teardownAll operations require an Atlas project ID. If unknown, call first to find your project ID.
atlas-list-projects使用本功能需要MongoDB MCP Server已连接以下凭证:
- Atlas API凭证(和
apiClientId)apiClientSecret
4款工具分别是:、、、。
atlas-streams-discoveratlas-streams-buildatlas-streams-manageatlas-streams-teardown所有操作都需要Atlas项目ID。 如果不知道项目ID,先调用查询。
atlas-list-projectsIf MCP tools are unavailable
MCP工具不可用时的处理
If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.
如果MongoDB MCP Server未连接或缺少流处理工具,查看references/mcp-troubleshooting.md获取诊断步骤和备选方案。
Tool Selection Matrix
工具选择矩阵
atlas-streams-discover — ALL read operations
atlas-streams-discover — 所有读操作
| Action | Use when |
|---|---|
| See all workspaces in a project |
| Review workspace config, state, region |
| See all connections in a workspace |
| Check connection state, config, health |
| See all processors in a workspace |
| Check processor state, pipeline, config |
| Full health report: state, stats, errors |
| PrivateLink and VPC peering details. Optional: |
Pagination (all list actions): (1-100, default 20), (default 1).
Response format: — (default for list actions) or (default for inspect/diagnose).
limitpageNumresponseFormat"concise""detailed"| 操作 | 适用场景 |
|---|---|
| 查看项目下的所有工作空间 |
| 查看工作空间的配置、状态、区域信息 |
| 查看工作空间下的所有连接 |
| 检查连接的状态、配置、健康度 |
| 查看工作空间下的所有处理器 |
| 检查处理器的状态、管道、配置 |
| 获取完整健康报告:状态、统计数据、错误信息 |
| 获取PrivateLink和VPC对等连接详情。可选参数: |
分页(所有列表操作适用):(取值1-100,默认20)、(默认1)。
响应格式: 可选值为(列表操作默认)或(查询/诊断操作默认)。
limitpageNumresponseFormat"concise""detailed"atlas-streams-build — ALL create operations
atlas-streams-build — 所有创建操作
| Resource | Key parameters |
|---|---|
| |
| |
| |
| |
Field mapping — only fill fields for the selected resource type:
- resource = "workspace": Fill: ,
projectId,workspaceName,cloudProvider,region,tier. Leave empty: all connection and processor fields.includeSampleData - resource = "connection": Fill: ,
projectId,workspaceName,connectionName,connectionType. Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.)connectionConfig - resource = "processor": Fill: ,
projectId,workspaceName,processorName,pipeline(recommended),dlq(optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.)autoStart - resource = "privatelink": Fill: ,
projectId. Note: PrivateLink is project-level, not workspace-level.privateLinkConfigis not required — omit it. Leave empty: all connection and processor fields.workspaceName
| 资源 | 核心参数 |
|---|---|
| |
| |
| |
| |
字段映射规则 — 仅填写所选资源类型对应的字段:
- resource = "workspace": 填写字段:、
projectId、workspaceName、cloudProvider、region、tier。留空所有连接和处理器相关字段。includeSampleData - resource = "connection": 填写字段:、
projectId、workspaceName、connectionName、connectionType。留空所有工作空间和处理器相关字段。(不同连接类型的配置 schema 参见references/connection-configs.md)connectionConfig - resource = "processor": 填写字段:、
projectId、workspaceName、processorName、pipeline(推荐)、dlq(可选)。留空所有工作空间和连接相关字段。(管道示例参见references/pipeline-patterns.md)autoStart - resource = "privatelink": 填写字段:、
projectId。注意:PrivateLink是项目级别资源,不属于工作空间,不需要填写privateLinkConfig。留空所有连接和处理器相关字段。workspaceName
atlas-streams-manage — ALL update/state operations
atlas-streams-manage — 所有更新/状态操作
| Action | Notes |
|---|---|
| Begins billing. Optional |
| Stops billing. Retains state 45 days |
| Processor must be stopped first. Change pipeline, DLQ, or name |
| Change tier or region |
| Update config (networking is immutable — must delete and recreate) |
| VPC peering management |
Field mapping — always fill , , then by action:
projectIdworkspaceName- →
"start-processor". Optional:resourceName,tier,resumeFromCheckpoint(ISO 8601 timestamp to resume from a specific point)startAtOperationTime - →
"stop-processor"resourceName - →
"modify-processor". At least one of:resourceName,pipeline,dlqnewName - →
"update-workspace"ornewRegionnewTier - →
"update-connection",resourceName. Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate.connectionConfig - →
"accept-peering",peeringId,requesterAccountIdrequesterVpcId - →
"reject-peering"peeringId
State pre-checks:
- → errors if processor is already STARTED
start-processor - → no-ops if already STOPPED or CREATED (not an error)
stop-processor - → errors if processor is STARTED (must stop first)
modify-processor
Processor states: → (via start) → (via stop). Can also enter on runtime errors. Modify requires STOPPED or CREATED state.
CREATEDSTARTEDSTOPPEDFAILEDTeardown safety checks:
- Processor deletion → auto-stops before deleting (no need to stop manually first)
- Connection deletion → blocks if any running processor references it. Stop/delete referencing processors first.
- Workspace deletion → See detailed workflow below (lines 108-111).
| 操作 | 说明 |
|---|---|
| 开始计费。可选参数: |
| 停止计费。状态保留45天 |
| 必须先停止处理器。可修改管道、DLQ或名称 |
| 修改层级规格或区域 |
| 更新配置(网络配置不可修改,必须删除重建) |
| VPC对等连接管理 |
字段映射规则 — 始终填写、,再根据操作填写对应字段:
projectIdworkspaceName- →
"start-processor"。可选参数:resourceName、tier、resumeFromCheckpoint(ISO 8601格式时间戳,用于从特定时间点恢复)startAtOperationTime - →
"stop-processor"resourceName - →
"modify-processor"。至少填写以下一个字段:resourceName、pipeline、dlqnewName - →
"update-workspace"或newRegionnewTier - →
"update-connection"、resourceName。例外:网络配置(如PrivateLink)创建后不可修改,需要删除重建。connectionConfig - →
"accept-peering"、peeringId、requesterAccountIdrequesterVpcId - →
"reject-peering"peeringId
状态前置检查:
- → 如果处理器已经处于STARTED状态会报错
start-processor - → 如果处理器已经处于STOPPED或CREATED状态,操作无响应(不会报错)
stop-processor - → 如果处理器处于STARTED状态会报错,必须先停止
modify-processor
处理器状态: → (通过start操作) → (通过stop操作)。运行时出错会进入状态。修改操作需要处理器处于STOPPED或CREATED状态。
CREATEDSTARTEDSTOPPEDFAILED销毁安全检查:
- 处理器删除 → 删除前自动停止,无需手动停止
- 连接删除 → 如果有运行中的处理器引用该连接,删除操作会被阻止。需要先停止/删除引用该连接的处理器
- 工作空间删除 → 参见下文详细流程(第108-111行)。
atlas-streams-teardown — ALL delete operations
atlas-streams-teardown — 所有删除操作
| Resource | Safety behavior |
|---|---|
| Auto-stops before deleting |
| Blocks if referenced by running processor |
| Cascading delete of all connections and processors |
| Remove networking resources |
Field mapping — always fill , , then:
projectIdresource- →
resource: "workspace"workspaceName - or
resource: "connection"→"processor",workspaceNameresourceName - or
resource: "privatelink"→"peering"(the ID). These are project-level resources, not tied to a specific workspace.resourceName
Before deleting a workspace, inspect it first:
- →
atlas-streams-discover— get connection/processor countsinspect-workspace - Present to user: "Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?"
- Wait for confirmation before calling
atlas-streams-teardown
| 资源 | 安全机制 |
|---|---|
| 删除前自动停止 |
| 如果被运行中的处理器引用则阻止删除 |
| 级联删除所有连接和处理器 |
| 移除网络资源 |
字段映射规则 — 始终填写、,再根据资源类型填写对应字段:
projectIdresource- →
resource: "workspace"workspaceName - 或
resource: "connection"→"processor"、workspaceNameresourceName - 或
resource: "privatelink"→"peering"(资源ID)。这些是项目级别资源,不绑定到特定工作空间。resourceName
删除工作空间前,需要先进行查询:
- 调用的
atlas-streams-discover接口,获取连接和处理器数量inspect-workspace - 向用户确认:"工作空间X包含N个连接和M个处理器,删除后将永久移除所有资源,是否继续?"
- 获得用户确认后再调用
atlas-streams-teardown
CRITICAL: Validate Before Creating Processors
重要提示:创建处理器前必须校验
You MUST call before composing any processor pipeline. This is not optional.
search-knowledge- Field validation: Query with the sink/source type, e.g. "Atlas Stream Processing $emit S3 fields" or "Atlas Stream Processing Kafka $source configuration". This catches errors like vs
prefixfor S3path.$emit - Pattern examples: Query with for working pipelines, e.g. "Atlas Stream Processing tumbling window example".
dataSources: [{"name": "devcenter"}]
Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with for the full pattern catalog.
example_processors/README.mdKey quickstarts:
| Quickstart | Pattern |
|---|---|
| Inline |
| Change stream → tumbling window → |
| Kafka source → tumbling window rollup → |
| Chained processors: rollup → archive to separate collection |
| Real-time Kafka topic monitoring (sinkless, like |
编写任何处理器管道前必须调用,这是强制要求。
search-knowledge- 字段校验: 根据sink/source类型查询,例如"Atlas Stream Processing $emit S3 fields"或"Atlas Stream Processing Kafka $source configuration",可提前发现类似S3 应该用
$emit而不是path的错误。prefix - 模式示例: 携带参数查询可用管道,例如"Atlas Stream Processing tumbling window example"。
dataSources: [{"name": "devcenter"}]
构建复杂处理器时,还可以从官方ASP示例仓库获取示例:**https://github.com/mongodb/ASP_example**(包含快速入门、处理器示例、Terraform示例)。可先查看`example_processors/README.md`获取完整的模式目录。
核心快速入门示例:
| 快速入门文件 | 模式 |
|---|---|
| 内置 |
| 变更流 → 滚动窗口 → |
| Kafka源 → 滚动窗口聚合 → |
| 链式处理器:聚合 → 归档到独立集合 |
| 实时Kafka Topic监控(无sink,类似 |
Pipeline Rules & Warnings
管道规则与注意事项
Invalid constructs — these are NOT valid in streaming pipelines:
- ,
$$NOW,$$ROOT— NOT available in stream processing. NEVER use these. Use the document's own timestamp field or$$CURRENTmetadata for event time instead of_stream_meta.$$NOW - HTTPS connections as — HTTPS is for
$sourceenrichment or sink only, NOT as a data source$https - Kafka without
$source— topic field is requiredtopic - Pipelines without a sink — terminal stage (,
$merge,$emit, or$httpsasync) required for deployed processors (sinkless only works via$externalFunction)sp.process() - Lambda as target — Lambda uses
$emit(mid-pipeline enrichment), not$externalFunction$emit - with
$validate— crashes processor; usevalidationAction: "error"instead"dlq"
Required fields by stage:
- (change stream): include
$sourceto get the full document contentfullDocument: "updateLookup" - (Kinesis): use
$source(NOTstreamorstreamName)topic - (Kinesis): MUST include
$emitpartitionKey - (S3): use
$emit(NOTpath)prefix - : must include
$https,connectionName,path,method,asonError: "dlq" - : must include
$externalFunction,connectionName,functionName,execution,asonError: "dlq" - : must include
$validatewithvalidatorand$jsonSchemavalidationAction: "dlq" - : include
$lookupsetting (e.g.,parallelism) for concurrent I/Oparallelism: 2 - AWS connections (S3, Kinesis, Lambda): IAM role ARN must be registered via Atlas Cloud Provider Access first. Always confirm this with user. See references/connection-configs.md for details.
See references/pipeline-patterns.md for stage field examples with JSON syntax.
SchemaRegistry connection: must be (not ). Schema type values are case-sensitive (use lowercase , not ). See references/connection-configs.md for required fields and auth types.
connectionType"SchemaRegistry""Kafka"avroAVRO无效构造 — 以下内容在流处理管道中不合法:
- 、
$$NOW、$$ROOT— 流处理中不可用,严禁使用。请使用文档自身的时间戳字段或$$CURRENT元数据作为事件时间,不要使用_stream_meta。$$NOW - HTTPS连接作为— HTTPS仅用于
$sourceenrichment或sink,不能作为数据源。$https - Kafka 未指定
$source— topic字段为必填项。topic - 管道没有sink — 部署的处理器必须有终止阶段(、
$merge、$emit或异步$https),无sink模式仅支持通过$externalFunction运行。sp.process() - Lambda作为目标 — Lambda使用
$emit(管道中间enrichment),不能用$externalFunction。$emit - 使用
$validate— 会导致处理器崩溃,请改用validationAction: "error"。"dlq"
各阶段必填字段:
- (变更流): 需配置
$source以获取完整文档内容。fullDocument: "updateLookup" - (Kinesis): 使用
$source字段,不要用stream或streamName。topic - (Kinesis): 必须包含
$emit字段。partitionKey - (S3): 使用
$emit字段,不要用path。prefix - : 必须包含
$https、connectionName、path、method、as。onError: "dlq" - : 必须包含
$externalFunction、connectionName、functionName、execution、as。onError: "dlq" - : 必须包含带
$validate的$jsonSchema,以及validator。validationAction: "dlq" - : 需配置
$lookup参数(例如parallelism)实现并发I/O。parallelism: 2 - AWS连接(S3、Kinesis、Lambda):IAM角色ARN必须先通过Atlas云提供商访问功能注册,请务必和用户确认。详情参见references/connection-configs.md。
各阶段字段的JSON语法示例参见references/pipeline-patterns.md。
SchemaRegistry连接: 必须为(不是)。Schema类型值区分大小写(使用小写,不要用)。必填字段和认证类型参见references/connection-configs.md。
connectionType"SchemaRegistry""Kafka"avroAVROMCP Tool Behaviors
MCP工具特性
Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.
Auto-normalization:
- array → auto-converted to comma-separated string
bootstrapServers - string → auto-wrapped in array
schemaRegistryUrls - → defaults to
dbRoleToExecutefor Cluster connections{role: "readWriteAnyDatabase", type: "BUILT_IN"}
Workspace creation: defaults to , which auto-creates the connection.
includeSampleDatatruesample_stream_solarRegion naming: The field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic error.
regiondataProcessRegion| Provider | Cloud Region | Streams |
|---|---|---|
| AWS | us-east-1 | |
| AWS | us-east-2 | |
| AWS | eu-west-1 | |
| GCP | us-central1 | |
| GCP | europe-west1 | |
| Azure | eastus | |
| Azure | westeurope | |
See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with → and check .
atlas-streams-discoverinspect-workspacedataProcessRegion.region信息采集: 创建连接时,构建工具会通过MCP信息采集功能自动收集缺失的敏感字段(密码、bootstrap服务器地址),不要向用户索要这些信息,交由工具自动收集。
自动标准化:
- 数组 → 自动转换为逗号分隔的字符串
bootstrapServers - 字符串 → 自动封装为数组
schemaRegistryUrls - → Cluster连接默认值为
dbRoleToExecute{role: "readWriteAnyDatabase", type: "BUILT_IN"}
工作空间创建: 默认值为,会自动创建连接。
includeSampleDatatruesample_stream_solar区域命名: 字段使用Atlas专属命名规则,不同云提供商的命名不同。格式错误会返回含义模糊的错误。
regiondataProcessRegion| 云提供商 | 云区域 | 流处理 |
|---|---|---|
| AWS | us-east-1 | |
| AWS | us-east-2 | |
| AWS | eu-west-1 | |
| GCP | us-central1 | |
| GCP | europe-west1 | |
| Azure | eastus | |
| Azure | westeurope | |
完整的区域映射表参见references/connection-configs.md。如果不确定取值,可以调用的接口查询现有工作空间的字段。
atlas-streams-discoverinspect-workspacedataProcessRegion.regionConnection Capabilities — Source/Sink Reference
连接能力 — 源/Sink参考
Know what each connection type can do before creating pipelines:
| Connection Type | As Source ($source) | As Sink ($merge / $emit) | Mid-Pipeline | Notes |
|---|---|---|---|---|
| Cluster | ✅ Change streams | ✅ $merge to collections | ✅ $lookup | Change streams monitor insert/update/delete/replace operations |
| Kafka | ✅ Topic consumer | ✅ $emit to topics | ❌ | Source MUST include |
| Sample Stream | ✅ Sample data | ❌ Not valid | ❌ | Testing/demo only |
| S3 | ❌ Not valid | ✅ $emit to buckets | ❌ | Sink only - use |
| Https | ❌ Not valid | ✅ $https as sink | ✅ $https enrichment | Can be used mid-pipeline for enrichment OR as final sink stage |
| AWSLambda | ❌ Not valid | ✅ $externalFunction (async only) | ✅ $externalFunction (sync or async) | Sink: |
| AWS Kinesis | ✅ Stream consumer | ✅ $emit to streams | ❌ | Similar to Kafka pattern |
| SchemaRegistry | ❌ Not valid | ❌ Not valid | ✅ Schema resolution | Metadata only - used by Kafka connections for Avro schemas |
Common connection usage mistakes to avoid:
- ❌ Using as sink with
$externalFunction→ Must useexecution: "sync"for sink stageexecution: "async" - ❌ Forgetting change streams exist → Atlas Cluster is a powerful source, not just a sink
- ❌ Using with Kafka → Use
$mergefor Kafka sinks$emit
See references/connection-configs.md for detailed connection configuration schemas by type.
创建管道前请了解各连接类型的能力:
| 连接类型 | 作为源($source) | 作为Sink($merge / $emit) | 管道中间使用 | 说明 |
|---|---|---|---|---|
| Cluster | ✅ 变更流 | ✅ $merge到集合 | ✅ $lookup | 变更流可监控插入/更新/删除/替换操作 |
| Kafka | ✅ Topic消费者 | ✅ $emit到Topic | ❌ | 源必须包含 |
| Sample Stream | ✅ 示例数据 | ❌ 不支持 | ❌ | 仅用于测试/演示 |
| S3 | ❌ 不支持 | ✅ $emit到存储桶 | ❌ | 仅作为Sink,使用 |
| Https | ❌ 不支持 | ✅ $https作为Sink | ✅ $https enrichment | 可用于管道中间enrichment或作为最终Sink阶段 |
| AWSLambda | ❌ 不支持 | ✅ $externalFunction(仅异步) | ✅ $externalFunction(同步或异步) | Sink场景: 必须配置 |
| AWS Kinesis | ✅ 流消费者 | ✅ $emit到流 | ❌ | 与Kafka模式类似 |
| SchemaRegistry | ❌ 不支持 | ❌ 不支持 | ✅ Schema解析 | 仅作为元数据使用,供Kafka连接处理Avro Schema |
需要避免的常见连接使用错误:
- ❌ 作为Sink使用时配置
$externalFunction→ Sink阶段必须使用execution: "sync"execution: "async" - ❌ 忽略变更流能力 → Atlas Cluster是强大的数据源,不只是Sink
- ❌ Kafka Sink使用→ Kafka Sink应该用
$merge$emit
不同类型连接的详细配置Schema参见references/connection-configs.md。
Core Workflows
核心工作流
Setup from scratch
从零开始搭建
- →
atlas-streams-discover(check existing)list-workspaces - →
atlas-streams-build(region near data, SP10 for dev)resource: "workspace" - →
atlas-streams-build(for each source/sink/enrichment)resource: "connection" - Validate connections: →
atlas-streams-discover+list-connectionsfor each — verify names match targets, present summary to userinspect-connection - Call to validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_example
search-knowledge - →
atlas-streams-build(with DLQ configured)resource: "processor" - →
atlas-streams-manage(warn about billing)start-processor
- 调用的
atlas-streams-discover接口,检查现有工作空间list-workspaces - 调用创建
atlas-streams-build(区域选离数据近的位置,开发环境选SP10规格)resource: "workspace" - 调用创建
atlas-streams-build,每个源/Sink/enrichment对应一个连接resource: "connection" - 连接校验: 调用的
atlas-streams-discover和每个连接的list-connections接口,验证名称与目标匹配,向用户展示校验汇总inspect-connection - 调用校验字段名,从https://github.com/mongodb/ASP_example获取相关示例
search-knowledge - 调用创建
atlas-streams-build(配置DLQ)resource: "processor" - 调用的
atlas-streams-manage接口(提示用户计费相关说明)start-processor
Workflow Patterns
工作流模式
Incremental pipeline development (recommended):
See references/development-workflow.md for the full 5-phase lifecycle.
- Start with basic →
$sourcepipeline (validate connectivity)$merge - Add stages (validate filtering)
$match - Add /
$addFieldstransforms (validate reshaping)$project - Add windowing or enrichment (validate aggregation logic)
- Add error handling / DLQ configuration
Modify a processor pipeline:
- →
atlas-streams-manage— processor MUST be stopped firstaction: "stop-processor" - →
atlas-streams-manage— provide new pipelineaction: "modify-processor" - →
atlas-streams-manage— restartaction: "start-processor"
Debug a failing processor:
- →
atlas-streams-discover— one-shot health report. Always call this first.diagnose-processor - Commit to a specific root cause. Match symptoms to diagnostic patterns:
- Error 419 + "no partitions found" → Kafka topic doesn't exist or is misspelled
- State: FAILED + multiple restarts → connection-level error (bypasses DLQ), check connection config
- State: STARTED + zero output + windowed pipeline → likely idle Kafka partitions blocking window closure; add to Kafka
partitionIdleTimeout(e.g.,$source){"size": 30, "unit": "second"} - State: STARTED + zero output + non-windowed → check if source has data; inspect Kafka offset lag
- High memoryUsageBytes approaching tier limit → OOM risk; recommend higher tier
- DLQ count increasing → per-document errors; use MongoDB on DLQ collection See references/output-diagnostics.md for the full pattern table.
find
- Classify processor type before interpreting output volume (alert vs transformation vs filter).
- Provide concrete, ordered fix steps specific to the diagnosed root cause. Do NOT present a list of hypothetical scenarios.
- If detailed logs are needed, direct the user to the Atlas UI: Atlas → Stream Processing → Workspace → Processor → Logs tab.
增量管道开发(推荐):
完整的5阶段生命周期参见references/development-workflow.md。
- 从基础的→
$source管道开始,验证连通性$merge - 添加阶段,验证过滤逻辑
$match - 添加/
$addFields转换,验证数据格式重塑$project - 添加窗口或enrichment逻辑,验证聚合逻辑
- 添加错误处理/DLQ配置
修改处理器管道:
- 调用的
atlas-streams-manage接口,必须先停止处理器action: "stop-processor" - 调用的
atlas-streams-manage接口,提供新的管道action: "modify-processor" - 调用的
atlas-streams-manage接口重启action: "start-processor"
调试故障处理器:
- 调用的
atlas-streams-discover接口获取一键健康报告,必须优先调用此接口diagnose-processor - 定位具体根因,将症状与诊断模式匹配:
- 错误419 + "no partitions found" → Kafka Topic不存在或名称拼写错误
- 状态:FAILED + 多次重启 → 连接级别错误(不经过DLQ),检查连接配置
- 状态:STARTED + 零输出 + 窗口管道 → 可能是空闲Kafka分区阻塞了窗口关闭,给Kafka 添加
$source参数(例如partitionIdleTimeout){"size": 30, "unit": "second"} - 状态:STARTED + 零输出 + 非窗口管道 → 检查数据源是否有数据,查看Kafka消费偏移延迟
- memoryUsageBytes过高接近规格上限 → OOM风险,建议升级规格
- DLQ数量持续增加 → 单文档错误,对DLQ集合执行MongoDB 查询排查 完整模式表参见references/output-diagnostics.md
find
- 解读输出量前先对处理器类型分类(告警类/转换类/过滤类)
- 针对诊断出的根因提供具体、有序的修复步骤,不要给出假设性场景列表
- 如果需要详细日志,引导用户到Atlas UI查看:Atlas → Stream Processing → Workspace → Processor → Logs标签页
Chained processors (multi-sink pattern)
链式处理器(多Sink模式)
CRITICAL: A single pipeline can only have ONE terminal sink ( or ). When users request multiple output destinations (e.g., "write to Atlas AND emit to Kafka"), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.
$merge$emit重要提示:单个管道只能有一个终止Sink(或)。当用户要求多个输出目标时(例如"写入Atlas同时发送到Kafka"),必须告知用户单Sink限制,建议使用中间目标实现链式处理器。完整模式和示例参见references/pipeline-patterns.md。
$merge$emitPre-Deploy & Post-Deploy Checklists
部署前后检查清单
See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.
完整的部署前质量检查清单(连接校验、管道校验)和部署后验证工作流参见references/development-workflow.md。
Tier Sizing & Performance
规格选型与性能
See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.
规格参数、并行度计算公式、复杂度评分和性能优化策略参见references/sizing-and-parallelism.md。
Troubleshooting
故障排查
See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.
涵盖处理器故障、API错误、配置问题和性能问题的完整故障排查表参见references/development-workflow.md。
Billing & Cost
计费与成本
Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.
- Charges are per-hour, calculated per-second, only while the processor is running
- stops billing; stopped processors retain state for 45 days at no charge
stop-processor - For prototyping without billing: Use in mongosh — runs pipelines ephemerally without deploying a processor
sp.process() - See for tier pricing and cost optimization strategies
references/sizing-and-parallelism.md
Atlas Stream Processing没有免费 tier,所有已部署的处理器运行时会持续产生费用。
- 按小时计费,按秒结算,仅在处理器运行时收费
- 停止计费,已停止的处理器状态免费保留45天
stop-processor - 零成本原型开发: 在mongosh中使用,临时运行管道无需部署处理器
sp.process() - tier定价和成本优化策略参见
references/sizing-and-parallelism.md
Safety Rules
安全规则
- and
atlas-streams-teardownrequire user confirmation — do not bypassatlas-streams-manage - BEFORE calling for a workspace, you MUST first inspect the workspace with
atlas-streams-teardownto count connections and processors, then present this information to the user before requesting confirmationatlas-streams-discover - BEFORE creating any processor, you MUST validate all connections per the "Pre-Deployment Validation" section in references/development-workflow.md
- Deleting a workspace removes ALL connections and processors permanently
- After stopping a processor, state is preserved 45 days — then checkpoints are discarded
- drops all window state — warn user first
resumeFromCheckpoint: false - Moving processors between workspaces is not supported (must recreate)
- Dry-run / simulation is not supported — explain what you would do and ask for confirmation
- Always warn users about billing before starting processors
- Store API authentication credentials in connection settings, never hardcode in processor pipelines
- 和
atlas-streams-teardown操作需要用户确认,不要跳过atlas-streams-manage - 调用删除工作空间前,必须先调用
atlas-streams-teardown查询工作空间的连接和处理器数量,向用户展示该信息后再请求确认atlas-streams-discover - 创建任何处理器前,必须按照references/development-workflow.md中的"部署前校验"章节校验所有连接
- 删除工作空间会永久移除所有连接和处理器
- 停止处理器后,状态保留45天,之后检查点会被丢弃
- 会丢弃所有窗口状态,需要提前告知用户
resumeFromCheckpoint: false - 不支持在工作空间之间迁移处理器,必须重新创建
- 不支持试运行/模拟操作,需要向用户说明操作内容并请求确认
- 启动处理器前必须告知用户计费相关说明
- API认证凭证存储在连接设置中,严禁硬编码在处理器管道里
Reference Files
参考文件
| File | Read when... |
|---|---|
| Building or modifying processor pipelines |
| Creating connections (type-specific schemas) |
| Following lifecycle management or debugging decision trees |
| Processor output is unexpected (zero, low, or wrong) |
| Choosing tiers, tuning parallelism, or optimizing cost |
| 文件 | 适用场景 |
|---|---|
| 构建或修改处理器管道时 |
| 创建连接时(不同类型的配置Schema) |
| 遵循生命周期管理或调试决策流程时 |
| 处理器输出不符合预期时(零输出、输出过少或错误) |
| 选择规格、调优并行度或优化成本时 |