process-substitution-fifos

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CRITICAL GUIDELINES

重要指南

Windows File Path Requirements

Windows文件路径要求

MANDATORY: Always Use Backslashes on Windows for File Paths
When using Edit or Write tools on Windows, you MUST use backslashes (
\
) in file paths, NOT forward slashes (
/
).

强制要求:在Windows系统中使用文件路径时必须使用反斜杠(\)
在Windows系统中使用编辑或写入工具时,文件路径必须使用反斜杠(\),绝不能使用正斜杠(/)。

Process Substitution & FIFOs (2025)

进程替换与命名管道(FIFO)(2025版)

Overview

概述

Master advanced inter-process communication patterns in bash using process substitution, named pipes (FIFOs), and efficient data streaming techniques. These patterns enable powerful data pipelines without temporary files.
掌握Bash中使用进程替换、命名管道(FIFO)和高效数据流技术的高级进程间通信(IPC)模式。这些模式无需临时文件即可构建强大的数据管道。

Process Substitution Basics

进程替换基础

Input Process Substitution
<(command)

输入进程替换
<(command)

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Compare two command outputs

比较两个命令的输出

diff <(sort file1.txt) <(sort file2.txt)
diff <(sort file1.txt) <(sort file2.txt)

Compare remote and local files

比较远程和本地文件

diff <(ssh server 'cat /etc/config') /etc/config
diff <(ssh server 'cat /etc/config') /etc/config

Merge sorted files

合并已排序的文件

sort -m <(sort file1.txt) <(sort file2.txt) <(sort file3.txt)
sort -m <(sort file1.txt) <(sort file2.txt) <(sort file3.txt)

Read from multiple sources simultaneously

同时从多个源读取数据

paste <(cut -f1 data.tsv) <(cut -f3 data.tsv)
paste <(cut -f1 data.tsv) <(cut -f3 data.tsv)

Feed command output to programs expecting files

将命令输出提供给需要文件作为参数的程序

Many programs require filename arguments, not stdin

许多程序要求传入文件名参数,而非标准输入

wc -l <(grep "error" *.log)
wc -l <(grep "error" *.log)

Process API response with tool expecting file

使用需要文件的工具处理API响应

jq '.items[]' <(curl -s "https://api.example.com/data")
jq '.items[]' <(curl -s "https://api.example.com/data")

Source environment from command output

从命令输出中加载环境变量

source <(aws configure export-credentials --format env)
source <(aws configure export-credentials --format env)

Feed to while loop without subshell issues

将输入传入while循环,避免子shell问题

while IFS= read -r line; do ((count++)) process "$line" done < <(find . -name "*.txt") echo "Processed $count files" # Variable survives!
undefined
while IFS= read -r line; do ((count++)) process "$line" done < <(find . -name "*.txt") echo "已处理 $count 个文件" # 变量值得以保留!
undefined

Output Process Substitution
>(command)

输出进程替换
>(command)

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Write to multiple destinations simultaneously (tee alternative)

同时写入多个目标(tee命令的替代方案)

echo "Log message" | tee >(logger -t myapp) >(mail -s "Alert" admin@example.com)
echo "日志消息" | tee >(logger -t myapp) >(mail -s "告警" admin@example.com)

Compress and checksum in one pass

一次完成压缩和校验

tar cf - /data | tee >(gzip > backup.tar.gz) >(sha256sum > backup.sha256)
tar cf - /data | tee >(gzip > backup.tar.gz) >(sha256sum > backup.sha256)

Send output to multiple processors

将输出发送到多个处理器

generate_data | tee >(processor1 > result1.txt) >(processor2 > result2.txt) > /dev/null
generate_data | tee >(processor1 > result1.txt) >(processor2 > result2.txt) > /dev/null

Log and process simultaneously

同时记录日志和处理输出

./build.sh 2>&1 | tee >(grep -i error > errors.log) >(grep -i warning > warnings.log)
./build.sh 2>&1 | tee >(grep -i error > errors.log) >(grep -i warning > warnings.log)

Real-time filtering with multiple outputs

实时过滤并输出到多个目标

tail -f /var/log/syslog | tee
>(grep --line-buffered "ERROR" >> errors.log)
>(grep --line-buffered "WARNING" >> warnings.log)
>(grep --line-buffered "CRITICAL" | mail -s "Critical Alert" admin@example.com)
undefined
tail -f /var/log/syslog | tee
>(grep --line-buffered "ERROR" >> errors.log)
>(grep --line-buffered "WARNING" >> warnings.log)
>(grep --line-buffered "CRITICAL" | mail -s "严重告警" admin@example.com)
undefined

Combining Input and Output Substitution

组合输入和输出进程替换

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Transform and compare

转换并比较内容

diff <(sort input.txt | uniq) <(sort reference.txt | uniq)
diff <(sort input.txt | uniq) <(sort reference.txt | uniq)

Pipeline with multiple branches

多分支数据管道

cat data.csv | tee
>(awk -F, '{print $1}' > column1.txt)
>(awk -F, '{print $2}' > column2.txt)
| wc -l
cat data.csv | tee
>(awk -F, '{print $1}' > column1.txt)
>(awk -F, '{print $2}' > column2.txt)
| wc -l

Complex data flow

复杂数据流

process_data() { local input="$1"
# Read from process substitution, write to multiple outputs
while IFS= read -r line; do
    echo "$line" | tee \
        >(echo "LOG: $line" >> "$log_file") \
        >(process_line "$line" >> results.txt)
done < <(cat "$input" | filter_input)
}
undefined
process_data() { local input="$1"
# 从进程替换读取数据,写入多个输出
while IFS= read -r line; do
    echo "$line" | tee \
        >(echo "LOG: $line" >> "$log_file") \
        >(process_line "$line" >> results.txt)
done < <(cat "$input" | filter_input)
}
undefined

Named Pipes (FIFOs)

命名管道(FIFO)

Creating and Using FIFOs

创建和使用FIFO

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Create FIFO

创建FIFO

mkfifo my_pipe
mkfifo my_pipe

Clean up on exit

退出时清理资源

trap 'rm -f my_pipe' EXIT
trap 'rm -f my_pipe' EXIT

Writer (in background or separate terminal)

写入端(在后台或单独终端运行)

echo "Hello from writer" > my_pipe &
echo "来自写入端的消息" > my_pipe &

Reader (blocks until data available)

读取端(会阻塞直到有数据可用)

cat < my_pipe
cat < my_pipe

With timeout (using read)

带超时的读取(使用read命令)

if read -t 5 line < my_pipe; then echo "Received: $line" else echo "Timeout waiting for data" fi
undefined
if read -t 5 line < my_pipe; then echo "收到数据: $line" else echo "等待数据超时" fi
undefined

Bidirectional Communication

双向通信

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Create two FIFOs for bidirectional communication

创建两个FIFO用于双向通信

REQUEST_PIPE="/tmp/request_$$" RESPONSE_PIPE="/tmp/response_$$"
mkfifo "$REQUEST_PIPE" "$RESPONSE_PIPE" trap 'rm -f "$REQUEST_PIPE" "$RESPONSE_PIPE"' EXIT
REQUEST_PIPE="/tmp/request_$$" RESPONSE_PIPE="/tmp/response_$$"
mkfifo "$REQUEST_PIPE" "$RESPONSE_PIPE" trap 'rm -f "$REQUEST_PIPE" "$RESPONSE_PIPE"' EXIT

Server process

服务端进程

server() { while true; do if read -r request < "$REQUEST_PIPE"; then case "$request" in "QUIT") echo "BYE" > "$RESPONSE_PIPE" break ;; "TIME") date > "$RESPONSE_PIPE" ;; "UPTIME") uptime > "$RESPONSE_PIPE" ;; *) echo "UNKNOWN: $request" > "$RESPONSE_PIPE" ;; esac fi done }
server() { while true; do if read -r request < "$REQUEST_PIPE"; then case "$request" in "QUIT") echo "BYE" > "$RESPONSE_PIPE" break ;; "TIME") date > "$RESPONSE_PIPE" ;; "UPTIME") uptime > "$RESPONSE_PIPE" ;; *) echo "UNKNOWN: $request" > "$RESPONSE_PIPE" ;; esac fi done }

Client function

客户端函数

send_request() { local request="$1" echo "$request" > "$REQUEST_PIPE" cat < "$RESPONSE_PIPE" }
send_request() { local request="$1" echo "$request" > "$REQUEST_PIPE" cat < "$RESPONSE_PIPE" }

Start server in background

在后台启动服务端

server & SERVER_PID=$!
server & SERVER_PID=$!

Send requests

发送请求

send_request "TIME" send_request "UPTIME" send_request "QUIT"
wait "$SERVER_PID"
undefined
send_request "TIME" send_request "UPTIME" send_request "QUIT"
wait "$SERVER_PID"
undefined

Producer-Consumer Pattern

生产者-消费者模式

bash
#!/usr/bin/env bash
set -euo pipefail

WORK_QUEUE="/tmp/work_queue_$$"
mkfifo "$WORK_QUEUE"
trap 'rm -f "$WORK_QUEUE"' EXIT
bash
#!/usr/bin/env bash
set -euo pipefail

WORK_QUEUE="/tmp/work_queue_$$"
mkfifo "$WORK_QUEUE"
trap 'rm -f "$WORK_QUEUE"' EXIT

Producer

生产者

producer() { local item for item in {1..100}; do echo "TASK:$item" done echo "DONE" }
producer() { local item for item in {1..100}; do echo "TASK:$item" done echo "DONE" }

Consumer (can have multiple)

消费者(可以启动多个)

consumer() { local id="$1" while read -r item; do [[ "$item" == "DONE" ]] && break echo "Consumer $id processing: $item" sleep 0.1 # Simulate work done }
consumer() { local id="$1" while read -r item; do [[ "$item" == "DONE" ]] && break echo "消费者 $id 正在处理: $item" sleep 0.1 # 模拟任务处理 done }

Start consumers (they'll block waiting for data)

启动消费者(会阻塞等待数据)

consumer 1 < "$WORK_QUEUE" & consumer 2 < "$WORK_QUEUE" & consumer 3 < "$WORK_QUEUE" &
consumer 1 < "$WORK_QUEUE" & consumer 2 < "$WORK_QUEUE" & consumer 3 < "$WORK_QUEUE" &

Start producer

启动生产者

producer > "$WORK_QUEUE"
wait echo "All work complete"
undefined
producer > "$WORK_QUEUE"
wait echo "所有任务处理完成"
undefined

FIFO with File Descriptors

结合文件描述符的FIFO

bash
#!/usr/bin/env bash
set -euo pipefail

FIFO="/tmp/fd_fifo_$$"
mkfifo "$FIFO"
trap 'rm -f "$FIFO"' EXIT
bash
#!/usr/bin/env bash
set -euo pipefail

FIFO="/tmp/fd_fifo_$$"
mkfifo "$FIFO"
trap 'rm -f "$FIFO"' EXIT

Open FIFO for read/write on FD 3

在文件描述符3上以读写模式打开FIFO

Opening for both prevents blocking on open

同时打开读写模式可避免打开时阻塞

exec 3<>"$FIFO"
exec 3<>"$FIFO"

Write to FIFO via FD

通过文件描述符写入FIFO

echo "Message 1" >&3 echo "Message 2" >&3
echo "消息1" >&3 echo "消息2" >&3

Read from FIFO via FD

通过文件描述符读取FIFO

read -r msg1 <&3 read -r msg2 <&3 echo "Got: $msg1, $msg2"
read -r msg1 <&3 read -r msg2 <&3 echo "收到数据: $msg1, $msg2"

Close FD

关闭文件描述符

exec 3>&-
undefined
exec 3>&-
undefined

Coprocess (Bash 4+)

协进程(Bash 4+)

Basic Coprocess Usage

协进程基础用法

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Start coprocess (bidirectional pipe)

启动协进程(双向管道)

coproc BC { bc -l; }
coproc BC { bc -l; }

Send data to coprocess

向协进程发送数据

echo "scale=10; 355/113" >&"${BC[1]}"
echo "scale=10; 355/113" >&"${BC[1]}"

Read result

读取协进程的结果

read -r result <&"${BC[0]}" echo "Pi approximation: $result"
read -r result <&"${BC[0]}" echo "π的近似值: $result"

More calculations

更多计算

echo "sqrt(2)" >&"${BC[1]}" read -r sqrt2 <&"${BC[0]}" echo "Square root of 2: $sqrt2"
echo "sqrt(2)" >&"${BC[1]}" read -r sqrt2 <&"${BC[0]}" echo "2的平方根: $sqrt2"

Close write end to signal EOF

关闭写入端以发送EOF信号

exec {BC[1]}>&-
exec {BC[1]}>&-

Wait for coprocess to finish

等待协进程结束

wait "$BC_PID"
undefined
wait "$BC_PID"
undefined

Named Coprocess

命名协进程

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Named coprocess for Python interpreter

为Python解释器创建命名协进程

coproc PYTHON { python3 -u -c " import sys for line in sys.stdin: exec(line.strip()) "; }
coproc PYTHON { python3 -u -c " import sys for line in sys.stdin: exec(line.strip()) "; }

Send Python commands

发送Python命令

echo "print('Hello from Python')" >&"${PYTHON[1]}" read -r output <&"${PYTHON[0]}" echo "Python said: $output"
echo "print(2**100)" >&"${PYTHON[1]}" read -r big_num <&"${PYTHON[0]}" echo "2^100 = $big_num"
echo "print('来自Python的消息')" >&"${PYTHON[1]}" read -r output <&"${PYTHON[0]}" echo "Python输出: $output"
echo "print(2**100)" >&"${PYTHON[1]}" read -r big_num <&"${PYTHON[0]}" echo "2^100 = $big_num"

Cleanup

清理资源

exec {PYTHON[1]}>&- wait "$PYTHON_PID" 2>/dev/null || true
undefined
exec {PYTHON[1]}>&- wait "$PYTHON_PID" 2>/dev/null || true
undefined

Coprocess Pool Pattern

协进程池模式

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Create pool of worker coprocesses

创建工作协进程池

declare -A WORKERS declare -A WORKER_PIDS
start_workers() { local count="$1" local i
for ((i=0; i<count; i++)); do
    # Each worker runs a processing loop
    coproc "WORKER_$i" {
        while IFS= read -r task; do
            [[ "$task" == "QUIT" ]] && exit 0
            # Simulate work
            sleep 0.1
            echo "DONE:$task"
        done
    }

    # Store FDs dynamically
    local -n write_fd="WORKER_${i}[1]"
    local -n read_fd="WORKER_${i}[0]"
    local -n pid="WORKER_${i}_PID"

    WORKERS["$i,in"]="$write_fd"
    WORKERS["$i,out"]="$read_fd"
    WORKER_PIDS["$i"]="$pid"
done
}
declare -A WORKERS declare -A WORKER_PIDS
start_workers() { local count="$1" local i
for ((i=0; i<count; i++)); do
    # 每个工作协进程运行处理循环
    coproc "WORKER_$i" {
        while IFS= read -r task; do
            [[ "$task" == "QUIT" ]] && exit 0
            # 模拟任务处理
            sleep 0.1
            echo "DONE:$task"
        done
    }

    # 动态存储文件描述符
    local -n write_fd="WORKER_${i}[1]"
    local -n read_fd="WORKER_${i}[0]"
    local -n pid="WORKER_${i}_PID"

    WORKERS["$i,in"]="$write_fd"
    WORKERS["$i,out"]="$read_fd"
    WORKER_PIDS["$i"]="$pid"
done
}

Note: Coprocess pool management is complex

注意:协进程池管理复杂度较高

Consider GNU Parallel for production workloads

生产环境建议使用GNU Parallel

undefined
undefined

Advanced Patterns

高级模式

Progress Monitoring with FIFO

基于FIFO的进度监控

bash
#!/usr/bin/env bash
set -euo pipefail

PROGRESS_PIPE="/tmp/progress_$$"
mkfifo "$PROGRESS_PIPE"
trap 'rm -f "$PROGRESS_PIPE"' EXIT
bash
#!/usr/bin/env bash
set -euo pipefail

PROGRESS_PIPE="/tmp/progress_$$"
mkfifo "$PROGRESS_PIPE"
trap 'rm -f "$PROGRESS_PIPE"' EXIT

Progress monitor

进度监控器

monitor_progress() { local total="$1" local current=0
while read -r update; do
    ((current++))
    local pct=$((current * 100 / total))
    printf "\rProgress: [%-50s] %d%%" \
        "$(printf '#%.0s' $(seq 1 $((pct/2))))" "$pct"
done < "$PROGRESS_PIPE"
echo
}
monitor_progress() { local total="$1" local current=0
while read -r update; do
    ((current++))
    local pct=$((current * 100 / total))
    printf "\r进度: [%-50s] %d%%" \
        "$(printf '#%.0s' $(seq 1 $((pct/2))))" "$pct"
done < "$PROGRESS_PIPE"
echo
}

Worker that reports progress

上报进度的工作进程

do_work() { local items=("$@") local item
for item in "${items[@]}"; do
    process_item "$item"
    echo "done" > "$PROGRESS_PIPE"
done
}
do_work() { local items=("$@") local item
for item in "${items[@]}"; do
    process_item "$item"
    echo "done" > "$PROGRESS_PIPE"
done
}

Usage

使用示例

items=(item1 item2 item3 ... item100) monitor_progress "${#items[@]}" & MONITOR_PID=$!
do_work "${items[@]}"
exec 3>"$PROGRESS_PIPE" # Keep pipe open exec 3>&- # Close to signal completion wait "$MONITOR_PID"
undefined
items=(item1 item2 item3 ... item100) monitor_progress "${#items[@]}" & MONITOR_PID=$!
do_work "${items[@]}"
exec 3>"$PROGRESS_PIPE" # 保持管道打开 exec 3>&- # 关闭管道以通知完成 wait "$MONITOR_PID"
undefined

Log Aggregator with Multiple FIFOs

基于多FIFO的日志聚合器

bash
#!/usr/bin/env bash
set -euo pipefail

LOG_DIR="/tmp/logs_$$"
mkdir -p "$LOG_DIR"
bash
#!/usr/bin/env bash
set -euo pipefail

LOG_DIR="/tmp/logs_$$"
mkdir -p "$LOG_DIR"

Create FIFOs for each log level

为每个日志级别创建FIFO

for level in DEBUG INFO WARN ERROR; do mkfifo "$LOG_DIR/$level" done
trap 'rm -rf "$LOG_DIR"' EXIT
for level in DEBUG INFO WARN ERROR; do mkfifo "$LOG_DIR/$level" done
trap 'rm -rf "$LOG_DIR"' EXIT

Aggregator process

日志聚合进程

aggregate_logs() { local output_file="$1"
# Open all FIFOs for reading
exec 3<"$LOG_DIR/DEBUG"
exec 4<"$LOG_DIR/INFO"
exec 5<"$LOG_DIR/WARN"
exec 6<"$LOG_DIR/ERROR"

while true; do
    # Use select-like behavior with read timeout
    read -t 0.1 -r msg <&3 && echo "[DEBUG] $(date '+%H:%M:%S') $msg" >> "$output_file"
    read -t 0.1 -r msg <&4 && echo "[INFO]  $(date '+%H:%M:%S') $msg" >> "$output_file"
    read -t 0.1 -r msg <&5 && echo "[WARN]  $(date '+%H:%M:%S') $msg" >> "$output_file"
    read -t 0.1 -r msg <&6 && echo "[ERROR] $(date '+%H:%M:%S') $msg" >> "$output_file"
done
}
aggregate_logs() { local output_file="$1"
# 打开所有FIFO用于读取
exec 3<"$LOG_DIR/DEBUG"
exec 4<"$LOG_DIR/INFO"
exec 5<"$LOG_DIR/WARN"
exec 6<"$LOG_DIR/ERROR"

while true; do
    # 带超时的类select行为
    read -t 0.1 -r msg <&3 && echo "[DEBUG] $(date '+%H:%M:%S') $msg" >> "$output_file"
    read -t 0.1 -r msg <&4 && echo "[INFO]  $(date '+%H:%M:%S') $msg" >> "$output_file"
    read -t 0.1 -r msg <&5 && echo "[WARN]  $(date '+%H:%M:%S') $msg" >> "$output_file"
    read -t 0.1 -r msg <&6 && echo "[ERROR] $(date '+%H:%M:%S') $msg" >> "$output_file"
done
}

Logging functions

日志记录函数

log_debug() { echo "$" > "$LOG_DIR/DEBUG"; } log_info() { echo "$" > "$LOG_DIR/INFO"; } log_warn() { echo "$" > "$LOG_DIR/WARN"; } log_error() { echo "$" > "$LOG_DIR/ERROR"; }
log_debug() { echo "$" > "$LOG_DIR/DEBUG"; } log_info() { echo "$" > "$LOG_DIR/INFO"; } log_warn() { echo "$" > "$LOG_DIR/WARN"; } log_error() { echo "$" > "$LOG_DIR/ERROR"; }

Start aggregator

启动聚合器

aggregate_logs "/var/log/app.log" & AGGREGATOR_PID=$!
aggregate_logs "/var/log/app.log" & AGGREGATOR_PID=$!

Application code uses logging functions

应用代码调用日志函数

log_info "Application started" log_debug "Processing item" log_warn "Resource running low" log_error "Critical failure"
log_info "应用启动" log_debug "处理任务" log_warn "资源不足" log_error "严重故障"

Cleanup

清理资源

kill "$AGGREGATOR_PID" 2>/dev/null
undefined
kill "$AGGREGATOR_PID" 2>/dev/null
undefined

Data Pipeline with Buffering

带缓冲的数据管道

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Buffered pipeline stage

带缓冲的管道阶段

buffered_stage() { local name="$1" local buffer_size="${2:-100}" local buffer=()
while IFS= read -r line || [[ ${#buffer[@]} -gt 0 ]]; do
    if [[ -n "$line" ]]; then
        buffer+=("$line")
    fi

    # Flush when buffer full or EOF
    if [[ ${#buffer[@]} -ge $buffer_size ]] || [[ -z "$line" && ${#buffer[@]} -gt 0 ]]; then
        printf '%s\n' "${buffer[@]}" | process_batch
        buffer=()
    fi
done
}
buffered_stage() { local name="$1" local buffer_size="${2:-100}" local buffer=()
while IFS= read -r line || [[ ${#buffer[@]} -gt 0 ]]; do
    if [[ -n "$line" ]]; then
        buffer+=("$line")
    fi

    # 缓冲区满或到达EOF时刷新
    if [[ ${#buffer[@]} -ge $buffer_size ]] || [[ -z "$line" && ${#buffer[@]} -gt 0 ]]; then
        printf '%s\n' "${buffer[@]}" | process_batch
        buffer=()
    fi
done
}

Parallel pipeline with process substitution

基于进程替换的并行管道

run_parallel_pipeline() { local input="$1"
cat "$input" | \
    tee >(filter_a | transform_a > output_a.txt) \
        >(filter_b | transform_b > output_b.txt) \
        >(filter_c | transform_c > output_c.txt) \
    > /dev/null

# Wait for all background processes
wait
}
undefined
run_parallel_pipeline() { local input="$1"
cat "$input" | \
    tee >(filter_a | transform_a > output_a.txt) \
        >(filter_b | transform_b > output_b.txt) \
        >(filter_c | transform_c > output_c.txt) \
    > /dev/null

# 等待所有后台进程完成
wait
}
undefined

Streaming JSON Processing

流式JSON处理

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Stream JSON array elements

流式处理JSON数组元素

stream_json_array() { local url="$1"
# Use jq to stream array elements one per line
curl -s "$url" | jq -c '.items[]' | while IFS= read -r item; do
    process_json_item "$item"
done
}
stream_json_array() { local url="$1"
# 使用jq将数组元素逐行输出
curl -s "$url" | jq -c '.items[]' | while IFS= read -r item; do
    process_json_item "$item"
done
}

Parallel JSON processing with process substitution

基于进程替换的并行JSON处理

parallel_json_process() { local input="$1" local workers=4
# Split input across workers
jq -c '.[]' "$input" | \
    parallel --pipe -N100 --jobs "$workers" '
        while IFS= read -r item; do
            echo "$item" | jq ".processed = true"
        done
    ' | jq -s '.'
}
parallel_json_process() { local input="$1" local workers=4
# 将输入拆分到多个工作进程
jq -c '.[]' "$input" | \
    parallel --pipe -N100 --jobs "$workers" '
        while IFS= read -r item; do
            echo "$item" | jq ".processed = true"
        done
    ' | jq -s '.'
}

Transform JSON stream

转换JSON流

transform_json_stream() { jq -c '.' | while IFS= read -r obj; do # Process with bash local id id=$(echo "$obj" | jq -r '.id')
    # Enrich and output
    echo "$obj" | jq --arg ts "$(date -Iseconds)" '. + {timestamp: $ts}'
done
}
undefined
transform_json_stream() { jq -c '.' | while IFS= read -r obj; do # 使用Bash处理 local id id=$(echo "$obj" | jq -r '.id')
    # 丰富数据并输出
    echo "$obj" | jq --arg ts "$(date -Iseconds)" '. + {timestamp: $ts}'
done
}
undefined

Bash 5.3 In-Shell Substitution

Bash 5.3 进程内替换

No-Fork Command Substitution

无fork命令替换

bash
#!/usr/bin/env bash
bash
#!/usr/bin/env bash

Requires Bash 5.3+

要求Bash 5.3+

set -euo pipefail
set -euo pipefail

Traditional: forks subshell

传统方式:创建子shell

result=$(echo "hello")
result=$(echo "hello")

Bash 5.3: No fork, runs in current shell

Bash 5.3新特性:无fork,在当前shell中执行

result=${ echo "hello"; }
result=${ echo "hello"; }

Significant for variable modifications

对变量修改的影响显著

counter=0
counter=0

Traditional - counter stays 0 (subshell)

传统方式 - counter保持0(子shell中修改不生效)

result=$(counter=$((counter + 1)); echo "$counter") echo "Counter: $counter" # Still 0
result=$(counter=$((counter + 1)); echo "$counter") echo "计数器: $counter" # 仍为0

Bash 5.3 - counter is modified (same shell)

Bash 5.3方式 - counter被修改(当前shell中执行)

result=${ counter=$((counter + 1)); echo "$counter"; } echo "Counter: $counter" # Now 1
result=${ counter=$((counter + 1)); echo "$counter"; } echo "计数器: $counter" # 现在为1

REPLY variable syntax (even more concise)

REPLY变量语法(更简洁)

${ REPLY="computed value"; } echo "$REPLY"
${ REPLY="计算结果"; } echo "$REPLY"

Or using ${| } syntax

或使用${| }语法

${| REPLY=$(expensive_computation); } echo "Result: $REPLY"
undefined
${| REPLY=$(expensive_computation); } echo "结果: $REPLY"
undefined

Performance-Critical Pipelines

性能敏感型管道

bash
#!/usr/bin/env bash
bash
#!/usr/bin/env bash

Requires Bash 5.3+

要求Bash 5.3+

set -euo pipefail
set -euo pipefail

Build result without forks

无fork方式构建路径

build_path() { local parts=("$@") local result=""
for part in "${parts[@]}"; do
    # No fork for each concatenation
    result=${ printf '%s/%s' "$result" "$part"; }
done

echo "${result#/}"
}
build_path() { local parts=("$@") local result=""
for part in "${parts[@]}"; do
    # 每次拼接都不创建fork
    result=${ printf '%s/%s' "$result" "$part"; }
done

echo "${result#/}"
}

Accumulate values efficiently

高效累加数值

accumulate() { local -n arr="$1" local sum=0
for val in "${arr[@]}"; do
    # In-shell arithmetic capture
    sum=${ echo $((sum + val)); }
done

echo "$sum"
}
undefined
accumulate() { local -n arr="$1" local sum=0
for val in "${arr[@]}"; do
    # 进程内算术计算捕获结果
    sum=${ echo $((sum + val)); }
done

echo "$sum"
}
undefined

Error Handling in Pipelines

管道中的错误处理

Pipeline Error Detection

管道错误检测

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Check all pipeline stages

检查管道所有阶段的错误

run_pipeline() { local result
# pipefail ensures we catch errors in any stage
if ! result=$(stage1 | stage2 | stage3); then
    echo "Pipeline failed" >&2
    return 1
fi

echo "$result"
}
run_pipeline() { local result
# pipefail选项确保我们能捕获任意阶段的错误
if ! result=$(stage1 | stage2 | stage3); then
    echo "管道执行失败" >&2
    return 1
fi

echo "$result"
}

PIPESTATUS for detailed error info

使用PIPESTATUS获取详细错误信息

run_with_status() { cmd1 | cmd2 | cmd3
local -a status=("${PIPESTATUS[@]}")

for i in "${!status[@]}"; do
    if [[ "${status[$i]}" -ne 0 ]]; then
        echo "Stage $i failed with status ${status[$i]}" >&2
    fi
done

# Return highest exit status
local max=0
for s in "${status[@]}"; do
    ((s > max)) && max="$s"
done
return "$max"
}
undefined
run_with_status() { cmd1 | cmd2 | cmd3
local -a status=("${PIPESTATUS[@]}")

for i in "${!status[@]}"; do
    if [[ "${status[$i]}" -ne 0 ]]; then
        echo "第 $i 阶段执行失败,状态码 ${status[$i]}" >&2
    fi
done

# 返回最高的退出状态码
local max=0
for s in "${status[@]}"; do
    ((s > max)) && max="$s"
done
return "$max"
}
undefined

Cleanup on Pipeline Failure

管道失败时的清理

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Track resources for cleanup

跟踪需要清理的资源

declare -a CLEANUP_PIDS=() declare -a CLEANUP_FILES=()
cleanup() { local pid file
for pid in "${CLEANUP_PIDS[@]}"; do
    kill "$pid" 2>/dev/null || true
done

for file in "${CLEANUP_FILES[@]}"; do
    rm -f "$file" 2>/dev/null || true
done
}
trap cleanup EXIT
declare -a CLEANUP_PIDS=() declare -a CLEANUP_FILES=()
cleanup() { local pid file
for pid in "${CLEANUP_PIDS[@]}"; do
    kill "$pid" 2>/dev/null || true
done

for file in "${CLEANUP_FILES[@]}"; do
    rm -f "$file" 2>/dev/null || true
done
}
trap cleanup EXIT

Register cleanup

注册清理资源

register_pid() { CLEANUP_PIDS+=("$1"); } register_file() { CLEANUP_FILES+=("$1"); }
register_pid() { CLEANUP_PIDS+=("$1"); } register_file() { CLEANUP_FILES+=("$1"); }

Example usage

使用示例

run_safe_pipeline() { local fifo="/tmp/pipeline_$$" mkfifo "$fifo" register_file "$fifo"
producer > "$fifo" &
register_pid "$!"

consumer < "$fifo" &
register_pid "$!"

wait
}
undefined
run_safe_pipeline() { local fifo="/tmp/pipeline_$$" mkfifo "$fifo" register_file "$fifo"
producer > "$fifo" &
register_pid "$!"

consumer < "$fifo" &
register_pid "$!"

wait
}
undefined

Best Practices

最佳实践

FIFO Naming Convention

FIFO命名规范

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Include PID and descriptive name

包含PID和描述性名称

create_fifo() { local name="$1" local fifo="/tmp/${name}$$$(date +%s)" mkfifo -m 600 "$fifo" # Restrictive permissions echo "$fifo" }
create_fifo() { local name="$1" local fifo="/tmp/${name}$$$(date +%s)" mkfifo -m 600 "$fifo" # 限制权限 echo "$fifo" }

Use tmpdir for security

使用临时目录提升安全性

create_secure_fifo() { local name="$1" local tmpdir tmpdir=$(mktemp -d) local fifo="$tmpdir/$name" mkfifo -m 600 "$fifo" echo "$fifo" }
undefined
create_secure_fifo() { local name="$1" local tmpdir tmpdir=$(mktemp -d) local fifo="$tmpdir/$name" mkfifo -m 600 "$fifo" echo "$fifo" }
undefined

Preventing Deadlocks

避免死锁

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

✗ DEADLOCK - writer blocks, reader never starts

✗ 死锁示例 - 写入端阻塞,读取端从未启动

mkfifo pipe

mkfifo pipe

echo "data" > pipe # Blocks forever

echo "data" > pipe # 永久阻塞

✓ SAFE - open both ends or use background

✓ 安全方式 - 同时打开两端或使用后台进程

mkfifo pipe trap 'rm -f pipe' EXIT
mkfifo pipe trap 'rm -f pipe' EXIT

Option 1: Background writer

选项1:后台写入

echo "data" > pipe & cat < pipe
echo "data" > pipe & cat < pipe

Option 2: Open for read/write

选项2:以读写模式打开

exec 3<>pipe echo "data" >&3 read -r data <&3 exec 3>&-
exec 3<>pipe echo "data" >&3 read -r data <&3 exec 3>&-

Option 3: Non-blocking open (requires careful handling)

选项3:非阻塞打开(需谨慎处理)

exec 3<pipe & exec 4>pipe echo "data" >&4 read -r data <&3
undefined
exec 3<pipe & exec 4>pipe echo "data" >&4 read -r data <&3
undefined

Timeout Patterns

超时模式

bash
#!/usr/bin/env bash
set -euo pipefail
bash
#!/usr/bin/env bash
set -euo pipefail

Read with timeout

带超时的读取

read_with_timeout() { local fifo="$1" local timeout="$2" local result
if read -t "$timeout" -r result < "$fifo"; then
    echo "$result"
    return 0
else
    echo "Timeout after ${timeout}s" >&2
    return 1
fi
}
read_with_timeout() { local fifo="$1" local timeout="$2" local result
if read -t "$timeout" -r result < "$fifo"; then
    echo "$result"
    return 0
else
    echo "等待${timeout}秒后超时" >&2
    return 1
fi
}

Write with timeout (using timeout command)

带超时的写入(使用timeout命令)

write_with_timeout() { local fifo="$1" local timeout="$2" local data="$3"
if timeout "$timeout" bash -c "echo '$data' > '$fifo'"; then
    return 0
else
    echo "Write timeout after ${timeout}s" >&2
    return 1
fi
}
undefined
write_with_timeout() { local fifo="$1" local timeout="$2" local data="$3"
if timeout "$timeout" bash -c "echo '$data' > '$fifo'"; then
    return 0
else
    echo "写入${timeout}秒后超时" >&2
    return 1
fi
}
undefined

Resources

参考资源


Master process substitution and FIFOs for efficient inter-process communication without temporary files.

掌握进程替换和FIFO技术,无需临时文件即可实现高效的进程间通信。