mongodb-atlas-stream-processing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMongoDB Atlas Streams
MongoDB Atlas Streams
Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.
使用MongoDB MCP Server提供的四款MCP工具构建、运维和调试Atlas Stream Processing (ASP)管道。
Prerequisites
前置条件
This skill requires the MongoDB MCP Server connected with:
- Atlas API credentials (and
apiClientId)apiClientSecret
The 4 tools: , , , .
atlas-streams-discoveratlas-streams-buildatlas-streams-manageatlas-streams-teardownAll operations require an Atlas project ID. If unknown, call first to find your project ID.
atlas-list-projects本技能需要连接以下配置的MongoDB MCP Server:
- Atlas API凭证(和
apiClientId)apiClientSecret
四款工具:、、、。
atlas-streams-discoveratlas-streams-buildatlas-streams-manageatlas-streams-teardown所有操作都需要Atlas项目ID。如果不知道项目ID,请先调用查找。
atlas-list-projectsIf MCP tools are unavailable
若MCP工具不可用
If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.
如果MongoDB MCP Server未连接或流处理工具缺失,请查看references/mcp-troubleshooting.md获取诊断步骤和备选方案。
Tool Selection Matrix
工具选择矩阵
atlas-streams-discover — ALL read operations
atlas-streams-discover — 所有读取操作
| Action | Use when |
|---|---|
| See all workspaces in a project |
| Review workspace config, state, region |
| See all connections in a workspace |
| Check connection state, config, health |
| See all processors in a workspace |
| Check processor state, pipeline, config |
| Full health report: state, stats, errors |
| PrivateLink and VPC peering details. Optional: |
Pagination (all list actions): (1-100, default 20), (default 1).
Response format: — (default for list actions) or (default for inspect/diagnose).
limitpageNumresponseFormat"concise""detailed"| 操作 | 使用场景 |
|---|---|
| 查看项目中的所有工作区 |
| 查看工作区配置、状态、区域信息 |
| 查看工作区中的所有连接 |
| 检查连接状态、配置、健康状况 |
| 查看工作区中的所有处理器 |
| 检查处理器状态、管道、配置 |
| 完整健康报告:状态、统计数据、错误信息 |
| PrivateLink和VPC对等连接详情。可选参数: |
分页(所有列表操作):(取值1-100,默认20)、(默认1)。
响应格式: — (列表操作默认格式)或(检查/诊断操作默认格式)。
limitpageNumresponseFormat"concise""detailed"atlas-streams-build — ALL create operations
atlas-streams-build — 所有创建操作
| Resource | Key parameters |
|---|---|
| |
| |
| |
| |
Field mapping — only fill fields for the selected resource type:
- resource = "workspace": Fill: ,
projectId,workspaceName,cloudProvider,region,tier. Leave empty: all connection and processor fields.includeSampleData - resource = "connection": Fill: ,
projectId,workspaceName,connectionName,connectionType. Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.)connectionConfig - resource = "processor": Fill: ,
projectId,workspaceName,processorName,pipeline(recommended),dlq(optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.)autoStart - resource = "privatelink": Fill: ,
projectId. Note: PrivateLink is project-level, not workspace-level.privateLinkConfigis not required — omit it. Leave empty: all connection and processor fields.workspaceName
| 资源 | 关键参数 |
|---|---|
| |
| |
| |
| |
字段映射 — 仅填写所选资源类型的字段:
- resource = "workspace": 填写:、
projectId、workspaceName、cloudProvider、region、tier。留空:所有连接和处理器字段。includeSampleData - resource = "connection": 填写:、
projectId、workspaceName、connectionName、connectionType。留空:所有工作区和处理器字段。(查看references/connection-configs.md获取特定类型的配置 schema。)connectionConfig - resource = "processor": 填写:、
projectId、workspaceName、processorName、pipeline(推荐配置)、dlq(可选)。留空:所有工作区和连接字段。(查看references/pipeline-patterns.md获取管道示例。)autoStart - resource = "privatelink": 填写:、
projectId。注意:PrivateLink是项目级配置,而非工作区级。无需填写privateLinkConfig— 直接省略。留空:所有连接和处理器字段。workspaceName
atlas-streams-manage — ALL update/state operations
atlas-streams-manage — 所有更新/状态操作
| Action | Notes |
|---|---|
| Begins billing. Optional |
| Stops billing. Retains state 45 days |
| Processor must be stopped first. Change pipeline, DLQ, or name |
| Change tier or region |
| Update config (networking is immutable — must delete and recreate) |
| VPC peering management |
Field mapping — always fill , , then by action:
projectIdworkspaceName- →
"start-processor". Optional:resourceName,tier,resumeFromCheckpoint(ISO 8601 timestamp to resume from a specific point)startAtOperationTime - →
"stop-processor"resourceName - →
"modify-processor". At least one of:resourceName,pipeline,dlqnewName - →
"update-workspace"ornewRegionnewTier - →
"update-connection",resourceName. Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate.connectionConfig - →
"accept-peering",peeringId,requesterAccountIdrequesterVpcId - →
"reject-peering"peeringId
State pre-checks:
- → errors if processor is already STARTED
start-processor - → no-ops if already STOPPED or CREATED (not an error)
stop-processor - → errors if processor is STARTED (must stop first)
modify-processor
Processor states: → (via start) → (via stop). Can also enter on runtime errors. Modify requires STOPPED or CREATED state.
CREATEDSTARTEDSTOPPEDFAILEDTeardown safety checks:
- Processor deletion → auto-stops before deleting (no need to stop manually first)
- Connection deletion → blocks if any running processor references it. Stop/delete referencing processors first.
- Workspace deletion → See detailed workflow below (lines 108-111).
| 操作 | 说明 |
|---|---|
| 开始计费。可选参数: |
| 停止计费。状态会保留45天 |
| 处理器必须先停止。可修改管道、DLQ或名称 |
| 修改层级或区域 |
| 更新配置(网络配置不可修改 — 必须删除后重新创建) |
| VPC对等连接管理 |
字段映射 — 始终填写、,然后根据操作填写:
projectIdworkspaceName- →
"start-processor"。可选参数:resourceName、tier、resumeFromCheckpoint(ISO 8601时间戳,用于从特定时间点恢复)startAtOperationTime - →
"stop-processor"resourceName - →
"modify-processor"。至少填写以下之一:resourceName、pipeline、dlqnewName - →
"update-workspace"或newRegionnewTier - →
"update-connection"、resourceName。例外:网络配置(如PrivateLink)创建后无法修改 — 需删除后重新创建。connectionConfig - →
"accept-peering"、peeringId、requesterAccountIdrequesterVpcId - →
"reject-peering"peeringId
状态预检查:
- → 如果处理器已处于STARTED状态则报错
start-processor - → 如果处理器已处于STOPPED或CREATED状态则无操作(不报错)
stop-processor - → 如果处理器处于STARTED状态则报错(必须先停止)
modify-processor
处理器状态: → (通过start操作)→ (通过stop操作)。运行时出错也可能进入状态。修改操作要求处理器处于STOPPED或CREATED状态。
CREATEDSTARTEDSTOPPEDFAILED销毁安全检查:
- 处理器删除 → 删除前自动停止(无需手动先停止)
- 连接删除 → 如果有运行中的处理器引用该连接则阻止删除。需先停止/删除引用该连接的处理器。
- 工作区删除 → 查看下文详细流程(第108-111行)。
atlas-streams-teardown — ALL delete operations
atlas-streams-teardown — 所有删除操作
| Resource | Safety behavior |
|---|---|
| Auto-stops before deleting |
| Blocks if referenced by running processor |
| Cascading delete of all connections and processors |
| Remove networking resources |
Field mapping — always fill , , then:
projectIdresource- →
resource: "workspace"workspaceName - or
resource: "connection"→"processor",workspaceNameresourceName - or
resource: "privatelink"→"peering"(the ID). These are project-level resources, not tied to a specific workspace.resourceName
Before deleting a workspace, inspect it first:
- →
atlas-streams-discover— get connection/processor countsinspect-workspace - Present to user: "Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?"
- Wait for confirmation before calling
atlas-streams-teardown
| 资源 | 安全行为 |
|---|---|
| 删除前自动停止 |
| 如果被运行中的处理器引用则阻止删除 |
| 级联删除所有连接和处理器 |
| 删除网络资源 |
字段映射 — 始终填写、,然后:
projectIdresource- →
resource: "workspace"workspaceName - 或
resource: "connection"→"processor"、workspaceNameresourceName - 或
resource: "privatelink"→"peering"(ID)。这些是项目级资源,不绑定特定工作区。resourceName
删除工作区前,先检查工作区:
- →
atlas-streams-discover— 获取连接/处理器数量inspect-workspace - 告知用户:"工作区X包含N个连接和M个处理器。删除将永久移除所有资源。是否继续?"
- 等待用户确认后再调用
atlas-streams-teardown
CRITICAL: Validate Before Creating Processors
重要提示:创建处理器前必须验证
You MUST call before composing any processor pipeline. This is not optional.
search-knowledge- Field validation: Query with the sink/source type, e.g. "Atlas Stream Processing $emit S3 fields" or "Atlas Stream Processing Kafka $source configuration". This catches errors like vs
prefixfor S3path.$emit - Pattern examples: Query with for working pipelines, e.g. "Atlas Stream Processing tumbling window example".
dataSources: [{"name": "devcenter"}]
Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with for the full pattern catalog.
example_processors/README.mdKey quickstarts:
| Quickstart | Pattern |
|---|---|
| Inline |
| Change stream → tumbling window → |
| Kafka source → tumbling window rollup → |
| Chained processors: rollup → archive to separate collection |
| Real-time Kafka topic monitoring (sinkless, like |
在编写任何处理器管道前,必须调用。这是强制要求。
search-knowledge- 字段验证: 根据接收器/源类型查询,例如"Atlas Stream Processing $emit S3 fields"或"Atlas Stream Processing Kafka $source configuration"。这可以捕获诸如S3 中
$emit与prefix混淆的错误。path - 模式示例: 使用查询可用管道,例如"Atlas Stream Processing tumbling window example"。
dataSources: [{"name": "devcenter"}]
构建非基础处理器时,还需从官方ASP示例仓库获取示例:**https://github.com/mongodb/ASP_example**(快速入门、示例处理器、Terraform示例)。先查看`example_processors/README.md`获取完整模式目录。
关键快速入门示例:
| 快速入门 | 模式 |
|---|---|
| 内联 |
| 变更流 → 滚动窗口 → |
| Kafka源 → 滚动窗口聚合 → |
| 链式处理器:聚合 → 归档到单独集合 |
| 实时Kafka主题监控(无接收器,类似 |
Pipeline Rules & Warnings
管道规则与警告
Invalid constructs — these are NOT valid in streaming pipelines:
- ,
$$NOW,$$ROOT— NOT available in stream processing. NEVER use these. Use the document's own timestamp field or$$CURRENTmetadata for event time instead of_stream_meta.$$NOW - HTTPS connections as — HTTPS is for
$sourceenrichment or sink only, NOT as a data source$https - Kafka without
$source— topic field is requiredtopic - Pipelines without a sink — terminal stage (,
$merge,$emit, or$httpsasync) required for deployed processors (sinkless only works via$externalFunction)sp.process() - Lambda as target — Lambda uses
$emit(mid-pipeline enrichment), not$externalFunction$emit - with
$validate— crashes processor; usevalidationAction: "error"instead"dlq"
Required fields by stage:
- (change stream): include
$sourceto get the full document contentfullDocument: "updateLookup" - (Kinesis): use
$source(NOTstreamorstreamName)topic - (Kinesis): MUST include
$emitpartitionKey - (S3): use
$emit(NOTpath)prefix - : must include
$https,connectionName,path,method,asonError: "dlq" - : must include
$externalFunction,connectionName,functionName,execution,asonError: "dlq" - : must include
$validatewithvalidatorand$jsonSchemavalidationAction: "dlq" - : include
$lookupsetting (e.g.,parallelism) for concurrent I/Oparallelism: 2 - AWS connections (S3, Kinesis, Lambda): IAM role ARN must be registered via Atlas Cloud Provider Access first. Always confirm this with user. See references/connection-configs.md for details.
See references/pipeline-patterns.md for stage field examples with JSON syntax.
SchemaRegistry connection: must be (not ). Schema type values are case-sensitive (use lowercase , not ). See references/connection-configs.md for required fields and auth types.
connectionType"SchemaRegistry""Kafka"avroAVRO无效结构 — 这些在流式管道中不合法:
- 、
$$NOW、$$ROOT— 流处理中不可用。绝对不要使用。使用文档自身的时间戳字段或$$CURRENT元数据作为事件时间,替代_stream_meta。$$NOW - HTTPS连接作为— HTTPS仅用于
$source数据增强或接收器,不能作为数据源$https - Kafka 缺少
$source— 必须填写topic字段topic - 无接收器的管道 — 部署的处理器必须包含终端阶段(、
$merge、$emit或异步$https)(无接收器仅适用于$externalFunction)sp.process() - Lambda作为目标 — Lambda使用
$emit(管道中间的数据增强),而非$externalFunction$emit - 搭配
$validate— 会导致处理器崩溃;请使用validationAction: "error"替代"dlq"
各阶段必填字段:
- (变更流):需包含
$source以获取完整文档内容fullDocument: "updateLookup" - (Kinesis):使用
$source(而非stream或streamName)topic - (Kinesis):必须包含
$emitpartitionKey - (S3):使用
$emit(而非path)prefix - :必须包含
$https、connectionName、path、method、asonError: "dlq" - :必须包含
$externalFunction、connectionName、functionName、execution、asonError: "dlq" - :必须包含带有
$validate的$jsonSchema和validatorvalidationAction: "dlq" - :需包含
$lookup设置(例如parallelism)以支持并发I/Oparallelism: 2 - AWS连接(S3、Kinesis、Lambda):IAM角色ARN必须先通过Atlas云提供商访问注册。请务必与用户确认这一点。查看references/connection-configs.md获取详情。
查看references/pipeline-patterns.md获取带JSON语法的阶段字段示例。
SchemaRegistry连接: 必须为(而非)。Schema类型值区分大小写(使用小写,而非)。查看references/connection-configs.md获取必填字段和认证类型。
connectionType"SchemaRegistry""Kafka"avroAVROMCP Tool Behaviors
MCP工具行为
Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.
Auto-normalization:
- array → auto-converted to comma-separated string
bootstrapServers - string → auto-wrapped in array
schemaRegistryUrls - → defaults to
dbRoleToExecutefor Cluster connections{role: "readWriteAnyDatabase", type: "BUILT_IN"}
Workspace creation: defaults to , which auto-creates the connection.
includeSampleDatatruesample_stream_solarRegion naming: The field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic error.
regiondataProcessRegion| Provider | Cloud Region | Streams |
|---|---|---|
| AWS | us-east-1 | |
| AWS | us-east-2 | |
| AWS | eu-west-1 | |
| GCP | us-central1 | |
| GCP | europe-west1 | |
| Azure | eastus | |
| Azure | westeurope | |
See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with → and check .
atlas-streams-discoverinspect-workspacedataProcessRegion.region信息收集: 创建连接时,构建工具会通过MCP自动收集缺失的敏感字段(密码、引导服务器)。请勿直接向用户索要这些信息 — 让工具自动收集。
自动标准化:
- 数组 → 自动转换为逗号分隔的字符串
bootstrapServers - 字符串 → 自动包装为数组
schemaRegistryUrls - → 对于Cluster连接,默认值为
dbRoleToExecute{role: "readWriteAnyDatabase", type: "BUILT_IN"}
工作区创建: 默认值为,会自动创建连接。
includeSampleDatatruesample_stream_solar区域命名: 字段使用Atlas特定的名称,不同云提供商的名称不同。使用错误格式会返回模糊的错误。
regiondataProcessRegion| 提供商 | 云区域 | Streams |
|---|---|---|
| AWS | us-east-1 | |
| AWS | us-east-2 | |
| AWS | eu-west-1 | |
| GCP | us-central1 | |
| GCP | europe-west1 | |
| Azure | eastus | |
| Azure | westeurope | |
查看references/connection-configs.md获取完整的区域映射表。如果不确定,使用 → 检查现有工作区,查看字段。
atlas-streams-discoverinspect-workspacedataProcessRegion.regionConnection Capabilities — Source/Sink Reference
连接能力 — 源/接收器参考
Know what each connection type can do before creating pipelines:
| Connection Type | As Source ($source) | As Sink ($merge / $emit) | Mid-Pipeline | Notes |
|---|---|---|---|---|
| Cluster | ✅ Change streams | ✅ $merge to collections | ✅ $lookup | Change streams monitor insert/update/delete/replace operations |
| Kafka | ✅ Topic consumer | ✅ $emit to topics | ❌ | Source MUST include |
| Sample Stream | ✅ Sample data | ❌ Not valid | ❌ | Testing/demo only |
| S3 | ❌ Not valid | ✅ $emit to buckets | ❌ | Sink only - use |
| Https | ❌ Not valid | ✅ $https as sink | ✅ $https enrichment | Can be used mid-pipeline for enrichment OR as final sink stage |
| AWSLambda | ❌ Not valid | ✅ $externalFunction (async only) | ✅ $externalFunction (sync or async) | Sink: |
| AWS Kinesis | ✅ Stream consumer | ✅ $emit to streams | ❌ | Similar to Kafka pattern |
| SchemaRegistry | ❌ Not valid | ❌ Not valid | ✅ Schema resolution | Metadata only - used by Kafka connections for Avro schemas |
Common connection usage mistakes to avoid:
- ❌ Using as sink with
$externalFunction→ Must useexecution: "sync"for sink stageexecution: "async" - ❌ Forgetting change streams exist → Atlas Cluster is a powerful source, not just a sink
- ❌ Using with Kafka → Use
$mergefor Kafka sinks$emit
See references/connection-configs.md for detailed connection configuration schemas by type.
创建管道前,请了解每种连接类型的功能:
| 连接类型 | 作为源($source) | 作为接收器($merge / $emit) | 管道中间阶段 | 说明 |
|---|---|---|---|---|
| Cluster | ✅ 变更流 | ✅ $merge到集合 | ✅ $lookup | 变更流监控插入/更新/删除/替换操作 |
| Kafka | ✅ 主题消费者 | ✅ $emit到主题 | ❌ | 源必须包含 |
| Sample Stream | ✅ 示例数据 | ❌ 不支持 | ❌ | 仅用于测试/演示 |
| S3 | ❌ 不支持 | ✅ $emit到存储桶 | ❌ | 仅作为接收器 - 使用 |
| Https | ❌ 不支持 | ✅ $https作为接收器 | ✅ $https数据增强 | 可用于管道中间的数据增强,或作为最终接收器阶段 |
| AWSLambda | ❌ 不支持 | ✅ $externalFunction(仅异步) | ✅ $externalFunction(同步或异步) | 接收器: 必须设置 |
| AWS Kinesis | ✅ 流消费者 | ✅ $emit到流 | ❌ | 模式与Kafka类似 |
| SchemaRegistry | ❌ 不支持 | ❌ 不支持 | ✅ Schema解析 | 仅元数据 - 供Kafka连接用于Avro schema |
常见连接使用错误:
- ❌ 使用作为接收器时设置
$externalFunction→ 接收器阶段必须使用execution: "sync"execution: "async" - ❌ 忽略变更流的存在 → Atlas Cluster是强大的数据源,而非仅作为接收器
- ❌ 对Kafka使用→ Kafka接收器请使用
$merge$emit
查看references/connection-configs.md获取各类型连接的详细配置schema。
Core Workflows
核心工作流
Setup from scratch
从零开始搭建
- →
atlas-streams-discover(check existing)list-workspaces - →
atlas-streams-build(region near data, SP10 for dev)resource: "workspace" - →
atlas-streams-build(for each source/sink/enrichment)resource: "connection" - Validate connections: →
atlas-streams-discover+list-connectionsfor each — verify names match targets, present summary to userinspect-connection - Call to validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_example
search-knowledge - →
atlas-streams-build(with DLQ configured)resource: "processor" - →
atlas-streams-manage(warn about billing)start-processor
- →
atlas-streams-discover(检查现有工作区)list-workspaces - →
atlas-streams-build(选择靠近数据的区域,开发环境使用SP10层级)resource: "workspace" - →
atlas-streams-build(为每个源/接收器/数据增强创建连接)resource: "connection" - 验证连接: →
atlas-streams-discover+list-connections检查每个连接 — 验证名称与目标匹配,并向用户展示汇总信息inspect-connection - 调用验证字段名称。从https://github.com/mongodb/ASP_example获取相关示例
search-knowledge - →
atlas-streams-build(配置DLQ)resource: "processor" - →
atlas-streams-manage(提醒用户计费规则)start-processor
Workflow Patterns
工作流模式
Incremental pipeline development (recommended):
See references/development-workflow.md for the full 5-phase lifecycle.
- Start with basic →
$sourcepipeline (validate connectivity)$merge - Add stages (validate filtering)
$match - Add /
$addFieldstransforms (validate reshaping)$project - Add windowing or enrichment (validate aggregation logic)
- Add error handling / DLQ configuration
Modify a processor pipeline:
- →
atlas-streams-manage— processor MUST be stopped firstaction: "stop-processor" - →
atlas-streams-manage— provide new pipelineaction: "modify-processor" - →
atlas-streams-manage— restartaction: "start-processor"
Debug a failing processor:
- →
atlas-streams-discover— one-shot health report. Always call this first.diagnose-processor - Commit to a specific root cause. Match symptoms to diagnostic patterns:
- Error 419 + "no partitions found" → Kafka topic doesn't exist or is misspelled
- State: FAILED + multiple restarts → connection-level error (bypasses DLQ), check connection config
- State: STARTED + zero output + windowed pipeline → likely idle Kafka partitions blocking window closure; add to Kafka
partitionIdleTimeout(e.g.,$source){"size": 30, "unit": "second"} - State: STARTED + zero output + non-windowed → check if source has data; inspect Kafka offset lag
- High memoryUsageBytes approaching tier limit → OOM risk; recommend higher tier
- DLQ count increasing → per-document errors; use MongoDB on DLQ collection See references/output-diagnostics.md for the full pattern table.
find
- Classify processor type before interpreting output volume (alert vs transformation vs filter).
- Provide concrete, ordered fix steps specific to the diagnosed root cause. Do NOT present a list of hypothetical scenarios.
- If detailed logs are needed, direct the user to the Atlas UI: Atlas → Stream Processing → Workspace → Processor → Logs tab.
增量式管道开发(推荐):
查看references/development-workflow.md获取完整的5阶段生命周期。
- 从基础的→
$source管道开始(验证连通性)$merge - 添加阶段(验证过滤逻辑)
$match - 添加/
$addFields转换阶段(验证数据重塑)$project - 添加窗口或数据增强阶段(验证聚合逻辑)
- 添加错误处理/DLQ配置
修改处理器管道:
- →
atlas-streams-manage— 处理器必须先停止action: "stop-processor" - →
atlas-streams-manage— 提供新管道action: "modify-processor" - →
atlas-streams-manage— 重启处理器action: "start-processor"
调试故障处理器:
- →
atlas-streams-discover— 一键生成健康报告。请始终先执行此操作。diagnose-processor - 确定具体根本原因。将症状与诊断模式匹配:
- 错误419 + "no partitions found" → Kafka主题不存在或拼写错误
- 状态:FAILED + 多次重启 → 连接级错误(绕过DLQ),检查连接配置
- 状态:STARTED + 无输出 + 窗口化管道 → 可能是空闲的Kafka分区阻止窗口关闭;在Kafka 中添加
$source(例如partitionIdleTimeout){"size": 30, "unit": "second"} - 状态:STARTED + 无输出 + 非窗口化管道 → 检查源是否有数据;查看Kafka偏移量延迟
- memoryUsageBytes接近层级限制 → 存在内存不足风险;建议升级层级
- DLQ计数增加 → 单文档错误;使用MongoDB 查询DLQ集合 查看references/output-diagnostics.md获取完整的模式表。
find
- 解释输出量之前,先对处理器类型进行分类(告警型、转换型、过滤型)。
- 根据诊断出的根本原因,提供具体、有序的修复步骤。请勿提供假设性场景列表。
- 如果需要详细日志,请引导用户前往Atlas UI:Atlas → Stream Processing → Workspace → Processor → Logs标签页。
Chained processors (multi-sink pattern)
链式处理器(多接收器模式)
CRITICAL: A single pipeline can only have ONE terminal sink ( or ). When users request multiple output destinations (e.g., "write to Atlas AND emit to Kafka"), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.
$merge$emit重要提示:单个管道只能有一个终端接收器(或)。当用户要求多个输出目标(例如“写入Atlas同时发送到Kafka”)时,必须告知用户单接收器限制,并建议使用中间目标构建链式处理器。查看references/pipeline-patterns.md获取完整模式和示例。
$merge$emitPre-Deploy & Post-Deploy Checklists
部署前与部署后检查清单
See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.
查看references/development-workflow.md获取完整的部署前质量检查清单(连接验证、管道验证)和部署后验证工作流。
Tier Sizing & Performance
层级规模与性能
See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.
查看references/sizing-and-parallelism.md获取层级规格、并行度公式、复杂度评分和性能优化策略。
Troubleshooting
故障排除
See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.
查看references/development-workflow.md获取完整的故障排除表,涵盖处理器故障、API错误、配置问题和性能问题。
Billing & Cost
计费与成本
Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.
- Charges are per-hour, calculated per-second, only while the processor is running
- stops billing; stopped processors retain state for 45 days at no charge
stop-processor - For prototyping without billing: Use in mongosh — runs pipelines ephemerally without deploying a processor
sp.process() - See for tier pricing and cost optimization strategies
references/sizing-and-parallelism.md
Atlas Stream Processing无免费层级。所有已部署的处理器运行时会持续产生费用。
- 按小时计费,按秒结算,仅在处理器运行时收费
- 会停止计费;已停止的处理器会免费保留状态45天
stop-processor - 无计费原型开发: 在mongosh中使用— 临时运行管道,无需部署处理器
sp.process() - 查看获取层级定价和成本优化策略
references/sizing-and-parallelism.md
Safety Rules
安全规则
- and
atlas-streams-teardownrequire user confirmation — do not bypassatlas-streams-manage - BEFORE calling for a workspace, you MUST first inspect the workspace with
atlas-streams-teardownto count connections and processors, then present this information to the user before requesting confirmationatlas-streams-discover - BEFORE creating any processor, you MUST validate all connections per the "Pre-Deployment Validation" section in references/development-workflow.md
- Deleting a workspace removes ALL connections and processors permanently
- After stopping a processor, state is preserved 45 days — then checkpoints are discarded
- drops all window state — warn user first
resumeFromCheckpoint: false - Moving processors between workspaces is not supported (must recreate)
- Dry-run / simulation is not supported — explain what you would do and ask for confirmation
- Always warn users about billing before starting processors
- Store API authentication credentials in connection settings, never hardcode in processor pipelines
- 和
atlas-streams-teardown需要用户确认 — 请勿跳过atlas-streams-manage - 调用删除工作区前,必须先使用
atlas-streams-teardown检查工作区,统计连接和处理器数量,然后将此信息告知用户并请求确认atlas-streams-discover - 创建任何处理器前,必须根据references/development-workflow.md中的“部署前验证”部分验证所有连接
- 删除工作区会永久移除所有连接和处理器
- 停止处理器后,状态会保留45天 — 之后检查点会被丢弃
- 会丢弃所有窗口状态 — 请先提醒用户
resumeFromCheckpoint: false - 不支持在工作区之间迁移处理器(必须重新创建)
- 不支持试运行/模拟 — 请说明要执行的操作并请求用户确认
- 启动处理器前,务必提醒用户计费规则
- 将API认证凭证存储在连接设置中,切勿硬编码到处理器管道中
Reference Files
参考文件
| File | Read when... |
|---|---|
| Building or modifying processor pipelines |
| Creating connections (type-specific schemas) |
| Following lifecycle management or debugging decision trees |
| Processor output is unexpected (zero, low, or wrong) |
| Choosing tiers, tuning parallelism, or optimizing cost |
| 文件 | 适用场景 |
|---|---|
| 构建或修改处理器管道时 |
| 创建连接时(特定类型的schema) |
| 遵循生命周期管理或调试决策树时 |
| 处理器输出异常(无输出、输出量低或输出错误)时 |
| 选择层级、调整并行度或优化成本时 |