dask
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDask
Dask
Overview
概述
Dask is a Python library for parallel and distributed computing that enables three critical capabilities:
- Larger-than-memory execution on single machines for data exceeding available RAM
- Parallel processing for improved computational speed across multiple cores
- Distributed computation supporting terabyte-scale datasets across multiple machines
Dask scales from laptops (processing ~100 GiB) to clusters (processing ~100 TiB) while maintaining familiar Python APIs.
Dask是一个用于并行和分布式计算的Python库,提供三项关键功能:
- 超内存执行:在单台机器上处理超出可用RAM的数据
- 并行处理:通过多核提升计算速度
- 分布式计算:支持跨多台机器处理TB级数据集
Dask可以从笔记本电脑(处理约100 GiB数据)扩展到集群(处理约100 TiB数据),同时保持熟悉的Python API。
When to Use This Skill
何时使用Dask
This skill should be used when:
- Process datasets that exceed available RAM
- Scale pandas or NumPy operations to larger datasets
- Parallelize computations for performance improvements
- Process multiple files efficiently (CSVs, Parquet, JSON, text logs)
- Build custom parallel workflows with task dependencies
- Distribute workloads across multiple cores or machines
在以下场景中应使用Dask:
- 处理超出可用RAM的数据集
- 将pandas或NumPy操作扩展到更大的数据集
- 并行化计算以提升性能
- 高效处理多个文件(CSV、Parquet、JSON、文本日志)
- 构建带有任务依赖的自定义并行工作流
- 将工作负载分配到多个核心或机器上
Core Capabilities
核心功能
Dask provides five main components, each suited to different use cases:
Dask提供五个主要组件,分别适用于不同的使用场景:
1. DataFrames - Parallel Pandas Operations
1. DataFrames - 并行Pandas操作
Purpose: Scale pandas operations to larger datasets through parallel processing.
When to Use:
- Tabular data exceeds available RAM
- Need to process multiple CSV/Parquet files together
- Pandas operations are slow and need parallelization
- Scaling from pandas prototype to production
Reference Documentation: For comprehensive guidance on Dask DataFrames, refer to which includes:
references/dataframes.md- Reading data (single files, multiple files, glob patterns)
- Common operations (filtering, groupby, joins, aggregations)
- Custom operations with
map_partitions - Performance optimization tips
- Common patterns (ETL, time series, multi-file processing)
Quick Example:
python
import dask.dataframe as dd用途:通过并行处理将pandas操作扩展到更大的数据集。
适用场景:
- 表格数据超出可用RAM
- 需要将多个CSV/Parquet文件一起处理
- Pandas操作速度慢,需要并行化
- 从pandas原型扩展到生产环境
参考文档:有关Dask DataFrames的全面指南,请参考,其中包括:
references/dataframes.md- 读取数据(单个文件、多个文件、通配符模式)
- 常见操作(过滤、分组、连接、聚合)
- 使用进行自定义操作
map_partitions - 性能优化技巧
- 常见模式(ETL、时间序列、多文件处理)
快速示例:
python
import dask.dataframe as ddRead multiple files as single DataFrame
读取多个文件为单个DataFrame
ddf = dd.read_csv('data/2024-*.csv')
ddf = dd.read_csv('data/2024-*.csv')
Operations are lazy until compute()
操作是惰性的,直到调用compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
**Key Points**:
- Operations are lazy (build task graph) until `.compute()` called
- Use `map_partitions` for efficient custom operations
- Convert to DataFrame early when working with structured data from other sourcesfiltered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
**关键点**:
- 操作是惰性的(构建任务图),直到调用`.compute()`
- 使用`map_partitions`进行高效的自定义操作
- 当处理来自其他来源的结构化数据时,尽早转换为DataFrame2. Arrays - Parallel NumPy Operations
2. Arrays - 并行NumPy操作
Purpose: Extend NumPy capabilities to datasets larger than memory using blocked algorithms.
When to Use:
- Arrays exceed available RAM
- NumPy operations need parallelization
- Working with scientific datasets (HDF5, Zarr, NetCDF)
- Need parallel linear algebra or array operations
Reference Documentation: For comprehensive guidance on Dask Arrays, refer to which includes:
references/arrays.md- Creating arrays (from NumPy, random, from disk)
- Chunking strategies and optimization
- Common operations (arithmetic, reductions, linear algebra)
- Custom operations with
map_blocks - Integration with HDF5, Zarr, and XArray
Quick Example:
python
import dask.array as da用途:使用分块算法将NumPy功能扩展到超出内存的数据集。
适用场景:
- 数组超出可用RAM
- NumPy操作需要并行化
- 处理科学数据集(HDF5、Zarr、NetCDF)
- 需要并行线性代数或数组操作
参考文档:有关Dask Arrays的全面指南,请参考,其中包括:
references/arrays.md- 创建数组(从NumPy、随机生成、从磁盘读取)
- 分块策略与优化
- 常见操作(算术运算、归约、线性代数)
- 使用进行自定义操作
map_blocks - 与HDF5、Zarr和XArray的集成
快速示例:
python
import dask.array as daCreate large array with chunks
创建带有分块的大型数组
x = da.random.random((100000, 100000), chunks=(10000, 10000))
x = da.random.random((100000, 100000), chunks=(10000, 10000))
Operations are lazy
操作是惰性的
y = x + 100
z = y.mean(axis=0)
y = x + 100
z = y.mean(axis=0)
Compute result
计算结果
result = z.compute()
**Key Points**:
- Chunk size is critical (aim for ~100 MB per chunk)
- Operations work on chunks in parallel
- Rechunk data when needed for efficient operations
- Use `map_blocks` for operations not available in Daskresult = z.compute()
**关键点**:
- 分块大小至关重要(目标为每个分块约100 MB)
- 操作在分块上并行执行
- 必要时重新分块以实现高效操作
- 对Dask不支持的操作使用`map_blocks`3. Bags - Parallel Processing of Unstructured Data
3. Bags - 非结构化数据并行处理
Purpose: Process unstructured or semi-structured data (text, JSON, logs) with functional operations.
When to Use:
- Processing text files, logs, or JSON records
- Data cleaning and ETL before structured analysis
- Working with Python objects that don't fit array/dataframe formats
- Need memory-efficient streaming processing
Reference Documentation: For comprehensive guidance on Dask Bags, refer to which includes:
references/bags.md- Reading text and JSON files
- Functional operations (map, filter, fold, groupby)
- Converting to DataFrames
- Common patterns (log analysis, JSON processing, text processing)
- Performance considerations
Quick Example:
python
import dask.bag as db
import json用途:使用函数式操作处理非结构化或半结构化数据(文本、JSON、日志)。
适用场景:
- 处理文本文件、日志或JSON记录
- 在结构化分析前进行数据清洗和ETL
- 处理不适合数组/数据框格式的Python对象
- 需要内存高效的流处理
参考文档:有关Dask Bags的全面指南,请参考,其中包括:
references/bags.md- 读取文本和JSON文件
- 函数式操作(map、filter、fold、groupby)
- 转换为DataFrames
- 常见模式(日志分析、JSON处理、文本处理)
- 性能注意事项
快速示例:
python
import dask.bag as db
import jsonRead and parse JSON files
读取并解析JSON文件
bag = db.read_text('logs/*.json').map(json.loads)
bag = db.read_text('logs/*.json').map(json.loads)
Filter and transform
过滤和转换
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
Convert to DataFrame for analysis
转换为DataFrame进行分析
ddf = processed.to_dataframe()
**Key Points**:
- Use for initial data cleaning, then convert to DataFrame/Array
- Use `foldby` instead of `groupby` for better performance
- Operations are streaming and memory-efficient
- Convert to structured formats (DataFrame) for complex operationsddf = processed.to_dataframe()
**关键点**:
- 用于初始数据清洗,然后转换为DataFrame/Array
- 使用`foldby`而非`groupby`以获得更好的性能
- 操作是流式的且内存高效
- 转换为结构化格式(DataFrame)以进行复杂操作4. Futures - Task-Based Parallelization
4. Futures - 基于任务的并行化
Purpose: Build custom parallel workflows with fine-grained control over task execution and dependencies.
When to Use:
- Building dynamic, evolving workflows
- Need immediate task execution (not lazy)
- Computations depend on runtime conditions
- Implementing custom parallel algorithms
- Need stateful computations
Reference Documentation: For comprehensive guidance on Dask Futures, refer to which includes:
references/futures.md- Setting up distributed client
- Submitting tasks and working with futures
- Task dependencies and data movement
- Advanced coordination (queues, locks, events, actors)
- Common patterns (parameter sweeps, dynamic tasks, iterative algorithms)
Quick Example:
python
from dask.distributed import Client
client = Client() # Create local cluster用途:构建对任务执行和依赖关系有细粒度控制的自定义并行工作流。
适用场景:
- 构建动态、演进的工作流
- 需要立即执行任务(非惰性)
- 计算依赖于运行时条件
- 实现自定义并行算法
- 需要有状态的计算
参考文档:有关Dask Futures的全面指南,请参考,其中包括:
references/futures.md- 设置分布式客户端
- 提交任务并处理futures
- 任务依赖与数据移动
- 高级协调(队列、锁、事件、角色)
- 常见模式(参数扫描、动态任务、迭代算法)
快速示例:
python
from dask.distributed import Client
client = Client() # 创建本地集群Submit tasks (executes immediately)
提交任务(立即执行)
def process(x):
return x ** 2
futures = client.map(process, range(100))
def process(x):
return x ** 2
futures = client.map(process, range(100))
Gather results
收集结果
results = client.gather(futures)
client.close()
**Key Points**:
- Requires distributed client (even for single machine)
- Tasks execute immediately when submitted
- Pre-scatter large data to avoid repeated transfers
- ~1ms overhead per task (not suitable for millions of tiny tasks)
- Use actors for stateful workflowsresults = client.gather(futures)
client.close()
**关键点**:
- 需要分布式客户端(即使是单台机器)
- 任务提交后立即执行
- 预分散大型数据以避免重复传输
- 每个任务约1ms的开销(不适合数百万个微小任务)
- 使用角色实现有状态工作流5. Schedulers - Execution Backends
5. Schedulers - 执行后端
Purpose: Control how and where Dask tasks execute (threads, processes, distributed).
When to Choose Scheduler:
- Threads (default): NumPy/Pandas operations, GIL-releasing libraries, shared memory benefit
- Processes: Pure Python code, text processing, GIL-bound operations
- Synchronous: Debugging with pdb, profiling, understanding errors
- Distributed: Need dashboard, multi-machine clusters, advanced features
Reference Documentation: For comprehensive guidance on Dask Schedulers, refer to which includes:
references/schedulers.md- Detailed scheduler descriptions and characteristics
- Configuration methods (global, context manager, per-compute)
- Performance considerations and overhead
- Common patterns and troubleshooting
- Thread configuration for optimal performance
Quick Example:
python
import dask
import dask.dataframe as dd用途:控制Dask任务的执行方式和位置(线程、进程、分布式)。
调度器选择场景:
- 线程(默认):NumPy/Pandas操作、释放GIL的库、共享内存优势
- 进程:纯Python代码、文本处理、受GIL限制的操作
- 同步:使用pdb调试、性能分析、排查错误
- 分布式:需要仪表盘、多机器集群、高级功能
参考文档:有关Dask调度器的全面指南,请参考,其中包括:
references/schedulers.md- 详细的调度器描述和特性
- 配置方法(全局、上下文管理器、按计算配置)
- 性能注意事项与开销
- 常见模式与故障排除
- 线程配置以实现最佳性能
快速示例:
python
import dask
import dask.dataframe as ddUse threads for DataFrame (default, good for numeric)
对DataFrame使用线程(默认,适合数值计算)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # Uses threads
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # 使用线程
Use processes for Python-heavy work
对Python密集型工作使用进程
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
Use synchronous for debugging
使用同步调度器进行调试
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # Can use pdb
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # 可以使用pdb
Use distributed for monitoring and scaling
使用分布式调度器进行监控和扩展
from dask.distributed import Client
client = Client()
result4 = computation.compute() # Uses distributed with dashboard
**Key Points**:
- Threads: Lowest overhead (~10 µs/task), best for numeric work
- Processes: Avoids GIL (~10 ms/task), best for Python work
- Distributed: Monitoring dashboard (~1 ms/task), scales to clusters
- Can switch schedulers per computation or globallyfrom dask.distributed import Client
client = Client()
result4 = computation.compute() # 使用带有仪表盘的分布式调度器
**关键点**:
- 线程:开销最低(约10微秒/任务),最适合数值工作
- 进程:避免GIL(约10毫秒/任务),最适合Python工作
- 分布式:带有监控仪表盘(约1毫秒/任务),可扩展到集群
- 可以按计算或全局切换调度器Best Practices
最佳实践
For comprehensive performance optimization guidance, memory management strategies, and common pitfalls to avoid, refer to . Key principles include:
references/best-practices.md有关全面的性能优化指南、内存管理策略和需要避免的常见陷阱,请参考。核心原则包括:
references/best-practices.mdStart with Simpler Solutions
从简单方案开始
Before using Dask, explore:
- Better algorithms
- Efficient file formats (Parquet instead of CSV)
- Compiled code (Numba, Cython)
- Data sampling
在使用Dask之前,先尝试:
- 更优的算法
- 高效的文件格式(用Parquet替代CSV)
- 编译代码(Numba、Cython)
- 数据采样
Critical Performance Rules
关键性能规则
1. Don't Load Data Locally Then Hand to Dask
python
undefined1. 不要先本地加载数据再交给Dask
python
undefinedWrong: Loads all data in memory first
错误:先将所有数据加载到内存
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
Correct: Let Dask handle loading
正确:让Dask处理加载
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
**2. Avoid Repeated compute() Calls**
```pythonimport dask.dataframe as dd
ddf = dd.read_csv('large.csv')
**2. 避免重复调用compute()**
```pythonWrong: Each compute is separate
错误:每次compute都是独立的
for item in items:
result = dask_computation(item).compute()
for item in items:
result = dask_computation(item).compute()
Correct: Single compute for all
正确:一次性计算所有任务
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
**3. Don't Build Excessively Large Task Graphs**
- Increase chunk sizes if millions of tasks
- Use `map_partitions`/`map_blocks` to fuse operations
- Check task graph size: `len(ddf.__dask_graph__())`
**4. Choose Appropriate Chunk Sizes**
- Target: ~100 MB per chunk (or 10 chunks per core in worker memory)
- Too large: Memory overflow
- Too small: Scheduling overhead
**5. Use the Dashboard**
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # Monitor performance, identify bottleneckscomputations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
**3. 不要构建过大的任务图**
- 如果任务数以百万计,增加分块大小
- 使用`map_partitions`/`map_blocks`融合操作
- 检查任务图大小:`len(ddf.__dask_graph__())`
**4. 选择合适的分块大小**
- 目标:每个分块约100 MB(或工作内存中每个核心对应10个分块)
- 过大:内存溢出
- 过小:调度开销大
**5. 使用仪表盘**
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # 监控性能,识别瓶颈Common Workflow Patterns
常见工作流模式
ETL Pipeline
ETL流水线
python
import dask.dataframe as ddpython
import dask.dataframe as ddExtract: Read data
提取:读取数据
ddf = dd.read_csv('raw_data/*.csv')
ddf = dd.read_csv('raw_data/*.csv')
Transform: Clean and process
转换:清洗和处理
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
Load: Aggregate and save
加载:聚合并保存
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
undefinedsummary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
undefinedUnstructured to Structured Pipeline
非结构化到结构化流水线
python
import dask.bag as db
import jsonpython
import dask.bag as db
import jsonStart with Bag for unstructured data
用Bag处理非结构化数据
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
Convert to DataFrame for structured analysis
转换为DataFrame进行结构化分析
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
undefinedddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
undefinedLarge-Scale Array Computation
大规模数组计算
python
import dask.array as dapython
import dask.array as daLoad or create large array
加载或创建大型数组
x = da.from_zarr('large_dataset.zarr')
x = da.from_zarr('large_dataset.zarr')
Process in chunks
分块处理
normalized = (x - x.mean()) / x.std()
normalized = (x - x.mean()) / x.std()
Save result
保存结果
da.to_zarr(normalized, 'normalized.zarr')
undefinedda.to_zarr(normalized, 'normalized.zarr')
undefinedCustom Parallel Workflow
自定义并行工作流
python
from dask.distributed import Client
client = Client()python
from dask.distributed import Client
client = Client()Scatter large dataset once
一次性分散大型数据集
data = client.scatter(large_dataset)
data = client.scatter(large_dataset)
Process in parallel with dependencies
并行处理带有依赖的任务
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
Gather results
收集结果
results = client.gather(futures)
undefinedresults = client.gather(futures)
undefinedSelecting the Right Component
选择合适的组件
Use this decision guide to choose the appropriate Dask component:
Data Type:
- Tabular data → DataFrames
- Numeric arrays → Arrays
- Text/JSON/logs → Bags (then convert to DataFrame)
- Custom Python objects → Bags or Futures
Operation Type:
- Standard pandas operations → DataFrames
- Standard NumPy operations → Arrays
- Custom parallel tasks → Futures
- Text processing/ETL → Bags
Control Level:
- High-level, automatic → DataFrames/Arrays
- Low-level, manual → Futures
Workflow Type:
- Static computation graph → DataFrames/Arrays/Bags
- Dynamic, evolving → Futures
使用以下决策指南选择合适的Dask组件:
数据类型:
- 表格数据 → DataFrames
- 数值数组 → Arrays
- 文本/JSON/日志 → Bags(然后转换为DataFrame)
- 自定义Python对象 → Bags或Futures
操作类型:
- 标准pandas操作 → DataFrames
- 标准NumPy操作 → Arrays
- 自定义并行任务 → Futures
- 文本处理/ETL → Bags
控制级别:
- 高级、自动 → DataFrames/Arrays
- 低级、手动 → Futures
工作流类型:
- 静态计算图 → DataFrames/Arrays/Bags
- 动态、演进 → Futures
Integration Considerations
集成注意事项
File Formats
文件格式
- Efficient: Parquet, HDF5, Zarr (columnar, compressed, parallel-friendly)
- Compatible but slower: CSV (use for initial ingestion only)
- For Arrays: HDF5, Zarr, NetCDF
- 高效格式:Parquet、HDF5、Zarr(列式、压缩、并行友好)
- 兼容但较慢:CSV(仅用于初始导入)
- 数组适用:HDF5、Zarr、NetCDF
Conversion Between Collections
集合间的转换
python
undefinedpython
undefinedBag → DataFrame
Bag → DataFrame
ddf = bag.to_dataframe()
ddf = bag.to_dataframe()
DataFrame → Array (for numeric data)
DataFrame → Array(适用于数值数据)
arr = ddf.to_dask_array(lengths=True)
arr = ddf.to_dask_array(lengths=True)
Array → DataFrame
Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
undefinedddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
undefinedWith Other Libraries
与其他库集成
- XArray: Wraps Dask arrays with labeled dimensions (geospatial, imaging)
- Dask-ML: Machine learning with scikit-learn compatible APIs
- Distributed: Advanced cluster management and monitoring
- XArray:用带标签的维度包装Dask数组(地理空间、成像)
- Dask-ML:提供与scikit-learn兼容的API的机器学习库
- Distributed:高级集群管理与监控
Debugging and Development
调试与开发
Iterative Development Workflow
迭代开发工作流
- Test on small data with synchronous scheduler:
python
dask.config.set(scheduler='synchronous')
result = computation.compute() # Can use pdb, easy debugging- Validate with threads on sample:
python
sample = ddf.head(1000) # Small sample- 使用同步调度器在小数据上测试:
python
dask.config.set(scheduler='synchronous')
result = computation.compute() # 可以使用pdb,便于调试- 用线程在样本数据上验证:
python
sample = ddf.head(1000) # 小样本Test logic, then scale to full dataset
测试逻辑,然后扩展到完整数据集
3. **Scale with distributed for monitoring**:
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # Monitor performance
result = computation.compute()
3. **用分布式调度器扩展并监控**:
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # 监控性能
result = computation.compute()Common Issues
常见问题
Memory Errors:
- Decrease chunk sizes
- Use strategically and delete when done
persist() - Check for memory leaks in custom functions
Slow Start:
- Task graph too large (increase chunk sizes)
- Use or
map_partitionsto reduce tasksmap_blocks
Poor Parallelization:
- Chunks too large (increase number of partitions)
- Using threads with Python code (switch to processes)
- Data dependencies preventing parallelism
内存错误:
- 减小分块大小
- 策略性地使用,使用后删除
persist() - 检查自定义函数中的内存泄漏
启动缓慢:
- 任务图过大(增加分块大小)
- 使用或
map_partitions减少任务数map_blocks
并行化效果差:
- 分块过大(增加分区数)
- 对Python代码使用线程(切换到进程)
- 数据依赖导致无法并行
Reference Files
参考文件
All reference documentation files can be read as needed for detailed information:
- - Complete Dask DataFrame guide
references/dataframes.md - - Complete Dask Array guide
references/arrays.md - - Complete Dask Bag guide
references/bags.md - - Complete Dask Futures and distributed computing guide
references/futures.md - - Complete scheduler selection and configuration guide
references/schedulers.md - - Comprehensive performance optimization and troubleshooting
references/best-practices.md
Load these files when users need detailed information about specific Dask components, operations, or patterns beyond the quick guidance provided here.
所有参考文档可根据需要读取以获取详细信息:
- - 完整的Dask DataFrame指南
references/dataframes.md - - 完整的Dask Array指南
references/arrays.md - - 完整的Dask Bag指南
references/bags.md - - 完整的Dask Futures与分布式计算指南
references/futures.md - - 完整的调度器选择与配置指南
references/schedulers.md - - 全面的性能优化与故障排除指南
references/best-practices.md
当用户需要了解特定Dask组件、操作或模式的详细信息,且超出本文提供的快速指导时,可加载这些文件。