dart-concurrency-isolates
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesedart-isolates-concurrency
Dart Isolates 并发处理
Goal
目标
Implements concurrent execution in Dart using isolates to offload heavy computations from the main thread. Manages both short-lived background tasks and long-lived bidirectional worker isolates while ensuring memory safety, proper port management, and strict resource cleanup. Assumes a Dart Native environment (isolates are not supported on Dart Web).
在Dart中使用Isolate实现并发执行,将繁重的计算任务从主线程转移出去。管理短期后台任务和长期双向工作者Isolate,同时确保内存安全、正确的端口管理和严格的资源清理。本文假设运行环境为Dart Native(Dart Web不支持Isolate)。
Instructions
操作步骤
1. Determine Isolate Strategy (Decision Logic)
1. 确定Isolate策略(决策逻辑)
Evaluate the user's concurrency requirements using the following decision tree:
- Condition A: Is the task a single, one-off computation (e.g., parsing a single large JSON file)?
- Action: Use .
Isolate.run()
- Action: Use
- Condition B: Does the task require repeated executions, maintaining state, or streaming multiple messages over time?
- Action: Use with manual
Isolate.spawn()andReceivePortmanagement.SendPort
- Action: Use
STOP AND ASK THE USER: "Are you executing a one-off background task, or do you need a long-running worker that handles multiple messages over time?"
使用以下决策树评估用户的并发需求:
- 条件A: 任务是否为单次、一次性计算(例如,解析单个大型JSON文件)?
- 操作: 使用。
Isolate.run()
- 操作: 使用
- 条件B: 任务是否需要重复执行、保持状态,或者随时间流式传输多条消息?
- 操作: 使用并手动管理
Isolate.spawn()和ReceivePort。SendPort
- 操作: 使用
请询问用户: “您要执行的是一次性后台任务,还是需要一个可长期运行、能处理多条消息的工作者?”
2. Implement Short-Lived Isolates (Isolate.run
)
Isolate.run2. 实现短期Isolate(Isolate.run
)
Isolate.runFor simple background tasks, use to automatically handle spawning, message transfer, error handling, and termination.
Isolate.run()dart
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
Future<Map<String, dynamic>> parseLargeJson(String filePath) async {
// Isolate.run spawns the isolate, runs the closure, returns the result, and exits.
return await Isolate.run(() async {
final fileData = await File(filePath).readAsString();
return jsonDecode(fileData) as Map<String, dynamic>;
});
}对于简单的后台任务,使用自动处理Isolate的启动、消息传递、错误处理和终止。
Isolate.run()dart
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
Future<Map<String, dynamic>> parseLargeJson(String filePath) async {
// Isolate.run spawns the isolate, runs the closure, returns the result, and exits.
return await Isolate.run(() async {
final fileData = await File(filePath).readAsString();
return jsonDecode(fileData) as Map<String, dynamic>;
});
}3. Implement Long-Lived Isolates (Isolate.spawn
)
Isolate.spawn3. 实现长期Isolate(Isolate.spawn
)
Isolate.spawnFor complex workers, establish a robust 2-way communication channel. You must implement message sequencing (using IDs), error handling (), and lifecycle management.
RemoteErrorStep 3a: Define the Worker Class and Spawning Logic
Use a to separate startup logic from message handling.
RawReceivePortdart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
class BackgroundWorker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;
BackgroundWorker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
static Future<BackgroundWorker> spawn() async {
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
try {
await Isolate.spawn(_startRemoteIsolate, initPort.sendPort);
} catch (e) {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) = await connection.future;
return BackgroundWorker._(receivePort, sendPort);
}Step 3b: Implement the Remote Isolate Entrypoint
Define the static method that runs on the spawned isolate.
dart
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
receivePort.listen((message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String payload) = message as (int, String);
try {
// Perform heavy computation here
final result = jsonDecode(payload);
sendPort.send((id, result));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}Step 3c: Implement Message Passing and Response Handling
Map outgoing messages to instances using unique IDs.
Completerdart
Future<Object?> executeTask(String payload) async {
if (_closed) throw StateError('Worker is closed');
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, payload));
return await completer.future;
}
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
final completer = _activeRequests.remove(id);
if (completer == null) return;
if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}
if (_closed && _activeRequests.isEmpty) {
_responses.close();
}
}Step 3d: Implement Graceful Shutdown
Ensure ports are closed to prevent memory leaks.
dart
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) {
_responses.close();
}
}
}
}对于复杂的工作者,建立可靠的双向通信通道。您必须实现消息排序(使用ID)、错误处理()和生命周期管理。
RemoteError步骤3a:定义工作者类和启动逻辑
使用将启动逻辑与消息处理分离。
RawReceivePortdart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
class BackgroundWorker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;
BackgroundWorker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
static Future<BackgroundWorker> spawn() async {
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
try {
await Isolate.spawn(_startRemoteIsolate, initPort.sendPort);
} catch (e) {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) = await connection.future;
return BackgroundWorker._(receivePort, sendPort);
}步骤3b:实现远程Isolate入口点
定义在生成的Isolate上运行的静态方法。
dart
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
receivePort.listen((message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String payload) = message as (int, String);
try {
// Perform heavy computation here
final result = jsonDecode(payload);
sendPort.send((id, result));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}步骤3c:实现消息传递和响应处理
使用唯一ID将传出消息映射到实例。
Completerdart
Future<Object?> executeTask(String payload) async {
if (_closed) throw StateError('Worker is closed');
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, payload));
return await completer.future;
}
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
final completer = _activeRequests.remove(id);
if (completer == null) return;
if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}
if (_closed && _activeRequests.isEmpty) {
_responses.close();
}
}步骤3d:实现优雅关闭
确保关闭端口以防止内存泄漏。
dart
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) {
_responses.close();
}
}
}
}4. Validate and Fix
4. 验证与修复
After generating isolate code, perform the following validation loop:
- Check Port Closure: Verify that is called in both the main isolate and the worker isolate during shutdown.
ReceivePort.close() - Check Payload Types: Ensure the data being sent through does not contain native resources (e.g.,
SendPort.send(),Socket,Pointer).ReceivePort - Fix: If native resources are being passed, refactor the payload to extract and send only primitive data types or serializable maps/records.
生成Isolate代码后,执行以下验证循环:
- 检查端口关闭: 验证在关闭过程中,主线程Isolate和工作者Isolate都调用了。
ReceivePort.close() - 检查负载类型: 确保通过发送的数据不包含原生资源(例如
SendPort.send()、Socket、Pointer)。ReceivePort - 修复: 如果传递了原生资源,重构负载以提取并仅发送原始数据类型或可序列化的映射/记录。
Constraints
约束条件
- DO use for simple one-off background tasks.
Isolate.run() - DO manually manage and
SendPortfor complex, long-running background workers.ReceivePort - AVOID passing large mutable objects between isolates; prefer simple data types and records.
- DO ensure isolates are terminated when no longer needed to free resources.
- DO NOT attempt to use isolates on the Dart Web platform (use web workers instead).
- DO NOT use shared-state concurrency patterns (e.g., mutexes); isolates communicate strictly via message passing.
- Related Skills: .
dart-async-programming
- 对于简单的一次性后台任务,请使用。
Isolate.run() - 对于复杂的长期后台工作者,请手动管理和
SendPort。ReceivePort - 避免在Isolate之间传递大型可变对象;优先使用简单数据类型和记录。
- 请确保在不再需要Isolate时终止它们以释放资源。
- 请勿尝试在Dart Web平台上使用Isolate(请改用Web Worker)。
- 请勿使用共享状态并发模式(例如互斥锁);Isolate严格通过消息传递进行通信。
- 相关技能:。
dart-async-programming