Loading...
Loading...
Reference skill for CDF Data Modeling API best practices. Covers concurrency limits (avoiding 429s), pagination patterns for instances.list and instances.query, batching write operations, search vs filter guidance, and the QueuedTaskRunner (Semaphore) utility for controlling concurrent requests. Triggers: DMS limits, 429 error, rate limit, pagination, cursor, nextCursor, batching, semaphore, QueuedTaskRunner, cdfTaskRunner, instances.search, instances.list, instances.query, instances.upsert, concurrency, deadlock.
npx skill4agent add cognitedata/dune-skills dm-limits-and-best-practicesinstances.listinstances.queryinstances.upsertinstances.searchinstances.searchoperatorANDOR// Narrow search: find a specific cell by name (AND — all terms must match)
const exactResults = await client.instances.search({
view: { type: 'view', ...PROCESS_CELL_VIEW },
query: 'reactor tank A',
properties: ['name'],
operator: 'AND',
limit: 10,
});
// Broad search: typeahead/autocomplete (OR — any term can match)
const broadResults = await client.instances.search({
view: { type: 'view', ...BATCH_VIEW },
query: 'BUDE completed',
properties: ['name', 'description', 'batchStatus'],
operator: 'OR',
limit: 10,
});searchfilter// Text search + exact filter: search for "pump" but only in active nodes
const filtered = await client.instances.search({
view: { type: 'view', ...PROCESS_CELL_VIEW },
query: 'pump',
properties: ['name', 'description'],
filter: {
equals: {
property: getContainerProperty(MY_CONTAINER, 'status'),
value: 'active',
},
},
limit: 20,
});instances.listinstances.queryfilterfilter// Exact match: get all completed batches
const completedBatches = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...BATCH_VIEW } }],
filter: {
equals: {
property: getContainerProperty(BATCH_CONTAINER, 'batchStatus'),
value: 'completed',
},
},
limit: 1000,
});| Need | Use |
|---|---|
| User typing in a search box | |
| Find a specific item by name | |
| Filter by status, date range, enums | |
| Text search + exact constraints | |
cdfTaskRunnersrc/shared/utils/semaphore.ts/**
* AbortError thrown when a queued task is cancelled
*/
export class AbortError extends Error {
public constructor(message: string = 'Aborted') {
super(message);
this.name = 'AbortError';
}
}
type PendingTask<AsyncFn, AsyncFnResult> = {
resolve: (result: AsyncFnResult) => void;
reject: (error: unknown) => void;
fn: AsyncFn;
key?: string;
};
const DEFAULT_MAX_CONCURRENT_TASKS = 15;
/**
* QueuedTaskRunner for controlling concurrent operations
* Used to limit concurrent CDF API requests to avoid rate limiting and deadlocks
* Essentially a semaphore that allows a limited number of tasks to run at once.
*/
export default class QueuedTaskRunner<
AsyncFn extends () => Promise<AsyncFnResult>,
AsyncFnResult = Awaited<ReturnType<AsyncFn>>,
> {
private pendingTasks: PendingTask<AsyncFn, AsyncFnResult>[] = [];
private currentPendingTasks: number = 0;
private readonly maxConcurrentTasks: number = 1;
public constructor(
maxConcurrentTasks: number = DEFAULT_MAX_CONCURRENT_TASKS
) {
this.maxConcurrentTasks = maxConcurrentTasks;
}
public schedule(
fn: AsyncFn,
options: { key?: string } = {}
): Promise<AsyncFnResult> {
this.startTrackingTime();
return new Promise((resolve, reject) => {
if (options.key !== undefined) {
// Cancel existing tasks with the same key (deduplication)
this.pendingTasks
.filter((task) => task.key === options.key)
.forEach((task) => task.reject(new AbortError()));
this.pendingTasks = this.pendingTasks.filter(
(task) => task.key !== options.key
);
}
this.pendingTasks.push({
resolve,
reject,
fn,
key: options.key,
});
this.attemptConsumingNextTask();
});
}
public async attemptConsumingNextTask(): Promise<void> {
if (this.pendingTasks.length === 0) return;
if (this.currentPendingTasks >= this.maxConcurrentTasks) return;
const pendingTask = this.pendingTasks.shift();
if (pendingTask === undefined) {
throw new Error('pendingTask is undefined, this should never happen');
}
this.currentPendingTasks++;
const { fn, resolve, reject } = pendingTask;
try {
const result = await fn();
resolve(result);
} catch (e) {
reject(e);
} finally {
this.currentPendingTasks--;
this.tick();
this.attemptConsumingNextTask();
}
}
public clearQueue = (): void => {
this.pendingTasks = [];
};
private startTime: number | null = null;
private startTrackingTime = (): void => {
if (this.startTime === null) {
this.startTime = performance.now();
}
};
private tick = (): void => {
if (this.pendingTasks.length === 0) {
this.startTime = null;
}
};
}
/**
* Global task runner for CDF API requests
* Limits concurrent requests to avoid 429 rate limiting and deadlocks
*/
export const cdfTaskRunner = new QueuedTaskRunner(DEFAULT_MAX_CONCURRENT_TASKS);cdfTaskRunner.schedule()import { cdfTaskRunner } from '../../../../shared/utils/semaphore';
// Single query
export async function fetchBatches(client: CogniteClient): Promise<CDFBatch[]> {
return cdfTaskRunner.schedule(async () => {
const response = await client.instances.query({
with: { /* ... */ },
select: { /* ... */ },
});
return response.items?.nodes || [];
});
}
// Multiple parallel queries (safe — the semaphore limits concurrency)
export async function enrichBatch(
client: CogniteClient,
batch: CDFBatch
): Promise<BatchEnrichment> {
const [currentOp, lastOp, cells, material] = await Promise.all([
fetchCurrentOperation(client, batch.space, batch.externalId),
fetchLastCompletedOperation(client, batch.space, batch.externalId),
fetchProcessCells(client, batch.space, batch.externalId),
fetchMaterial(client, batch.space, batch.externalId),
]);
return { currentOp, lastOp, cells, material };
}
// Each of the above functions internally uses cdfTaskRunner.schedule(),
// so Promise.all is safe — the semaphore prevents exceeding concurrency limitskeyconst result = await cdfTaskRunner.schedule(
async () => client.instances.query({ /* ... */ }),
{ key: `batch-flow-${batchId}` }
);
// If another call with the same key arrives before this completes,
// the previous pending call is rejected with AbortErrorinstances.listlimitnextCursorinstances.querycursorsasync function fetchAllNodes(client: CogniteClient): Promise<CDFNodeResponse[]> {
const allItems: CDFNodeResponse[] = [];
let cursor: string | undefined = undefined;
do {
const response = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...MY_VIEW } }],
filter: {
equals: {
property: getContainerProperty(MY_CONTAINER, 'status'),
value: 'active',
},
},
limit: 1000,
cursor,
});
allItems.push(...response.items);
cursor = response.nextCursor;
} while (cursor);
return allItems;
}querynextCursorRecord<string, string>cursorsimport { isEmpty } from 'lodash';
async function fetchAllResults(
client: CogniteClient
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> {
const QUERY_LIMIT = 10_000;
const fetchPage = async (
nextCursors?: Record<string, string>
): Promise<{ results: CDFResult[]; edges: EdgeDefinition[] }> => {
const { items, nextCursor } = await client.instances.query({
with: {
results: {
limit: QUERY_LIMIT,
nodes: {
filter: {
hasData: [{ type: 'view', ...RESULT_VIEW }],
},
},
},
relatedEdges: {
limit: QUERY_LIMIT,
edges: {
from: 'results' as const,
maxDistance: 1,
direction: 'outwards' as const,
filter: {
equals: {
property: ['edge', 'type'],
value: MY_EDGE_TYPE,
},
},
},
},
},
cursors: nextCursors, // Pass cursors from previous page
select: {
results: {
sources: [
{ source: { type: 'view', ...RESULT_VIEW }, properties: ['*'] },
],
},
relatedEdges: {},
},
});
const results = (items?.results || []) as CDFResult[];
const edges = (items?.relatedEdges || []).filter(
(e) => e.instanceType === 'edge'
);
// Recurse if more pages exist
if (!isEmpty(nextCursor)) {
const next = await fetchPage(nextCursor);
return {
results: [...results, ...next.results],
edges: [...edges, ...next.edges],
};
}
return { results, edges };
};
return fetchPage();
}export async function fetchAllWithPagination(
client: CogniteClient
): Promise<CDFNodeResponse[]> {
return cdfTaskRunner.schedule(async () => {
const allItems: CDFNodeResponse[] = [];
let cursor: string | undefined = undefined;
do {
const response = await client.instances.list({
instanceType: 'node',
sources: [{ source: { type: 'view', ...MY_VIEW } }],
filter: { /* ... */ },
limit: 1000,
cursor,
});
allItems.push(...response.items);
cursor = response.nextCursor;
// Optional: break early if you have enough data
if (allItems.length >= 500) break;
} while (cursor);
return allItems;
});
}instances.upsertfunction chunk<T>(arr: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < arr.length; i += size) {
chunks.push(arr.slice(i, i + size));
}
return chunks;
}const UPSERT_BATCH_SIZE = 1000;
async function batchUpsertNodes(
client: CogniteClient,
nodes: NodeOrEdgeCreate[]
): Promise<void> {
const chunks = chunk(nodes, UPSERT_BATCH_SIZE);
// Process chunks through the semaphore — safe even with Promise.all
await Promise.all(
chunks.map((batch) =>
cdfTaskRunner.schedule(async () => {
await client.instances.upsert({
items: batch,
});
})
)
);
}import QueuedTaskRunner from '../../../../shared/utils/semaphore';
// Dedicated runner for deletes (stricter concurrency — check docs for current limit)
const deleteTaskRunner = new QueuedTaskRunner(2);
async function batchDeleteNodes(
client: CogniteClient,
nodeIds: { space: string; externalId: string }[]
): Promise<void> {
const chunks = chunk(nodeIds, 1000);
for (const batch of chunks) {
await deleteTaskRunner.schedule(async () => {
await client.instances.delete(
batch.map((id) => ({
instanceType: 'node' as const,
...id,
}))
);
});
}
}// BAD: Nested semaphore — can deadlock
async function fetchAndEnrich(client: CogniteClient) {
return cdfTaskRunner.schedule(async () => {
const batches = await fetchBatches(client); // This also calls cdfTaskRunner.schedule!
// If all slots are held by fetchAndEnrich callers, fetchBatches will never run
});
}
// GOOD: Let inner functions own the semaphore
async function fetchAndEnrich(client: CogniteClient) {
const batches = await fetchBatches(client); // Has its own semaphore call
const enriched = await Promise.all(
batches.map((b) => enrichBatch(client, b)) // Each has its own semaphore call
);
return enriched;
}limitnextCursor// BAD: May miss data
const response = await client.instances.list({ limit: 1000, /* ... */ });
const items = response.items; // Could be incomplete!
// GOOD: Paginate
const allItems = [];
let cursor;
do {
const response = await client.instances.list({ limit: 1000, cursor, /* ... */ });
allItems.push(...response.items);
cursor = response.nextCursor;
} while (cursor);// BAD: Too many simultaneous requests
await Promise.all(batchIds.map((id) => client.instances.query({ /* ... */ })));
// GOOD: Each call goes through the semaphore
await Promise.all(
batchIds.map((id) =>
cdfTaskRunner.schedule(() => client.instances.query({ /* ... */ }))
)
);instances.querylimitcursorscdfTaskRunner.schedule()instances.listcursornextCursorinstances.querycursorsnextCursorinstances.upsertcdfTaskRunner.schedule()Promise.allinstances.searchfilter