dm-limits-and-best-practices
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCDF Data Modeling: Limits, Concurrency & Best Practices
CDF数据建模:限制、并发与最佳实践
This is a reference skill. When writing or reviewing code that calls CDF Data Modeling APIs, apply the patterns below.
这是一份参考指南。在编写或审查调用CDF数据建模API的代码时,请遵循以下模式。
DMS Limits Reference
DMS限制参考
For the latest concurrency limits, resource limits, and property value limits, see the official documentation:
https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions
Key things to be aware of:
- Instance apply, delete, and query operations each have their own concurrent request limits
- Exceeding these limits returns 429 Too Many Requests
- Transformations consume a large portion of the concurrency budget, leaving less for other clients
- has a max page size (use pagination for complete results)
instances.list - table expressions each have their own item limit
instances.query - accepts up to 1000 items per call
instances.upsert
如需最新的并发限制、资源限制和属性值限制,请参阅官方文档:
https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions
需要重点注意的事项:
- 实例的apply、delete和query操作各有独立的并发请求限制
- 超出限制会返回429 Too Many Requests错误
- 转换操作会占用大量并发预算,留给其他客户端的资源会减少
- 有最大分页大小(如需完整结果请使用分页)
instances.list - 的每个表表达式都有独立的条目限制
instances.query - 每次调用最多支持1000条数据
instances.upsert
Search vs Filter: When to Use Which
搜索与过滤:如何选择
instances.search
— Free-text search on text properties
instances.searchinstances.search
— 文本属性的自由文本搜索
instances.searchUse when you need fuzzy/text matching on string fields (names, descriptions, etc.). It supports an parameter:
instances.searchoperator- (default) — Narrow search. All terms must match. Use when the user provides a specific query.
AND - — Broad "shotgun" search. Any term can match. Use for exploratory/typeahead search where you want maximum recall.
OR
typescript
// Narrow search: find a specific cell by name (AND — all terms must match)
const exactResults = await client.instances.search({
view: { type: 'view', ...PROCESS_CELL_VIEW },
query: 'reactor tank A',
properties: ['name'],
operator: 'AND',
limit: 10,
});
// Broad search: typeahead/autocomplete (OR — any term can match)
const broadResults = await client.instances.search({
view: { type: 'view', ...BATCH_VIEW },
query: 'BUDE completed',
properties: ['name', 'description', 'batchStatus'],
operator: 'OR',
limit: 10,
});You can combine with to further constrain results with exact-match conditions:
searchfiltertypescript
// Text search + exact filter: search for "pump" but only in active nodes
const filtered = await client.instances.search({
view: { type: 'view', ...PROCESS_CELL_VIEW },
query: 'pump',
properties: ['name', 'description'],
filter: {
equals: {
property: getContainerProperty(MY_CONTAINER, 'status'),
value: 'active',
},
},
limit: 20,
});当你需要对字符串字段(名称、描述等)进行模糊/文本匹配时,请使用。它支持参数:
instances.searchoperator- (默认)—— 缩小搜索范围。所有关键词必须匹配。适用于用户提供特定查询词的场景。
AND - —— 宽泛的"广撒网"搜索。任意关键词匹配即可。适用于探索性/输入提示搜索,需要最大召回率的场景。
OR
typescript
// 精确搜索:按名称查找特定单元(AND — 所有关键词必须匹配)
const exactResults = await client.instances.search({
view: { type: 'view', ...PROCESS_CELL_VIEW },
query: 'reactor tank A',
properties: ['name'],
operator: 'AND',
limit: 10,
});
// 宽泛搜索:输入提示/自动补全(OR — 任意关键词匹配即可)
const broadResults = await client.instances.search({
view: { type: 'view', ...BATCH_VIEW },
query: 'BUDE completed',
properties: ['name', 'description', 'batchStatus'],
operator: 'OR',
limit: 10,
});你可以将与结合,通过精确匹配条件进一步约束结果:
searchfiltertypescript
// 文本搜索 + 精确过滤:搜索"pump"但仅包含活跃节点
const filtered = await client.instances.search({
view: { type: 'view', ...PROCESS_CELL_VIEW },
query: 'pump',
properties: ['name', 'description'],
filter: {
equals: {
property: getContainerProperty(MY_CONTAINER, 'status'),
value: 'active',
},
},
limit: 20,
});instances.list
/ instances.query
with filter
— Exact-match filtering
instances.listinstances.queryfilterinstances.list
/ instances.query
搭配 filter
— 精确匹配过滤
instances.listinstances.queryfilterUse when you need precise, deterministic matching (equals, range, in, hasData, etc.). No fuzzy matching — values must match exactly.
filtertypescript
// Exact match: get all completed batches
const completedBatches = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...BATCH_VIEW } }],
filter: {
equals: {
property: getContainerProperty(BATCH_CONTAINER, 'batchStatus'),
value: 'completed',
},
},
limit: 1000,
});当你需要精准、确定的匹配(等于、范围、包含、hasData等)时,请使用。不支持模糊匹配 — 值必须完全匹配。
filtertypescript
// 精确匹配:获取所有已完成的批次
const completedBatches = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...BATCH_VIEW } }],
filter: {
equals: {
property: getContainerProperty(BATCH_CONTAINER, 'batchStatus'),
value: 'completed',
},
},
limit: 1000,
});Decision Guide
决策指南
| Need | Use |
|---|---|
| User typing in a search box | |
| Find a specific item by name | |
| Filter by status, date range, enums | |
| Text search + exact constraints | |
| 需求场景 | 推荐使用方式 |
|---|---|
| 用户在搜索框输入内容 | |
| 按名称查找特定条目 | |
| 按状态、日期范围、枚举值过滤 | 在list/query中使用 |
| 文本搜索 + 精确约束 | |
QueuedTaskRunner (Semaphore)
QueuedTaskRunner(信号量)
Always use the global to wrap CDF API calls. It limits concurrent requests and prevents 429 errors and deadlocks.
cdfTaskRunner**请始终使用全局的**来包裹CDF API调用。它可以限制并发请求,避免429错误和死锁。
cdfTaskRunnerSource Code
源代码
If the project does not already have a semaphore utility, create with this implementation:
src/shared/utils/semaphore.tstypescript
/**
* AbortError thrown when a queued task is cancelled
*/
export class AbortError extends Error {
public constructor(message: string = 'Aborted') {
super(message);
this.name = 'AbortError';
}
}
type PendingTask<AsyncFn, AsyncFnResult> = {
resolve: (result: AsyncFnResult) => void;
reject: (error: unknown) => void;
fn: AsyncFn;
key?: string;
};
const DEFAULT_MAX_CONCURRENT_TASKS = 15;
/**
* QueuedTaskRunner for controlling concurrent operations
* Used to limit concurrent CDF API requests to avoid rate limiting and deadlocks
* Essentially a semaphore that allows a limited number of tasks to run at once.
*/
export default class QueuedTaskRunner<
AsyncFn extends () => Promise<AsyncFnResult>,
AsyncFnResult = Awaited<ReturnType<AsyncFn>>,
> {
private pendingTasks: PendingTask<AsyncFn, AsyncFnResult>[] = [];
private currentPendingTasks: number = 0;
private readonly maxConcurrentTasks: number = 1;
public constructor(
maxConcurrentTasks: number = DEFAULT_MAX_CONCURRENT_TASKS
) {
this.maxConcurrentTasks = maxConcurrentTasks;
}
public schedule(
fn: AsyncFn,
options: { key?: string } = {}
): Promise<AsyncFnResult> {
this.startTrackingTime();
return new Promise((resolve, reject) => {
if (options.key !== undefined) {
// Cancel existing tasks with the same key (deduplication)
this.pendingTasks
.filter((task) => task.key === options.key)
.forEach((task) => task.reject(new AbortError()));
this.pendingTasks = this.pendingTasks.filter(
(task) => task.key !== options.key
);
}
this.pendingTasks.push({
resolve,
reject,
fn,
key: options.key,
});
this.attemptConsumingNextTask();
});
}
public async attemptConsumingNextTask(): Promise<void> {
if (this.pendingTasks.length === 0) return;
if (this.currentPendingTasks >= this.maxConcurrentTasks) return;
const pendingTask = this.pendingTasks.shift();
if (pendingTask === undefined) {
throw new Error('pendingTask is undefined, this should never happen');
}
this.currentPendingTasks++;
const { fn, resolve, reject } = pendingTask;
try {
const result = await fn();
resolve(result);
} catch (e) {
reject(e);
} finally {
this.currentPendingTasks--;
this.tick();
this.attemptConsumingNextTask();
}
}
public clearQueue = (): void => {
this.pendingTasks = [];
};
private startTime: number | null = null;
private startTrackingTime = (): void => {
if (this.startTime === null) {
this.startTime = performance.now();
}
};
private tick = (): void => {
if (this.pendingTasks.length === 0) {
this.startTime = null;
}
};
}
/**
* Global task runner for CDF API requests
* Limits concurrent requests to avoid 429 rate limiting and deadlocks
*/
export const cdfTaskRunner = new QueuedTaskRunner(DEFAULT_MAX_CONCURRENT_TASKS);如果项目中还没有信号量工具,请创建并添加以下实现:
src/shared/utils/semaphore.tstypescript
/**
* AbortError 在队列任务被取消时抛出
*/
export class AbortError extends Error {
public constructor(message: string = 'Aborted') {
super(message);
this.name = 'AbortError';
}
}
type PendingTask<AsyncFn, AsyncFnResult> = {
resolve: (result: AsyncFnResult) => void;
reject: (error: unknown) => void;
fn: AsyncFn;
key?: string;
};
const DEFAULT_MAX_CONCURRENT_TASKS = 15;
/**
* QueuedTaskRunner 用于控制并发操作
* 用于限制CDF API并发请求,避免速率限制和死锁
* 本质上是一个信号量,允许同时运行有限数量的任务
*/
export default class QueuedTaskRunner<
AsyncFn extends () => Promise<AsyncFnResult>,
AsyncFnResult = Awaited<ReturnType<AsyncFn>>,
> {
private pendingTasks: PendingTask<AsyncFn, AsyncFnResult>[] = [];
private currentPendingTasks: number = 0;
private readonly maxConcurrentTasks: number = 1;
public constructor(
maxConcurrentTasks: number = DEFAULT_MAX_CONCURRENT_TASKS
) {
this.maxConcurrentTasks = maxConcurrentTasks;
}
public schedule(
fn: AsyncFn,
options: { key?: string } = {}
): Promise<AsyncFnResult> {
this.startTrackingTime();
return new Promise((resolve, reject) => {
if (options.key !== undefined) {
// 取消具有相同key的现有任务(去重)
this.pendingTasks
.filter((task) => task.key === options.key)
.forEach((task) => task.reject(new AbortError()));
this.pendingTasks = this.pendingTasks.filter(
(task) => task.key !== options.key
);
}
this.pendingTasks.push({
resolve,
reject,
fn,
key: options.key,
});
this.attemptConsumingNextTask();
});
}
public async attemptConsumingNextTask(): Promise<void> {
if (this.pendingTasks.length === 0) return;
if (this.currentPendingTasks >= this.maxConcurrentTasks) return;
const pendingTask = this.pendingTasks.shift();
if (pendingTask === undefined) {
throw new Error('pendingTask is undefined, this should never happen');
}
this.currentPendingTasks++;
const { fn, resolve, reject } = pendingTask;
try {
const result = await fn();
resolve(result);
} catch (e) {
reject(e);
} finally {
this.currentPendingTasks--;
this.tick();
this.attemptConsumingNextTask();
}
}
public clearQueue = (): void => {
this.pendingTasks = [];
};
private startTime: number | null = null;
private startTrackingTime = (): void => {
if (this.startTime === null) {
this.startTime = performance.now();
}
};
private tick = (): void => {
if (this.pendingTasks.length === 0) {
this.startTime = null;
}
};
}
/**
* 用于CDF API请求的全局任务运行器
* 限制并发请求以避免429速率限制和死锁
*/
export const cdfTaskRunner = new QueuedTaskRunner(DEFAULT_MAX_CONCURRENT_TASKS);Usage Pattern
使用模式
Always wrap CDF calls with :
cdfTaskRunner.schedule()typescript
import { cdfTaskRunner } from '../../../../shared/utils/semaphore';
// Single query
export async function fetchBatches(client: CogniteClient): Promise<CDFBatch[]> {
return cdfTaskRunner.schedule(async () => {
const response = await client.instances.query({
with: { /* ... */ },
select: { /* ... */ },
});
return response.items?.nodes || [];
});
}
// Multiple parallel queries (safe — the semaphore limits concurrency)
export async function enrichBatch(
client: CogniteClient,
batch: CDFBatch
): Promise<BatchEnrichment> {
const [currentOp, lastOp, cells, material] = await Promise.all([
fetchCurrentOperation(client, batch.space, batch.externalId),
fetchLastCompletedOperation(client, batch.space, batch.externalId),
fetchProcessCells(client, batch.space, batch.externalId),
fetchMaterial(client, batch.space, batch.externalId),
]);
return { currentOp, lastOp, cells, material };
}
// Each of the above functions internally uses cdfTaskRunner.schedule(),
// so Promise.all is safe — the semaphore prevents exceeding concurrency limits请始终用包裹CDF调用:
cdfTaskRunner.schedule()typescript
import { cdfTaskRunner } from '../../../../shared/utils/semaphore';
// 单个查询
export async function fetchBatches(client: CogniteClient): Promise<CDFBatch[]> {
return cdfTaskRunner.schedule(async () => {
const response = await client.instances.query({
with: { /* ... */ },
select: { /* ... */ },
});
return response.items?.nodes || [];
});
}
// 多个并行查询(安全 — 信号量会限制并发)
export async function enrichBatch(
client: CogniteClient,
batch: CDFBatch
): Promise<BatchEnrichment> {
const [currentOp, lastOp, cells, material] = await Promise.all([
fetchCurrentOperation(client, batch.space, batch.externalId),
fetchLastCompletedOperation(client, batch.space, batch.externalId),
fetchProcessCells(client, batch.space, batch.externalId),
fetchMaterial(client, batch.space, batch.externalId),
]);
return { currentOp, lastOp, cells, material };
}
// 上述每个函数内部都使用了cdfTaskRunner.schedule(),
// 因此Promise.all是安全的 — 信号量会防止超出并发限制Deduplication with Keys
使用Key进行去重
Use the option to cancel stale requests when the same query is triggered again (e.g., user changes filters quickly):
keytypescript
const result = await cdfTaskRunner.schedule(
async () => client.instances.query({ /* ... */ }),
{ key: `batch-flow-${batchId}` }
);
// If another call with the same key arrives before this completes,
// the previous pending call is rejected with AbortError当相同查询被再次触发时(例如用户快速更改过滤器),使用选项取消过时的请求:
keytypescript
const result = await cdfTaskRunner.schedule(
async () => client.instances.query({ /* ... */ }),
{ key: `batch-flow-${batchId}` }
);
// 如果在当前请求完成前收到另一个相同key的调用,
// 之前待处理的调用会被AbortError拒绝Pagination
分页
DMS returns at most items and a for the next page.
DMS uses a object keyed by table expression name.
instances.listlimitnextCursorinstances.querycursorsDMS的最多返回条数据和用于下一页的。
DMS的使用以表表达式名称为键的对象。
instances.listlimitnextCursorinstances.querycursorsinstances.list Pagination
instances.list 分页
typescript
async function fetchAllNodes(client: CogniteClient): Promise<CDFNodeResponse[]> {
const allItems: CDFNodeResponse[] = [];
let cursor: string | undefined = undefined;
do {
const response = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...MY_VIEW } }],
filter: {
equals: {
property: getContainerProperty(MY_CONTAINER, 'status'),
value: 'active',
},
},
limit: 1000,
cursor,
});
allItems.push(...response.items);
cursor = response.nextCursor;
} while (cursor);
return allItems;
}typescript
async function fetchAllNodes(client: CogniteClient): Promise<CDFNodeResponse[]> {
const allItems: CDFNodeResponse[] = [];
let cursor: string | undefined = undefined;
do {
const response = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...MY_VIEW } }],
filter: {
equals: {
property: getContainerProperty(MY_CONTAINER, 'status'),
value: 'active',
},
},
limit: 1000,
cursor,
});
allItems.push(...response.items);
cursor = response.nextCursor;
} while (cursor);
return allItems;
}instances.query Pagination
instances.query 分页
The endpoint returns as a (one cursor per table expression). Use it via the parameter:
querynextCursorRecord<string, string>cursorstypescript
import { isEmpty } from 'lodash';
async function fetchAllResults(
client: CogniteClient
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> {
const QUERY_LIMIT = 10_000;
const fetchPage = async (
nextCursors?: Record<string, string>
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> => {
const { items, nextCursor } = await client.instances.query({
with: {
results: {
limit: QUERY_LIMIT,
nodes: {
filter: {
hasData: [{ type: 'view', ...RESULT_VIEW }],
},
},
},
relatedEdges: {
limit: QUERY_LIMIT,
edges: {
from: 'results' as const,
maxDistance: 1,
direction: 'outwards' as const,
filter: {
equals: {
property: ['edge', 'type'],
value: MY_EDGE_TYPE,
},
},
},
},
},
cursors: nextCursors, // Pass cursors from previous page
select: {
results: {
sources: [
{ source: { type: 'view', ...RESULT_VIEW }, properties: ['*'] },
],
},
relatedEdges: {},
},
});
const results = (items?.results || []) as CDFResult[];
const edges = (items?.relatedEdges || []).filter(
(e) => e.instanceType === 'edge'
);
// Recurse if more pages exist
if (!isEmpty(nextCursor)) {
const next = await fetchPage(nextCursor);
return {
results: [...results, ...next.results],
edges: [...edges, ...next.edges],
};
}
return { results, edges };
};
return fetchPage();
}querynextCursorRecord<string, string>cursorstypescript
import { isEmpty } from 'lodash';
async function fetchAllResults(
client: CogniteClient
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> {
const QUERY_LIMIT = 10_000;
const fetchPage = async (
nextCursors?: Record<string, string>
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> => {
const { items, nextCursor } = await client.instances.query({
with: {
results: {
limit: QUERY_LIMIT,
nodes: {
filter: {
hasData: [{ type: 'view', ...RESULT_VIEW }],
},
},
},
relatedEdges: {
limit: QUERY_LIMIT,
edges: {
from: 'results' as const,
maxDistance: 1,
direction: 'outwards' as const,
filter: {
equals: {
property: ['edge', 'type'],
value: MY_EDGE_TYPE,
},
},
},
},
},
cursors: nextCursors, // 传入上一页的游标
select: {
results: {
sources: [
{ source: { type: 'view', ...RESULT_VIEW }, properties: ['*'] },
],
},
relatedEdges: {},
},
});
const results = (items?.results || []) as CDFResult[];
const edges = (items?.relatedEdges || []).filter(
(e) => e.instanceType === 'edge'
);
// 如果还有下一页则递归获取
if (!isEmpty(nextCursor)) {
const next = await fetchPage(nextCursor);
return {
results: [...results, ...next.results],
edges: [...edges, ...next.edges],
};
}
return { results, edges };
};
return fetchPage();
}Pagination + QueuedTaskRunner Combined
分页与QueuedTaskRunner结合
Always wrap paginated fetches with the semaphore to avoid saturating the concurrency budget:
typescript
export async function fetchAllWithPagination(
client: CogniteClient
): Promise<CDFNodeResponse[]> {
return cdfTaskRunner.schedule(async () => {
const allItems: CDFNodeResponse[] = [];
let cursor: string | undefined = undefined;
do {
const response = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...MY_VIEW } }],
filter: { /* ... */ },
limit: 1000,
cursor,
});
allItems.push(...response.items);
cursor = response.nextCursor;
// Optional: break early if you have enough data
if (allItems.length >= 500) break;
} while (cursor);
return allItems;
});
}请始终用信号量包裹分页查询,避免耗尽并发预算:
typescript
export async function fetchAllWithPagination(
client: CogniteClient
): Promise<CDFNodeResponse[]> {
return cdfTaskRunner.schedule(async () => {
const allItems: CDFNodeResponse[] = [];
let cursor: string | undefined = undefined;
do {
const response = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...MY_VIEW } }],
filter: { /* ... */ },
limit: 1000,
cursor,
});
allItems.push(...response.items);
cursor = response.nextCursor;
// 可选:如果已获取足够数据则提前终止
if (allItems.length >= 500) break;
} while (cursor);
return allItems;
});
}Batching Write Operations
批量写入操作
When upserting many instances, chunk them to stay under the apply concurrency limit. Each call accepts up to 1000 items.
instances.upsert当批量插入/更新大量实例时,请将数据分块以保持在apply并发限制内。每次调用最多支持1000条数据。
instances.upsertChunking Utility
分块工具
typescript
function chunk<T>(arr: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < arr.length; i += size) {
chunks.push(arr.slice(i, i + size));
}
return chunks;
}typescript
function chunk<T>(arr: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < arr.length; i += size) {
chunks.push(arr.slice(i, i + size));
}
return chunks;
}Batched Upsert with QueuedTaskRunner
结合QueuedTaskRunner的批量Upsert
typescript
const UPSERT_BATCH_SIZE = 1000;
async function batchUpsertNodes(
client: CogniteClient,
nodes: NodeOrEdgeCreate[]
): Promise<void> {
const chunks = chunk(nodes, UPSERT_BATCH_SIZE);
// Process chunks through the semaphore — safe even with Promise.all
await Promise.all(
chunks.map((batch) =>
cdfTaskRunner.schedule(async () => {
await client.instances.upsert({
items: batch,
});
})
)
);
}typescript
const UPSERT_BATCH_SIZE = 1000;
async function batchUpsertNodes(
client: CogniteClient,
nodes: NodeOrEdgeCreate[]
): Promise<void> {
const chunks = chunk(nodes, UPSERT_BATCH_SIZE);
// 通过信号量处理分块数据 — 即使使用Promise.all也安全
await Promise.all(
chunks.map((batch) =>
cdfTaskRunner.schedule(async () => {
await client.instances.upsert({
items: batch,
});
})
)
);
}Batched Delete with QueuedTaskRunner
结合QueuedTaskRunner的批量Delete
Instance deletes have an even stricter concurrency limit. Use a separate, more restrictive task runner:
typescript
import QueuedTaskRunner from '../../../../shared/utils/semaphore';
// Dedicated runner for deletes (stricter concurrency — check docs for current limit)
const deleteTaskRunner = new QueuedTaskRunner(2);
async function batchDeleteNodes(
client: CogniteClient,
nodeIds: { space: string; externalId: string }[]
): Promise<void> {
const chunks = chunk(nodeIds, 1000);
for (const batch of chunks) {
await deleteTaskRunner.schedule(async () => {
await client.instances.delete(
batch.map((id) => ({
instanceType: 'node' as const,
...id,
}))
);
});
}
}实例删除的并发限制更严格。请使用单独的、限制更严格的任务运行器:
typescript
import QueuedTaskRunner from '../../../../shared/utils/semaphore';
// 用于删除操作的专用运行器(更严格的并发限制 — 请查阅文档获取当前限制)
const deleteTaskRunner = new QueuedTaskRunner(2);
async function batchDeleteNodes(
client: CogniteClient,
nodeIds: { space: string; externalId: string }[]
): Promise<void> {
const chunks = chunk(nodeIds, 1000);
for (const batch of chunks) {
await deleteTaskRunner.schedule(async () => {
await client.instances.delete(
batch.map((id) => ({
instanceType: 'node' as const,
...id,
}))
);
});
}
}Common Pitfalls
常见陷阱
1. Deadlocks from Nested Semaphore Calls
1. 嵌套信号量调用导致死锁
If function A holds a semaphore slot and calls function B which also needs a slot, you can deadlock if all slots are occupied. Keep the semaphore at the outermost call level, or ensure inner calls don't go through the same semaphore.
typescript
// BAD: Nested semaphore — can deadlock
async function fetchAndEnrich(client: CogniteClient) {
return cdfTaskRunner.schedule(async () => {
const batches = await fetchBatches(client); // This also calls cdfTaskRunner.schedule!
// If all slots are held by fetchAndEnrich callers, fetchBatches will never run
});
}
// GOOD: Let inner functions own the semaphore
async function fetchAndEnrich(client: CogniteClient) {
const batches = await fetchBatches(client); // Has its own semaphore call
const enriched = await Promise.all(
batches.map((b) => enrichBatch(client, b)) // Each has its own semaphore call
);
return enriched;
}如果函数A占用了一个信号量槽,然后调用同样需要槽的函数B,当所有槽都被占用时会发生死锁。请将信号量放在最外层调用层级,或确保内部调用不使用同一个信号量。
typescript
// 错误:嵌套信号量 — 可能导致死锁
async function fetchAndEnrich(client: CogniteClient) {
return cdfTaskRunner.schedule(async () => {
const batches = await fetchBatches(client); // 该函数也调用了cdfTaskRunner.schedule!
// 如果所有槽都被fetchAndEnrich的调用者占用,fetchBatches将永远无法执行
});
}
// 正确:让内部函数管理信号量
async function fetchAndEnrich(client: CogniteClient) {
const batches = await fetchBatches(client); // 内部已包含信号量调用
const enriched = await Promise.all(
batches.map((b) => enrichBatch(client, b)) // 每个调用内部都包含信号量调用
);
return enriched;
}2. Forgetting Pagination
2. 忘记分页
DMS returns at most items. If you don't paginate, you silently lose data. Always check :
limitnextCursortypescript
// BAD: May miss data
const response = await client.instances.list({ limit: 1000, /* ... */ });
const items = response.items; // Could be incomplete!
// GOOD: Paginate
const allItems = [];
let cursor;
do {
const response = await client.instances.list({ limit: 1000, cursor, /* ... */ });
allItems.push(...response.items);
cursor = response.nextCursor;
} while (cursor);DMS最多返回条数据。如果不使用分页,会静默丢失数据。请始终检查:
limitnextCursortypescript
// 错误:可能丢失数据
const response = await client.instances.list({ limit: 1000, /* ... */ });
const items = response.items; // 可能不完整!
// 正确:使用分页
const allItems = [];
let cursor;
do {
const response = await client.instances.list({ limit: 1000, cursor, /* ... */ });
allItems.push(...response.items);
cursor = response.nextCursor;
} while (cursor);3. Unbounded Promise.all Without Semaphore
3. 不使用信号量的无界Promise.all
Firing many parallel API calls will hit the 429 limit immediately:
typescript
// BAD: Too many simultaneous requests
await Promise.all(batchIds.map((id) => client.instances.query({ /* ... */ })));
// GOOD: Each call goes through the semaphore
await Promise.all(
batchIds.map((id) =>
cdfTaskRunner.schedule(() => client.instances.query({ /* ... */ }))
)
);同时发起大量并行API调用会立即触发429限制:
typescript
// 错误:同时请求过多
await Promise.all(batchIds.map((id) => client.instances.query({ /* ... */ })));
// 正确:每个调用都通过信号量
await Promise.all(
batchIds.map((id) =>
cdfTaskRunner.schedule(() => client.instances.query({ /* ... */ }))
)
);4. Query Limit per Table Expression
4. 每个表表达式的查询限制
Each table expression in has its own . If your traversal might return more items than the limit in a single expression, you must paginate using the parameter.
instances.querylimitcursorsinstances.querylimitcursorsSummary Checklist
总结检查清单
- Wrap all CDF API calls with
cdfTaskRunner.schedule() - Paginate calls using
instances.list/cursornextCursor - Paginate calls using
instances.query/cursorswhen data may exceed limitsnextCursor - Chunk write operations to 1000 items per call
instances.upsert - Use a separate, stricter task runner for deletes
- Avoid nesting calls to prevent deadlocks
cdfTaskRunner.schedule() - Use with semaphore-wrapped functions, never with raw API calls
Promise.all - Use for text matching,
instances.searchfor exact-match queriesfilter - Refer to https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions for current limits
- 用包裹所有CDF API调用
cdfTaskRunner.schedule() - 使用/
cursor对nextCursor调用进行分页instances.list - 当数据可能超出限制时,使用/
cursors对nextCursor调用进行分页instances.query - 将写入操作分块,每次调用最多1000条数据
instances.upsert - 为删除操作使用单独的、限制更严格的任务运行器
- 避免嵌套调用以防止死锁
cdfTaskRunner.schedule() - 对包裹信号量的函数使用Promise.all,切勿直接对原始API调用使用
- 文本匹配使用,精确匹配查询使用
instances.searchfilter - 查阅https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions获取当前限制