cupynumeric-parallel-data-load

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Parallel sharded data -> cupynumeric load

并行分片数据加载至cupynumeric

Why this skill exists. cupynumeric mirrors NumPy's array API, including
cupynumeric.load
for a single
.npy
file. Beyond that, file loading lives in Legate, not cupynumeric:
FormatBuilt-in loader
Single
.npy
cupynumeric.load(path)
(NumPy-API parity)
HDF5 (single file)
legate.io.hdf5.from_file
/
from_file_batched
Sharded multi-file (any format), Parquet/Arrow, raw binary, custom layoutsNo built-in loader — this skill.
This skill shows the canonical way to fill the gap in the last row: write a Legate Python task that calls the third-party reader the format needs (
h5py
,
pyarrow
,
np.memmap
, ...) inside the task body, and let Legate distribute the reads across GPUs / nodes. For the formats with a built-in loader, prefer it unless you need a custom in-task body (mmap-based loader, format-specific decoder, sidecar metadata, partial / sharded reads).
Canonical pattern: manual partition + manual task launch, sized to the machine, not the files. Only axis 0 is sharded; trailing axes ride along inside each tile. Per-shard row counts may differ across files (only
dtype
and trailing axes must match); the launch fills every available processor regardless of how many files there are.
.npy
is the worked example because the header carries shape and dtype on disk, but the skeleton applies to any format with cheap range/slice reads (raw binary, HDF5, Parquet/Arrow — see "Other formats" below). Reference implementation:
assets/examples/parallel_npy_load.py
.
本技能的存在意义。cupynumeric镜像了NumPy的数组API,包括用于单个.npy文件的
cupynumeric.load
。除此之外,文件加载功能属于Legate而非cupynumeric:
格式内置加载器
单个.npy文件
cupynumeric.load(path)
(与NumPy API兼容)
HDF5(单个文件)
legate.io.hdf5.from_file
/
from_file_batched
分片多文件(任意格式)、Parquet/Arrow、原始二进制、自定义布局无内置加载器 — 需使用本技能
本技能展示了填补最后一行空白的标准方法:编写一个Legate Python任务,在任务体内调用对应格式所需的第三方读取器(
h5py
pyarrow
np.memmap
等),并让Legate在GPU/节点间分布式执行读取操作。对于有内置加载器的格式,除非你需要自定义任务体(基于内存映射的加载器、格式特定解码器、辅助元数据、部分/分片读取),否则优先使用内置加载器。
标准模式:手动分区 + 手动任务启动,根据机器规模而非文件数量确定大小。仅对轴0进行分片;后续轴作为整体包含在每个分块中。各分片文件的行数可以不同(仅
dtype
和后续轴必须匹配);无论文件数量多少,启动操作都会利用所有可用处理器。
.npy是示例用例,因为其头部包含磁盘上的形状和dtype信息,但该框架适用于任何支持低成本范围/切片读取的格式(原始二进制、HDF5、Parquet/Arrow — 见下文“其他格式”)。参考实现:
assets/examples/parallel_npy_load.py

Data layout assumption

数据布局假设

This skill is purely about loading — it assumes the data is already laid out on a shared filesystem in some predictable, indexable way. Producing those files is out of scope (the example ships a
write
subcommand for convenience, but real users bring their own).
The worked example assumes one specific layout:
  • A directory containing files named
    shard_0000.npy
    ,
    shard_0001.npy
    , ... in a contiguous integer sequence (zero-padded width 4).
  • All shards share the same
    dtype
    and the same trailing axes (
    shape[1:]
    ); axis 0 (rows per shard) may differ across files — the recipe builds a cumulative row-offset table and reads each file's overlapping slice from inside the leaf task.
  • The directory is visible to every rank (shared filesystem for multi-node runs).
The example's
discover_layout()
prints what it found and hard-fails with a descriptive error when the layout is wrong (missing directory, no shards, mismatched
dtype
/ trailing axes, or a hole in the contiguous
shard_NNNN.npy
sequence).
If your data lives in a different layout — fixed-stride raw binary, an HDF5 file with one dataset per shard, a directory tree, ... — only the glob pattern, the per-file reader (step 4 below), and the metadata discovery (step 1 below) change. The partitioning and launch machinery is layout-agnostic.
本技能仅关注加载 — 假设数据已按可预测、可索引的方式存储在共享文件系统中。生成这些文件的过程不在本技能范围内(示例中为方便起见提供了
write
子命令,但实际用户需自行生成)。
示例采用以下特定布局:
  • 目录包含名为
    shard_0000.npy
    shard_0001.npy
    ……的文件,文件名按连续整数序列命名(补零至4位宽度)。
  • 所有分片共享相同的
    dtype
    和后续轴(
    shape[1:]
    );轴0(每个分片的行数)可因文件而异 — 本方案会构建累计行偏移表,并在叶子任务中读取每个文件的重叠切片。
  • 目录对所有rank可见(多节点运行时使用共享文件系统)。
示例中的
discover_layout()
会打印检测到的信息,若布局错误(目录缺失、无分片、
dtype
/后续轴不匹配、
shard_NNNN.npy
序列不连续),则会抛出明确的错误并终止。
若你的数据采用其他布局 — 固定步长原始二进制、每个分片对应一个数据集的HDF5文件、目录树等 — 仅需修改全局匹配模式、每个文件的读取器(下文步骤4)和元数据检测(下文步骤1)。分区和启动机制与布局无关。

When to use

使用场景

See the format table above for the routing decision (built-in loader vs. this skill). Beyond that, two additional cues that this skill is the right fit:
  • Replacing sequential
    np.concatenate([read(f) for f in files])
    with parallel per-GPU reads.
  • Demonstrating how a user-defined Legate Python task writes into a cupynumeric output array via a manual launch.
参考上方的格式表格来决定使用内置加载器还是本技能。除此之外,以下两个线索表明本技能是合适的选择:
  • 用并行的每GPU读取替代顺序执行的
    np.concatenate([read(f) for f in files])
  • 演示用户自定义Legate Python任务如何通过手动启动将数据写入cupynumeric输出数组。

Examples

示例

Paths below are written relative to this skill's directory (the script ships at
assets/examples/parallel_npy_load.py
). Adjust the prefix to match wherever your skill is installed (e.g.
skills/cupynumeric-parallel-data-load/assets/...
if the skill lives under a top-level
skills/
directory).
bash
undefined
以下路径均相对于本技能的目录(脚本位于
assets/examples/parallel_npy_load.py
)。请根据技能安装位置调整前缀(例如,若技能位于顶级
skills/
目录下,则路径为
skills/cupynumeric-parallel-data-load/assets/...
)。
bash
undefined

Single-node, 4 GPUs.

单节点,4块GPU

legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

```bash
legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

```bash

Multi-node, 2 nodes x 4 GPUs (slurm), shared filesystem at --shard-dir.

多节点,2节点×4块GPU(使用slurm),共享文件系统路径为--shard-dir

Generate the shards once on rank 0, then re-run
read
at any scale.

先在rank 0上生成分片,然后以任意规模重新运行
read

legate --launcher srun --nodes 2 --cpus 1
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
legate --launcher srun --nodes 2 --ranks-per-node 4
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

No layout flags — the read driver walks every `.npy` header to recover
per-file row counts, the trailing shape, and the dtype, then derives
`tile_rows` from the available processor count.

`--min-gpu-chunk 1` is only needed when the per-tile element count is
below Legate's default minimum chunk size for GPU launches (e.g. the
worked example's defaults — total rows split across 4 GPUs at
`~1M` per tile — fall below the threshold and would otherwise be
folded onto a single GPU). For production-sized datasets (tens of
millions of elements per tile or larger) you can drop the flag and
let Legate use its default. Bumping it to a moderate value (e.g.
`--min-gpu-chunk 1024`) is fine when each tile is large enough that
per-task overhead matters more than getting *every* GPU a tile.
legate --launcher srun --nodes 2 --cpus 1
assets/examples/parallel_npy_load.py
write --shard-dir /shared/scratch/demo
legate --launcher srun --nodes 2 --ranks-per-node 4
--gpus 4 --fbmem 4000 --min-gpu-chunk 1
assets/examples/parallel_npy_load.py
read --shard-dir /shared/scratch/demo

无需布局标志 — 读取驱动会遍历每个.npy头部以恢复每个文件的行数、后续形状和dtype,然后根据可用处理器数量推导`tile_rows`。

`--min-gpu-chunk 1`仅在每个分块的元素数量低于Legate GPU启动的默认最小分块大小时需要(例如,示例默认设置 — 总行数拆分到4块GPU,每个分块约1M元素 — 低于阈值,否则会合并到单个GPU上执行)。对于生产规模的数据集(每个分块包含数千万或更多元素),可以省略该标志,让Legate使用默认值。当每个分块足够大,任务开销比占用所有GPU更重要时,可将其设置为适中的值(例如`--min-gpu-chunk 1024`)。

Instructions

操作步骤

Five steps from a
.npy
worked example; only step 1 (parsing the format header) and step 4 (the per-file reader inside the task body) are format-specific. The other three (allocate destination, partition, fence) are reused unchanged across formats — see "Other formats" below for the swap-points.
基于.npy示例的五个步骤;仅步骤1(解析格式头部)和步骤4(任务体内的单文件读取器)与格式相关。其他三个步骤(分配目标存储、分区、栅栏)可在所有格式中重复使用 — 见下文“其他格式”中的替换点。

1. Read the metadata from every shard

1. 读取所有分片的元数据

Scan the directory and peek at every
.npy
header (
mmap_mode="r"
reads only the header). The header carries the per-shard shape and dtype, so the driver can recover total rows, trailing shape, and a cumulative row-offset table without ever loading the data:
python
paths = sorted(SHARD_DIR.glob("shard_*.npy"))

per_file_rows = []                       # rows along axis 0 per file
trailing_shape = None                    # shape[1:], must match across files
dtype = None
for p in paths:
    hdr = np.load(p, mmap_mode="r")
    if trailing_shape is None:
        trailing_shape = tuple(hdr.shape[1:])
        dtype = hdr.dtype
    elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
        raise RuntimeError(
            f"{p.name}: trailing shape / dtype mismatch "
            f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
        )
    per_file_rows.append(int(hdr.shape[0]))

cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64)  # length N+1
total_rows = int(cum_rows[-1])
The snippet above enforces matching
dtype
and
trailing_shape
(i.e.
shape[1:]
) across files. Per-shard row counts may differ — the cum-rows table handles that. Production code should also verify that names form a contiguous
shard_0000.npy ... shard_NNNN.npy
sequence (omitted from the snippet for brevity; see
discover_layout()
in the worked example). Discovery relies only on what the on-disk format itself exposes (the
.npy
header here,
.shape
/
.dtype
for HDF5, etc.); any sidecar (manifest, content hashes) is a separate verification step on top.
扫描目录并查看每个.npy的头部(
mmap_mode="r"
仅读取头部)。头部包含每个分片的形状和dtype,因此驱动无需加载数据即可恢复总行数、后续形状和累计行偏移表:
python
paths = sorted(SHARD_DIR.glob("shard_*.npy"))

per_file_rows = []                       # 每个文件沿轴0的行数
trailing_shape = None                    # shape[1:],所有文件必须匹配
dtype = None
for p in paths:
    hdr = np.load(p, mmap_mode="r")
    if trailing_shape is None:
        trailing_shape = tuple(hdr.shape[1:])
        dtype = hdr.dtype
    elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
        raise RuntimeError(
            f"{p.name}: 后续形状/dtype不匹配 "
            f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
        )
    per_file_rows.append(int(hdr.shape[0]))

cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64)  # 长度为N+1
total_rows = int(cum_rows[-1])
上述代码片段强制要求所有文件的
dtype
trailing_shape
(即
shape[1:]
)匹配。每个分片的行数可以不同 — 累计行表会处理这种情况。生产代码还应验证文件名是否形成连续的
shard_0000.npy ... shard_NNNN.npy
序列(为简洁起见,片段中省略了该逻辑;见示例中的
discover_layout()
)。检测仅依赖磁盘格式本身暴露的信息(此处为.npy头部,HDF5为
.shape
/
.dtype
等);任何辅助文件(清单、内容哈希)都是额外的验证步骤。

2. Create the cupynumeric output store from the metadata

2. 根据元数据创建cupynumeric输出存储

The total array spans
total_rows
along axis 0; trailing axes come from
trailing_shape
unchanged. Use
cn.empty
— the task overwrites every cell, zero-init would be wasted.
python
import cupynumeric as cn

total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)
总数组沿轴0的长度为
total_rows
;后续轴与
trailing_shape
一致。使用
cn.empty
— 任务会覆盖所有单元格,零初始化是浪费资源的操作。
python
import cupynumeric as cn

total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)

3. Tile the store by processor count

3. 根据处理器数量划分存储分块

The launch shape is sized to the available processors, not to the file count. Pick
tile_rows = ceil(total_rows / num_processors)
and partition axis 0 by that tile size. Trailing axes are not partitioned (tile spans the full extent there). The last tile is allowed to be short — that's exactly what
partition_by_tiling
supports — so the recipe needs no divisibility constraint.
python
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array

runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
    machine.count(TaskTarget.GPU),
    machine.count(TaskTarget.OMP),
    machine.count(TaskTarget.CPU),
    1,
)

tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)

num_tasks = (total_rows + tile_rows - 1) // tile_rows  # match partition tile count
启动形状由可用处理器数量而非文件数量决定。选择
tile_rows = ceil(total_rows / num_processors)
,并按该分块大小对轴0进行分区。后续轴不分区(分块覆盖其完整范围)。允许最后一个分块长度较短 — 这正是
partition_by_tiling
支持的功能 — 因此本方案无需满足可整除约束。
python
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array

runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
    machine.count(TaskTarget.GPU),
    machine.count(TaskTarget.OMP),
    machine.count(TaskTarget.CPU),
    1,
)

tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)

num_tasks = (total_rows + tile_rows - 1) // tile_rows  # 与分区分块数量匹配

4. Define the leaf task and launch it manually

4. 定义叶子任务并手动启动

PATHS
and
CUM_ROWS
(the file paths and cumulative row-offset table from step 1) plus
TILE_ROWS
are populated as module globals by the driver before launching; control replication runs the driver on every rank, so every worker sees identical values.
Each task builds its consumer view first (cupy on GPU, numpy on CPU/OMP) and reads the tile's actual row count from
view.shape[0]
PhysicalStore
itself has no
.shape
attribute, so going through the view is required. It then computes its global row range from its launch coordinate and that row count, bisects
cum_rows
for the overlapping file(s), and copies each overlapping file slice into the matching destination slice. Register CPU, OMP, and GPU variants so the same launch runs unchanged anywhere; dispatch on
ctx.get_variant_kind()
picks the consumer matching where the
OutputStore
is resident (
cp.from_dlpack(dst)
for FBMEM,
np.asarray(dst)
for SYSMEM). cupy is imported inside the GPU branch only, so the task body loads on machines without cupy.
python
import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task

@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
    t = ctx.task_index[0]                              # tile index 0..num_tasks-1

    variant = ctx.get_variant_kind()
    if variant == VariantCode.GPU:
        import cupy as cp                              # lazy: only on GPU
        view = cp.from_dlpack(dst)
    else:
        view = np.asarray(dst)                         # zero-copy numpy view

    tile_rows_actual = view.shape[0]                   # short on the last tile
    row_start = t * TILE_ROWS                          # global axis-0 start
    row_end = row_start + tile_rows_actual

    # Find the half-open range of file indices that overlap [row_start, row_end).
    first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
    last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1

    for f in range(first_file, last_file + 1):
        # Intersection of tile [row_start, row_end) with file [cum[f], cum[f+1]).
        lo = max(row_start, int(CUM_ROWS[f]))
        hi = min(row_end, int(CUM_ROWS[f + 1]))
        file_lo = lo - int(CUM_ROWS[f])
        file_hi = hi - int(CUM_ROWS[f])
        dst_lo = lo - row_start
        dst_hi = hi - row_start
        chunk = np.ascontiguousarray(
            np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
        )
        if variant == VariantCode.GPU:
            view[dst_lo:dst_hi].set(chunk)             # cudaMemcpyAsync H2D
        else:
            view[dst_lo:dst_hi] = chunk                # zero-copy numpy write

manual_task = runtime.create_manual_task(
    load_tile.library,
    load_tile.task_id,
    (num_tasks,),                                      # launch domain == tile count
)
manual_task.add_output(partition)
manual_task.execute()
Both consumers go through
PhysicalStore
's native producers (
__dlpack__
for cupy,
__array_interface__
for
np.asarray
) — zero-copy views of the local tile. Bisect cost is
O(log num_shards)
and the inner loop typically iterates 1–2 times (tiles overlap at most a couple of files).
PATHS
CUM_ROWS
(步骤1中的文件路径和累计行偏移表)以及
TILE_ROWS
在启动前由驱动填充为模块全局变量;控制复制机制会在所有rank上运行驱动,因此所有工作节点都会看到相同的值。
每个任务首先创建其消费视图(GPU上使用cupy,CPU/OMP上使用numpy),并从
view.shape[0]
读取分块的实际行数 —
PhysicalStore
本身没有
.shape
属性,因此必须通过视图获取。然后根据启动坐标和行数计算全局行范围,在
cum_rows
中二分查找重叠的文件,并将每个重叠文件的切片复制到对应的目标切片中。注册CPU、OMP和GPU变体,以便同一启动操作可在任意环境中运行;通过
ctx.get_variant_kind()
进行调度,选择与
OutputStore
所在位置匹配的消费者(FBMEM使用
cp.from_dlpack(dst)
,SYSMEM使用
np.asarray(dst)
)。仅在GPU分支中导入cupy,因此任务体可在没有cupy的机器上运行。
python
import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task

@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
    t = ctx.task_index[0]                              # 分块索引0..num_tasks-1

    variant = ctx.get_variant_kind()
    if variant == VariantCode.GPU:
        import cupy as cp                              # 延迟导入:仅在GPU上执行
        view = cp.from_dlpack(dst)
    else:
        view = np.asarray(dst)                         # 零拷贝numpy视图

    tile_rows_actual = view.shape[0]                   # 最后一个分块可能较短
    row_start = t * TILE_ROWS                          # 全局轴0起始位置
    row_end = row_start + tile_rows_actual

    # 查找与[row_start, row_end)重叠的文件索引的半开范围
    first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
    last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1

    for f in range(first_file, last_file + 1):
        # 分块[row_start, row_end)与文件[cum[f], cum[f+1])的交集
        lo = max(row_start, int(CUM_ROWS[f]))
        hi = min(row_end, int(CUM_ROWS[f + 1]))
        file_lo = lo - int(CUM_ROWS[f])
        file_hi = hi - int(CUM_ROWS[f])
        dst_lo = lo - row_start
        dst_hi = hi - row_start
        chunk = np.ascontiguousarray(
            np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
        )
        if variant == VariantCode.GPU:
            view[dst_lo:dst_hi].set(chunk)             # cudaMemcpyAsync H2D
        else:
            view[dst_lo:dst_hi] = chunk                # 零拷贝numpy写入

manual_task = runtime.create_manual_task(
    load_tile.library,
    load_tile.task_id,
    (num_tasks,),                                      # 启动域 == 分块数量
)
manual_task.add_output(partition)
manual_task.execute()
两种消费者均通过
PhysicalStore
的原生生产者(cupy使用
__dlpack__
np.asarray
使用
__array_interface__
)获取 — 本地分块的零拷贝视图。二分查找的时间复杂度为
O(log num_shards)
,内循环通常迭代1-2次(分块最多与几个文件重叠)。

5. Fence and verify

5. 栅栏与验证

python
get_legate_runtime().issue_execution_fence(block=True)
python
get_legate_runtime().issue_execution_fence(block=True)

Hard constraints

硬性约束

  1. All shards must share
    dtype
    and trailing axes (
    shape[1:]
    ).
    The recipe stacks shards along axis 0; the destination's trailing axes come from
    trailing_shape
    , which the discovery step locks to the value of the first file. Per-shard row counts (
    shape[0]
    ) may freely differ — the cumulative-offset table handles them. The example rejects any shard whose
    dtype
    or trailing shape differs from the first one with a descriptive error.
  2. Pick the consumer that matches the variant.
    cp.from_dlpack
    rejects SYSMEM-resident stores;
    np.asarray
    silently returns a host view of an FBMEM-resident store you can't actually write through. Dispatch on
    ctx.get_variant_kind()
    so each variant uses its own consumer — see step 4.
  3. mmap views aren't always C-contiguous — wrap each per-file slice with
    np.ascontiguousarray(arr[file_lo:file_hi])
    before
    .set()
    or the numpy in-place write.
  4. Multi-node:
    SHARD_DIR
    must be on a shared filesystem.
    Every worker (on every rank) opens shards by path; node-local
    /tmp
    paths only work for single-node demos.
  1. 所有分片必须共享
    dtype
    和后续轴(
    shape[1:]
    。本方案沿轴0堆叠分片;目标存储的后续轴来自
    trailing_shape
    ,检测步骤会将其锁定为第一个文件的值。每个分片的行数(
    shape[0]
    )可以自由不同 — 累计偏移表会处理这种情况。若某个分片的
    dtype
    或后续形状与第一个分片不同,示例会抛出明确的错误并终止。
  2. 选择与变体匹配的消费者
    cp.from_dlpack
    会拒绝驻留在SYSMEM的存储;
    np.asarray
    会静默返回FBMEM存储的主机视图,但无法对其执行写入操作。通过
    ctx.get_variant_kind()
    进行调度,让每个变体使用对应的消费者 — 见步骤4。
  3. 内存映射视图并非始终是C连续的 — 在
    .set()
    或numpy原地写入之前,需用
    np.ascontiguousarray(arr[file_lo:file_hi])
    包装每个文件切片。
  4. 多节点:
    SHARD_DIR
    必须位于共享文件系统
    。每个工作节点(所有rank)都通过路径打开分片;节点本地的
    /tmp
    路径仅适用于单节点演示。

Variants

变体

Uniform-shard fast path (one task per file)

统一分片快速路径(每个文件对应一个任务)

When every shard already has the same
(shape, dtype)
and you happen to have
num_shards
processors available, the cum-rows / bisect machinery is overhead. Set
tile_rows = shard_shape[0]
and
num_tasks = num_shards
; the partition then has one tile per file and each task reads exactly one file end-to-end (no bisect, no inner loop). The driver-side switch is a one-liner:
python
if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
    tile_rows = per_file_rows[0]
else:
    tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
The same
load_tile
task body still works in either mode — the inner loop just happens to iterate exactly once per task. There's no need for a separate task body for the fast path.
当所有分片的
(shape, dtype)
都相同,且恰好有
num_shards
个可用处理器时,累计行/二分查找机制会产生额外开销。设置
tile_rows = shard_shape[0]
num_tasks = num_shards
;此时分区每个文件对应一个分块,每个任务完整读取一个文件(无需二分查找,无内循环)。驱动端只需一行代码即可切换:
python
if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
    tile_rows = per_file_rows[0]
else:
    tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
同一个
load_tile
任务体在两种模式下均能正常工作 — 内循环恰好每个任务迭代一次。无需为快速路径编写单独的任务体。

Over-decompose for better load balancing

过度分解以实现更好的负载均衡

The default
tile_rows = ceil(total_rows / num_processors)
gives one tile per processor. To over-decompose by a factor
K
(smaller tiles, more point tasks, finer-grained queueing), divide by
K * num_processors
instead:
python
tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))
num_tasks = ceil(total_rows / tile_rows)
then expands to roughly
K * num_processors
. The same task body still works — bisect just lands on more tasks per file.
默认的
tile_rows = ceil(total_rows / num_processors)
为每个处理器分配一个分块。若要按因子
K
进行过度分解(分块更小,点任务更多,队列粒度更细),则改为除以
K * num_processors
python
tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))
此时
num_tasks = ceil(total_rows / tile_rows)
会扩展至约
K * num_processors
。同一个任务体仍能正常工作 — 二分查找会为每个文件分配更多任务。

Other formats

其他格式

Only the per-file reader inside
load_tile
changes. The reader's contract: given a file path and a half-open row range
[file_lo, file_hi)
along axis 0, return a numpy array of shape
(file_hi - file_lo,) + trailing_shape
that can be made C-contiguous. Cheap range/slice reads are required — formats that only support "read the whole file" defeat the partial-overlap case (a tile that covers only part of one file).
FormatReader inside the leaf task
.npy
(worked example)
host = np.ascontiguousarray(np.load(p, mmap_mode="r")[file_lo:file_hi])
Raw binary (fixed-shape)
arr = np.memmap(p, dtype=DTYPE, mode="r", shape=(rows_in_file, *trailing_shape)); host = np.ascontiguousarray(arr[file_lo:file_hi])
HDF5
with h5py.File(p, "r") as f: host = np.ascontiguousarray(f["data"][file_lo:file_hi])
Parquet / Arrow
tbl = pq.read_table(p, columns=..., use_threads=False).slice(file_lo, file_hi - file_lo); host = tbl.to_pandas().values
(For built-in single-call loaders per format, see the "Why this skill exists" table at the top of this file.)
The discovery step (step 1) parses each format's metadata:
.npy
/ HDF5 / Parquet all carry per-file row count + dtype on disk. Raw binary doesn't — sidecar or derive from file size.
仅需修改
load_tile
内的单文件读取器。读取器的约定:给定文件路径和沿轴0的半开行范围
[file_lo, file_hi)
,返回形状为
(file_hi - file_lo,) + trailing_shape
的numpy数组,且该数组可转换为C连续格式。要求支持低成本范围/切片读取 — 仅支持“读取整个文件”的格式无法处理部分重叠场景(分块仅覆盖文件的一部分)。
格式叶子任务内的读取器
.npy(示例用例)
host = np.ascontiguousarray(np.load(p, mmap_mode="r")[file_lo:file_hi])
原始二进制(固定形状)
arr = np.memmap(p, dtype=DTYPE, mode="r", shape=(rows_in_file, *trailing_shape)); host = np.ascontiguousarray(arr[file_lo:file_hi])
HDF5
with h5py.File(p, "r") as f: host = np.ascontiguousarray(f["data"][file_lo:file_hi])
Parquet / Arrow
tbl = pq.read_table(p, columns=..., use_threads=False).slice(file_lo, file_hi - file_lo); host = tbl.to_pandas().values
(各格式的内置单调用加载器见本文开头“本技能的存在意义”表格。)
检测步骤(步骤1)会解析每种格式的元数据:.npy/HDF5/Parquet均在磁盘上存储每个文件的行数和dtype。原始二进制则不存储 — 需通过辅助文件或文件大小推导。

Common pitfalls

常见陷阱

cn.asarray(dst)
is illegal in a leaf task

叶子任务中使用
cn.asarray(dst)
是非法的

Inside a
@task
body, any cupynumeric op that touches the top-level runtime —
cn.asarray(store)
, slice assignment
cn_dst[s] = host_np
— triggers
create_index_space
from the wrong context and Legion aborts:
LEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_space
Fix: consume the DLPack capsule with a third-party library (cupy / torch / numpy) inside leaf tasks.
cn.asarray
is fine in the driver, just not in leaf tasks. See
examples/dlpack/leaf_task_interop.py
for the torch-flavoured workaround.
@task
体内,任何触及顶级runtime的cupynumeric操作 —
cn.asarray(store)
、切片赋值
cn_dst[s] = host_np
— 都会在错误的上下文中触发
create_index_space
,导致Legion终止:
LEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_space
修复方法:在叶子任务中使用第三方库(cupy/torch/numpy)消费DLPack胶囊。
cn.asarray
在驱动中使用是可行的,但不能在叶子任务中使用。见
examples/dlpack/leaf_task_interop.py
中的torch版解决方案。

In-task
assert
aborts the runtime

任务内的
assert
会终止runtime

Legate treats unraised exceptions in a
@task
as a contract violation and aborts unless the task was registered with
throws_exception()
. Sanity-check on the host before launching.
Legate将
@task
中未捕获的异常视为违反约定,除非任务注册时指定了
throws_exception()
,否则会终止runtime。请在启动前在主机端进行完整性检查。

Launch domain must match the partition tile count

启动域必须与分区分块数量匹配

create_manual_task(launch_shape=...)
and
partition_by_tiling(...)
are independent — the runtime doesn't catch a mismatch. Larger launch domain → out-of-range tiles; smaller → unwritten tiles. Always derive both from the same
(total_rows, tile_rows)
via two separate
ceil
divisions (sizing the launch domain to
num_processors
directly would over-launch when
num_processors > total_rows
):
python
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))
create_manual_task(launch_shape=...)
partition_by_tiling(...)
是独立的 — runtime不会检测不匹配的情况。启动域过大 → 分块越界;启动域过小 → 部分分块未被写入。始终通过两个独立的向上取整除法从相同的
(total_rows, tile_rows)
推导两者(直接根据
num_processors
设置启动域会在
num_processors > total_rows
时过度启动):
python
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))