streams
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseNode.js Streams Skill
Node.js Streams 技能指南
Master streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.
掌握Streams,实现大文件的内存高效处理、实时数据处理及可组合数据管道的构建。
Quick Start
快速入门
Streams in 4 types:
- Readable - Source of data (file, HTTP request)
- Writable - Destination (file, HTTP response)
- Transform - Modify data in transit
- Duplex - Both readable and writable
Streams分为4种类型:
- Readable - 数据源(文件、HTTP请求)
- Writable - 数据目的地(文件、HTTP响应)
- Transform - 在传输过程中修改数据
- Duplex - 同时支持可读和可写
Core Concepts
核心概念
Readable Stream
Readable 流
javascript
const fs = require('fs');
// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Event-based consumption
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readStream.on('end', () => {
console.log('Finished reading');
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});javascript
const fs = require('fs');
// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Event-based consumption
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readStream.on('end', () => {
console.log('Finished reading');
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});Writable Stream
Writable 流
javascript
const writeStream = fs.createWriteStream('output.txt');
// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end
// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
// Wait for drain event before writing more
writeStream.once('drain', () => {
continueWriting();
});
}javascript
const writeStream = fs.createWriteStream('output.txt');
// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end
// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
// Wait for drain event before writing more
writeStream.once('drain', () => {
continueWriting();
});
}Transform Stream
Transform 流
javascript
const { Transform } = require('stream');
// Custom transform: uppercase text
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Usage
fs.createReadStream('input.txt')
.pipe(upperCase)
.pipe(fs.createWriteStream('output.txt'));javascript
const { Transform } = require('stream');
// Custom transform: uppercase text
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Usage
fs.createReadStream('input.txt')
.pipe(upperCase)
.pipe(fs.createWriteStream('output.txt'));Learning Path
学习路径
Beginner (1-2 weeks)
入门阶段(1-2周)
- ✅ Understand stream types
- ✅ Read/write file streams
- ✅ Basic pipe operations
- ✅ Handle stream events
- ✅ 理解流的类型
- ✅ 读写文件流
- ✅ 基础管道操作
- ✅ 处理流事件
Intermediate (3-4 weeks)
进阶阶段(3-4周)
- ✅ Transform streams
- ✅ Backpressure handling
- ✅ Object mode streams
- ✅ Pipeline utility
- ✅ Transform流
- ✅ 背压处理
- ✅ 对象模式流
- ✅ Pipeline工具
Advanced (5-6 weeks)
高级阶段(5-6周)
- ✅ Custom stream implementation
- ✅ Async iterators
- ✅ Web Streams API
- ✅ Performance optimization
- ✅ 自定义流实现
- ✅ 异步迭代器
- ✅ Web Streams API
- ✅ 性能优化
Pipeline (Recommended)
Pipeline(推荐用法)
javascript
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Compose streams with error handling
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
}
// With transform
await pipeline(
fs.createReadStream('data.csv'),
csvParser(),
transformRow(),
jsonStringify(),
fs.createWriteStream('data.json')
);javascript
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Compose streams with error handling
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
}
// With transform
await pipeline(
fs.createReadStream('data.csv'),
csvParser(),
transformRow(),
jsonStringify(),
fs.createWriteStream('data.json')
);Pipeline with Error Handling
带错误处理的Pipeline
javascript
const { pipeline } = require('stream');
pipeline(
source,
transform1,
transform2,
destination,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);javascript
const { pipeline } = require('stream');
pipeline(
source,
transform1,
transform2,
destination,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);HTTP Streaming
HTTP 流处理
javascript
const http = require('http');
const fs = require('fs');
// Stream file as HTTP response
http.createServer((req, res) => {
const filePath = './video.mp4';
const stat = fs.statSync(filePath);
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': stat.size
});
// Stream instead of loading entire file
fs.createReadStream(filePath).pipe(res);
}).listen(3000);
// Stream HTTP request body
http.createServer((req, res) => {
const writeStream = fs.createWriteStream('./upload.bin');
req.pipe(writeStream);
req.on('end', () => {
res.end('Upload complete');
});
}).listen(3001);javascript
const http = require('http');
const fs = require('fs');
// Stream file as HTTP response
http.createServer((req, res) => {
const filePath = './video.mp4';
const stat = fs.statSync(filePath);
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': stat.size
});
// Stream instead of loading entire file
fs.createReadStream(filePath).pipe(res);
}).listen(3000);
// Stream HTTP request body
http.createServer((req, res) => {
const writeStream = fs.createWriteStream('./upload.bin');
req.pipe(writeStream);
req.on('end', () => {
res.end('Upload complete');
});
}).listen(3001);Object Mode Streams
对象模式流
javascript
const { Transform } = require('stream');
const jsonParser = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk);
this.push(obj);
callback();
} catch (err) {
callback(err);
}
}
});
// Process objects instead of buffers
const processRecords = new Transform({
objectMode: true,
transform(record, encoding, callback) {
record.processed = true;
record.timestamp = Date.now();
this.push(record);
callback();
}
});javascript
const { Transform } = require('stream');
const jsonParser = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk);
this.push(obj);
callback();
} catch (err) {
callback(err);
}
}
});
// Process objects instead of buffers
const processRecords = new Transform({
objectMode: true,
transform(record, encoding, callback) {
record.processed = true;
record.timestamp = Date.now();
this.push(record);
callback();
}
});Async Iterators
异步迭代器
javascript
const { Readable } = require('stream');
// Create from async iterator
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield { id: i, data: `item-${i}` };
}
}
const stream = Readable.from(generateData(), { objectMode: true });
// Consume with for-await
async function processStream(readable) {
for await (const chunk of readable) {
console.log('Processing:', chunk);
}
}javascript
const { Readable } = require('stream');
// Create from async iterator
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield { id: i, data: `item-${i}` };
}
}
const stream = Readable.from(generateData(), { objectMode: true });
// Consume with for-await
async function processStream(readable) {
for await (const chunk of readable) {
console.log('Processing:', chunk);
}
}Backpressure Handling
背压处理
javascript
const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
// Check if writable can accept more data
const canContinue = writable.write(chunk);
if (!canContinue) {
// Pause reading until writable is ready
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});
// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
if (err) console.error('Error:', err);
});javascript
const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
// Check if writable can accept more data
const canContinue = writable.write(chunk);
if (!canContinue) {
// Pause reading until writable is ready
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});
// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
if (err) console.error('Error:', err);
});Custom Readable Stream
自定义Readable流
javascript
const { Readable } = require('stream');
class DatabaseStream extends Readable {
constructor(query, options) {
super({ ...options, objectMode: true });
this.query = query;
this.cursor = null;
}
async _read() {
if (!this.cursor) {
this.cursor = await db.collection('items').find(this.query).cursor();
}
const doc = await this.cursor.next();
if (doc) {
this.push(doc);
} else {
this.push(null); // Signal end
}
}
}
// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
console.log(item);
}javascript
const { Readable } = require('stream');
class DatabaseStream extends Readable {
constructor(query, options) {
super({ ...options, objectMode: true });
this.query = query;
this.cursor = null;
}
async _read() {
if (!this.cursor) {
this.cursor = await db.collection('items').find(this.query).cursor();
}
const doc = await this.cursor.next();
if (doc) {
this.push(doc);
} else {
this.push(null); // Signal end
}
}
}
// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
console.log(item);
}Unit Test Template
单元测试模板
javascript
const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');
describe('Stream Processing', () => {
it('should transform data correctly', async () => {
const input = Readable.from(['hello', 'world']);
const chunks = [];
const upperCase = new Transform({
transform(chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
}
});
await pipeline(
input,
upperCase,
async function* (source) {
for await (const chunk of source) {
chunks.push(chunk.toString());
}
}
);
expect(chunks).toEqual(['HELLO', 'WORLD']);
});
});javascript
const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');
describe('Stream Processing', () => {
it('should transform data correctly', async () => {
const input = Readable.from(['hello', 'world']);
const chunks = [];
const upperCase = new Transform({
transform(chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
}
});
await pipeline(
input,
upperCase,
async function* (source) {
for await (const chunk of source) {
chunks.push(chunk.toString());
}
}
);
expect(chunks).toEqual(['HELLO', 'WORLD']);
});
});Troubleshooting
故障排除
| Problem | Cause | Solution |
|---|---|---|
| Memory grows infinitely | No backpressure | Use pipeline or handle drain |
| Data loss | Errors not caught | Use pipeline with error callback |
| Slow processing | Small chunk size | Increase highWaterMark |
| Stream hangs | Missing end() call | Call writable.end() |
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 内存无限增长 | 未处理背压 | 使用pipeline或处理drain事件 |
| 数据丢失 | 未捕获错误 | 使用带错误回调的pipeline |
| 处理速度慢 | 块大小过小 | 增大highWaterMark |
| 流挂起 | 未调用end() | 调用writable.end() |
When to Use
使用场景
Use streams when:
- Processing large files (GB+)
- Real-time data processing
- Memory-constrained environments
- Building data pipelines
- HTTP request/response handling
在以下场景使用Streams:
- 处理大文件(GB级以上)
- 实时数据处理
- 内存受限环境
- 构建数据管道
- HTTP请求/响应处理
Related Skills
相关技能
- Async Programming (async patterns)
- Performance Optimization (memory efficiency)
- Express REST API (streaming responses)
- 异步编程(异步模式)
- 性能优化(内存效率)
- Express REST API(流式响应)