dm-limits-and-best-practices

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CDF Data Modeling: Limits, Concurrency & Best Practices

CDF数据建模:限制、并发与最佳实践

This is a reference skill. When writing or reviewing code that calls CDF Data Modeling APIs, apply the patterns below.

这是一份参考指南。在编写或审查调用CDF数据建模API的代码时,请遵循以下模式。

DMS Limits Reference

DMS限制参考

For the latest concurrency limits, resource limits, and property value limits, see the official documentation: https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions
Key things to be aware of:
  • Instance apply, delete, and query operations each have their own concurrent request limits
  • Exceeding these limits returns 429 Too Many Requests
  • Transformations consume a large portion of the concurrency budget, leaving less for other clients
  • instances.list
    has a max page size (use pagination for complete results)
  • instances.query
    table expressions each have their own item limit
  • instances.upsert
    accepts up to 1000 items per call

如需最新的并发限制、资源限制和属性值限制,请参阅官方文档: https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions
需要重点注意的事项:
  • 实例的applydeletequery操作各有独立的并发请求限制
  • 超出限制会返回429 Too Many Requests错误
  • 转换操作会占用大量并发预算,留给其他客户端的资源会减少
  • instances.list
    有最大分页大小(如需完整结果请使用分页)
  • instances.query
    的每个表表达式都有独立的条目限制
  • instances.upsert
    每次调用最多支持1000条数据

Search vs Filter: When to Use Which

搜索与过滤:如何选择

instances.search
— Free-text search on text properties

instances.search
— 文本属性的自由文本搜索

Use
instances.search
when you need fuzzy/text matching on string fields (names, descriptions, etc.). It supports an
operator
parameter:
  • AND
    (default) — Narrow search. All terms must match. Use when the user provides a specific query.
  • OR
    — Broad "shotgun" search. Any term can match. Use for exploratory/typeahead search where you want maximum recall.
typescript
// Narrow search: find a specific cell by name (AND — all terms must match)
const exactResults = await client.instances.search({
  view: { type: 'view', ...PROCESS_CELL_VIEW },
  query: 'reactor tank A',
  properties: ['name'],
  operator: 'AND',
  limit: 10,
});

// Broad search: typeahead/autocomplete (OR — any term can match)
const broadResults = await client.instances.search({
  view: { type: 'view', ...BATCH_VIEW },
  query: 'BUDE completed',
  properties: ['name', 'description', 'batchStatus'],
  operator: 'OR',
  limit: 10,
});
You can combine
search
with
filter
to further constrain results with exact-match conditions:
typescript
// Text search + exact filter: search for "pump" but only in active nodes
const filtered = await client.instances.search({
  view: { type: 'view', ...PROCESS_CELL_VIEW },
  query: 'pump',
  properties: ['name', 'description'],
  filter: {
    equals: {
      property: getContainerProperty(MY_CONTAINER, 'status'),
      value: 'active',
    },
  },
  limit: 20,
});
当你需要对字符串字段(名称、描述等)进行模糊/文本匹配时,请使用
instances.search
。它支持
operator
参数:
  • AND
    (默认)—— 缩小搜索范围。所有关键词必须匹配。适用于用户提供特定查询词的场景。
  • OR
    —— 宽泛的"广撒网"搜索。任意关键词匹配即可。适用于探索性/输入提示搜索,需要最大召回率的场景。
typescript
// 精确搜索:按名称查找特定单元(AND — 所有关键词必须匹配)
const exactResults = await client.instances.search({
  view: { type: 'view', ...PROCESS_CELL_VIEW },
  query: 'reactor tank A',
  properties: ['name'],
  operator: 'AND',
  limit: 10,
});

// 宽泛搜索:输入提示/自动补全(OR — 任意关键词匹配即可)
const broadResults = await client.instances.search({
  view: { type: 'view', ...BATCH_VIEW },
  query: 'BUDE completed',
  properties: ['name', 'description', 'batchStatus'],
  operator: 'OR',
  limit: 10,
});
你可以将
search
filter
结合,通过精确匹配条件进一步约束结果:
typescript
// 文本搜索 + 精确过滤:搜索"pump"但仅包含活跃节点
const filtered = await client.instances.search({
  view: { type: 'view', ...PROCESS_CELL_VIEW },
  query: 'pump',
  properties: ['name', 'description'],
  filter: {
    equals: {
      property: getContainerProperty(MY_CONTAINER, 'status'),
      value: 'active',
    },
  },
  limit: 20,
});

instances.list
/
instances.query
with
filter
— Exact-match filtering

instances.list
/
instances.query
搭配
filter
— 精确匹配过滤

Use
filter
when you need precise, deterministic matching (equals, range, in, hasData, etc.). No fuzzy matching — values must match exactly.
typescript
// Exact match: get all completed batches
const completedBatches = await client.instances.list({
  instanceType: 'node',
  sources: [{ source: { type: 'view', ...BATCH_VIEW } }],
  filter: {
    equals: {
      property: getContainerProperty(BATCH_CONTAINER, 'batchStatus'),
      value: 'completed',
    },
  },
  limit: 1000,
});
当你需要精准、确定的匹配(等于、范围、包含、hasData等)时,请使用
filter
。不支持模糊匹配 — 值必须完全匹配。
typescript
// 精确匹配:获取所有已完成的批次
const completedBatches = await client.instances.list({
  instanceType: 'node',
  sources: [{ source: { type: 'view', ...BATCH_VIEW } }],
  filter: {
    equals: {
      property: getContainerProperty(BATCH_CONTAINER, 'batchStatus'),
      value: 'completed',
    },
  },
  limit: 1000,
});

Decision Guide

决策指南

NeedUse
User typing in a search box
instances.search
with
OR
Find a specific item by name
instances.search
with
AND
Filter by status, date range, enums
filter
on list/query
Text search + exact constraints
instances.search
+
filter

需求场景推荐使用方式
用户在搜索框输入内容
instances.search
搭配
OR
按名称查找特定条目
instances.search
搭配
AND
按状态、日期范围、枚举值过滤在list/query中使用
filter
文本搜索 + 精确约束
instances.search
+
filter

QueuedTaskRunner (Semaphore)

QueuedTaskRunner(信号量)

Always use the global
cdfTaskRunner
to wrap CDF API calls. It limits concurrent requests and prevents 429 errors and deadlocks.
**请始终使用全局的
cdfTaskRunner
**来包裹CDF API调用。它可以限制并发请求,避免429错误和死锁。

Source Code

源代码

If the project does not already have a semaphore utility, create
src/shared/utils/semaphore.ts
with this implementation:
typescript
/**
 * AbortError thrown when a queued task is cancelled
 */
export class AbortError extends Error {
  public constructor(message: string = 'Aborted') {
    super(message);
    this.name = 'AbortError';
  }
}

type PendingTask<AsyncFn, AsyncFnResult> = {
  resolve: (result: AsyncFnResult) => void;
  reject: (error: unknown) => void;
  fn: AsyncFn;
  key?: string;
};

const DEFAULT_MAX_CONCURRENT_TASKS = 15;

/**
 * QueuedTaskRunner for controlling concurrent operations
 * Used to limit concurrent CDF API requests to avoid rate limiting and deadlocks
 * Essentially a semaphore that allows a limited number of tasks to run at once.
 */
export default class QueuedTaskRunner<
  AsyncFn extends () => Promise<AsyncFnResult>,
  AsyncFnResult = Awaited<ReturnType<AsyncFn>>,
> {
  private pendingTasks: PendingTask<AsyncFn, AsyncFnResult>[] = [];
  private currentPendingTasks: number = 0;
  private readonly maxConcurrentTasks: number = 1;

  public constructor(
    maxConcurrentTasks: number = DEFAULT_MAX_CONCURRENT_TASKS
  ) {
    this.maxConcurrentTasks = maxConcurrentTasks;
  }

  public schedule(
    fn: AsyncFn,
    options: { key?: string } = {}
  ): Promise<AsyncFnResult> {
    this.startTrackingTime();

    return new Promise((resolve, reject) => {
      if (options.key !== undefined) {
        // Cancel existing tasks with the same key (deduplication)
        this.pendingTasks
          .filter((task) => task.key === options.key)
          .forEach((task) => task.reject(new AbortError()));

        this.pendingTasks = this.pendingTasks.filter(
          (task) => task.key !== options.key
        );
      }

      this.pendingTasks.push({
        resolve,
        reject,
        fn,
        key: options.key,
      });

      this.attemptConsumingNextTask();
    });
  }

  public async attemptConsumingNextTask(): Promise<void> {
    if (this.pendingTasks.length === 0) return;
    if (this.currentPendingTasks >= this.maxConcurrentTasks) return;

    const pendingTask = this.pendingTasks.shift();
    if (pendingTask === undefined) {
      throw new Error('pendingTask is undefined, this should never happen');
    }

    this.currentPendingTasks++;
    const { fn, resolve, reject } = pendingTask;

    try {
      const result = await fn();
      resolve(result);
    } catch (e) {
      reject(e);
    } finally {
      this.currentPendingTasks--;
      this.tick();
      this.attemptConsumingNextTask();
    }
  }

  public clearQueue = (): void => {
    this.pendingTasks = [];
  };

  private startTime: number | null = null;

  private startTrackingTime = (): void => {
    if (this.startTime === null) {
      this.startTime = performance.now();
    }
  };

  private tick = (): void => {
    if (this.pendingTasks.length === 0) {
      this.startTime = null;
    }
  };
}

/**
 * Global task runner for CDF API requests
 * Limits concurrent requests to avoid 429 rate limiting and deadlocks
 */
export const cdfTaskRunner = new QueuedTaskRunner(DEFAULT_MAX_CONCURRENT_TASKS);
如果项目中还没有信号量工具,请创建
src/shared/utils/semaphore.ts
并添加以下实现:
typescript
/**
 * AbortError 在队列任务被取消时抛出
 */
export class AbortError extends Error {
  public constructor(message: string = 'Aborted') {
    super(message);
    this.name = 'AbortError';
  }
}

type PendingTask<AsyncFn, AsyncFnResult> = {
  resolve: (result: AsyncFnResult) => void;
  reject: (error: unknown) => void;
  fn: AsyncFn;
  key?: string;
};

const DEFAULT_MAX_CONCURRENT_TASKS = 15;

/**
 * QueuedTaskRunner 用于控制并发操作
 * 用于限制CDF API并发请求,避免速率限制和死锁
 * 本质上是一个信号量,允许同时运行有限数量的任务
 */
export default class QueuedTaskRunner<
  AsyncFn extends () => Promise<AsyncFnResult>,
  AsyncFnResult = Awaited<ReturnType<AsyncFn>>,
> {
  private pendingTasks: PendingTask<AsyncFn, AsyncFnResult>[] = [];
  private currentPendingTasks: number = 0;
  private readonly maxConcurrentTasks: number = 1;

  public constructor(
    maxConcurrentTasks: number = DEFAULT_MAX_CONCURRENT_TASKS
  ) {
    this.maxConcurrentTasks = maxConcurrentTasks;
  }

  public schedule(
    fn: AsyncFn,
    options: { key?: string } = {}
  ): Promise<AsyncFnResult> {
    this.startTrackingTime();

    return new Promise((resolve, reject) => {
      if (options.key !== undefined) {
        // 取消具有相同key的现有任务(去重)
        this.pendingTasks
          .filter((task) => task.key === options.key)
          .forEach((task) => task.reject(new AbortError()));

        this.pendingTasks = this.pendingTasks.filter(
          (task) => task.key !== options.key
        );
      }

      this.pendingTasks.push({
        resolve,
        reject,
        fn,
        key: options.key,
      });

      this.attemptConsumingNextTask();
    });
  }

  public async attemptConsumingNextTask(): Promise<void> {
    if (this.pendingTasks.length === 0) return;
    if (this.currentPendingTasks >= this.maxConcurrentTasks) return;

    const pendingTask = this.pendingTasks.shift();
    if (pendingTask === undefined) {
      throw new Error('pendingTask is undefined, this should never happen');
    }

    this.currentPendingTasks++;
    const { fn, resolve, reject } = pendingTask;

    try {
      const result = await fn();
      resolve(result);
    } catch (e) {
      reject(e);
    } finally {
      this.currentPendingTasks--;
      this.tick();
      this.attemptConsumingNextTask();
    }
  }

  public clearQueue = (): void => {
    this.pendingTasks = [];
  };

  private startTime: number | null = null;

  private startTrackingTime = (): void => {
    if (this.startTime === null) {
      this.startTime = performance.now();
    }
  };

  private tick = (): void => {
    if (this.pendingTasks.length === 0) {
      this.startTime = null;
    }
  };
}

/**
 * 用于CDF API请求的全局任务运行器
 * 限制并发请求以避免429速率限制和死锁
 */
export const cdfTaskRunner = new QueuedTaskRunner(DEFAULT_MAX_CONCURRENT_TASKS);

Usage Pattern

使用模式

Always wrap CDF calls with
cdfTaskRunner.schedule()
:
typescript
import { cdfTaskRunner } from '../../../../shared/utils/semaphore';

// Single query
export async function fetchBatches(client: CogniteClient): Promise<CDFBatch[]> {
  return cdfTaskRunner.schedule(async () => {
    const response = await client.instances.query({
      with: { /* ... */ },
      select: { /* ... */ },
    });
    return response.items?.nodes || [];
  });
}

// Multiple parallel queries (safe — the semaphore limits concurrency)
export async function enrichBatch(
  client: CogniteClient,
  batch: CDFBatch
): Promise<BatchEnrichment> {
  const [currentOp, lastOp, cells, material] = await Promise.all([
    fetchCurrentOperation(client, batch.space, batch.externalId),
    fetchLastCompletedOperation(client, batch.space, batch.externalId),
    fetchProcessCells(client, batch.space, batch.externalId),
    fetchMaterial(client, batch.space, batch.externalId),
  ]);
  return { currentOp, lastOp, cells, material };
}

// Each of the above functions internally uses cdfTaskRunner.schedule(),
// so Promise.all is safe — the semaphore prevents exceeding concurrency limits
请始终用
cdfTaskRunner.schedule()
包裹CDF调用:
typescript
import { cdfTaskRunner } from '../../../../shared/utils/semaphore';

// 单个查询
export async function fetchBatches(client: CogniteClient): Promise<CDFBatch[]> {
  return cdfTaskRunner.schedule(async () => {
    const response = await client.instances.query({
      with: { /* ... */ },
      select: { /* ... */ },
    });
    return response.items?.nodes || [];
  });
}

// 多个并行查询(安全 — 信号量会限制并发)
export async function enrichBatch(
  client: CogniteClient,
  batch: CDFBatch
): Promise<BatchEnrichment> {
  const [currentOp, lastOp, cells, material] = await Promise.all([
    fetchCurrentOperation(client, batch.space, batch.externalId),
    fetchLastCompletedOperation(client, batch.space, batch.externalId),
    fetchProcessCells(client, batch.space, batch.externalId),
    fetchMaterial(client, batch.space, batch.externalId),
  ]);
  return { currentOp, lastOp, cells, material };
}

// 上述每个函数内部都使用了cdfTaskRunner.schedule(),
// 因此Promise.all是安全的 — 信号量会防止超出并发限制

Deduplication with Keys

使用Key进行去重

Use the
key
option to cancel stale requests when the same query is triggered again (e.g., user changes filters quickly):
typescript
const result = await cdfTaskRunner.schedule(
  async () => client.instances.query({ /* ... */ }),
  { key: `batch-flow-${batchId}` }
);
// If another call with the same key arrives before this completes,
// the previous pending call is rejected with AbortError

当相同查询被再次触发时(例如用户快速更改过滤器),使用
key
选项取消过时的请求:
typescript
const result = await cdfTaskRunner.schedule(
  async () => client.instances.query({ /* ... */ }),
  { key: `batch-flow-${batchId}` }
);
// 如果在当前请求完成前收到另一个相同key的调用,
// 之前待处理的调用会被AbortError拒绝

Pagination

分页

DMS
instances.list
returns at most
limit
items and a
nextCursor
for the next page. DMS
instances.query
uses a
cursors
object keyed by table expression name.
DMS的
instances.list
最多返回
limit
条数据和用于下一页的
nextCursor
。 DMS的
instances.query
使用以表表达式名称为键的
cursors
对象。

instances.list Pagination

instances.list 分页

typescript
async function fetchAllNodes(client: CogniteClient): Promise<CDFNodeResponse[]> {
  const allItems: CDFNodeResponse[] = [];
  let cursor: string | undefined = undefined;

  do {
    const response = await client.instances.list({
      instanceType: 'node',
      sources: [{ source: { type: 'view', ...MY_VIEW } }],
      filter: {
        equals: {
          property: getContainerProperty(MY_CONTAINER, 'status'),
          value: 'active',
        },
      },
      limit: 1000,
      cursor,
    });

    allItems.push(...response.items);
    cursor = response.nextCursor;
  } while (cursor);

  return allItems;
}
typescript
async function fetchAllNodes(client: CogniteClient): Promise<CDFNodeResponse[]> {
  const allItems: CDFNodeResponse[] = [];
  let cursor: string | undefined = undefined;

  do {
    const response = await client.instances.list({
      instanceType: 'node',
      sources: [{ source: { type: 'view', ...MY_VIEW } }],
      filter: {
        equals: {
          property: getContainerProperty(MY_CONTAINER, 'status'),
          value: 'active',
        },
      },
      limit: 1000,
      cursor,
    });

    allItems.push(...response.items);
    cursor = response.nextCursor;
  } while (cursor);

  return allItems;
}

instances.query Pagination

instances.query 分页

The
query
endpoint returns
nextCursor
as a
Record<string, string>
(one cursor per table expression). Use it via the
cursors
parameter:
typescript
import { isEmpty } from 'lodash';

async function fetchAllResults(
  client: CogniteClient
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> {
  const QUERY_LIMIT = 10_000;

  const fetchPage = async (
    nextCursors?: Record<string, string>
  ): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> => {
    const { items, nextCursor } = await client.instances.query({
      with: {
        results: {
          limit: QUERY_LIMIT,
          nodes: {
            filter: {
              hasData: [{ type: 'view', ...RESULT_VIEW }],
            },
          },
        },
        relatedEdges: {
          limit: QUERY_LIMIT,
          edges: {
            from: 'results' as const,
            maxDistance: 1,
            direction: 'outwards' as const,
            filter: {
              equals: {
                property: ['edge', 'type'],
                value: MY_EDGE_TYPE,
              },
            },
          },
        },
      },
      cursors: nextCursors, // Pass cursors from previous page
      select: {
        results: {
          sources: [
            { source: { type: 'view', ...RESULT_VIEW }, properties: ['*'] },
          ],
        },
        relatedEdges: {},
      },
    });

    const results = (items?.results || []) as CDFResult[];
    const edges = (items?.relatedEdges || []).filter(
      (e) => e.instanceType === 'edge'
    );

    // Recurse if more pages exist
    if (!isEmpty(nextCursor)) {
      const next = await fetchPage(nextCursor);
      return {
        results: [...results, ...next.results],
        edges: [...edges, ...next.edges],
      };
    }

    return { results, edges };
  };

  return fetchPage();
}
query
端点返回的
nextCursor
Record<string, string>
类型(每个表表达式对应一个游标)。通过
cursors
参数使用它:
typescript
import { isEmpty } from 'lodash';

async function fetchAllResults(
  client: CogniteClient
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> {
  const QUERY_LIMIT = 10_000;

  const fetchPage = async (
    nextCursors?: Record<string, string>
  ): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> => {
    const { items, nextCursor } = await client.instances.query({
      with: {
        results: {
          limit: QUERY_LIMIT,
          nodes: {
            filter: {
              hasData: [{ type: 'view', ...RESULT_VIEW }],
            },
          },
        },
        relatedEdges: {
          limit: QUERY_LIMIT,
          edges: {
            from: 'results' as const,
            maxDistance: 1,
            direction: 'outwards' as const,
            filter: {
              equals: {
                property: ['edge', 'type'],
                value: MY_EDGE_TYPE,
              },
            },
          },
        },
      },
      cursors: nextCursors, // 传入上一页的游标
      select: {
        results: {
          sources: [
            { source: { type: 'view', ...RESULT_VIEW }, properties: ['*'] },
          ],
        },
        relatedEdges: {},
      },
    });

    const results = (items?.results || []) as CDFResult[];
    const edges = (items?.relatedEdges || []).filter(
      (e) => e.instanceType === 'edge'
    );

    // 如果还有下一页则递归获取
    if (!isEmpty(nextCursor)) {
      const next = await fetchPage(nextCursor);
      return {
        results: [...results, ...next.results],
        edges: [...edges, ...next.edges],
      };
    }

    return { results, edges };
  };

  return fetchPage();
}

Pagination + QueuedTaskRunner Combined

分页与QueuedTaskRunner结合

Always wrap paginated fetches with the semaphore to avoid saturating the concurrency budget:
typescript
export async function fetchAllWithPagination(
  client: CogniteClient
): Promise<CDFNodeResponse[]> {
  return cdfTaskRunner.schedule(async () => {
    const allItems: CDFNodeResponse[] = [];
    let cursor: string | undefined = undefined;

    do {
      const response = await client.instances.list({
        instanceType: 'node',
        sources: [{ source: { type: 'view', ...MY_VIEW } }],
        filter: { /* ... */ },
        limit: 1000,
        cursor,
      });

      allItems.push(...response.items);
      cursor = response.nextCursor;

      // Optional: break early if you have enough data
      if (allItems.length >= 500) break;
    } while (cursor);

    return allItems;
  });
}

请始终用信号量包裹分页查询,避免耗尽并发预算:
typescript
export async function fetchAllWithPagination(
  client: CogniteClient
): Promise<CDFNodeResponse[]> {
  return cdfTaskRunner.schedule(async () => {
    const allItems: CDFNodeResponse[] = [];
    let cursor: string | undefined = undefined;

    do {
      const response = await client.instances.list({
        instanceType: 'node',
        sources: [{ source: { type: 'view', ...MY_VIEW } }],
        filter: { /* ... */ },
        limit: 1000,
        cursor,
      });

      allItems.push(...response.items);
      cursor = response.nextCursor;

      // 可选:如果已获取足够数据则提前终止
      if (allItems.length >= 500) break;
    } while (cursor);

    return allItems;
  });
}

Batching Write Operations

批量写入操作

When upserting many instances, chunk them to stay under the apply concurrency limit. Each
instances.upsert
call accepts up to 1000 items.
当批量插入/更新大量实例时,请将数据分块以保持在apply并发限制内。每次
instances.upsert
调用最多支持1000条数据。

Chunking Utility

分块工具

typescript
function chunk<T>(arr: T[], size: number): T[][] {
  const chunks: T[][] = [];
  for (let i = 0; i < arr.length; i += size) {
    chunks.push(arr.slice(i, i + size));
  }
  return chunks;
}
typescript
function chunk<T>(arr: T[], size: number): T[][] {
  const chunks: T[][] = [];
  for (let i = 0; i < arr.length; i += size) {
    chunks.push(arr.slice(i, i + size));
  }
  return chunks;
}

Batched Upsert with QueuedTaskRunner

结合QueuedTaskRunner的批量Upsert

typescript
const UPSERT_BATCH_SIZE = 1000;

async function batchUpsertNodes(
  client: CogniteClient,
  nodes: NodeOrEdgeCreate[]
): Promise<void> {
  const chunks = chunk(nodes, UPSERT_BATCH_SIZE);

  // Process chunks through the semaphore — safe even with Promise.all
  await Promise.all(
    chunks.map((batch) =>
      cdfTaskRunner.schedule(async () => {
        await client.instances.upsert({
          items: batch,
        });
      })
    )
  );
}
typescript
const UPSERT_BATCH_SIZE = 1000;

async function batchUpsertNodes(
  client: CogniteClient,
  nodes: NodeOrEdgeCreate[]
): Promise<void> {
  const chunks = chunk(nodes, UPSERT_BATCH_SIZE);

  // 通过信号量处理分块数据 — 即使使用Promise.all也安全
  await Promise.all(
    chunks.map((batch) =>
      cdfTaskRunner.schedule(async () => {
        await client.instances.upsert({
          items: batch,
        });
      })
    )
  );
}

Batched Delete with QueuedTaskRunner

结合QueuedTaskRunner的批量Delete

Instance deletes have an even stricter concurrency limit. Use a separate, more restrictive task runner:
typescript
import QueuedTaskRunner from '../../../../shared/utils/semaphore';

// Dedicated runner for deletes (stricter concurrency — check docs for current limit)
const deleteTaskRunner = new QueuedTaskRunner(2);

async function batchDeleteNodes(
  client: CogniteClient,
  nodeIds: { space: string; externalId: string }[]
): Promise<void> {
  const chunks = chunk(nodeIds, 1000);

  for (const batch of chunks) {
    await deleteTaskRunner.schedule(async () => {
      await client.instances.delete(
        batch.map((id) => ({
          instanceType: 'node' as const,
          ...id,
        }))
      );
    });
  }
}

实例删除的并发限制更严格。请使用单独的、限制更严格的任务运行器:
typescript
import QueuedTaskRunner from '../../../../shared/utils/semaphore';

// 用于删除操作的专用运行器(更严格的并发限制 — 请查阅文档获取当前限制)
const deleteTaskRunner = new QueuedTaskRunner(2);

async function batchDeleteNodes(
  client: CogniteClient,
  nodeIds: { space: string; externalId: string }[]
): Promise<void> {
  const chunks = chunk(nodeIds, 1000);

  for (const batch of chunks) {
    await deleteTaskRunner.schedule(async () => {
      await client.instances.delete(
        batch.map((id) => ({
          instanceType: 'node' as const,
          ...id,
        }))
      );
    });
  }
}

Common Pitfalls

常见陷阱

1. Deadlocks from Nested Semaphore Calls

1. 嵌套信号量调用导致死锁

If function A holds a semaphore slot and calls function B which also needs a slot, you can deadlock if all slots are occupied. Keep the semaphore at the outermost call level, or ensure inner calls don't go through the same semaphore.
typescript
// BAD: Nested semaphore — can deadlock
async function fetchAndEnrich(client: CogniteClient) {
  return cdfTaskRunner.schedule(async () => {
    const batches = await fetchBatches(client); // This also calls cdfTaskRunner.schedule!
    // If all slots are held by fetchAndEnrich callers, fetchBatches will never run
  });
}

// GOOD: Let inner functions own the semaphore
async function fetchAndEnrich(client: CogniteClient) {
  const batches = await fetchBatches(client); // Has its own semaphore call
  const enriched = await Promise.all(
    batches.map((b) => enrichBatch(client, b)) // Each has its own semaphore call
  );
  return enriched;
}
如果函数A占用了一个信号量槽,然后调用同样需要槽的函数B,当所有槽都被占用时会发生死锁。请将信号量放在最外层调用层级,或确保内部调用不使用同一个信号量。
typescript
// 错误:嵌套信号量 — 可能导致死锁
async function fetchAndEnrich(client: CogniteClient) {
  return cdfTaskRunner.schedule(async () => {
    const batches = await fetchBatches(client); // 该函数也调用了cdfTaskRunner.schedule!
    // 如果所有槽都被fetchAndEnrich的调用者占用,fetchBatches将永远无法执行
  });
}

// 正确:让内部函数管理信号量
async function fetchAndEnrich(client: CogniteClient) {
  const batches = await fetchBatches(client); // 内部已包含信号量调用
  const enriched = await Promise.all(
    batches.map((b) => enrichBatch(client, b)) // 每个调用内部都包含信号量调用
  );
  return enriched;
}

2. Forgetting Pagination

2. 忘记分页

DMS returns at most
limit
items. If you don't paginate, you silently lose data. Always check
nextCursor
:
typescript
// BAD: May miss data
const response = await client.instances.list({ limit: 1000, /* ... */ });
const items = response.items; // Could be incomplete!

// GOOD: Paginate
const allItems = [];
let cursor;
do {
  const response = await client.instances.list({ limit: 1000, cursor, /* ... */ });
  allItems.push(...response.items);
  cursor = response.nextCursor;
} while (cursor);
DMS最多返回
limit
条数据。如果不使用分页,会静默丢失数据。请始终检查
nextCursor
typescript
// 错误:可能丢失数据
const response = await client.instances.list({ limit: 1000, /* ... */ });
const items = response.items; // 可能不完整!

// 正确:使用分页
const allItems = [];
let cursor;
do {
  const response = await client.instances.list({ limit: 1000, cursor, /* ... */ });
  allItems.push(...response.items);
  cursor = response.nextCursor;
} while (cursor);

3. Unbounded Promise.all Without Semaphore

3. 不使用信号量的无界Promise.all

Firing many parallel API calls will hit the 429 limit immediately:
typescript
// BAD: Too many simultaneous requests
await Promise.all(batchIds.map((id) => client.instances.query({ /* ... */ })));

// GOOD: Each call goes through the semaphore
await Promise.all(
  batchIds.map((id) =>
    cdfTaskRunner.schedule(() => client.instances.query({ /* ... */ }))
  )
);
同时发起大量并行API调用会立即触发429限制:
typescript
// 错误:同时请求过多
await Promise.all(batchIds.map((id) => client.instances.query({ /* ... */ })));

// 正确:每个调用都通过信号量
await Promise.all(
  batchIds.map((id) =>
    cdfTaskRunner.schedule(() => client.instances.query({ /* ... */ }))
  )
);

4. Query Limit per Table Expression

4. 每个表表达式的查询限制

Each table expression in
instances.query
has its own
limit
. If your traversal might return more items than the limit in a single expression, you must paginate using the
cursors
parameter.

instances.query
中的每个表表达式都有独立的
limit
。如果你的遍历可能在单个表达式中返回超过限制的数据,必须使用
cursors
参数进行分页。

Summary Checklist

总结检查清单

  • Wrap all CDF API calls with
    cdfTaskRunner.schedule()
  • Paginate
    instances.list
    calls using
    cursor
    /
    nextCursor
  • Paginate
    instances.query
    calls using
    cursors
    /
    nextCursor
    when data may exceed limits
  • Chunk write operations to 1000 items per
    instances.upsert
    call
  • Use a separate, stricter task runner for deletes
  • Avoid nesting
    cdfTaskRunner.schedule()
    calls to prevent deadlocks
  • Use
    Promise.all
    with semaphore-wrapped functions, never with raw API calls
  • Use
    instances.search
    for text matching,
    filter
    for exact-match queries
  • Refer to https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions for current limits
  • cdfTaskRunner.schedule()
    包裹所有CDF API调用
  • 使用
    cursor
    /
    nextCursor
    instances.list
    调用进行分页
  • 当数据可能超出限制时,使用
    cursors
    /
    nextCursor
    instances.query
    调用进行分页
  • 将写入操作分块,每次
    instances.upsert
    调用最多1000条数据
  • 为删除操作使用单独的、限制更严格的任务运行器
  • 避免嵌套
    cdfTaskRunner.schedule()
    调用以防止死锁
  • 对包裹信号量的函数使用Promise.all,切勿直接对原始API调用使用
  • 文本匹配使用
    instances.search
    ,精确匹配查询使用
    filter
  • 查阅https://docs.cognite.com/cdf/dm/dm_reference/dm_limits_and_restrictions获取当前限制