dart-concurrency-isolates

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

dart-isolates-concurrency

Dart Isolates 并发处理

Goal

目标

Implements concurrent execution in Dart using isolates to offload heavy computations from the main thread. Manages both short-lived background tasks and long-lived bidirectional worker isolates while ensuring memory safety, proper port management, and strict resource cleanup. Assumes a Dart Native environment (isolates are not supported on Dart Web).
在Dart中使用Isolate实现并发执行,将繁重的计算任务从主线程转移出去。管理短期后台任务和长期双向工作者Isolate,同时确保内存安全、正确的端口管理和严格的资源清理。本文假设运行环境为Dart Native(Dart Web不支持Isolate)。

Instructions

操作步骤

1. Determine Isolate Strategy (Decision Logic)

1. 确定Isolate策略(决策逻辑)

Evaluate the user's concurrency requirements using the following decision tree:
  • Condition A: Is the task a single, one-off computation (e.g., parsing a single large JSON file)?
    • Action: Use
      Isolate.run()
      .
  • Condition B: Does the task require repeated executions, maintaining state, or streaming multiple messages over time?
    • Action: Use
      Isolate.spawn()
      with manual
      ReceivePort
      and
      SendPort
      management.
STOP AND ASK THE USER: "Are you executing a one-off background task, or do you need a long-running worker that handles multiple messages over time?"
使用以下决策树评估用户的并发需求:
  • 条件A: 任务是否为单次、一次性计算(例如,解析单个大型JSON文件)?
    • 操作: 使用
      Isolate.run()
  • 条件B: 任务是否需要重复执行、保持状态,或者随时间流式传输多条消息?
    • 操作: 使用
      Isolate.spawn()
      并手动管理
      ReceivePort
      SendPort
请询问用户: “您要执行的是一次性后台任务,还是需要一个可长期运行、能处理多条消息的工作者?”

2. Implement Short-Lived Isolates (
Isolate.run
)

2. 实现短期Isolate(
Isolate.run

For simple background tasks, use
Isolate.run()
to automatically handle spawning, message transfer, error handling, and termination.
dart
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';

Future<Map<String, dynamic>> parseLargeJson(String filePath) async {
  // Isolate.run spawns the isolate, runs the closure, returns the result, and exits.
  return await Isolate.run(() async {
    final fileData = await File(filePath).readAsString();
    return jsonDecode(fileData) as Map<String, dynamic>;
  });
}
对于简单的后台任务,使用
Isolate.run()
自动处理Isolate的启动、消息传递、错误处理和终止。
dart
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';

Future<Map<String, dynamic>> parseLargeJson(String filePath) async {
  // Isolate.run spawns the isolate, runs the closure, returns the result, and exits.
  return await Isolate.run(() async {
    final fileData = await File(filePath).readAsString();
    return jsonDecode(fileData) as Map<String, dynamic>;
  });
}

3. Implement Long-Lived Isolates (
Isolate.spawn
)

3. 实现长期Isolate(
Isolate.spawn

For complex workers, establish a robust 2-way communication channel. You must implement message sequencing (using IDs), error handling (
RemoteError
), and lifecycle management.
Step 3a: Define the Worker Class and Spawning Logic Use a
RawReceivePort
to separate startup logic from message handling.
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

class BackgroundWorker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  BackgroundWorker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  static Future<BackgroundWorker> spawn() async {
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    try {
      await Isolate.spawn(_startRemoteIsolate, initPort.sendPort);
    } catch (e) {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) = await connection.future;
    return BackgroundWorker._(receivePort, sendPort);
  }
Step 3b: Implement the Remote Isolate Entrypoint Define the static method that runs on the spawned isolate.
dart
  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      
      final (int id, String payload) = message as (int, String);
      try {
        // Perform heavy computation here
        final result = jsonDecode(payload);
        sendPort.send((id, result));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }
Step 3c: Implement Message Passing and Response Handling Map outgoing messages to
Completer
instances using unique IDs.
dart
  Future<Object?> executeTask(String payload) async {
    if (_closed) throw StateError('Worker is closed');
    
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    
    _commands.send((id, payload));
    return await completer.future;
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id);
    
    if (completer == null) return;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) {
      _responses.close();
    }
  }
Step 3d: Implement Graceful Shutdown Ensure ports are closed to prevent memory leaks.
dart
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) {
        _responses.close();
      }
    }
  }
}
对于复杂的工作者,建立可靠的双向通信通道。您必须实现消息排序(使用ID)、错误处理(
RemoteError
)和生命周期管理。
步骤3a:定义工作者类和启动逻辑 使用
RawReceivePort
将启动逻辑与消息处理分离。
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

class BackgroundWorker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  BackgroundWorker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  static Future<BackgroundWorker> spawn() async {
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    try {
      await Isolate.spawn(_startRemoteIsolate, initPort.sendPort);
    } catch (e) {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) = await connection.future;
    return BackgroundWorker._(receivePort, sendPort);
  }
步骤3b:实现远程Isolate入口点 定义在生成的Isolate上运行的静态方法。
dart
  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      
      final (int id, String payload) = message as (int, String);
      try {
        // Perform heavy computation here
        final result = jsonDecode(payload);
        sendPort.send((id, result));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }
步骤3c:实现消息传递和响应处理 使用唯一ID将传出消息映射到
Completer
实例。
dart
  Future<Object?> executeTask(String payload) async {
    if (_closed) throw StateError('Worker is closed');
    
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    
    _commands.send((id, payload));
    return await completer.future;
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id);
    
    if (completer == null) return;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) {
      _responses.close();
    }
  }
步骤3d:实现优雅关闭 确保关闭端口以防止内存泄漏。
dart
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) {
        _responses.close();
      }
    }
  }
}

4. Validate and Fix

4. 验证与修复

After generating isolate code, perform the following validation loop:
  1. Check Port Closure: Verify that
    ReceivePort.close()
    is called in both the main isolate and the worker isolate during shutdown.
  2. Check Payload Types: Ensure the data being sent through
    SendPort.send()
    does not contain native resources (e.g.,
    Socket
    ,
    Pointer
    ,
    ReceivePort
    ).
  3. Fix: If native resources are being passed, refactor the payload to extract and send only primitive data types or serializable maps/records.
生成Isolate代码后,执行以下验证循环:
  1. 检查端口关闭: 验证在关闭过程中,主线程Isolate和工作者Isolate都调用了
    ReceivePort.close()
  2. 检查负载类型: 确保通过
    SendPort.send()
    发送的数据不包含原生资源(例如
    Socket
    Pointer
    ReceivePort
    )。
  3. 修复: 如果传递了原生资源,重构负载以提取并仅发送原始数据类型或可序列化的映射/记录。

Constraints

约束条件

  • DO use
    Isolate.run()
    for simple one-off background tasks.
  • DO manually manage
    SendPort
    and
    ReceivePort
    for complex, long-running background workers.
  • AVOID passing large mutable objects between isolates; prefer simple data types and records.
  • DO ensure isolates are terminated when no longer needed to free resources.
  • DO NOT attempt to use isolates on the Dart Web platform (use web workers instead).
  • DO NOT use shared-state concurrency patterns (e.g., mutexes); isolates communicate strictly via message passing.
  • Related Skills:
    dart-async-programming
    .
  • 对于简单的一次性后台任务,请使用
    Isolate.run()
  • 对于复杂的长期后台工作者,请手动管理
    SendPort
    ReceivePort
  • 避免在Isolate之间传递大型可变对象;优先使用简单数据类型和记录。
  • 请确保在不再需要Isolate时终止它们以释放资源。
  • 请勿尝试在Dart Web平台上使用Isolate(请改用Web Worker)。
  • 请勿使用共享状态并发模式(例如互斥锁);Isolate严格通过消息传递进行通信。
  • 相关技能:
    dart-async-programming