ingestion-pipeline-doctor-nodejs
Original:🇺🇸 English
Translated
Ingestion pipeline architecture overview and convention reference. Use when you need a quick orientation to the pipeline framework or want to know which doctor agent to use for a specific concern.
2installs
Sourceposthog/posthog
Added on
NPX Install
npx skill4agent add posthog/posthog ingestion-pipeline-doctor-nodejsTags
Translated version includes tags in frontmatterSKILL.md Content
View Translation Comparison →Pipeline Doctor
Quick reference for PostHog's ingestion pipeline framework and its convention-checking agents.
Architecture overview
The ingestion pipeline processes events through a typed, composable step chain:
text
Kafka message
→ messageAware()
→ parse headers/body
→ sequentially() for preprocessing
→ filterMap() to enrich context (e.g., team lookup)
→ teamAware()
→ groupBy(token:distinctId)
→ concurrently() for per-entity processing
→ gather()
→ pipeBatch() for batch operations
→ handleIngestionWarnings()
→ handleResults()
→ handleSideEffects()
→ build()See for the real implementation.
nodejs/src/ingestion/analytics/joined-ingestion-pipeline.tsKey file locations
| What | Where |
|---|---|
| Step type | |
| Result types | |
| Doc-test chapters | |
| Joined pipeline | |
| Doctor agents | |
| Test helpers | |
Which agent to use
| Concern | Agent | When to use |
|---|---|---|
| Step structure | | Factory pattern, type extension, config injection, naming |
| Result handling | | ok/dlq/drop/redirect, side effects, ingestion warnings |
| Composition | | Builder chain, concurrency, grouping, branching, retries |
| Testing | | Test helpers, assertions, fake timers, doc-test style |
Quick convention reference
Steps: Factory function returning a named inner function. Generic for type extension. No . Config via closure.
<T extends Input>anyResults: Use , , , constructors. Side effects as promises in . Warnings as third parameter.
ok()dlq()drop()redirect()ok(value, [effects])Composition: wraps the pipeline. inside . after. + for per-entity work. before batch steps.
messageAwarehandleResultsmessageAwarehandleSideEffectsgroupByconcurrentlygatherTesting: Step tests call factory directly. Use / helpers. Fake timers for async. Type guards for result assertions. No .
consumeAll()collectBatches()anyRunning all doctors
Ask Claude to "run all pipeline doctors on my recent changes" to get a comprehensive review across all 4 concern areas.