ingestion-pipeline-doctor-nodejs
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePipeline Doctor
Pipeline Doctor
Quick reference for PostHog's ingestion pipeline framework and its convention-checking agents.
PostHog摄入管道框架及其规范检查agents的快速参考指南。
Architecture overview
架构概述
The ingestion pipeline processes events through a typed, composable step chain:
text
Kafka message
→ messageAware()
→ parse headers/body
→ sequentially() for preprocessing
→ filterMap() to enrich context (e.g., team lookup)
→ teamAware()
→ groupBy(token:distinctId)
→ concurrently() for per-entity processing
→ gather()
→ pipeBatch() for batch operations
→ handleIngestionWarnings()
→ handleResults()
→ handleSideEffects()
→ build()See for the real implementation.
nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts摄入管道通过一个类型化、可组合的步骤链处理事件:
text
Kafka message
→ messageAware()
→ parse headers/body
→ sequentially() for preprocessing
→ filterMap() to enrich context (e.g., team lookup)
→ teamAware()
→ groupBy(token:distinctId)
→ concurrently() for per-entity processing
→ gather()
→ pipeBatch() for batch operations
→ handleIngestionWarnings()
→ handleResults()
→ handleSideEffects()
→ build()查看 获取实际实现代码。
nodejs/src/ingestion/analytics/joined-ingestion-pipeline.tsKey file locations
关键文件位置
| What | Where |
|---|---|
| Step type | |
| Result types | |
| Doc-test chapters | |
| Joined pipeline | |
| Doctor agents | |
| Test helpers | |
| 内容 | 位置 |
|---|---|
| 步骤类型 | |
| 结果类型 | |
| 文档测试章节 | |
| 联合管道 | |
| Doctor agents | |
| 测试辅助工具 | |
Which agent to use
选择合适的Agent
| Concern | Agent | When to use |
|---|---|---|
| Step structure | | Factory pattern, type extension, config injection, naming |
| Result handling | | ok/dlq/drop/redirect, side effects, ingestion warnings |
| Composition | | Builder chain, concurrency, grouping, branching, retries |
| Testing | | Test helpers, assertions, fake timers, doc-test style |
| 关注场景 | Agent | 适用场景 |
|---|---|---|
| 步骤结构 | | 工厂模式、类型扩展、配置注入、命名规范 |
| 结果处理 | | ok/dlq/drop/redirect、副作用处理、摄入警告 |
| 组合方式 | | 构建器链、并发处理、分组、分支、重试 |
| 测试 | | 测试辅助工具、断言、模拟定时器、文档测试风格 |
Quick convention reference
快速规范参考
Steps: Factory function returning a named inner function. Generic for type extension. No . Config via closure.
<T extends Input>anyResults: Use , , , constructors. Side effects as promises in . Warnings as third parameter.
ok()dlq()drop()redirect()ok(value, [effects])Composition: wraps the pipeline. inside . after. + for per-entity work. before batch steps.
messageAwarehandleResultsmessageAwarehandleSideEffectsgroupByconcurrentlygatherTesting: Step tests call factory directly. Use / helpers. Fake timers for async. Type guards for result assertions. No .
consumeAll()collectBatches()any步骤:工厂函数返回一个命名内部函数。使用泛型实现类型扩展。禁止使用类型。通过闭包注入配置。
<T extends Input>any结果:使用、、、构造函数。副作用以Promise形式传入。警告作为第三个参数传入。
ok()dlq()drop()redirect()ok(value, [effects])组合:包裹整个管道。置于内部。置于其后。使用 + 处理每个实体的任务。批量步骤前使用。
messageAwarehandleResultsmessageAwarehandleSideEffectsgroupByconcurrentlygather测试:步骤测试直接调用工厂函数。使用/辅助工具。为异步操作使用模拟定时器。使用类型守卫进行结果断言。禁止使用类型。
consumeAll()collectBatches()anyRunning all doctors
运行所有诊断工具
Ask Claude to "run all pipeline doctors on my recent changes" to get a comprehensive review across all 4 concern areas.
让Claude执行“对我最近的变更运行所有pipeline doctors”,即可获取涵盖全部4个关注领域的全面评审。