implementing-warehouse-sources
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseImplementing Data warehouse sources
实现数据仓库源
Use this skill when building or updating Data warehouse sources in .
posthog/temporal/data_imports/sources/本技能适用于在中构建或更新数据仓库源的场景。
posthog/temporal/data_imports/sources/Read first
前置阅读
Before coding, read:
posthog/temporal/data_imports/sources/source.templateposthog/temporal/data_imports/sources/README.md- 1 API source with + transport logic (e.g. klaviyo, github). For dependent-resource fan-out (parent→child with
settings.py), also readtype: "resolve"andposthog/temporal/data_imports/sources/common/rest_source/__init__.py(e.g.config_setup.py,process_parent_data_item).make_parent_key_name
编码前,请阅读:
posthog/temporal/data_imports/sources/source.templateposthog/temporal/data_imports/sources/README.md- 一个包含和传输逻辑的API源(例如klaviyo、github)。对于依赖资源的扇出(父→子,使用
settings.py),还需阅读type: "resolve"和posthog/temporal/data_imports/sources/common/rest_source/__init__.py(例如config_setup.py、process_parent_data_item)。make_parent_key_name
Source architecture contract
源架构约定
For API-backed sources, use this split:
- : source registration, source form fields, schema list, credential validation, and pipeline handoff.
source.py - : endpoint catalog, incremental fields, primary key and partition defaults.
settings.py - : API client/auth, paginator, request params, row normalization, and
{source}.py.SourceResponse
This keeps endpoint behavior declarative and easy to extend.
For REST sources that mix top-level and fan-out endpoints, keep endpoint metadata in and route in with this priority:
settings.py{source}.py- endpoint-specific custom iterators (only when required),
- generic fan-out helper path,
- top-level endpoint path.
对于基于API的源,采用以下拆分方式:
- :源注册、源表单字段、 schema列表、凭证验证和流水线交接。
source.py - :端点目录、增量字段、主键和分区默认值。
settings.py - :API客户端/认证、分页器、请求参数、行规范化和
{source}.py。SourceResponse
这种拆分可使端点行为保持声明式,便于扩展。
对于混合顶级端点和扇出端点的REST源,将端点元数据保留在中,并按以下优先级在中路由:
settings.py{source}.py- 端点特定的自定义迭代器(仅在必要时使用),
- 通用扇出辅助路径,
- 顶级端点路径。
Implementation checklist
实现检查清单
Copy this and track progress:
text
Source implementation:
- [ ] Define source fields in `get_source_config`
- [ ] Implement credential validation
- [ ] Define schemas in `get_schemas`
- [ ] Add/confirm endpoint settings (`settings.py`)
- [ ] Implement transport and paginator (`{source}.py`)
- [ ] Return correct `SourceResponse` (keys, partitioning, sort mode)
- [ ] Add non-retryable auth/permission errors
- [ ] Add source tests
- [ ] Add transport tests
- [ ] Add icon in `frontend/public/services/`
- [ ] Run `pnpm run generate:source-configs`
- [ ] Run `pnpm run schema:build`
- [ ] For Beta: set `betaSource=True` in `SourceConfig`; omit `unreleasedSource` (or set `False`) when releasing.复制以下内容并跟踪进度:
text
源实现:
- [ ] 在`get_source_config`中定义源字段
- [ ] 实现凭证验证
- [ ] 在`get_schemas`中定义schemas
- [ ] 添加/确认端点设置(`settings.py`)
- [ ] 实现传输和分页器(`{source}.py`)
- [ ] 返回正确的`SourceResponse`(键、分区、排序模式)
- [ ] 添加不可重试的认证/权限错误处理
- [ ] 添加源测试
- [ ] 添加传输测试
- [ ] 在`frontend/public/services/`中添加图标
- [ ] 运行`pnpm run generate:source-configs`
- [ ] 运行`pnpm run schema:build`
- [ ] 若为Beta版本:在`SourceConfig`中设置`betaSource=True`;正式发布时省略`unreleasedSource`(或设为`False`)。Required coding conventions
必选编码规范
- Register with .
@SourceRegistry.register - Source class should inherit unless resumable/webhook behavior is required.
SimpleSource[GeneratedConfig] - API sources should usually return in endpoint resources.
table_format="delta" - Use for incremental merge safety; they are endpoint-specific (declare in
primary_keys, not alwayssettings.py).id - Add partitioning for new sources where possible:
- API sources: with stable datetime field when available.
partition_mode="datetime"
- API sources:
- Add for known permanent failures (401/403/invalid credentials).
get_non_retryable_errors() - Keep comments minimal and only when intent is not obvious.
- 使用进行注册。
@SourceRegistry.register - 源类应继承,除非需要可恢复/ webhook行为。
SimpleSource[GeneratedConfig] - API源通常应在端点资源中返回。
table_format="delta" - 使用确保增量合并安全;它们是端点特定的(在
primary_keys中声明,不一定是settings.py)。id - 尽可能为新源添加分区:
- API源:当有稳定的日期时间字段时,使用。
partition_mode="datetime"
- API源:当有稳定的日期时间字段时,使用
- 为已知的永久失败(401/403/无效凭证)添加。
get_non_retryable_errors() - 尽量减少注释,仅在意图不明确时添加。
Incremental sync guidance
增量同步指南
- If API supports server-side time filtering, add it and map from .
db_incremental_field_last_value - If API only supports cursor pagination, still declare incremental fields if reliable and let merge semantics dedupe.
- Set only if the endpoint truly returns descending order and cannot return ascending.
sort_mode="desc" - For descending sources, make sure behavior with is considered.
db_incremental_field_earliest_value - Default unknown endpoints to full refresh first; only enable incremental after confirming a stable filter field and API semantics.
- Prefer immutable partition keys (,
created_at,dateCreated) over mutable fields (firstSeen,updated_at) when both exist.lastSeen - Confirm partition keys against response schemas, not assumptions from endpoint names.
- 如果API支持服务器端时间过滤,添加该功能并映射自。
db_incremental_field_last_value - 如果API仅支持游标分页,若字段可靠仍需声明增量字段,让合并语义处理去重。
- 仅当端点确实返回降序且无法返回升序时,设置。
sort_mode="desc" - 对于降序源,需考虑的行为。
db_incremental_field_earliest_value - 未知端点默认先执行全量刷新;仅在确认有稳定的过滤字段和API语义后,再启用增量同步。
- 当同时存在可变字段(、
updated_at)和不可变分区键(lastSeen、created_at、dateCreated)时,优先选择不可变分区键。firstSeen - 根据响应schema确认分区键,不要根据端点名称假设。
API behavior verification checklist
API行为验证清单
Before finalizing endpoint logic, verify these from docs (or reliable API examples):
- Response shape: list vs object vs wrapped data ().
{"data": [...]} - Pagination contract: Link header vs body cursor vs offset/page; next-page termination signal.
- Ordering guarantees: ascending/descending/undefined for key time fields.
- Rate limit headers and semantics (window reset timestamp, concurrent limits).
- Field stability: whether candidate incremental/partition fields can change over time.
If behavior is not documented, keep parsing/merge logic conservative and add a code comment documenting the uncertainty.
在确定端点逻辑之前,根据文档(或可靠的API示例)验证以下内容:
- 响应结构:列表 vs 对象 vs 包装数据()。
{"data": [...]} - 分页约定:Link头 vs 主体游标 vs 偏移量/页码;下一页终止信号。
- 排序保证:关键时间字段的升序/降序/未定义。
- 速率限制头和语义(窗口重置时间戳、并发限制)。
- 字段稳定性:候选增量/分区字段是否会随时间变化。
如果行为未被记录,保持解析/合并逻辑保守,并添加代码注释记录不确定性。
Endpoint inventory workflow
端点清单工作流
- Build an endpoint inventory before expanding coverage:
- endpoint path and auth scopes,
- grain (org/project/child fan-out),
- pagination style,
- primary key shape (single/composite),
- incremental candidate fields.
- Keep the inventory in source-local docs (for example ) so future endpoint additions stay consistent.
posthog/temporal/data_imports/sources/<source>/api_inventory.md - Add endpoints in phases:
- org-level list endpoints first,
- then project-level fan-out,
- then child/fan-out endpoints with bounded pagination.
- 在扩展覆盖范围之前,构建端点清单:
- 端点路径和认证范围,
- 粒度(组织/项目/子扇出),
- 分页样式,
- 主键结构(单键/复合键),
- 候选增量字段。
- 将清单保存在源本地文档中(例如),以便未来添加端点时保持一致性。
posthog/temporal/data_imports/sources/<source>/api_inventory.md - 分阶段添加端点:
- 先添加组织级列表端点,
- 然后添加项目级扇出端点,
- 最后添加带有限制分页的子/扇出端点。
Top-level endpoints (org/account level)
顶级端点(组织/账户级)
Top-level endpoints are list/read endpoints that do not require parent-row expansion.
- Declare endpoint metadata in (
settings.py,path,primary_key,incremental_fields,partition_key).sort_mode - Build them through a single resource config (style helper) and keep transport branches minimal.
get_resource(...) - Keep endpoint params declarative and stable (, required filters).
limit - Use merge write disposition only when incremental semantics are reliable; otherwise full replace is safer.
顶级端点是不需要父行扩展的列表/读取端点。
- 在中声明端点元数据(
settings.py、path、primary_key、incremental_fields、partition_key)。sort_mode - 通过单个资源配置(风格的辅助函数)构建,并尽量减少传输分支。
get_resource(...) - 保持端点参数声明式且稳定(、必填过滤器)。
limit - 仅当增量语义可靠时使用合并写入方式;否则全量替换更安全。
Pagination tips
分页技巧
- Some APIs use cursor pagination in headers — check both
Linkand any results flag the API may use.rel="next" - When following a full cursor URL from response headers, clear request params in paginator to avoid duplicate query params.
update_request - For parent/child fan-out, keep hard page caps per parent resource to avoid unbounded scans.
- Emit structured logs when page caps are reached (include resource name and parent identifiers) so operators can tune limits safely.
- 某些API在头中使用游标分页 —— 检查
Link和API可能使用的任何结果标志。rel="next" - 当从响应头中跟随完整的游标URL时,在分页器中清除请求参数,避免重复的查询参数。
update_request - 对于父/子扇出,为每个父资源设置硬页面上限,避免无限制扫描。
- 当达到页面上限时输出结构化日志(包含资源名称和父标识符),以便运维人员安全调整限制。
Retry and throttling strategy
重试和限流策略
- Use a retry framework (for example tenacity) instead of manual retry loops where possible.
- Retry transport failures and retryable status codes (, transient
429).5xx - Prefer server-provided rate-limit reset headers for wait calculation on ; fall back to exponential backoff when unavailable.
429 - Keep retries bounded and deterministic (), and preserve clear terminal behavior:
stop_after_attempt- return final response for retried status responses when useful for downstream handling, or
- raise final exception for transport failures.
- Keep timeout and retry settings near the top of the module for easy operator tuning.
- 尽可能使用重试框架(例如tenacity),而非手动重试循环。
- 重试传输失败和可重试状态码(、临时
429)。5xx - 在状态下,优先使用服务器提供的速率限制重置头计算等待时间;不可用时回退到指数退避。
429 - 保持重试有界且确定(),并保留清晰的终端行为:
stop_after_attempt- 若对下游处理有用,返回重试状态响应的最终结果,或
- 对传输失败抛出最终异常。
- 将超时和重试设置放在模块顶部,便于运维人员调整。
Fan-out endpoints
扇出端点
Fan-out means iterating a parent resource (for example projects) and then querying child endpoints per parent (for example project issues).
Prefer dependent resources when you have a single parent→child. Use with a parent resource and a child that declares for the parent field (e.g. parent slug or id). The shared infra (, ) paginates the parent and calls the child per parent row. Add so child rows get parent fields; they are injected as via .
rest_api_resourcestype: "resolve"rest_source/__init__.pyconfig_setup.process_parent_data_iteminclude_from_parent_<parent>_<field>make_parent_key_nameMake fan-out declarative in endpoint config. Add a fan-out config object in (for example ) with:
settings.pyDependentEndpointConfigparent_nameresolve_paramresolve_fieldinclude_from_parent- optional parent field renames (e.g. )
id -> project_id - optional parent endpoint params (for parent-specific defaults)
Then route all single-hop fan-out endpoints through a shared helper (for example ) so callers do not reimplement parent/child config assembly.
common/rest_source/fanout.py:build_dependent_resourceParent field rename mapping belongs in the helper. If a helper supports declarative renames, apply the map there. Callers should not branch on whether renames exist.
Use per-endpoint pagination/selectors through fan-out helper overrides. supports optional endpoint overrides so you can keep single-hop fan-out declarative even when parent and child have different response shapes/pagination contracts:
build_dependent_resource- and
parent_endpoint_extra: pass endpoint-levelchild_endpoint_extraandpaginator(for wrapped payloads likedata_selector).{"items": [...]} - : override default page-size query param (
page_size_param) for APIs that use a different name (for examplelimit).page_size
This means you can often avoid custom iterators for single-hop fan-out even when parent and child paginate differently (e.g. Typeform forms page-number + responses cursor token).
Path pre-formatting: Child paths often have multiple placeholders (e.g. org and resource slug). only does with the resolved param. Pre-format any static placeholders with on the child path before passing to the resource config, so only the resolved placeholder remains and DLT does not raise .
process_parent_data_itemstr.format().replace()KeyErrorWhen to keep a custom iterator: If fan-out requires two or more levels (e.g. parent → mid-level list → detail per mid-level), where an intermediate API call discovers values that become part of the URL, that cannot be expressed as a single parent→child in . Implement a custom HTTP iterator for that endpoint only; reuse the same pagination/retry helpers as elsewhere.
rest_api_resources扇出是指遍历父资源(例如项目),然后为每个父资源查询子端点(例如项目问题)。
当只有单个父→子关系时,优先使用依赖资源。 使用,包含一个父资源和一个为父字段声明的子资源(例如父slug或id)。共享基础设施(、)会分页父资源,并为每个父行调用子资源。添加,使子行获取父字段;它们会通过被注入为。
rest_api_resourcestype: "resolve"rest_source/__init__.pyconfig_setup.process_parent_data_iteminclude_from_parentmake_parent_key_name_<parent>_<field>在端点配置中声明式定义扇出。 在中添加扇出配置对象(例如),包含:
settings.pyDependentEndpointConfigparent_nameresolve_paramresolve_fieldinclude_from_parent- 可选的父字段重命名(例如)
id -> project_id - 可选的父端点参数(针对父资源的特定默认值)
然后通过共享辅助函数(例如)路由所有单跳扇出端点,这样调用者无需重新实现父/子配置组装。
common/rest_source/fanout.py:build_dependent_resource父字段重命名映射应放在辅助函数中。 如果辅助函数支持声明式重命名,在此处应用映射。调用者不应根据是否存在重命名进行分支处理。
通过扇出辅助函数的覆盖项使用端点特定的分页/选择器。 支持可选的端点覆盖,因此即使父和子具有不同的响应结构/分页约定,也能保持单跳扇出的声明式:
build_dependent_resource- 和
parent_endpoint_extra:传递端点级别的child_endpoint_extra和paginator(用于包装的负载,例如data_selector)。{"items": [...]} - :覆盖默认的页面大小查询参数(
page_size_param),适用于使用不同名称的API(例如limit)。page_size
这意味着即使父和子的分页方式不同(例如Typeform表单使用页码 + 响应使用游标令牌),通常也可以避免为单跳扇出实现自定义迭代器。
路径预格式化: 子路径通常有多个占位符(例如组织和资源slug)。仅对已解析的参数执行。在传递给资源配置之前,使用预格式化子路径中的任何静态占位符,这样仅保留已解析的占位符,避免DLT抛出。
process_parent_data_itemstr.format().replace()KeyError何时保留自定义迭代器: 如果扇出需要两级或更多级别(例如父→中间级列表→每个中间级的详情),其中中间API调用发现的值会成为URL的一部分,无法在中表示为单个父→子关系。仅为该端点实现自定义HTTP迭代器;重用其他地方的相同分页/重试辅助函数。
rest_api_resourcesTesting expectations
测试要求
Add at least two test modules:
- :
tests/test_<source>_source.pysource_type- fields and labels
get_source_config - outputs
get_schemas - success/failure
validate_credentials - argument plumbing
source_for_pipeline
- :
tests/test_<source>.py- paginator behavior from API response headers/body
- resource generation for incremental vs non-incremental
- endpoint-specific primary key mapping
- credential validation status mapping
- mapper/filter helpers if present
- fan-out endpoint row format assertions (dict shape + parent identifiers)
- for dependent-resource fan-out: mock , pass rows with
rest_api_resourceskeys to exercise parent-field injection and rename behavior_<parent>_<field> - expected return schema checks for each declared endpoint in
settings.py
Prefer behavior tests over config-shape tests. Avoid brittle assertions on internal config dict structure unless they protect a known regression that cannot be asserted via output behavior.
Use parameterized tests for status codes and edge cases.
至少添加两个测试模块:
- :
tests/test_<source>_source.pysource_type- 字段和标签
get_source_config - 输出
get_schemas - 成功/失败情况
validate_credentials - 参数传递
source_for_pipeline
- :
tests/test_<source>.py- API响应头/主体的分页器行为
- 增量与非增量的资源生成
- 端点特定的主键映射
- 凭证验证状态映射
- 映射器/过滤器辅助函数(如果存在)
- 扇出端点行格式断言(字典结构 + 父标识符)
- 对于依赖资源扇出:模拟,传递带有
rest_api_resources键的行,以验证父字段注入和重命名行为_<parent>_<field> - 对中每个声明的端点进行预期返回schema检查
settings.py
优先选择行为测试而非配置结构测试。除非是为了保护无法通过输出行为断言的已知回归,否则避免对内部配置字典结构进行脆弱的断言。
对状态码和边缘情况使用参数化测试。
Validation and generation workflow
验证和生成工作流
After changing source fields, run the generation commands from the checklist and targeted tests for the new source.
更改源字段后,运行检查清单中的生成命令,并针对新源运行定向测试。
Common pitfalls
常见陷阱
- Source not visible in wizard: source not registered/imported, or schema not rebuilt.
- Generated config class still empty: forgot after updating fields.
generate:source-configs - Incremental sync misbehaving: wrong field name/type or wrong sort assumptions.
- Endless retries for bad credentials: missing .
get_non_retryable_errors - Dependent resource path : pre-format static path placeholders (see Fan-out).
KeyError - Silent truncation risk: page caps hit without logs/metrics.
- Drift from refactors: unused function params/helpers left behind after endpoint behavior changes.
- Type drift in endpoint config dicts: use source typing aliases (,
Endpoint,ClientConfig) to keep static checks precise.IncrementalConfig
- 源在向导中不可见:源未注册/导入,或schema未重建。
- 生成的配置类仍为空:更新字段后忘记运行。
generate:source-configs - 增量同步行为异常:字段名称/类型错误或排序假设错误。
- 错误凭证导致无限重试:缺少。
get_non_retryable_errors - 依赖资源路径:未预格式化静态路径占位符(参见扇出部分)。
KeyError - 静默截断风险:达到页面上限但无日志/指标。
- 重构导致偏离:端点行为更改后留下未使用的函数参数/辅助函数。
- 端点配置字典中的类型漂移:使用源类型别名(、
Endpoint、ClientConfig)保持静态检查精确。IncrementalConfig