distributed-job-safety

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Distributed Job Safety

分布式作业安全

Patterns and anti-patterns for concurrent job management with pueue + mise + systemd-run, learned from production failures in distributed data pipeline orchestration.
Scope: Universal principles for any pueue + mise workflow with concurrent parameterized jobs. Examples use illustrative names but the principles apply to any domain.
Prerequisite skills:
devops-tools:pueue-job-orchestration
,
itp:mise-tasks
,
itp:mise-configuration

使用pueue + mise + systemd-run进行并发作业管理的模式与反模式,来自分布式数据流水线编排的生产故障经验总结。
适用范围:适用于任何带有并发参数化作业的pueue + mise工作流的通用原则。示例使用示意性名称,但原则可应用于任何领域。
前置技能
devops-tools:pueue-job-orchestration
,
itp:mise-tasks
,
itp:mise-configuration

The Eight Invariants

八大不变原则

Non-negotiable rules for concurrent job safety. Violating any one causes silent data corruption or job failure.
Full formal specifications: references/concurrency-invariants.md
并发作业安全的不可妥协规则。违反任何一条都会导致静默数据损坏或作业失败。
完整正式规范:references/concurrency-invariants.md

1. Filename Uniqueness by ALL Job Parameters

1. 基于所有作业参数的文件名唯一性

Every file path shared between concurrent jobs MUST include ALL parameters that differentiate those jobs.
WRONG:  {symbol}_{start}_{end}.json                    # Two thresholds collide
RIGHT:  {symbol}_{threshold}_{start}_{end}.json         # Each job gets its own file
Test: If two pueue jobs can run simultaneously with different parameter values, those values MUST appear in every shared filename, temp directory, and lock file.
并发作业之间共享的每个文件路径必须包含区分这些作业的所有参数。
WRONG:  {symbol}_{start}_{end}.json                    # 两个阈值会冲突
RIGHT:  {symbol}_{threshold}_{start}_{end}.json         # 每个作业拥有独立文件
测试方法:如果两个pueue作业可以使用不同参数值同时运行,这些值必须出现在每个共享文件名、临时目录和锁文件中。

2. Verify Before Mutate (No Blind Queueing)

2. 先验证再变更(禁止盲目排队)

Before queueing jobs, check what is already running. Before deleting state, check who owns it.
bash
undefined
在排队作业前,检查当前已在运行的作业。在删除状态前,检查状态的归属。
bash
undefined

WRONG: Blind queue

WRONG: 盲目排队

for item in "${ITEMS[@]}"; do pueue add --group mygroup -- run_job "$item" "$param" done
for item in "${ITEMS[@]}"; do pueue add --group mygroup -- run_job "$item" "$param" done

RIGHT: Check first

RIGHT: 先检查

running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")') if echo "$running" | grep -q "${item}@${param}"; then echo "SKIP: ${item}@${param} already running" continue fi
undefined
running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")') if echo "$running" | grep -q "${item}@${param}"; then echo "SKIP: ${item}@${param} 已在运行" continue fi
undefined

3. Idempotent File Operations (missing_ok=True)

3. 幂等文件操作(missing_ok=True)

All file deletion in concurrent contexts MUST tolerate the file already being gone.
python
undefined
并发环境下的所有文件删除操作必须能容忍文件已不存在的情况。
python
undefined

WRONG: TOCTOU race

WRONG: TOCTOU竞争

if path.exists(): path.unlink() # Crashes if another job deleted between check and unlink
if path.exists(): path.unlink() # 若在检查与删除之间文件被其他作业删除,会崩溃

RIGHT: Idempotent

RIGHT: 幂等操作

path.unlink(missing_ok=True)
undefined
path.unlink(missing_ok=True)
undefined

4. Atomic Writes for Shared State

4. 共享状态的原子写入

Checkpoint files must never be partially written. Use the tempfile-fsync-rename pattern.
python
fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp")
with os.fdopen(fd, "w") as f:
    f.write(json.dumps(data))
    f.flush()
    os.fsync(f.fileno())
os.replace(temp_path, path)  # POSIX atomic rename
Bash equivalent (for NDJSON telemetry appends):
bash
undefined
检查点文件绝不能被部分写入。使用临时文件-同步-重命名模式。
python
fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp")
with os.fdopen(fd, "w") as f:
    f.write(json.dumps(data))
    f.flush()
    os.fsync(f.fileno())
os.replace(temp_path, path)  # POSIX原子重命名
Bash等价实现(用于NDJSON遥测追加):
bash
undefined

Atomic multi-line append via flock + temp file

通过flock + 临时文件实现原子多行追加

TMPOUT=$(mktemp)
TMPOUT=$(mktemp)

... write lines to $TMPOUT ...

... 向$TMPOUT写入内容 ...

flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'" rm -f "$TMPOUT"
undefined
flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'" rm -f "$TMPOUT"
undefined

5. Config File Is SSoT

5. 配置文件为唯一可信源(SSoT)

The
.mise.toml
[env]
section is the single source of truth for environment defaults. Per-job
env
overrides bypass the SSoT and allow arbitrary values with no review gate.
bash
undefined
.mise.toml
[env]
部分是环境默认值的唯一可信源。针对单个作业的
env
覆盖会绕过可信源,允许无审核的任意值。
bash
undefined

WRONG: Per-job override bypasses mise SSoT

WRONG: 单作业覆盖绕过mise可信源

pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py
pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py

RIGHT: Set the correct value in .mise.toml, no per-job override needed

RIGHT: 在.mise.toml中设置正确值,无需单作业覆盖

pueue add -- uv run python script.py

**Controlled exception**: `pueue env set <id> KEY VALUE` is acceptable for one-off overrides on stashed/queued tasks (e.g., hyperparameter sweeps). The key distinction: mise `[env]` is SSoT for **defaults** that apply to all runs; `pueue env set` is for **one-time parameterization** of a specific task without modifying the config file. See `devops-tools:pueue-job-orchestration` Per-Task Environment Override section.
pueue add -- uv run python script.py

**可控例外**:`pueue env set <id> KEY VALUE`可用于暂存/排队任务的一次性覆盖(如超参数调优)。关键区别在于:mise `[env]`是**所有运行默认值**的可信源;`pueue env set`用于**特定任务的一次性参数化**,无需修改配置文件。详见`devops-tools:pueue-job-orchestration`的“单任务环境覆盖”章节。

6. Maximize Parallelism Within Safe Margins

6. 在安全范围内最大化并行度

Always probe host resources and scale parallelism to use available capacity. Conservative defaults waste hours of idle compute.
bash
undefined
始终探测主机资源,调整并行度以利用可用容量。保守的默认值会浪费数小时的闲置计算资源。
bash
undefined

Probe host resources

探测主机资源

ssh host 'nproc && free -h && uptime'
ssh host 'nproc && free -h && uptime'

Sizing formula (leave 20% margin for OS + DB + overhead)

计算公式(预留20%余量给操作系统、数据库及开销)

max_jobs = min(

max_jobs = min(

(available_memory_gb * 0.8) / per_job_memory_gb,

(可用内存GB * 0.8) / 单作业内存GB,

(total_cores * 0.8) / per_job_cpu_cores

(总核心数 * 0.8) / 单作业核心数

)

)


**For ClickHouse workloads**: The bottleneck is often ClickHouse's `concurrent_threads_soft_limit` (default: 2 × nproc), not pueue's parallelism. Each query requests `max_threads` threads (default: nproc). Right-size `--max_threads` per query to match the effective thread count (soft_limit / pueue_slots), then increase pueue slots. Pueue parallelism can be adjusted live without restarting running jobs.

**Post-bump monitoring** (mandatory for 5 minutes after any parallelism change):

- `uptime` — load average should stay below 0.9 × nproc
- `vmstat 1 5` — si/so columns must remain 0 (no active swapping)
- ClickHouse errors: `SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing'` — must be 0

**Cross-reference**: See `devops-tools:pueue-job-orchestration` ClickHouse Parallelism Tuning section for the full decision matrix.

**针对ClickHouse工作负载**:瓶颈通常是ClickHouse的`concurrent_threads_soft_limit`(默认:2 × nproc),而非pueue的并行度。每个查询会请求`max_threads`线程(默认:nproc)。需根据有效线程数(soft_limit / pueue_slots)调整`--max_threads`,然后再增加pueue的槽位。Pueue并行度可实时调整,无需重启运行中的作业。

**调整后监控**(并行度变更后必须监控5分钟):

- `uptime` — 负载平均值应保持在0.9 × nproc以下
- `vmstat 1 5` — si/so列必须保持为0(无活跃交换)
- ClickHouse错误:`SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing'` — 结果必须为0

**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“ClickHouse并行度调优”章节获取完整决策矩阵。

7. Per-Job Memory Caps via systemd-run

7. 通过systemd-run设置单作业内存上限

On Linux with cgroups v2, wrap each job with
systemd-run
to enforce hard memory limits.
bash
systemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0 \
    uv run python scripts/process.py --symbol BTCUSDT --threshold 250
Critical:
MemorySwapMax=0
is mandatory. Without it, the process escapes into swap and the memory limit is effectively meaningless.
在支持cgroups v2的Linux系统上,使用
systemd-run
包裹每个作业以强制执行硬内存限制。
bash
systemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0 \
    uv run python scripts/process.py --symbol BTCUSDT --threshold 250
关键要求
MemorySwapMax=0
是必须的。若无此设置,进程会逃逸到交换空间,内存限制将失去实际意义。

8. Monitor by Stable Identifiers, Not Ephemeral IDs

8. 通过稳定标识符而非临时ID进行监控

Pueue job IDs are ephemeral — they shift when jobs are removed, re-queued, or split. Use group names and label patterns for monitoring.
bash
undefined
Pueue作业ID是临时的 — 当作业被移除、重新排队或拆分时,ID会变化。使用组名和标签模式进行监控。
bash
undefined

WRONG: Hardcoded job IDs

WRONG: 硬编码作业ID

if pueue status --json | jq -e ".tasks."14"" >/dev/null; then ...
if pueue status --json | jq -e ".tasks."14"" >/dev/null; then ...

RIGHT: Query by group/label

RIGHT: 通过组/标签查询

pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'

Full specification: [references/concurrency-invariants.md](./references/concurrency-invariants.md#inv-8)

---
pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'

完整规范:[references/concurrency-invariants.md](./references/concurrency-invariants.md#inv-8)

---

Anti-Patterns (Learned from Production)

反模式(从生产故障中总结)

AP-1: Redeploying Without Checking Running Jobs

AP-1: 未检查运行中作业就重新部署

Symptom: Killed running jobs, requeued new ones. Old checkpoint files from killed jobs persisted, causing collisions with new jobs.
Fix: Always run state audit before redeployment:
bash
pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running")] | length'
症状:杀死运行中的作业,重新排队新作业。被杀死作业的旧检查点文件残留,导致与新作业冲突。
修复方案:重新部署前始终运行状态审计:
bash
pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running")] | length'

If > 0, decide: wait, kill gracefully, or abort

若结果>0,决定:等待、优雅终止或中止部署


See: [references/deployment-checklist.md](./references/deployment-checklist.md)

参考:[references/deployment-checklist.md](./references/deployment-checklist.md)

AP-2: Checkpoint Filename Missing Job Parameters

AP-2: 检查点文件名缺失作业参数

Symptom:
FileNotFoundError
on checkpoint delete -- Job A deleted Job B's checkpoint.
Root cause: Filename
{item}_{start}_{end}.json
lacked a differentiating parameter. Two jobs for the same item at different configurations shared the file.
Fix: Include ALL differentiating parameters:
{item}_{config}_{start}_{end}.json
症状:删除检查点时出现
FileNotFoundError
— 作业A删除了作业B的检查点。
根本原因:文件名
{item}_{start}_{end}.json
缺少区分参数。同一项目不同配置的两个作业共享该文件。
修复方案:包含所有区分参数:
{item}_{config}_{start}_{end}.json

AP-3: Trusting
pueue restart
Logs

AP-3: 信任
pueue restart
日志

Symptom:
pueue log <id>
shows old error after
pueue restart
, appearing as if the restart failed.
Root cause: Pueue appends output to existing log. After restart, the log contains BOTH the old failed run and the new attempt.
Fix: Check timestamps in the log, or add a new fresh job instead of restarting:
bash
undefined
症状:执行
pueue restart
后,
pueue log <id>
显示旧错误,看起来像是重启失败。
根本原因:Pueue会将输出追加到现有日志。重启后,日志同时包含旧的失败运行记录和新的尝试记录。
修复方案:检查日志中的时间戳,或添加新的作业而非重启:
bash
undefined

More reliable than restart

比重启更可靠

pueue add --group mygroup --label "BTCUSDT@750-retry" -- <same command>
undefined
pueue add --group mygroup --label "BTCUSDT@750-retry" -- <相同命令>
undefined

AP-4: Assuming PyPI Propagation Is Instant

AP-4: 假设PyPI传播是即时的

Symptom:
uv pip install pkg==X.Y.Z
fails with "no version found" immediately after publishing.
Root cause: PyPI CDN propagation takes 30-120 seconds.
Fix: Use
--refresh
flag to bust cache:
bash
uv pip install --refresh --index-url https://pypi.org/simple/ mypkg==<version>
症状:发布后立即执行
uv pip install pkg==X.Y.Z
,失败并提示“未找到版本”。
根本原因:PyPI CDN传播需要30-120秒。
修复方案:使用
--refresh
标志清除缓存:
bash
uv pip install --refresh --index-url https://pypi.org/simple/ mypkg==<version>

AP-5: Confusing Editable Source vs. Installed Wheel

AP-5: 混淆可编辑源码与已安装Wheel包

Symptom: Updated pip package to latest, but
uv run
still uses old code.
Root cause:
uv.lock
has
source = { editable = "." }
--
uv run
reads Python files from the git working tree, not from the installed wheel.
Fix: On remote hosts,
git pull
updates the source that
uv run
reads. Pip install only matters for non-editable environments.
症状:将pip包更新到最新版本,但
uv run
仍使用旧代码。
根本原因
uv.lock
中包含
source = { editable = "." }
uv run
从git工作树读取Python文件,而非已安装的Wheel包。
修复方案:在远程主机上,
git pull
会更新
uv run
读取的源码。Pip安装仅对非可编辑环境有意义。

AP-6: Sequential Phase Assumption

AP-6: 假设阶段是顺序执行的

Symptom: Phase 2 jobs started while Phase 1 was still running for the same item, creating contention.
Root cause: All phases queued simultaneously.
Fix: Either use pueue dependencies (
--after <id>
) or queue phases sequentially after verification:
bash
undefined
症状:阶段2作业在阶段1仍运行时启动,导致资源竞争。
根本原因:所有阶段被同时排队。
修复方案:使用pueue依赖(
--after <id>
)或在验证后顺序排队阶段:
bash
undefined

Queue Phase 1, wait for completion, then Phase 2

排队阶段1,等待完成后再排队阶段2

pueue add --label "phase1" -- run_phase_1
pueue add --label "phase1" -- run_phase_1

... wait and verify ...

... 等待并验证 ...

pueue add --label "phase2" -- run_phase_2
undefined
pueue add --label "phase2" -- run_phase_2
undefined

AP-7: Manual Post-Processing Steps

AP-7: 手动后处理步骤

Symptom: Queue batch jobs, print "run optimize after they finish."
bash
undefined
症状:批量作业排队后,打印“完成后运行optimize”。
bash
undefined

WRONG

WRONG

postprocess_all() { queue_batch_jobs echo "Run 'pueue wait' then manually run optimize and validate" # NO! }

**Fix**: Wire post-processing as pueue `--after` dependent jobs:

```bash
postprocess_all() { queue_batch_jobs echo "运行'pueue wait'后手动执行optimize和validate" # 错误做法! }

**修复方案**:将后处理作为pueue的`--after`依赖作业:

```bash

RIGHT

RIGHT

postprocess_all() { JOB_IDS=() for param in 250 500 750 1000; do job_id=$(pueue add --print-task-id --group mygroup
--label "ITEM@${param}" -- uv run python process.py --param "$param") JOB_IDS+=("$job_id") done # Chain optimize after ALL batch jobs optimize_id=$(pueue add --print-task-id --after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL") # Chain validation after optimize pueue add --after "$optimize_id" -- uv run python scripts/validate.py }

**Cross-reference**: See `devops-tools:pueue-job-orchestration` Dependency Chaining section for full `--after` patterns.
postprocess_all() { JOB_IDS=() for param in 250 500 750 1000; do job_id=$(pueue add --print-task-id --group mygroup
--label "ITEM@${param}" -- uv run python process.py --param "$param") JOB_IDS+=("$job_id") done # 所有批量作业完成后执行optimize optimize_id=$(pueue add --print-task-id --after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL") # optimize完成后执行验证 pueue add --after "$optimize_id" -- uv run python scripts/validate.py }

**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“依赖链”章节获取完整的`--after`模式。

AP-8: Hardcoded Job IDs in Pipeline Monitors

AP-8: 监控中使用硬编码作业ID

Symptom: Background monitor crashes with empty variable or wrong comparison after jobs are removed, re-queued, or split into per-year jobs.
Root cause: Monitor uses
grep "^14|"
to find specific job IDs. When those IDs no longer exist (killed, removed, replaced by per-year splits), the grep returns empty and downstream comparisons fail.
Fix: Detect phase transitions by group completion patterns, not by tracking individual job IDs. Use
group_all_done()
to check if all jobs in a pueue group have finished.
Principle: Pueue group names and job labels are stable identifiers. Job IDs are ephemeral.
Cross-reference: See
devops-tools:pueue-job-orchestration
Pipeline Monitoring section for the full
group_all_done()
implementation and integrity check patterns.
症状:作业被移除、重新排队或拆分为按年作业后,后台监控因空变量或错误比较而崩溃。
根本原因:监控使用
grep "^14|"
查找特定作业ID。当这些ID不存在(被杀死、移除或被按年拆分替代)时,grep返回空,导致下游比较失败。
修复方案:通过组完成模式而非跟踪单个作业ID来检测阶段转换。使用
group_all_done()
检查pueue组中的所有作业是否已完成。
原则:Pueue组名和作业标签是稳定标识符。作业ID是临时的。
交叉参考:详见
devops-tools:pueue-job-orchestration
的“流水线监控”章节获取完整的
group_all_done()
实现和完整性检查模式。

AP-9: Sequential Processing When Epoch Resets Enable Parallelism

AP-9: 当纪元重置支持并行时仍使用顺序处理

Symptom: A multi-year job runs for days single-threaded while 25+ cores sit idle. ETA: 1,700 hours.
Root cause: Pipeline processor resets state at epoch boundaries (yearly, monthly) — each epoch is already independent. But the job was queued as one monolithic range.
Fix: Split into per-epoch pueue jobs running concurrently:
bash
undefined
症状:一个多年作业单线程运行需要数天,而25+核心处于闲置状态。预计时间:1700小时。
根本原因:流水线处理器在纪元边界(每年、每月)重置状态 — 每个纪元本身是独立的。但作业被作为一个整体范围排队。
修复方案:拆分为按纪元的pueue作业并发运行:
bash
undefined

WRONG: Single monolithic job, wastes idle cores

WRONG: 单个整体作业,浪费闲置核心

pueue add -- process --start 2019-01-01 --end 2026-12-31 # 1,700 hours single-threaded
pueue add -- process --start 2019-01-01 --end 2026-12-31 # 单线程需1700小时

RIGHT: Per-year splits, 5x+ speedup on multi-core

RIGHT: 按年拆分,多核环境下提速5倍以上

for year in 2019 2020 2021 2022 2023 2024 2025 2026; do pueue add --group item-yearly --label "ITEM@250:${year}"
-- process --start "${year}-01-01" --end "${year}-12-31" done

**When this applies**: Any pipeline where the processor explicitly resets state at time boundaries (ouroboros pattern, rolling windows, annual rebalancing). If the processor carries state across boundaries, per-epoch splitting is NOT safe.

**Cross-reference**: See `devops-tools:pueue-job-orchestration` Per-Year Parallelization section for full patterns.
for year in 2019 2020 2021 2022 2023 2024 2025 2026; do pueue add --group item-yearly --label "ITEM@250:${year}"
-- process --start "${year}-01-01" --end "${year}-12-31" done

**适用场景**:任何处理器在时间边界明确重置状态的流水线(衔尾蛇模式、滚动窗口、年度再平衡)。若处理器跨边界携带状态,则按纪元拆分不安全。

**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“按年并行化”章节获取完整模式。

AP-10: State File Bloat Causing Silent Performance Regression

AP-10: 状态文件膨胀导致静默性能退化

Symptom: Job submission that used to take 10 minutes now takes 6+ hours. No errors — just slow. Pipeline appears healthy but execution slots sit idle waiting for new jobs to be queued.
Root cause: Pueue's
state.json
grows with every completed task. At 50K+ completed tasks (80-100MB state file), each
pueue add
takes 1-2 seconds instead of <100ms. This is invisible — no errors, no warnings, just gradually degrading throughput.
Why it's dangerous: The regression is proportional to total completed tasks across the daemon's lifetime. A sweep that runs 10K jobs/day hits the problem by day 5. The first day runs fine, creating a false sense of security.
Fix: Treat
state.json
as infrastructure that requires periodic maintenance:
bash
undefined
症状:过去需要10分钟的作业提交现在需要6+小时。无错误 — 只是慢。流水线看起来健康,但执行槽在等待新作业排队时处于闲置状态。
根本原因:Pueue的
state.json
会随每个完成的任务增长。当完成任务达到50K+(状态文件80-100MB)时,每个
pueue add
操作需要1-2秒,而非<100ms。这是不可见的 — 无错误、无警告,只是吞吐量逐渐下降。
危险性:退化程度与守护进程生命周期内完成的任务总数成正比。每天运行10K作业的扫描任务会在第5天遇到问题。第一天运行正常,会造成虚假的安全感。
修复方案:将
state.json
视为需要定期维护的基础设施:
bash
undefined

Before bulk submission: always clean

批量提交前:始终清理

pueue clean -g mygroup 2>/dev/null || true
pueue clean -g mygroup 2>/dev/null || true

During long sweeps: clean between batches

长时间扫描期间:批量之间清理

(See pueue-job-orchestration skill for full batch pattern)

(详见pueue-job-orchestration技能的完整批量模式)

Monitor state size as part of health checks

将状态文件大小作为健康检查的一部分

STATE_FILE="$HOME/.local/share/pueue/state.json" ls -lh "$STATE_FILE" # Should be <10MB for healthy operation

**Invariant**: `state.json` size should stay below 50MB during active sweeps. Above 50MB, `pueue add` latency exceeds 500ms and parallel submission gains vanish.

**Cross-reference**: See `devops-tools:pueue-job-orchestration` State File Management section for benchmarks and the periodic clean pattern.
STATE_FILE="$HOME/.local/share/pueue/state.json" ls -lh "$STATE_FILE" # 健康运行时应<10MB

**不变原则**:`state.json`在活跃扫描期间应保持在50MB以下。超过50MB时,`pueue add`延迟超过500ms,并行提交的收益消失。

**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“状态文件管理”章节获取基准测试和定期清理模式。

AP-11: Wrong Working Directory in Remote Pueue Jobs

AP-11: 远程Pueue作业的工作目录错误

Symptom: Jobs fail immediately (exit code 2) with
can't open file 'scripts/populate.py': [Errno 2] No such file or directory
.
Root cause:
ssh host "pueue add -- uv run python scripts/process.py"
queues the job with the SSH session's cwd (typically
$HOME
), not the project directory. The script path is relative, so pueue looks for
~/scripts/process.py
instead of
~/project/scripts/process.py
.
Fix: Use
-w
(preferred) or
cd &&
to set the working directory:
bash
undefined
症状:作业立即失败(退出码2),提示
can't open file 'scripts/populate.py': [Errno 2] No such file or directory
根本原因
ssh host "pueue add -- uv run python scripts/process.py"
排队的作业使用SSH会话的当前工作目录(通常为
$HOME
),而非项目目录。脚本路径是相对路径,因此pueue会查找
~/scripts/process.py
而非
~/project/scripts/process.py
修复方案:使用
-w
(首选)或
cd &&
设置工作目录:
bash
undefined

WRONG: pueue inherits SSH cwd ($HOME)

WRONG: pueue继承SSH的当前工作目录($HOME)

ssh host "pueue add --group mygroup -- uv run python scripts/process.py"
ssh host "pueue add --group mygroup -- uv run python scripts/process.py"

RIGHT (preferred): -w flag sets working directory explicitly

RIGHT(首选):-w标志显式设置工作目录

ssh host "pueue add -w ~/project --group mygroup -- uv run python scripts/process.py"
ssh host "pueue add -w ~/project --group mygroup -- uv run python scripts/process.py"

RIGHT (alternative): cd first, then pueue add inherits project cwd

RIGHT(替代方案):先cd,再pueue add继承项目工作目录

ssh host "cd ~/project && pueue add --group mygroup -- uv run python scripts/process.py"

**Note**: Pueue v4 **does** have `-w` / `--working-directory`. Use it as the primary approach. Fall back to `cd &&` for SSH-piped commands where `-w` path expansion may differ. On macOS, `-w /tmp` resolves to `/private/tmp` (symlink).

**Test**: After queuing, verify the Path column in `pueue status` shows the project directory, not `$HOME`.
ssh host "cd ~/project && pueue add --group mygroup -- uv run python scripts/process.py"

**注意**:Pueue v4 **支持** `-w`/`--working-directory`。将其作为主要方法。对于SSH管道命令,若`-w`路径展开可能不同,可回退到`cd &&`。在macOS上,`-w /tmp`会解析为`/private/tmp`(符号链接)。

**测试**:排队后,在`pueue status`的Path列中验证显示的是项目目录,而非`$HOME`。

AP-12: Per-File SSH for Bulk Job Submission

AP-12: 批量作业提交时每个文件单独使用SSH

Symptom: Submitting 300K jobs takes days because each
pueue add
requires a separate SSH round-trip from the local machine to the remote host.
Root cause: The submission script runs locally and calls
ssh host "pueue add ..."
per job. Each SSH connection has ~50-100ms overhead. At 300K jobs: 300K * 75ms = 6.25 hours just for SSH, before any submission latency.
Fix: Generate a commands file locally, rsync it to the remote host, then run
xargs -P
on the remote host to eliminate SSH overhead entirely:
bash
undefined
症状:提交300K作业需要数天,因为每个
pueue add
都需要从本地机器到远程主机的单独SSH往返。
根本原因:提交脚本在本地运行,每个作业调用一次
ssh host "pueue add ..."
。每个SSH连接有~50-100ms的开销。300K作业的SSH开销为300K × 75ms = 6.25小时,还未算提交延迟。
修复方案:在本地生成命令文件,通过rsync传输到远程主机,然后在远程主机上运行
xargs -P
,完全消除SSH开销:
bash
undefined

Step 1 (local): Generate commands file

步骤1(本地):生成命令文件

bash gen_commands.sh > /tmp/commands.txt
bash gen_commands.sh > /tmp/commands.txt

Step 2 (local): Transfer to remote

步骤2(本地):传输到远程

rsync /tmp/commands.txt host:/tmp/commands.txt
rsync /tmp/commands.txt host:/tmp/commands.txt

Step 3 (remote): Feed via xargs -P (no SSH per-job)

步骤3(远程):通过xargs -P执行(无每作业SSH)

ssh host "xargs -P16 -I{} bash -c '{}' < /tmp/commands.txt"

**Invariant**: Bulk submission should run ON the same host as pueue. The only SSH call should be to start the feeder process, not per-job.
ssh host "xargs -P16 -I{} bash -c '{}' < /tmp/commands.txt"

**不变原则**:批量提交应在与pueue相同的主机上运行。唯一的SSH调用应是启动 feeder 进程,而非每个作业一次。

AP-13: SIGPIPE Under set -euo pipefail

AP-13: set -euo pipefail下的SIGPIPE

Symptom: Script exits with code 141 (128 + SIGPIPE=13) on harmless pipe operations.
Root cause:
ls *.sql | head -10
head
reads 10 lines then closes stdin.
ls
gets SIGPIPE writing to closed pipe. Under
set -o pipefail
, this propagates as exit 141.
Fix: Avoid piping to
head
in strict-mode scripts:
bash
undefined
症状:脚本在无害的管道操作中以代码141(128 + SIGPIPE=13)退出。
根本原因
ls *.sql | head -10
head
读取10行后关闭标准输入。
ls
向已关闭的管道写入时收到SIGPIPE。在
set -o pipefail
下,这会传播为退出码141。
修复方案:在严格模式脚本中避免管道到
head
bash
undefined

WRONG (exit 141)

WRONG(退出码141)

ls /tmp/sql/*.sql | head -10
ls /tmp/sql/*.sql | head -10

RIGHT (temp file)

RIGHT(临时文件)

ls /tmp/sql/*.sql > /tmp/filelist.txt head -10 /tmp/filelist.txt
undefined
ls /tmp/sql/*.sql > /tmp/filelist.txt head -10 /tmp/filelist.txt
undefined

AP-14: False Data Loss From Variable-Width NDJSON Output

AP-14: 可变宽度NDJSON输出导致的虚假数据丢失

Symptom:
wc -l
shows fewer lines than expected. Appears as 3-6% "data loss".
Root cause: Configs with 0 signals after feature filtering produce 1 NDJSON line (skipped entry), not N barrier lines. Example: 95 normal × 3 + 5 skipped × 1 = 290 (not 300).
Fix: Account for variable output width in line count validation:
expected = N_normal * barriers_per_query + N_skipped * 1 + N_error * 1

症状
wc -l
显示的行数少于预期。看起来像是3-6%的“数据丢失”。
根本原因:特征过滤后无信号的配置会生成1行NDJSON(跳过条目),而非N行屏障线。示例:95个正常×3 +5个跳过×1=290(而非300)。
修复方案:在行数验证中考虑可变输出宽度:
expected = 正常数量 × 每个查询的屏障数 + 跳过数量 ×1 + 错误数量 ×1

The Mise + Pueue + systemd-run Stack

Mise + Pueue + systemd-run 技术栈

mise (environment + task discovery)
  |-- .mise.toml [env] -> SSoT for defaults
  |-- .mise/tasks/jobs.toml -> task definitions
  |     |-- mise run jobs:submit-all
  |     |     |-- submit-all.sh (orchestrator)
  |     |           |-- pueue add (per-unit, NOT per-query)
  |     |                 |-- submit_unit.sh (per unit)
  |     |                       |-- xargs -P16 (parallel queries)
  |     |                             |-- wrapper.sh (per query)
  |     |                                   |-- clickhouse-client < sql_file
  |     |                                   |-- flock + append NDJSON
  |     |
  |     |-- mise run jobs:process-all (Python pipeline variant)
  |     |     |-- job-runner.sh (orchestrator)
  |     |           |-- pueue add (per-job)
  |     |                 |-- systemd-run --scope -p MemoryMax=XG -p MemorySwapMax=0
  |     |                       |-- uv run python scripts/process.py
  |     |                             |-- run_resumable_job()
  |     |                                   |-- get_checkpoint_path() -> param-aware
  |     |                                   |-- checkpoint.save() -> atomic write
  |     |                                   |-- checkpoint.unlink() -> missing_ok=True
  |     |
  |     |-- mise run jobs:autoscale-loop
  |           |-- autoscaler.sh --loop (60s interval)
  |                 |-- reads: free -m, uptime, pueue status --json
  |                 |-- adjusts: pueue parallel N --group <group>
Responsibility boundaries:
LayerResponsibility
miseEnvironment variables, tool versions, task discovery
pueueDaemon persistence, parallelism limits, restart,
--after
systemd-runPer-job cgroup memory caps (Linux only, no-op on macOS)
autoscalerDynamic parallelism tuning based on host resources
Python/appDomain logic, checkpoint management, data integrity

mise(环境 + 任务发现)
  |-- .mise.toml [env] -> 默认值的唯一可信源
  |-- .mise/tasks/jobs.toml -> 任务定义
  |     |-- mise run jobs:submit-all
  |     |     |-- submit-all.sh(编排器)
  |     |           |-- pueue add(按单元,而非按查询)
  |     |                 |-- submit_unit.sh(按单元)
  |     |                       |-- xargs -P16(并行查询)
  |     |                             |-- wrapper.sh(按查询)
  |     |                                   |-- clickhouse-client < sql_file
  |     |                                   |-- flock + 追加NDJSON
  |     |
  |     |-- mise run jobs:process-all(Python流水线变体)
  |     |     |-- job-runner.sh(编排器)
  |     |           |-- pueue add(按作业)
  |     |                 |-- systemd-run --scope -p MemoryMax=XG -p MemorySwapMax=0
  |     |                       |-- uv run python scripts/process.py
  |     |                             |-- run_resumable_job()
  |     |                                   |-- get_checkpoint_path() -> 感知参数
  |     |                                   |-- checkpoint.save() -> 原子写入
  |     |                                   |-- checkpoint.unlink() -> missing_ok=True
  |     |
  |     |-- mise run jobs:autoscale-loop
  |           |-- autoscaler.sh --loop(60秒间隔)
  |                 |-- 读取:free -m, uptime, pueue status --json
  |                 |-- 调整:pueue parallel N --group <group>
职责边界
层级职责
mise环境变量、工具版本、任务发现
pueue守护进程持久化、并行度限制、重启、
--after
依赖管理
systemd-run单作业cgroup内存上限(仅Linux,macOS上无作用)
autoscaler基于主机资源的动态并行度调优
Python/应用领域逻辑、检查点管理、数据完整性

Remote Deployment Protocol

远程部署协议

When deploying a fix to a running host:
1. AUDIT:   ssh host 'pueue status --json' -> count running/queued/failed
2. DECIDE:  Wait for running jobs? Kill? Let them finish with old code?
3. PULL:    ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
4. VERIFY:  ssh host 'cd ~/project && python -c "import pkg; print(pkg.__version__)"'
5. UPGRADE: ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
6. RESTART: ssh host 'pueue restart <failed_id>' OR add fresh jobs
7. MONITOR: ssh host 'pueue status --group mygroup'
Critical: Step 1 (AUDIT) is mandatory. Skipping it is the root cause of cascade failures.
See: references/deployment-checklist.md for full protocol.

向运行中的主机部署修复时:
1. 审计:   ssh host 'pueue status --json' -> 统计运行中/排队中/失败的作业数
2. 决策:  等待运行中的作业完成?杀死?让它们用旧代码完成?
3. 拉取:    ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
4. 验证:  ssh host 'cd ~/project && python -c "import pkg; print(pkg.__version__)"'
5. 升级:  ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
6. 重启:  ssh host 'pueue restart <failed_id>' 或添加新作业
7. 监控:  ssh host 'pueue status --group mygroup'
关键要求:步骤1(审计)是必须的。跳过此步骤是级联故障的根本原因。
参考:references/deployment-checklist.md获取完整协议。

Concurrency Safety Decision Tree

并发安全决策树

Adding a new parameter to a resumable job function?
|-- Is it job-differentiating (two jobs can have different values)?
|   |-- YES -> Add to checkpoint filename
|   |          Add to pueue job label
|   |          Add to remote checkpoint key
|   |-- NO  -> Skip (e.g., verbose, notify are per-run, not per-job)
|
|-- Does the function delete files?
|   |-- YES -> Use missing_ok=True
|   |          Use atomic write for creates
|   |-- NO  -> Standard operation
|
|-- Does the function write to shared storage?
    |-- YES -> Force deduplication after write
    |          Use UPSERT semantics where possible
    |-- NO  -> Standard operation

向可恢复作业函数添加新参数?
|-- 该参数是否区分作业(两个作业可以有不同值)?
|   |-- 是 -> 添加到检查点文件名
|   |          添加到pueue作业标签
|   |          添加到远程检查点键
|   |-- 否  -> 跳过(如verbose、notify是按运行而非按作业的参数)
|
|-- 函数是否删除文件?
|   |-- 是 -> 使用missing_ok=True
|   |          创建时使用原子写入
|   |-- 否  -> 标准操作
|
|-- 函数是否写入共享存储?
    |-- 是 -> 写入后强制去重
    |          尽可能使用UPSERT语义
    |-- 否  -> 标准操作

Autoscaler

自动扩缩容器

Pueue has no resource awareness. The autoscaler complements it with dynamic parallelism tuning.
How it works: Reads CPU load + available memory, then adjusts
pueue parallel N
per group.
CPU < 40% AND MEM < 60%  ->  SCALE UP (+1 per group)
CPU > 80% OR  MEM > 80%  ->  SCALE DOWN (-1 per group)
Otherwise                 ->  HOLD
Incremental scaling protocol -- don't jump to max capacity. Ramp up in steps and verify stability at each level:
Step 1: Start with conservative defaults (e.g., group1=2, group2=3)
Step 2: After jobs stabilize (~5 min), probe: uptime + free -h + ps aux
Step 3: If load < 40% cores AND memory < 60% available:
        Bump by +1-2 jobs per group
Step 4: Wait ~5 min for new jobs to reach peak memory
Step 5: Probe again. If still within 80% margin, bump again
Step 6: Repeat until load ~50% cores OR memory ~70% available
Why incremental: Job memory footprint grows over time (a job may start at ~500 MB and peak at 5+ GB). Jumping straight to max parallelism risks OOM when all jobs hit peak simultaneously.
Safety bounds: Each group should have min/max limits the autoscaler won't exceed. It should also check per-job memory estimates before scaling up (don't add a 5 GB job if only 3 GB available).
Dynamic adjustment (pueue supports live tuning without restarting jobs):
bash
undefined
Pueue无资源感知能力。自动扩缩容器通过动态并行度调优对其进行补充。
工作原理:读取CPU负载 + 可用内存,然后调整每个组的
pueue parallel N
CPU < 40% 且 内存 < 60%  ->  扩容(每个组+1)
CPU > 80% 或 内存 > 80%  ->  缩容(每个组-1)
其他情况                 ->  保持
增量扩容协议 — 不要直接跳到最大容量。逐步扩容并在每个级别验证稳定性:
步骤1: 从保守默认值开始(如group1=2,group2=3)
步骤2: 作业稳定后(约5分钟),探测:uptime + free -h + ps aux
步骤3: 若负载 < 40%核心 且 内存 < 60%可用:
        每个组增加1-2个作业
步骤4: 等待约5分钟,让新作业达到内存峰值
步骤5: 再次探测。若仍在80%余量内,继续扩容
步骤6: 重复直到负载~50%核心 或 内存~70%可用
为什么要增量:作业内存占用会随时间增长(作业可能从~500MB开始,峰值达到5+GB)。直接跳到最大并行度可能导致所有作业同时达到峰值时出现OOM。
安全边界:每个组应设置自动扩缩容器不会超过的最小/最大限制。在扩容前还应检查单作业内存估算(若仅剩余3GB可用,不要添加5GB的作业)。
动态调整(pueue支持实时调优,无需重启运行中的作业):
bash
undefined

Scale up when resources are available

资源可用时扩容

pueue parallel 4 --group group1 pueue parallel 5 --group group2
pueue parallel 4 --group group1 pueue parallel 5 --group group2

Scale down if memory pressure detected

检测到内存压力时缩容

pueue parallel 2 --group group1

**Per-symbol/per-family groups**: When jobs have vastly different resource profiles, give each family its own pueue group. This prevents a single high-memory job type from starving lighter jobs:

```bash
pueue parallel 2 --group group1

**按符号/按家族分组**:当作业的资源配置差异很大时,为每个家族创建独立的pueue组。这可防止单一高内存作业类型抢占轻量作业的资源:

```bash

Example: high-volume symbols need fewer concurrent jobs (5 GB each)

示例:高流量符号需要更少的并发作业(每个5GB)

pueue group add highvol-yearly --parallel 2
pueue group add highvol-yearly --parallel 2

Low-volume symbols can run more concurrently (1 GB each)

低流量符号可以运行更多并发作业(每个1GB)

pueue group add lowvol-yearly --parallel 6

---
pueue group add lowvol-yearly --parallel 6

---

Project-Specific Extensions

项目特定扩展

This skill provides universal patterns that apply to any distributed job pipeline. Projects should create a local extension skill (e.g.,
myproject-job-safety
) in their
.claude/skills/
directory that provides:
Local Extension ProvidesExample
Concrete function names
run_resumable_job()
->
myapp_populate_cache()
Application-specific env vars
MY_APP_MIN_THRESHOLD
,
MY_APP_CH_HOSTS
Memory profiles per job type"250 dbps peaks at 5 GB, use MemoryMax=8G"
Database-specific audit queries
SELECT ... FROM mydb.mytable ... countIf(x < 0)
Issue provenance tracking"Checkpoint race: GH-84"
Host-specific configuration"bigblack: 32 cores, 61 GB, groups p1/p2/p3/p4"
Two-layer invocation pattern: When this skill is triggered, also check for and invoke any local
*-job-safety
skill in the project's
.claude/skills/
directory for project-specific configuration.
devops-tools:distributed-job-safety    (universal patterns - this skill)
  + .claude/skills/myproject-job-safety  (project-specific config)
  = Complete operational knowledge

本技能提供通用模式,适用于任何分布式作业流水线。项目应在其
.claude/skills/
目录中创建本地扩展技能(如
myproject-job-safety
),提供:
本地扩展提供内容示例
具体函数名
run_resumable_job()
->
myapp_populate_cache()
应用特定环境变量
MY_APP_MIN_THRESHOLD
,
MY_APP_CH_HOSTS
按作业类型划分的内存配置文件"250 dbps峰值5GB,使用MemoryMax=8G"
数据库特定审计查询
SELECT ... FROM mydb.mytable ... countIf(x < 0)
问题来源跟踪"检查点竞争:GH-84"
主机特定配置"bigblack: 32核心,61GB内存,分组p1/p2/p3/p4"
两层调用模式:触发本技能时,还应检查并调用项目
.claude/skills/
目录中的任何本地
*-job-safety
技能,获取项目特定配置。
devops-tools:distributed-job-safety   (通用模式 - 本技能)
  + .claude/skills/myproject-job-safety (项目特定配置)
  = 完整的运维知识

References

参考资料

  • Concurrency Invariants -- Formal invariant specifications (INV-1 through INV-8)
  • Deployment Checklist -- Step-by-step remote deployment protocol
  • Environment Gotchas -- Host-specific pitfalls (G-1 through G-14)
  • Cross-reference:
    devops-tools:pueue-job-orchestration
    -- Pueue basics, dependency chaining, installation
  • 并发不变原则 -- 正式的不变原则规范(INV-1至INV-8)
  • 部署检查清单 -- 分步远程部署协议
  • 环境陷阱 -- 主机特定陷阱(G-1至G-14)
  • 交叉参考
    devops-tools:pueue-job-orchestration
    -- Pueue基础、依赖链、安装