Distributed Job Safety
分布式作业安全
Patterns and anti-patterns for concurrent job management with pueue + mise + systemd-run, learned from production failures in distributed data pipeline orchestration.
Scope: Universal principles for any pueue + mise workflow with concurrent parameterized jobs. Examples use illustrative names but the principles apply to any domain.
Prerequisite skills:
devops-tools:pueue-job-orchestration
,
,
使用pueue + mise + systemd-run进行并发作业管理的模式与反模式,来自分布式数据流水线编排的生产故障经验总结。
适用范围:适用于任何带有并发参数化作业的pueue + mise工作流的通用原则。示例使用示意性名称,但原则可应用于任何领域。
前置技能:
devops-tools:pueue-job-orchestration
,
,
The Eight Invariants
八大不变原则
Non-negotiable rules for concurrent job safety. Violating any one causes silent data corruption or job failure.
Full formal specifications: references/concurrency-invariants.md
并发作业安全的不可妥协规则。违反任何一条都会导致静默数据损坏或作业失败。
完整正式规范:references/concurrency-invariants.md
1. Filename Uniqueness by ALL Job Parameters
1. 基于所有作业参数的文件名唯一性
Every file path shared between concurrent jobs MUST include ALL parameters that differentiate those jobs.
WRONG: {symbol}_{start}_{end}.json # Two thresholds collide
RIGHT: {symbol}_{threshold}_{start}_{end}.json # Each job gets its own file
Test: If two pueue jobs can run simultaneously with different parameter values, those values MUST appear in every shared filename, temp directory, and lock file.
并发作业之间共享的每个文件路径必须包含区分这些作业的所有参数。
WRONG: {symbol}_{start}_{end}.json # 两个阈值会冲突
RIGHT: {symbol}_{threshold}_{start}_{end}.json # 每个作业拥有独立文件
测试方法:如果两个pueue作业可以使用不同参数值同时运行,这些值必须出现在每个共享文件名、临时目录和锁文件中。
2. Verify Before Mutate (No Blind Queueing)
2. 先验证再变更(禁止盲目排队)
Before queueing jobs, check what is already running. Before deleting state, check who owns it.
在排队作业前,检查当前已在运行的作业。在删除状态前,检查状态的归属。
WRONG: Blind queue
WRONG: 盲目排队
for item in "${ITEMS[@]}"; do
pueue add --group mygroup -- run_job "$item" "$param"
done
for item in "${ITEMS[@]}"; do
pueue add --group mygroup -- run_job "$item" "$param"
done
RIGHT: Check first
RIGHT: 先检查
running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")')
if echo "$running" | grep -q "${item}@${param}"; then
echo "SKIP: ${item}@${param} already running"
continue
fi
running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")')
if echo "$running" | grep -q "${item}@${param}"; then
echo "SKIP: ${item}@${param} 已在运行"
continue
fi
3. Idempotent File Operations (missing_ok=True)
3. 幂等文件操作(missing_ok=True)
All file deletion in concurrent contexts MUST tolerate the file already being gone.
并发环境下的所有文件删除操作必须能容忍文件已不存在的情况。
WRONG: TOCTOU race
WRONG: TOCTOU竞争
if path.exists():
path.unlink() # Crashes if another job deleted between check and unlink
if path.exists():
path.unlink() # 若在检查与删除之间文件被其他作业删除,会崩溃
RIGHT: Idempotent
RIGHT: 幂等操作
path.unlink(missing_ok=True)
path.unlink(missing_ok=True)
4. Atomic Writes for Shared State
4. 共享状态的原子写入
Checkpoint files must never be partially written. Use the tempfile-fsync-rename pattern.
python
fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp")
with os.fdopen(fd, "w") as f:
f.write(json.dumps(data))
f.flush()
os.fsync(f.fileno())
os.replace(temp_path, path) # POSIX atomic rename
Bash equivalent (for NDJSON telemetry appends):
检查点文件绝不能被部分写入。使用临时文件-同步-重命名模式。
python
fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp")
with os.fdopen(fd, "w") as f:
f.write(json.dumps(data))
f.flush()
os.fsync(f.fileno())
os.replace(temp_path, path) # POSIX原子重命名
Bash等价实现(用于NDJSON遥测追加):
Atomic multi-line append via flock + temp file
通过flock + 临时文件实现原子多行追加
... write lines to $TMPOUT ...
... 向$TMPOUT写入内容 ...
flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'"
rm -f "$TMPOUT"
flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'"
rm -f "$TMPOUT"
5. Config File Is SSoT
5. 配置文件为唯一可信源(SSoT)
The
section is the single source of truth for environment defaults. Per-job
overrides bypass the SSoT and allow arbitrary values with no review gate.
的
部分是环境默认值的唯一可信源。针对单个作业的
覆盖会绕过可信源,允许无审核的任意值。
WRONG: Per-job override bypasses mise SSoT
WRONG: 单作业覆盖绕过mise可信源
pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py
pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py
RIGHT: Set the correct value in .mise.toml, no per-job override needed
RIGHT: 在.mise.toml中设置正确值,无需单作业覆盖
pueue add -- uv run python script.py
**Controlled exception**: `pueue env set <id> KEY VALUE` is acceptable for one-off overrides on stashed/queued tasks (e.g., hyperparameter sweeps). The key distinction: mise `[env]` is SSoT for **defaults** that apply to all runs; `pueue env set` is for **one-time parameterization** of a specific task without modifying the config file. See `devops-tools:pueue-job-orchestration` Per-Task Environment Override section.
pueue add -- uv run python script.py
**可控例外**:`pueue env set <id> KEY VALUE`可用于暂存/排队任务的一次性覆盖(如超参数调优)。关键区别在于:mise `[env]`是**所有运行默认值**的可信源;`pueue env set`用于**特定任务的一次性参数化**,无需修改配置文件。详见`devops-tools:pueue-job-orchestration`的“单任务环境覆盖”章节。
6. Maximize Parallelism Within Safe Margins
6. 在安全范围内最大化并行度
Always probe host resources and scale parallelism to use available capacity. Conservative defaults waste hours of idle compute.
始终探测主机资源,调整并行度以利用可用容量。保守的默认值会浪费数小时的闲置计算资源。
Probe host resources
探测主机资源
ssh host 'nproc && free -h && uptime'
ssh host 'nproc && free -h && uptime'
Sizing formula (leave 20% margin for OS + DB + overhead)
计算公式(预留20%余量给操作系统、数据库及开销)
max_jobs = min(
max_jobs = min(
(available_memory_gb * 0.8) / per_job_memory_gb,
(可用内存GB * 0.8) / 单作业内存GB,
(total_cores * 0.8) / per_job_cpu_cores
(总核心数 * 0.8) / 单作业核心数
**For ClickHouse workloads**: The bottleneck is often ClickHouse's `concurrent_threads_soft_limit` (default: 2 × nproc), not pueue's parallelism. Each query requests `max_threads` threads (default: nproc). Right-size `--max_threads` per query to match the effective thread count (soft_limit / pueue_slots), then increase pueue slots. Pueue parallelism can be adjusted live without restarting running jobs.
**Post-bump monitoring** (mandatory for 5 minutes after any parallelism change):
- `uptime` — load average should stay below 0.9 × nproc
- `vmstat 1 5` — si/so columns must remain 0 (no active swapping)
- ClickHouse errors: `SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing'` — must be 0
**Cross-reference**: See `devops-tools:pueue-job-orchestration` ClickHouse Parallelism Tuning section for the full decision matrix.
**针对ClickHouse工作负载**:瓶颈通常是ClickHouse的`concurrent_threads_soft_limit`(默认:2 × nproc),而非pueue的并行度。每个查询会请求`max_threads`线程(默认:nproc)。需根据有效线程数(soft_limit / pueue_slots)调整`--max_threads`,然后再增加pueue的槽位。Pueue并行度可实时调整,无需重启运行中的作业。
**调整后监控**(并行度变更后必须监控5分钟):
- `uptime` — 负载平均值应保持在0.9 × nproc以下
- `vmstat 1 5` — si/so列必须保持为0(无活跃交换)
- ClickHouse错误:`SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing'` — 结果必须为0
**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“ClickHouse并行度调优”章节获取完整决策矩阵。
7. Per-Job Memory Caps via systemd-run
7. 通过systemd-run设置单作业内存上限
On Linux with cgroups v2, wrap each job with
to enforce hard memory limits.
bash
systemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0 \
uv run python scripts/process.py --symbol BTCUSDT --threshold 250
Critical:
is mandatory. Without it, the process escapes into swap and the memory limit is effectively meaningless.
在支持cgroups v2的Linux系统上,使用
包裹每个作业以强制执行硬内存限制。
bash
systemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0 \
uv run python scripts/process.py --symbol BTCUSDT --threshold 250
关键要求:
是必须的。若无此设置,进程会逃逸到交换空间,内存限制将失去实际意义。
8. Monitor by Stable Identifiers, Not Ephemeral IDs
8. 通过稳定标识符而非临时ID进行监控
Pueue job IDs are ephemeral — they shift when jobs are removed, re-queued, or split. Use group names and label patterns for monitoring.
Pueue作业ID是临时的 — 当作业被移除、重新排队或拆分时,ID会变化。使用组名和标签模式进行监控。
WRONG: Hardcoded job IDs
WRONG: 硬编码作业ID
if pueue status --json | jq -e ".tasks."14"" >/dev/null; then ...
if pueue status --json | jq -e ".tasks."14"" >/dev/null; then ...
RIGHT: Query by group/label
RIGHT: 通过组/标签查询
pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'
Full specification: [references/concurrency-invariants.md](./references/concurrency-invariants.md#inv-8)
---
pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'
完整规范:[references/concurrency-invariants.md](./references/concurrency-invariants.md#inv-8)
---
Anti-Patterns (Learned from Production)
反模式(从生产故障中总结)
AP-1: Redeploying Without Checking Running Jobs
AP-1: 未检查运行中作业就重新部署
Symptom: Killed running jobs, requeued new ones. Old checkpoint files from killed jobs persisted, causing collisions with new jobs.
Fix: Always run state audit before redeployment:
bash
pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running")] | length'
症状:杀死运行中的作业,重新排队新作业。被杀死作业的旧检查点文件残留,导致与新作业冲突。
修复方案:重新部署前始终运行状态审计:
bash
pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running")] | length'
If > 0, decide: wait, kill gracefully, or abort
若结果>0,决定:等待、优雅终止或中止部署
See: [references/deployment-checklist.md](./references/deployment-checklist.md)
参考:[references/deployment-checklist.md](./references/deployment-checklist.md)
AP-2: Checkpoint Filename Missing Job Parameters
AP-2: 检查点文件名缺失作业参数
Symptom:
on checkpoint delete -- Job A deleted Job B's checkpoint.
Root cause: Filename
{item}_{start}_{end}.json
lacked a differentiating parameter. Two jobs for the same item at different configurations shared the file.
Fix: Include ALL differentiating parameters:
{item}_{config}_{start}_{end}.json
症状:删除检查点时出现
— 作业A删除了作业B的检查点。
根本原因:文件名
{item}_{start}_{end}.json
缺少区分参数。同一项目不同配置的两个作业共享该文件。
修复方案:包含所有区分参数:
{item}_{config}_{start}_{end}.json
Symptom:
shows old error after
, appearing as if the restart failed.
Root cause: Pueue appends output to existing log. After restart, the log contains BOTH the old failed run and the new attempt.
Fix: Check timestamps in the log, or add a new fresh job instead of restarting:
根本原因:Pueue会将输出追加到现有日志。重启后,日志同时包含旧的失败运行记录和新的尝试记录。
修复方案:检查日志中的时间戳,或添加新的作业而非重启:
More reliable than restart
比重启更可靠
pueue add --group mygroup --label "BTCUSDT@750-retry" -- <same command>
pueue add --group mygroup --label "BTCUSDT@750-retry" -- <相同命令>
AP-4: Assuming PyPI Propagation Is Instant
AP-4: 假设PyPI传播是即时的
Symptom:
uv pip install pkg==X.Y.Z
fails with "no version found" immediately after publishing.
Root cause: PyPI CDN propagation takes 30-120 seconds.
Fix: Use
flag to bust cache:
bash
uv pip install --refresh --index-url https://pypi.org/simple/ mypkg==<version>
症状:发布后立即执行
uv pip install pkg==X.Y.Z
,失败并提示“未找到版本”。
根本原因:PyPI CDN传播需要30-120秒。
bash
uv pip install --refresh --index-url https://pypi.org/simple/ mypkg==<version>
AP-5: Confusing Editable Source vs. Installed Wheel
AP-5: 混淆可编辑源码与已安装Wheel包
Symptom: Updated pip package to latest, but
still uses old code.
Root cause:
has
source = { editable = "." }
--
reads Python files from the git working tree, not from the installed wheel.
Fix: On remote hosts,
updates the source that
reads. Pip install only matters for non-editable environments.
根本原因:
中包含
source = { editable = "." }
—
从git工作树读取Python文件,而非已安装的Wheel包。
修复方案:在远程主机上,
会更新
读取的源码。Pip安装仅对非可编辑环境有意义。
AP-6: Sequential Phase Assumption
AP-6: 假设阶段是顺序执行的
Symptom: Phase 2 jobs started while Phase 1 was still running for the same item, creating contention.
Root cause: All phases queued simultaneously.
Fix: Either use pueue dependencies (
) or queue phases sequentially after verification:
症状:阶段2作业在阶段1仍运行时启动,导致资源竞争。
根本原因:所有阶段被同时排队。
修复方案:使用pueue依赖(
)或在验证后顺序排队阶段:
Queue Phase 1, wait for completion, then Phase 2
排队阶段1,等待完成后再排队阶段2
pueue add --label "phase1" -- run_phase_1
pueue add --label "phase1" -- run_phase_1
... wait and verify ...
... 等待并验证 ...
pueue add --label "phase2" -- run_phase_2
pueue add --label "phase2" -- run_phase_2
AP-7: Manual Post-Processing Steps
AP-7: 手动后处理步骤
Symptom: Queue batch jobs, print "run optimize after they finish."
症状:批量作业排队后,打印“完成后运行optimize”。
postprocess_all() {
queue_batch_jobs
echo "Run 'pueue wait' then manually run optimize and validate" # NO!
}
**Fix**: Wire post-processing as pueue `--after` dependent jobs:
```bash
postprocess_all() {
queue_batch_jobs
echo "运行'pueue wait'后手动执行optimize和validate" # 错误做法!
}
**修复方案**:将后处理作为pueue的`--after`依赖作业:
```bash
postprocess_all() {
JOB_IDS=()
for param in 250 500 750 1000; do
job_id=$(pueue add --print-task-id --group mygroup
--label "ITEM@${param}" -- uv run python process.py --param "$param")
JOB_IDS+=("$job_id")
done
# Chain optimize after ALL batch jobs
optimize_id=$(pueue add --print-task-id --after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
# Chain validation after optimize
pueue add --after "$optimize_id" -- uv run python scripts/validate.py
}
**Cross-reference**: See `devops-tools:pueue-job-orchestration` Dependency Chaining section for full `--after` patterns.
postprocess_all() {
JOB_IDS=()
for param in 250 500 750 1000; do
job_id=$(pueue add --print-task-id --group mygroup
--label "ITEM@${param}" -- uv run python process.py --param "$param")
JOB_IDS+=("$job_id")
done
# 所有批量作业完成后执行optimize
optimize_id=$(pueue add --print-task-id --after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
# optimize完成后执行验证
pueue add --after "$optimize_id" -- uv run python scripts/validate.py
}
**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“依赖链”章节获取完整的`--after`模式。
AP-8: Hardcoded Job IDs in Pipeline Monitors
AP-8: 监控中使用硬编码作业ID
Symptom: Background monitor crashes with empty variable or wrong comparison after jobs are removed, re-queued, or split into per-year jobs.
Root cause: Monitor uses
to find specific job IDs. When those IDs no longer exist (killed, removed, replaced by per-year splits), the grep returns empty and downstream comparisons fail.
Fix: Detect phase transitions by
group completion patterns, not by tracking individual job IDs. Use
to check if all jobs in a pueue group have finished.
Principle: Pueue group names and job labels are stable identifiers. Job IDs are ephemeral.
Cross-reference: See
devops-tools:pueue-job-orchestration
Pipeline Monitoring section for the full
implementation and integrity check patterns.
症状:作业被移除、重新排队或拆分为按年作业后,后台监控因空变量或错误比较而崩溃。
根本原因:监控使用
查找特定作业ID。当这些ID不存在(被杀死、移除或被按年拆分替代)时,grep返回空,导致下游比较失败。
修复方案:通过
组完成模式而非跟踪单个作业ID来检测阶段转换。使用
检查pueue组中的所有作业是否已完成。
原则:Pueue组名和作业标签是稳定标识符。作业ID是临时的。
交叉参考:详见
devops-tools:pueue-job-orchestration
的“流水线监控”章节获取完整的
实现和完整性检查模式。
AP-9: Sequential Processing When Epoch Resets Enable Parallelism
AP-9: 当纪元重置支持并行时仍使用顺序处理
Symptom: A multi-year job runs for days single-threaded while 25+ cores sit idle. ETA: 1,700 hours.
Root cause: Pipeline processor resets state at epoch boundaries (yearly, monthly) — each epoch is already independent. But the job was queued as one monolithic range.
Fix: Split into per-epoch pueue jobs running concurrently:
症状:一个多年作业单线程运行需要数天,而25+核心处于闲置状态。预计时间:1700小时。
根本原因:流水线处理器在纪元边界(每年、每月)重置状态 — 每个纪元本身是独立的。但作业被作为一个整体范围排队。
修复方案:拆分为按纪元的pueue作业并发运行:
WRONG: Single monolithic job, wastes idle cores
WRONG: 单个整体作业,浪费闲置核心
pueue add -- process --start 2019-01-01 --end 2026-12-31 # 1,700 hours single-threaded
pueue add -- process --start 2019-01-01 --end 2026-12-31 # 单线程需1700小时
RIGHT: Per-year splits, 5x+ speedup on multi-core
RIGHT: 按年拆分,多核环境下提速5倍以上
for year in 2019 2020 2021 2022 2023 2024 2025 2026; do
pueue add --group item-yearly --label "ITEM@250:${year}"
-- process --start "${year}-01-01" --end "${year}-12-31"
done
**When this applies**: Any pipeline where the processor explicitly resets state at time boundaries (ouroboros pattern, rolling windows, annual rebalancing). If the processor carries state across boundaries, per-epoch splitting is NOT safe.
**Cross-reference**: See `devops-tools:pueue-job-orchestration` Per-Year Parallelization section for full patterns.
for year in 2019 2020 2021 2022 2023 2024 2025 2026; do
pueue add --group item-yearly --label "ITEM@250:${year}"
-- process --start "${year}-01-01" --end "${year}-12-31"
done
**适用场景**:任何处理器在时间边界明确重置状态的流水线(衔尾蛇模式、滚动窗口、年度再平衡)。若处理器跨边界携带状态,则按纪元拆分不安全。
**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“按年并行化”章节获取完整模式。
AP-10: State File Bloat Causing Silent Performance Regression
AP-10: 状态文件膨胀导致静默性能退化
Symptom: Job submission that used to take 10 minutes now takes 6+ hours. No errors — just slow. Pipeline appears healthy but execution slots sit idle waiting for new jobs to be queued.
Root cause: Pueue's
grows with every completed task. At 50K+ completed tasks (80-100MB state file), each
takes 1-2 seconds instead of <100ms. This is invisible — no errors, no warnings, just gradually degrading throughput.
Why it's dangerous: The regression is proportional to total completed tasks across the daemon's lifetime. A sweep that runs 10K jobs/day hits the problem by day 5. The first day runs fine, creating a false sense of security.
Fix: Treat
as infrastructure that requires periodic maintenance:
症状:过去需要10分钟的作业提交现在需要6+小时。无错误 — 只是慢。流水线看起来健康,但执行槽在等待新作业排队时处于闲置状态。
根本原因:Pueue的
会随每个完成的任务增长。当完成任务达到50K+(状态文件80-100MB)时,每个
操作需要1-2秒,而非<100ms。这是不可见的 — 无错误、无警告,只是吞吐量逐渐下降。
危险性:退化程度与守护进程生命周期内完成的任务总数成正比。每天运行10K作业的扫描任务会在第5天遇到问题。第一天运行正常,会造成虚假的安全感。
Before bulk submission: always clean
批量提交前:始终清理
pueue clean -g mygroup 2>/dev/null || true
pueue clean -g mygroup 2>/dev/null || true
During long sweeps: clean between batches
长时间扫描期间:批量之间清理
(See pueue-job-orchestration skill for full batch pattern)
(详见pueue-job-orchestration技能的完整批量模式)
Monitor state size as part of health checks
将状态文件大小作为健康检查的一部分
STATE_FILE="$HOME/.local/share/pueue/state.json"
ls -lh "$STATE_FILE" # Should be <10MB for healthy operation
**Invariant**: `state.json` size should stay below 50MB during active sweeps. Above 50MB, `pueue add` latency exceeds 500ms and parallel submission gains vanish.
**Cross-reference**: See `devops-tools:pueue-job-orchestration` State File Management section for benchmarks and the periodic clean pattern.
STATE_FILE="$HOME/.local/share/pueue/state.json"
ls -lh "$STATE_FILE" # 健康运行时应<10MB
**不变原则**:`state.json`在活跃扫描期间应保持在50MB以下。超过50MB时,`pueue add`延迟超过500ms,并行提交的收益消失。
**交叉参考**:详见`devops-tools:pueue-job-orchestration`的“状态文件管理”章节获取基准测试和定期清理模式。
AP-11: Wrong Working Directory in Remote Pueue Jobs
AP-11: 远程Pueue作业的工作目录错误
Symptom: Jobs fail immediately (exit code 2) with
can't open file 'scripts/populate.py': [Errno 2] No such file or directory
.
Root cause:
ssh host "pueue add -- uv run python scripts/process.py"
queues the job with the SSH session's cwd (typically
), not the project directory. The script path is relative, so pueue looks for
instead of
~/project/scripts/process.py
.
Fix: Use
(preferred) or
to set the working directory:
症状:作业立即失败(退出码2),提示
can't open file 'scripts/populate.py': [Errno 2] No such file or directory
。
根本原因:
ssh host "pueue add -- uv run python scripts/process.py"
排队的作业使用SSH会话的当前工作目录(通常为
),而非项目目录。脚本路径是相对路径,因此pueue会查找
而非
~/project/scripts/process.py
。
WRONG: pueue inherits SSH cwd ($HOME)
WRONG: pueue继承SSH的当前工作目录($HOME)
ssh host "pueue add --group mygroup -- uv run python scripts/process.py"
ssh host "pueue add --group mygroup -- uv run python scripts/process.py"
RIGHT (preferred): -w flag sets working directory explicitly
RIGHT(首选):-w标志显式设置工作目录
ssh host "pueue add -w ~/project --group mygroup -- uv run python scripts/process.py"
ssh host "pueue add -w ~/project --group mygroup -- uv run python scripts/process.py"
RIGHT (alternative): cd first, then pueue add inherits project cwd
RIGHT(替代方案):先cd,再pueue add继承项目工作目录
ssh host "cd ~/project && pueue add --group mygroup -- uv run python scripts/process.py"
**Note**: Pueue v4 **does** have `-w` / `--working-directory`. Use it as the primary approach. Fall back to `cd &&` for SSH-piped commands where `-w` path expansion may differ. On macOS, `-w /tmp` resolves to `/private/tmp` (symlink).
**Test**: After queuing, verify the Path column in `pueue status` shows the project directory, not `$HOME`.
ssh host "cd ~/project && pueue add --group mygroup -- uv run python scripts/process.py"
**注意**:Pueue v4 **支持** `-w`/`--working-directory`。将其作为主要方法。对于SSH管道命令,若`-w`路径展开可能不同,可回退到`cd &&`。在macOS上,`-w /tmp`会解析为`/private/tmp`(符号链接)。
**测试**:排队后,在`pueue status`的Path列中验证显示的是项目目录,而非`$HOME`。
AP-12: Per-File SSH for Bulk Job Submission
AP-12: 批量作业提交时每个文件单独使用SSH
Symptom: Submitting 300K jobs takes days because each
requires a separate SSH round-trip from the local machine to the remote host.
Root cause: The submission script runs locally and calls
per job. Each SSH connection has ~50-100ms overhead. At 300K jobs: 300K * 75ms = 6.25 hours just for SSH, before any submission latency.
Fix: Generate a commands file locally, rsync it to the remote host, then run
on the remote host to eliminate SSH overhead entirely:
症状:提交300K作业需要数天,因为每个
都需要从本地机器到远程主机的单独SSH往返。
根本原因:提交脚本在本地运行,每个作业调用一次
。每个SSH连接有~50-100ms的开销。300K作业的SSH开销为300K × 75ms = 6.25小时,还未算提交延迟。
修复方案:在本地生成命令文件,通过rsync传输到远程主机,然后在
远程主机上运行
,完全消除SSH开销:
Step 1 (local): Generate commands file
步骤1(本地):生成命令文件
bash gen_commands.sh > /tmp/commands.txt
bash gen_commands.sh > /tmp/commands.txt
Step 2 (local): Transfer to remote
步骤2(本地):传输到远程
rsync /tmp/commands.txt host:/tmp/commands.txt
rsync /tmp/commands.txt host:/tmp/commands.txt
Step 3 (remote): Feed via xargs -P (no SSH per-job)
步骤3(远程):通过xargs -P执行(无每作业SSH)
ssh host "xargs -P16 -I{} bash -c '{}' < /tmp/commands.txt"
**Invariant**: Bulk submission should run ON the same host as pueue. The only SSH call should be to start the feeder process, not per-job.
ssh host "xargs -P16 -I{} bash -c '{}' < /tmp/commands.txt"
**不变原则**:批量提交应在与pueue相同的主机上运行。唯一的SSH调用应是启动 feeder 进程,而非每个作业一次。
AP-13: SIGPIPE Under set -euo pipefail
AP-13: set -euo pipefail下的SIGPIPE
Symptom: Script exits with code 141 (128 + SIGPIPE=13) on harmless pipe operations.
Root cause:
—
reads 10 lines then closes stdin.
gets SIGPIPE writing to closed pipe. Under
, this propagates as exit 141.
Fix: Avoid piping to
in strict-mode scripts:
症状:脚本在无害的管道操作中以代码141(128 + SIGPIPE=13)退出。
根本原因:
—
读取10行后关闭标准输入。
向已关闭的管道写入时收到SIGPIPE。在
下,这会传播为退出码141。
WRONG (exit 141)
WRONG(退出码141)
ls /tmp/sql/*.sql | head -10
ls /tmp/sql/*.sql | head -10
RIGHT (temp file)
RIGHT(临时文件)
ls /tmp/sql/*.sql > /tmp/filelist.txt
head -10 /tmp/filelist.txt
ls /tmp/sql/*.sql > /tmp/filelist.txt
head -10 /tmp/filelist.txt
AP-14: False Data Loss From Variable-Width NDJSON Output
AP-14: 可变宽度NDJSON输出导致的虚假数据丢失
Symptom:
shows fewer lines than expected. Appears as 3-6% "data loss".
Root cause: Configs with 0 signals after feature filtering produce 1 NDJSON line (skipped entry), not N barrier lines. Example: 95 normal × 3 + 5 skipped × 1 = 290 (not 300).
Fix: Account for variable output width in line count validation:
expected = N_normal * barriers_per_query + N_skipped * 1 + N_error * 1
症状:
显示的行数少于预期。看起来像是3-6%的“数据丢失”。
根本原因:特征过滤后无信号的配置会生成1行NDJSON(跳过条目),而非N行屏障线。示例:95个正常×3 +5个跳过×1=290(而非300)。
修复方案:在行数验证中考虑可变输出宽度:
expected = 正常数量 × 每个查询的屏障数 + 跳过数量 ×1 + 错误数量 ×1
The Mise + Pueue + systemd-run Stack
Mise + Pueue + systemd-run 技术栈
mise (environment + task discovery)
|-- .mise.toml [env] -> SSoT for defaults
|-- .mise/tasks/jobs.toml -> task definitions
| |-- mise run jobs:submit-all
| | |-- submit-all.sh (orchestrator)
| | |-- pueue add (per-unit, NOT per-query)
| | |-- submit_unit.sh (per unit)
| | |-- xargs -P16 (parallel queries)
| | |-- wrapper.sh (per query)
| | |-- clickhouse-client < sql_file
| | |-- flock + append NDJSON
| |
| |-- mise run jobs:process-all (Python pipeline variant)
| | |-- job-runner.sh (orchestrator)
| | |-- pueue add (per-job)
| | |-- systemd-run --scope -p MemoryMax=XG -p MemorySwapMax=0
| | |-- uv run python scripts/process.py
| | |-- run_resumable_job()
| | |-- get_checkpoint_path() -> param-aware
| | |-- checkpoint.save() -> atomic write
| | |-- checkpoint.unlink() -> missing_ok=True
| |
| |-- mise run jobs:autoscale-loop
| |-- autoscaler.sh --loop (60s interval)
| |-- reads: free -m, uptime, pueue status --json
| |-- adjusts: pueue parallel N --group <group>
Responsibility boundaries:
| Layer | Responsibility |
|---|
| mise | Environment variables, tool versions, task discovery |
| pueue | Daemon persistence, parallelism limits, restart, |
| systemd-run | Per-job cgroup memory caps (Linux only, no-op on macOS) |
| autoscaler | Dynamic parallelism tuning based on host resources |
| Python/app | Domain logic, checkpoint management, data integrity |
mise(环境 + 任务发现)
|-- .mise.toml [env] -> 默认值的唯一可信源
|-- .mise/tasks/jobs.toml -> 任务定义
| |-- mise run jobs:submit-all
| | |-- submit-all.sh(编排器)
| | |-- pueue add(按单元,而非按查询)
| | |-- submit_unit.sh(按单元)
| | |-- xargs -P16(并行查询)
| | |-- wrapper.sh(按查询)
| | |-- clickhouse-client < sql_file
| | |-- flock + 追加NDJSON
| |
| |-- mise run jobs:process-all(Python流水线变体)
| | |-- job-runner.sh(编排器)
| | |-- pueue add(按作业)
| | |-- systemd-run --scope -p MemoryMax=XG -p MemorySwapMax=0
| | |-- uv run python scripts/process.py
| | |-- run_resumable_job()
| | |-- get_checkpoint_path() -> 感知参数
| | |-- checkpoint.save() -> 原子写入
| | |-- checkpoint.unlink() -> missing_ok=True
| |
| |-- mise run jobs:autoscale-loop
| |-- autoscaler.sh --loop(60秒间隔)
| |-- 读取:free -m, uptime, pueue status --json
| |-- 调整:pueue parallel N --group <group>
职责边界:
| 层级 | 职责 |
|---|
| mise | 环境变量、工具版本、任务发现 |
| pueue | 守护进程持久化、并行度限制、重启、依赖管理 |
| systemd-run | 单作业cgroup内存上限(仅Linux,macOS上无作用) |
| autoscaler | 基于主机资源的动态并行度调优 |
| Python/应用 | 领域逻辑、检查点管理、数据完整性 |
Remote Deployment Protocol
远程部署协议
When deploying a fix to a running host:
1. AUDIT: ssh host 'pueue status --json' -> count running/queued/failed
2. DECIDE: Wait for running jobs? Kill? Let them finish with old code?
3. PULL: ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
4. VERIFY: ssh host 'cd ~/project && python -c "import pkg; print(pkg.__version__)"'
5. UPGRADE: ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
6. RESTART: ssh host 'pueue restart <failed_id>' OR add fresh jobs
7. MONITOR: ssh host 'pueue status --group mygroup'
Critical: Step 1 (AUDIT) is mandatory. Skipping it is the root cause of cascade failures.
See: references/deployment-checklist.md for full protocol.
向运行中的主机部署修复时:
1. 审计: ssh host 'pueue status --json' -> 统计运行中/排队中/失败的作业数
2. 决策: 等待运行中的作业完成?杀死?让它们用旧代码完成?
3. 拉取: ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
4. 验证: ssh host 'cd ~/project && python -c "import pkg; print(pkg.__version__)"'
5. 升级: ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
6. 重启: ssh host 'pueue restart <failed_id>' 或添加新作业
7. 监控: ssh host 'pueue status --group mygroup'
关键要求:步骤1(审计)是必须的。跳过此步骤是级联故障的根本原因。
参考:references/deployment-checklist.md获取完整协议。
Concurrency Safety Decision Tree
并发安全决策树
Adding a new parameter to a resumable job function?
|-- Is it job-differentiating (two jobs can have different values)?
| |-- YES -> Add to checkpoint filename
| | Add to pueue job label
| | Add to remote checkpoint key
| |-- NO -> Skip (e.g., verbose, notify are per-run, not per-job)
|
|-- Does the function delete files?
| |-- YES -> Use missing_ok=True
| | Use atomic write for creates
| |-- NO -> Standard operation
|
|-- Does the function write to shared storage?
|-- YES -> Force deduplication after write
| Use UPSERT semantics where possible
|-- NO -> Standard operation
向可恢复作业函数添加新参数?
|-- 该参数是否区分作业(两个作业可以有不同值)?
| |-- 是 -> 添加到检查点文件名
| | 添加到pueue作业标签
| | 添加到远程检查点键
| |-- 否 -> 跳过(如verbose、notify是按运行而非按作业的参数)
|
|-- 函数是否删除文件?
| |-- 是 -> 使用missing_ok=True
| | 创建时使用原子写入
| |-- 否 -> 标准操作
|
|-- 函数是否写入共享存储?
|-- 是 -> 写入后强制去重
| 尽可能使用UPSERT语义
|-- 否 -> 标准操作
Pueue has no resource awareness. The autoscaler complements it with dynamic parallelism tuning.
How it works: Reads CPU load + available memory, then adjusts
per group.
CPU < 40% AND MEM < 60% -> SCALE UP (+1 per group)
CPU > 80% OR MEM > 80% -> SCALE DOWN (-1 per group)
Otherwise -> HOLD
Incremental scaling protocol -- don't jump to max capacity. Ramp up in steps and verify stability at each level:
Step 1: Start with conservative defaults (e.g., group1=2, group2=3)
Step 2: After jobs stabilize (~5 min), probe: uptime + free -h + ps aux
Step 3: If load < 40% cores AND memory < 60% available:
Bump by +1-2 jobs per group
Step 4: Wait ~5 min for new jobs to reach peak memory
Step 5: Probe again. If still within 80% margin, bump again
Step 6: Repeat until load ~50% cores OR memory ~70% available
Why incremental: Job memory footprint grows over time (a job may start at ~500 MB and peak at 5+ GB). Jumping straight to max parallelism risks OOM when all jobs hit peak simultaneously.
Safety bounds: Each group should have min/max limits the autoscaler won't exceed. It should also check per-job memory estimates before scaling up (don't add a 5 GB job if only 3 GB available).
Dynamic adjustment (pueue supports live tuning without restarting jobs):
Pueue无资源感知能力。自动扩缩容器通过动态并行度调优对其进行补充。
工作原理:读取CPU负载 + 可用内存,然后调整每个组的
。
CPU < 40% 且 内存 < 60% -> 扩容(每个组+1)
CPU > 80% 或 内存 > 80% -> 缩容(每个组-1)
其他情况 -> 保持
增量扩容协议 — 不要直接跳到最大容量。逐步扩容并在每个级别验证稳定性:
步骤1: 从保守默认值开始(如group1=2,group2=3)
步骤2: 作业稳定后(约5分钟),探测:uptime + free -h + ps aux
步骤3: 若负载 < 40%核心 且 内存 < 60%可用:
每个组增加1-2个作业
步骤4: 等待约5分钟,让新作业达到内存峰值
步骤5: 再次探测。若仍在80%余量内,继续扩容
步骤6: 重复直到负载~50%核心 或 内存~70%可用
为什么要增量:作业内存占用会随时间增长(作业可能从~500MB开始,峰值达到5+GB)。直接跳到最大并行度可能导致所有作业同时达到峰值时出现OOM。
安全边界:每个组应设置自动扩缩容器不会超过的最小/最大限制。在扩容前还应检查单作业内存估算(若仅剩余3GB可用,不要添加5GB的作业)。
动态调整(pueue支持实时调优,无需重启运行中的作业):
Scale up when resources are available
资源可用时扩容
pueue parallel 4 --group group1
pueue parallel 5 --group group2
pueue parallel 4 --group group1
pueue parallel 5 --group group2
Scale down if memory pressure detected
检测到内存压力时缩容
pueue parallel 2 --group group1
**Per-symbol/per-family groups**: When jobs have vastly different resource profiles, give each family its own pueue group. This prevents a single high-memory job type from starving lighter jobs:
```bash
pueue parallel 2 --group group1
**按符号/按家族分组**:当作业的资源配置差异很大时,为每个家族创建独立的pueue组。这可防止单一高内存作业类型抢占轻量作业的资源:
```bash
Example: high-volume symbols need fewer concurrent jobs (5 GB each)
示例:高流量符号需要更少的并发作业(每个5GB)
pueue group add highvol-yearly --parallel 2
pueue group add highvol-yearly --parallel 2
Low-volume symbols can run more concurrently (1 GB each)
低流量符号可以运行更多并发作业(每个1GB)
pueue group add lowvol-yearly --parallel 6
pueue group add lowvol-yearly --parallel 6
Project-Specific Extensions
项目特定扩展
This skill provides
universal patterns that apply to any distributed job pipeline. Projects should create a
local extension skill (e.g.,
) in their
directory that provides:
| Local Extension Provides | Example |
|---|
| Concrete function names | -> |
| Application-specific env vars | , |
| Memory profiles per job type | "250 dbps peaks at 5 GB, use MemoryMax=8G" |
| Database-specific audit queries | SELECT ... FROM mydb.mytable ... countIf(x < 0)
|
| Issue provenance tracking | "Checkpoint race: GH-84" |
| Host-specific configuration | "bigblack: 32 cores, 61 GB, groups p1/p2/p3/p4" |
Two-layer invocation pattern: When this skill is triggered, also check for and invoke any local
skill in the project's
directory for project-specific configuration.
devops-tools:distributed-job-safety (universal patterns - this skill)
+ .claude/skills/myproject-job-safety (project-specific config)
= Complete operational knowledge
本技能提供
通用模式,适用于任何分布式作业流水线。项目应在其
目录中创建
本地扩展技能(如
),提供:
| 本地扩展提供内容 | 示例 |
|---|
| 具体函数名 | -> |
| 应用特定环境变量 | , |
| 按作业类型划分的内存配置文件 | "250 dbps峰值5GB,使用MemoryMax=8G" |
| 数据库特定审计查询 | SELECT ... FROM mydb.mytable ... countIf(x < 0)
|
| 问题来源跟踪 | "检查点竞争:GH-84" |
| 主机特定配置 | "bigblack: 32核心,61GB内存,分组p1/p2/p3/p4" |
两层调用模式:触发本技能时,还应检查并调用项目
目录中的任何本地
技能,获取项目特定配置。
devops-tools:distributed-job-safety (通用模式 - 本技能)
+ .claude/skills/myproject-job-safety (项目特定配置)
= 完整的运维知识
- Concurrency Invariants -- Formal invariant specifications (INV-1 through INV-8)
- Deployment Checklist -- Step-by-step remote deployment protocol
- Environment Gotchas -- Host-specific pitfalls (G-1 through G-14)
- Cross-reference:
devops-tools:pueue-job-orchestration
-- Pueue basics, dependency chaining, installation
- 并发不变原则 -- 正式的不变原则规范(INV-1至INV-8)
- 部署检查清单 -- 分步远程部署协议
- 环境陷阱 -- 主机特定陷阱(G-1至G-14)
- 交叉参考:
devops-tools:pueue-job-orchestration
-- Pueue基础、依赖链、安装