parallel-file-processor
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseParallel File Processor
并行文件处理器
Version: 1.1.0 Category: Development Last Updated: 2026-01-02
Process multiple files concurrently with intelligent batching, progress tracking, and result aggregation for significant performance improvements.
版本: 1.1.0 分类: 开发工具 最后更新: 2026-01-02
通过智能批处理、进度跟踪和结果聚合功能,实现多文件并发处理,大幅提升性能。
Quick Start
快速开始
python
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pandas as pd
def process_csv(file_path: Path) -> dict:
"""Process a single CSV file."""
df = pd.read_csv(file_path)
return {'file': file_path.name, 'rows': len(df), 'columns': len(df.columns)}python
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pandas as pd
def process_csv(file_path: Path) -> dict:
"""处理单个CSV文件。"""
df = pd.read_csv(file_path)
return {'file': file_path.name, 'rows': len(df), 'columns': len(df.columns)}Get all CSV files
获取所有CSV文件
files = list(Path('data/raw/').glob('*.csv'))
files = list(Path('data/raw/').glob('*.csv'))
Process in parallel
并行处理
results = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_csv, f): f for f in files}
for future in as_completed(futures):
results.append(future.result())
print(f"Processed {len(results)} files")
undefinedresults = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_csv, f): f for f in files}
for future in as_completed(futures):
results.append(future.result())
print(f"已处理 {len(results)} 个文件")
undefinedWhen to Use
适用场景
- Processing large numbers of files (100+ files)
- Batch operations on directory contents
- Extracting data from multiple ZIP archives
- Aggregating results from parallel operations
- CPU-bound file transformations
- IO-bound file operations with proper concurrency
- 处理大量文件(100个以上)
- 对目录内容执行批量操作
- 从多个ZIP归档中提取数据
- 聚合并行操作的结果
- CPU密集型文件转换
- 具备适当并发机制的IO密集型文件操作
Core Pattern
核心流程
Directory Scan -> Filter -> Batch -> Parallel Process -> Aggregate -> Output目录扫描 -> 过滤 -> 批处理 -> 并行处理 -> 聚合 -> 输出Implementation
实现细节
Core Components
核心组件
python
from dataclasses import dataclass, field
from pathlib import Path
from typing import (
List, Dict, Any, Callable, Optional, Generator, TypeVar, Generic
)
from enum import Enum
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
R = TypeVar('R')
class ProcessingMode(Enum):
"""Processing execution mode."""
SEQUENTIAL = "sequential"
THREAD_POOL = "thread_pool"
PROCESS_POOL = "process_pool"
ASYNC = "async"
@dataclass
class FileInfo:
"""File metadata container."""
path: Path
size_bytes: int
modified_time: float
extension: str
relative_path: Optional[str] = None
@classmethod
def from_path(cls, path: Path, base_path: Path = None) -> 'FileInfo':
"""Create FileInfo from path."""
stat = path.stat()
relative = str(path.relative_to(base_path)) if base_path else None
return cls(
path=path,
size_bytes=stat.st_size,
modified_time=stat.st_mtime,
extension=path.suffix.lower(),
relative_path=relative
)
@dataclass
class ProcessingResult(Generic[T]):
"""Result of processing a single file."""
file_info: FileInfo
success: bool
result: Optional[T] = None
error: Optional[str] = None
duration_seconds: float = 0.0
@dataclass
class BatchResult(Generic[T]):
"""Aggregated results from batch processing."""
total_files: int = 0
successful: int = 0
failed: int = 0
results: List[ProcessingResult[T]] = field(default_factory=list)
total_duration_seconds: float = 0.0
errors: List[str] = field(default_factory=list)
@property
def success_rate(self) -> float:
"""Calculate success rate as percentage."""
if self.total_files == 0:
return 100.0
return (self.successful / self.total_files) * 100
def successful_results(self) -> List[T]:
"""Get list of successful results only."""
return [r.result for r in self.results if r.success and r.result is not None]python
from dataclasses import dataclass, field
from pathlib import Path
from typing import (
List, Dict, Any, Callable, Optional, Generator, TypeVar, Generic
)
from enum import Enum
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
R = TypeVar('R')
class ProcessingMode(Enum):
"""处理执行模式。"""
SEQUENTIAL = "sequential"
THREAD_POOL = "thread_pool"
PROCESS_POOL = "process_pool"
ASYNC = "async"
@dataclass
class FileInfo:
"""文件元数据容器。"""
path: Path
size_bytes: int
modified_time: float
extension: str
relative_path: Optional[str] = None
@classmethod
def from_path(cls, path: Path, base_path: Path = None) -> 'FileInfo':
"""从路径创建FileInfo对象。"""
stat = path.stat()
relative = str(path.relative_to(base_path)) if base_path else None
return cls(
path=path,
size_bytes=stat.st_size,
modified_time=stat.st_mtime,
extension=path.suffix.lower(),
relative_path=relative
)
@dataclass
class ProcessingResult(Generic[T]):
"""单个文件的处理结果。"""
file_info: FileInfo
success: bool
result: Optional[T] = None
error: Optional[str] = None
duration_seconds: float = 0.0
@dataclass
class BatchResult(Generic[T]):
"""批处理的聚合结果。"""
total_files: int = 0
successful: int = 0
failed: int = 0
results: List[ProcessingResult[T]] = field(default_factory=list)
total_duration_seconds: float = 0.0
errors: List[str] = field(default_factory=list)
@property
def success_rate(self) -> float:
"""计算成功率(百分比)。"""
if self.total_files == 0:
return 100.0
return (self.successful / self.total_files) * 100
def successful_results(self) -> List[T]:
"""仅获取成功的结果列表。"""
return [r.result for r in self.results if r.success and r.result is not None]File Scanner
文件扫描器
python
import fnmatch
from typing import List, Optional, Set, Generator
from pathlib import Path
class FileScanner:
"""
Scan directories for files matching patterns.
Supports glob patterns, extension filtering, and size limits.
"""
def __init__(self,
include_patterns: List[str] = None,
exclude_patterns: List[str] = None,
extensions: Set[str] = None,
min_size: int = 0,
max_size: int = None,
recursive: bool = True):
"""
Initialize file scanner.
Args:
include_patterns: Glob patterns to include (e.g., ['*.csv', '*.xlsx'])
exclude_patterns: Glob patterns to exclude (e.g., ['*_backup*'])
extensions: File extensions to include (e.g., {'.csv', '.xlsx'})
min_size: Minimum file size in bytes
max_size: Maximum file size in bytes
recursive: Scan subdirectories
"""
self.include_patterns = include_patterns or ['*']
self.exclude_patterns = exclude_patterns or []
self.extensions = extensions
self.min_size = min_size
self.max_size = max_size
self.recursive = recursive
def scan(self, directory: Path) -> Generator[FileInfo, None, None]:
"""
Scan directory and yield matching files.
Args:
directory: Directory to scan
Yields:
FileInfo for each matching file
"""
directory = Path(directory)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if not directory.is_dir():
raise ValueError(f"Not a directory: {directory}")
# Choose iteration method
if self.recursive:
files = directory.rglob('*')
else:
files = directory.glob('*')
for path in files:
if path.is_file() and self._matches(path):
try:
yield FileInfo.from_path(path, directory)
except Exception as e:
logger.warning(f"Could not get info for {path}: {e}")
def _matches(self, path: Path) -> bool:
"""Check if file matches all criteria."""
name = path.name
# Check include patterns
if not any(fnmatch.fnmatch(name, p) for p in self.include_patterns):
return False
# Check exclude patterns
if any(fnmatch.fnmatch(name, p) for p in self.exclude_patterns):
return False
# Check extension
if self.extensions and path.suffix.lower() not in self.extensions:
return False
# Check size
try:
size = path.stat().st_size
if size < self.min_size:
return False
if self.max_size and size > self.max_size:
return False
except OSError:
return False
return True
def count(self, directory: Path) -> int:
"""Count matching files without loading all info."""
return sum(1 for _ in self.scan(directory))
def list_files(self, directory: Path) -> List[FileInfo]:
"""Get all matching files as list."""
return list(self.scan(directory))python
import fnmatch
from typing import List, Optional, Set, Generator
from pathlib import Path
class FileScanner:
"""
扫描目录中匹配指定模式的文件。
支持glob模式、扩展名过滤和大小限制。
"""
def __init__(self,
include_patterns: List[str] = None,
exclude_patterns: List[str] = None,
extensions: Set[str] = None,
min_size: int = 0,
max_size: int = None,
recursive: bool = True):
"""
初始化文件扫描器。
参数:
include_patterns: 要包含的glob模式(例如: ['*.csv', '*.xlsx'])
exclude_patterns: 要排除的glob模式(例如: ['*_backup*'])
extensions: 要包含的文件扩展名(例如: {'.csv', '.xlsx'})
min_size: 文件最小大小(字节)
max_size: 文件最大大小(字节)
recursive: 是否扫描子目录
"""
self.include_patterns = include_patterns or ['*']
self.exclude_patterns = exclude_patterns or []
self.extensions = extensions
self.min_size = min_size
self.max_size = max_size
self.recursive = recursive
def scan(self, directory: Path) -> Generator[FileInfo, None, None]:
"""
扫描目录并生成匹配的文件信息。
参数:
directory: 要扫描的目录
生成:
每个匹配文件的FileInfo对象
"""
directory = Path(directory)
if not directory.exists():
raise FileNotFoundError(f"目录未找到: {directory}")
if not directory.is_dir():
raise ValueError(f"不是目录: {directory}")
# 选择迭代方式
if self.recursive:
files = directory.rglob('*')
else:
files = directory.glob('*')
for path in files:
if path.is_file() and self._matches(path):
try:
yield FileInfo.from_path(path, directory)
except Exception as e:
logger.warning(f"无法获取文件信息 {path}: {e}")
def _matches(self, path: Path) -> bool:
"""检查文件是否符合所有条件。"""
name = path.name
# 检查包含模式
if not any(fnmatch.fnmatch(name, p) for p in self.include_patterns):
return False
# 检查排除模式
if any(fnmatch.fnmatch(name, p) for p in self.exclude_patterns):
return False
# 检查扩展名
if self.extensions and path.suffix.lower() not in self.extensions:
return False
# 检查大小
try:
size = path.stat().st_size
if size < self.min_size:
return False
if self.max_size and size > self.max_size:
return False
except OSError:
return False
return True
def count(self, directory: Path) -> int:
"""统计匹配的文件数量,无需加载所有信息。"""
return sum(1 for _ in self.scan(directory))
def list_files(self, directory: Path) -> List[FileInfo]:
"""获取所有匹配文件的列表。"""
return list(self.scan(directory))Parallel Processor
并行处理器
python
import time
from concurrent.futures import (
ThreadPoolExecutor, ProcessPoolExecutor,
as_completed, Future
)
from typing import Callable, TypeVar, Generic, List
import asyncio
from functools import partial
T = TypeVar('T')
R = TypeVar('R')
class ParallelProcessor(Generic[T, R]):
"""
Process items in parallel with configurable execution modes.
"""
def __init__(self,
processor: Callable[[T], R],
mode: ProcessingMode = ProcessingMode.THREAD_POOL,
max_workers: int = None,
batch_size: int = None,
timeout: float = None):
"""
Initialize parallel processor.
Args:
processor: Function to process each item
mode: Processing mode (thread, process, async)
max_workers: Maximum concurrent workers
batch_size: Items per batch (for memory management)
timeout: Timeout per item in seconds
"""
self.processor = processor
self.mode = mode
self.max_workers = max_workers or self._default_workers()
self.batch_size = batch_size or 100
self.timeout = timeout
self._progress_callback: Optional[Callable[[int, int], None]] = None
def _default_workers(self) -> int:
"""Get default worker count based on mode."""
import os
cpu_count = os.cpu_count() or 4
if self.mode == ProcessingMode.PROCESS_POOL:
return cpu_count
elif self.mode == ProcessingMode.THREAD_POOL:
return cpu_count * 2 # IO-bound benefits from more threads
else:
return cpu_count
def on_progress(self, callback: Callable[[int, int], None]):
"""Set progress callback: callback(completed, total)."""
self._progress_callback = callback
return self
def process(self, items: List[T]) -> BatchResult[R]:
"""
Process all items and return aggregated results.
Args:
items: Items to process
Returns:
BatchResult with all results
"""
start_time = time.time()
total = len(items)
if self.mode == ProcessingMode.SEQUENTIAL:
result = self._process_sequential(items)
elif self.mode == ProcessingMode.THREAD_POOL:
result = self._process_threaded(items)
elif self.mode == ProcessingMode.PROCESS_POOL:
result = self._process_multiprocess(items)
elif self.mode == ProcessingMode.ASYNC:
result = asyncio.run(self._process_async(items))
else:
raise ValueError(f"Unknown mode: {self.mode}")
result.total_duration_seconds = time.time() - start_time
return result
def _process_threaded(self, items: List[T]) -> BatchResult[R]:
"""Process items using thread pool."""
result = BatchResult(total_files=len(items))
completed = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_item = {
executor.submit(self._process_single, item): item
for item in items
}
# Collect results as they complete
for future in as_completed(future_to_item):
proc_result = future.result()
result.results.append(proc_result)
if proc_result.success:
result.successful += 1
else:
result.failed += 1
if proc_result.error:
result.errors.append(proc_result.error)
completed += 1
if self._progress_callback:
self._progress_callback(completed, len(items))
return result
def _process_single(self, item: T) -> ProcessingResult[R]:
"""Process a single item with error handling."""
start_time = time.time()
# Create FileInfo if item is a Path or FileInfo
if isinstance(item, Path):
file_info = FileInfo.from_path(item)
elif isinstance(item, FileInfo):
file_info = item
else:
# Create dummy FileInfo for non-file items
file_info = FileInfo(
path=Path(""),
size_bytes=0,
modified_time=0,
extension=""
)
try:
result = self.processor(item)
return ProcessingResult(
file_info=file_info,
success=True,
result=result,
duration_seconds=time.time() - start_time
)
except Exception as e:
return ProcessingResult(
file_info=file_info,
success=False,
error=str(e),
duration_seconds=time.time() - start_time
)python
import time
from concurrent.futures import (
ThreadPoolExecutor, ProcessPoolExecutor,
as_completed, Future
)
from typing import Callable, TypeVar, Generic, List
import asyncio
from functools import partial
T = TypeVar('T')
R = TypeVar('R')
class ParallelProcessor(Generic[T, R]):
"""
支持可配置执行模式的并行项处理工具。
"""
def __init__(self,
processor: Callable[[T], R],
mode: ProcessingMode = ProcessingMode.THREAD_POOL,
max_workers: int = None,
batch_size: int = None,
timeout: float = None):
"""
初始化并行处理器。
参数:
processor: 处理每个项的函数
mode: 处理模式(线程、进程、异步)
max_workers: 最大并发工作线程数
batch_size: 每个批次的项数(用于内存管理)
timeout: 每个项的超时时间(秒)
"""
self.processor = processor
self.mode = mode
self.max_workers = max_workers or self._default_workers()
self.batch_size = batch_size or 100
self.timeout = timeout
self._progress_callback: Optional[Callable[[int, int], None]] = None
def _default_workers(self) -> int:
"""根据模式获取默认工作线程数。"""
import os
cpu_count = os.cpu_count() or 4
if self.mode == ProcessingMode.PROCESS_POOL:
return cpu_count
elif self.mode == ProcessingMode.THREAD_POOL:
return cpu_count * 2 # IO密集型任务受益于更多线程
else:
return cpu_count
def on_progress(self, callback: Callable[[int, int], None]):
"""设置进度回调函数: callback(已完成数, 总数)。"""
self._progress_callback = callback
return self
def process(self, items: List[T]) -> BatchResult[R]:
"""
处理所有项并返回聚合结果。
参数:
items: 要处理的项列表
返回:
包含所有结果的BatchResult对象
"""
start_time = time.time()
total = len(items)
if self.mode == ProcessingMode.SEQUENTIAL:
result = self._process_sequential(items)
elif self.mode == ProcessingMode.THREAD_POOL:
result = self._process_threaded(items)
elif self.mode == ProcessingMode.PROCESS_POOL:
result = self._process_multiprocess(items)
elif self.mode == ProcessingMode.ASYNC:
result = asyncio.run(self._process_async(items))
else:
raise ValueError(f"未知模式: {self.mode}")
result.total_duration_seconds = time.time() - start_time
return result
def _process_threaded(self, items: List[T]) -> BatchResult[R]:
"""使用线程池处理项。"""
result = BatchResult(total_files=len(items))
completed = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_item = {
executor.submit(self._process_single, item): item
for item in items
}
# 收集完成的结果
for future in as_completed(future_to_item):
proc_result = future.result()
result.results.append(proc_result)
if proc_result.success:
result.successful += 1
else:
result.failed += 1
if proc_result.error:
result.errors.append(proc_result.error)
completed += 1
if self._progress_callback:
self._progress_callback(completed, len(items))
return result
def _process_single(self, item: T) -> ProcessingResult[R]:
"""处理单个项并包含错误处理。"""
start_time = time.time()
# 如果项是Path或FileInfo,创建对应的FileInfo对象
if isinstance(item, Path):
file_info = FileInfo.from_path(item)
elif isinstance(item, FileInfo):
file_info = item
else:
# 为非文件项创建虚拟FileInfo
file_info = FileInfo(
path=Path(""),
size_bytes=0,
modified_time=0,
extension=""
)
try:
result = self.processor(item)
return ProcessingResult(
file_info=file_info,
success=True,
result=result,
duration_seconds=time.time() - start_time
)
except Exception as e:
return ProcessingResult(
file_info=file_info,
success=False,
error=str(e),
duration_seconds=time.time() - start_time
)File Processor
文件处理器
python
class FileProcessor:
"""
High-level file processing with parallel execution.
Combines scanning, filtering, and parallel processing.
"""
def __init__(self,
scanner: FileScanner = None,
mode: ProcessingMode = ProcessingMode.THREAD_POOL,
max_workers: int = None):
self.scanner = scanner or FileScanner()
self.mode = mode
self.max_workers = max_workers
def process_directory(self,
directory: Path,
processor: Callable[[FileInfo], Any],
progress_callback: Callable[[int, int], None] = None
) -> BatchResult:
"""Process all matching files in a directory."""
files = self.scanner.list_files(directory)
logger.info(f"Found {len(files)} files to process")
if not files:
return BatchResult()
parallel = ParallelProcessor(
processor=processor,
mode=self.mode,
max_workers=self.max_workers
)
if progress_callback:
parallel.on_progress(progress_callback)
return parallel.process(files)
def aggregate_csv(self,
directory: Path,
output_path: Path = None,
**read_kwargs) -> pd.DataFrame:
"""Read and aggregate all CSV files in directory."""
self.scanner = FileScanner(extensions={'.csv'})
def read_csv(file_info: FileInfo) -> pd.DataFrame:
df = pd.read_csv(file_info.path, **read_kwargs)
df['_source_file'] = file_info.path.name
return df
result = self.process_directory(directory, read_csv)
dfs = result.successful_results()
if not dfs:
return pd.DataFrame()
combined = pd.concat(dfs, ignore_index=True)
if output_path:
combined.to_csv(output_path, index=False)
return combined
def extract_all_zips(self,
directory: Path,
output_directory: Path
) -> BatchResult:
"""Extract all ZIP files in directory."""
import zipfile
self.scanner = FileScanner(extensions={'.zip'})
output_directory.mkdir(parents=True, exist_ok=True)
def extract_zip(file_info: FileInfo) -> Dict:
extract_dir = output_directory / file_info.path.stem
extract_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(file_info.path, 'r') as zf:
zf.extractall(extract_dir)
return {
'source': str(file_info.path),
'destination': str(extract_dir),
'files_extracted': len(zf.namelist())
}
return self.process_directory(directory, extract_zip)python
class FileProcessor:
"""
支持并行执行的高级文件处理工具。
整合了扫描、过滤和并行处理功能。
"""
def __init__(self,
scanner: FileScanner = None,
mode: ProcessingMode = ProcessingMode.THREAD_POOL,
max_workers: int = None):
self.scanner = scanner or FileScanner()
self.mode = mode
self.max_workers = max_workers
def process_directory(self,
directory: Path,
processor: Callable[[FileInfo], Any],
progress_callback: Callable[[int, int], None] = None
) -> BatchResult:
"""处理目录中所有匹配的文件。"""
files = self.scanner.list_files(directory)
logger.info(f"找到 {len(files)} 个待处理文件")
if not files:
return BatchResult()
parallel = ParallelProcessor(
processor=processor,
mode=self.mode,
max_workers=self.max_workers
)
if progress_callback:
parallel.on_progress(progress_callback)
return parallel.process(files)
def aggregate_csv(self,
directory: Path,
output_path: Path = None,
**read_kwargs) -> pd.DataFrame:
"""读取并聚合目录中的所有CSV文件。"""
self.scanner = FileScanner(extensions={'.csv'})
def read_csv(file_info: FileInfo) -> pd.DataFrame:
df = pd.read_csv(file_info.path, **read_kwargs)
df['_source_file'] = file_info.path.name
return df
result = self.process_directory(directory, read_csv)
dfs = result.successful_results()
if not dfs:
return pd.DataFrame()
combined = pd.concat(dfs, ignore_index=True)
if output_path:
combined.to_csv(output_path, index=False)
return combined
def extract_all_zips(self,
directory: Path,
output_directory: Path
) -> BatchResult:
"""提取目录中的所有ZIP文件。"""
import zipfile
self.scanner = FileScanner(extensions={'.zip'})
output_directory.mkdir(parents=True, exist_ok=True)
def extract_zip(file_info: FileInfo) -> Dict:
extract_dir = output_directory / file_info.path.stem
extract_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(file_info.path, 'r') as zf:
zf.extractall(extract_dir)
return {
'source': str(file_info.path),
'destination': str(extract_dir),
'files_extracted': len(zf.namelist())
}
return self.process_directory(directory, extract_zip)Progress Tracking
进度跟踪器
python
from datetime import datetime, timedelta
import sys
class ProgressTracker:
"""Track and display processing progress."""
def __init__(self,
total: int,
description: str = "Processing",
show_eta: bool = True,
bar_width: int = 40):
self.total = total
self.description = description
self.show_eta = show_eta
self.bar_width = bar_width
self.completed = 0
self.start_time: Optional[datetime] = None
def start(self):
"""Start tracking."""
self.start_time = datetime.now()
self.completed = 0
self._display()
def update(self, completed: int, total: int):
"""Update progress."""
self.completed = completed
self.total = total
self._display()
def _display(self):
"""Display progress bar."""
if self.total == 0:
return
pct = self.completed / self.total
filled = int(self.bar_width * pct)
bar = '#' * filled + '-' * (self.bar_width - filled)
# Calculate ETA
eta_str = ""
if self.show_eta and self.start_time and self.completed > 0:
elapsed = (datetime.now() - self.start_time).total_seconds()
rate = self.completed / elapsed
remaining = (self.total - self.completed) / rate if rate > 0 else 0
eta_str = f" ETA: {timedelta(seconds=int(remaining))}"
line = (f"\r{self.description}: |{bar}| "
f"{self.completed}/{self.total} ({pct*100:.1f}%){eta_str}")
sys.stdout.write(line)
sys.stdout.flush()
if self.completed == self.total:
print()
def finish(self):
"""Mark processing as complete."""
self.completed = self.total
self._display()python
from datetime import datetime, timedelta
import sys
class ProgressTracker:
"""跟踪并显示处理进度。"""
def __init__(self,
total: int,
description: str = "Processing",
show_eta: bool = True,
bar_width: int = 40):
self.total = total
self.description = description
self.show_eta = show_eta
self.bar_width = bar_width
self.completed = 0
self.start_time: Optional[datetime] = None
def start(self):
"""开始跟踪进度。"""
self.start_time = datetime.now()
self.completed = 0
self._display()
def update(self, completed: int, total: int):
"""更新进度。"""
self.completed = completed
self.total = total
self._display()
def _display(self):
"""显示进度条。"""
if self.total == 0:
return
pct = self.completed / self.total
filled = int(self.bar_width * pct)
bar = '#' * filled + '-' * (self.bar_width - filled)
# 计算预计完成时间
eta_str = ""
if self.show_eta and self.start_time and self.completed > 0:
elapsed = (datetime.now() - self.start_time).total_seconds()
rate = self.completed / elapsed
remaining = (self.total - self.completed) / rate if rate > 0 else 0
eta_str = f" 预计剩余时间: {timedelta(seconds=int(remaining))}"
line = (f"\r{self.description}: |{bar}| "
f"{self.completed}/{self.total} ({pct*100:.1f}%){eta_str}")
sys.stdout.write(line)
sys.stdout.flush()
if self.completed == self.total:
print()
def finish(self):
"""标记处理完成。"""
self.completed = self.total
self._display()Result Aggregator
结果聚合器
python
import json
class ResultAggregator:
"""Aggregate and export batch processing results."""
def __init__(self, batch_result: BatchResult):
self.batch_result = batch_result
def to_dataframe(self) -> pd.DataFrame:
"""Convert results to DataFrame."""
data = []
for r in self.batch_result.results:
row = {
'file_path': str(r.file_info.path),
'file_name': r.file_info.path.name,
'file_size': r.file_info.size_bytes,
'success': r.success,
'duration_seconds': r.duration_seconds,
'error': r.error
}
if r.success and isinstance(r.result, dict):
for k, v in r.result.items():
if not k.startswith('_'):
row[f'result_{k}'] = v
data.append(row)
return pd.DataFrame(data)
def summary(self) -> Dict[str, Any]:
"""Generate summary statistics."""
return {
'total_files': self.batch_result.total_files,
'successful': self.batch_result.successful,
'failed': self.batch_result.failed,
'success_rate_pct': self.batch_result.success_rate,
'total_duration_seconds': self.batch_result.total_duration_seconds,
'avg_duration_seconds': (
self.batch_result.total_duration_seconds /
self.batch_result.total_files
if self.batch_result.total_files > 0 else 0
),
'errors': self.batch_result.errors[:10]
}
def export_csv(self, path: Path):
"""Export results to CSV."""
df = self.to_dataframe()
df.to_csv(path, index=False)
def export_json(self, path: Path):
"""Export summary to JSON."""
summary = self.summary()
with open(path, 'w') as f:
json.dump(summary, f, indent=2)
def combine_dataframes(self) -> pd.DataFrame:
"""Combine results that are DataFrames."""
dfs = [r for r in self.batch_result.successful_results()
if isinstance(r, pd.DataFrame)]
if not dfs:
return pd.DataFrame()
return pd.concat(dfs, ignore_index=True)python
import json
class ResultAggregator:
"""聚合并导出批处理结果。"""
def __init__(self, batch_result: BatchResult):
self.batch_result = batch_result
def to_dataframe(self) -> pd.DataFrame:
"""将结果转换为DataFrame。"""
data = []
for r in self.batch_result.results:
row = {
'file_path': str(r.file_info.path),
'file_name': r.file_info.path.name,
'file_size': r.file_info.size_bytes,
'success': r.success,
'duration_seconds': r.duration_seconds,
'error': r.error
}
if r.success and isinstance(r.result, dict):
for k, v in r.result.items():
if not k.startswith('_'):
row[f'result_{k}'] = v
data.append(row)
return pd.DataFrame(data)
def summary(self) -> Dict[str, Any]:
"""生成汇总统计信息。"""
return {
'total_files': self.batch_result.total_files,
'successful': self.batch_result.successful,
'failed': self.batch_result.failed,
'success_rate_pct': self.batch_result.success_rate,
'total_duration_seconds': self.batch_result.total_duration_seconds,
'avg_duration_seconds': (
self.batch_result.total_duration_seconds /
self.batch_result.total_files
if self.batch_result.total_files > 0 else 0
),
'errors': self.batch_result.errors[:10]
}
def export_csv(self, path: Path):
"""将结果导出为CSV文件。"""
df = self.to_dataframe()
df.to_csv(path, index=False)
def export_json(self, path: Path):
"""将汇总信息导出为JSON文件。"""
summary = self.summary()
with open(path, 'w') as f:
json.dump(summary, f, indent=2)
def combine_dataframes(self) -> pd.DataFrame:
"""合并结果中的DataFrame对象。"""
dfs = [r for r in self.batch_result.successful_results()
if isinstance(r, pd.DataFrame)]
if not dfs:
return pd.DataFrame()
return pd.concat(dfs, ignore_index=True)YAML Configuration
YAML配置
Basic Configuration
基础配置
yaml
undefinedyaml
undefinedconfig/parallel_processing.yaml
config/parallel_processing.yaml
scan:
directory: "data/raw/"
recursive: true
include_patterns:
- ".csv"
- ".xlsx"
exclude_patterns:
- "_backup"
- "~$*"
extensions:
- ".csv"
- ".xlsx"
size_limits:
min_bytes: 100
max_bytes: 104857600 # 100MB
processing:
mode: thread_pool # sequential, thread_pool, process_pool, async
max_workers: 8
batch_size: 100
timeout_seconds: 30
output:
results_csv: "data/results/processing_results.csv"
summary_json: "data/results/summary.json"
combined_output: "data/processed/combined.csv"
progress:
enabled: true
show_eta: true
undefinedscan:
directory: "data/raw/"
recursive: true
include_patterns:
- ".csv"
- ".xlsx"
exclude_patterns:
- "_backup"
- "~$*"
extensions:
- ".csv"
- ".xlsx"
size_limits:
min_bytes: 100
max_bytes: 104857600 # 100MB
processing:
mode: thread_pool # sequential, thread_pool, process_pool, async
max_workers: 8
batch_size: 100
timeout_seconds: 30
output:
results_csv: "data/results/processing_results.csv"
summary_json: "data/results/summary.json"
combined_output: "data/processed/combined.csv"
progress:
enabled: true
show_eta: true
undefinedUsage Examples
使用示例
Example 1: Process CSV Files
示例1: 处理CSV文件
python
from parallel_file_processor import (
FileScanner, FileProcessor, ProcessingMode,
ProgressTracker, ResultAggregator
)
from pathlib import Path
import pandas as pdpython
from parallel_file_processor import (
FileScanner, FileProcessor, ProcessingMode,
ProgressTracker, ResultAggregator
)
from pathlib import Path
import pandas as pdDefine processing function
定义处理函数
def process_csv(file_info):
"""Extract statistics from CSV file."""
df = pd.read_csv(file_info.path)
return {
'rows': len(df),
'columns': len(df.columns),
'memory_mb': df.memory_usage(deep=True).sum() / 1e6,
'numeric_columns': len(df.select_dtypes(include='number').columns)
}
def process_csv(file_info):
"""从CSV文件中提取统计信息。"""
df = pd.read_csv(file_info.path)
return {
'rows': len(df),
'columns': len(df.columns),
'memory_mb': df.memory_usage(deep=True).sum() / 1e6,
'numeric_columns': len(df.select_dtypes(include='number').columns)
}
Setup scanner and processor
设置扫描器和处理器
scanner = FileScanner(extensions={'.csv'})
processor = FileProcessor(
scanner=scanner,
mode=ProcessingMode.THREAD_POOL,
max_workers=8
)
scanner = FileScanner(extensions={'.csv'})
processor = FileProcessor(
scanner=scanner,
mode=ProcessingMode.THREAD_POOL,
max_workers=8
)
Create progress tracker
创建进度跟踪器
tracker = ProgressTracker(0, "Processing CSVs")
tracker.start()
tracker = ProgressTracker(0, "处理CSV文件")
tracker.start()
Process with progress
带进度跟踪的处理
result = processor.process_directory(
Path("data/raw/"),
process_csv,
progress_callback=tracker.update
)
tracker.finish()
result = processor.process_directory(
Path("data/raw/"),
process_csv,
progress_callback=tracker.update
)
tracker.finish()
Aggregate results
聚合结果
aggregator = ResultAggregator(result)
print(f"\nSummary: {aggregator.summary()}")
aggregator.export_csv(Path("data/results/csv_stats.csv"))
undefinedaggregator = ResultAggregator(result)
print(f"\n汇总信息: {aggregator.summary()}")
aggregator.export_csv(Path("data/results/csv_stats.csv"))
undefinedExample 2: Parallel ZIP Extraction
示例2: 并行提取ZIP文件
python
undefinedpython
undefinedExtract all ZIPs in parallel
并行提取所有ZIP文件
processor = FileProcessor(mode=ProcessingMode.THREAD_POOL)
result = processor.extract_all_zips(
directory=Path("data/archives/"),
output_directory=Path("data/extracted/")
)
print(f"Extracted {result.successful} ZIP files")
print(f"Failed: {result.failed}")
processor = FileProcessor(mode=ProcessingMode.THREAD_POOL)
result = processor.extract_all_zips(
directory=Path("data/archives/"),
output_directory=Path("data/extracted/")
)
print(f"已提取 {result.successful} 个ZIP文件")
print(f"失败数量: {result.failed}")
Get extraction details
获取提取详情
aggregator = ResultAggregator(result)
df = aggregator.to_dataframe()
total_files = df['result_files_extracted'].sum()
print(f"Total files extracted: {total_files}")
undefinedaggregator = ResultAggregator(result)
df = aggregator.to_dataframe()
total_files = df['result_files_extracted'].sum()
print(f"提取的文件总数: {total_files}")
undefinedExample 3: Aggregate Data from Multiple Sources
示例3: 聚合多源数据
python
undefinedpython
undefinedAggregate CSV files with custom processing
带自定义处理的CSV文件聚合
def load_and_clean(file_info):
"""Load CSV and perform basic cleaning."""
df = pd.read_csv(file_info.path)
# Clean column names
df.columns = [c.lower().strip().replace(' ', '_') for c in df.columns]
# Add metadata
df['_source'] = file_info.path.name
df['_loaded_at'] = pd.Timestamp.now()
return dfprocessor = FileProcessor(
scanner=FileScanner(extensions={'.csv'}),
mode=ProcessingMode.THREAD_POOL
)
result = processor.process_directory(
Path("data/monthly_reports/"),
load_and_clean
)
def load_and_clean(file_info):
"""加载CSV文件并执行基础清洗。"""
df = pd.read_csv(file_info.path)
# 清洗列名
df.columns = [c.lower().strip().replace(' ', '_') for c in df.columns]
# 添加元数据
df['_source'] = file_info.path.name
df['_loaded_at'] = pd.Timestamp.now()
return dfprocessor = FileProcessor(
scanner=FileScanner(extensions={'.csv'}),
mode=ProcessingMode.THREAD_POOL
)
result = processor.process_directory(
Path("data/monthly_reports/"),
load_and_clean
)
Combine all DataFrames
合并所有DataFrame
aggregator = ResultAggregator(result)
combined_df = aggregator.combine_dataframes()
print(f"Combined {len(combined_df)} rows from {result.successful} files")
combined_df.to_csv("data/combined_reports.csv", index=False)
undefinedaggregator = ResultAggregator(result)
combined_df = aggregator.combine_dataframes()
print(f"从 {result.successful} 个文件中合并得到 {len(combined_df)} 行数据")
combined_df.to_csv("data/combined_reports.csv", index=False)
undefinedExample 4: Custom Batch Processing
示例4: 自定义批处理
python
from parallel_file_processor import ParallelProcessor, ProcessingModepython
from parallel_file_processor import ParallelProcessor, ProcessingModeProcess list of items (not files)
处理非文件类型的项列表
items = list(range(1000))
def heavy_computation(item):
"""CPU-intensive calculation."""
import math
result = sum(math.sin(i * item) for i in range(10000))
return {'item': item, 'result': result}
items = list(range(1000))
def heavy_computation(item):
"""CPU密集型计算。"""
import math
result = sum(math.sin(i * item) for i in range(10000))
return {'item': item, 'result': result}
Use process pool for CPU-bound work
对CPU密集型任务使用进程池
processor = ParallelProcessor(
processor=heavy_computation,
mode=ProcessingMode.PROCESS_POOL,
max_workers=4
)
processor = ParallelProcessor(
processor=heavy_computation,
mode=ProcessingMode.PROCESS_POOL,
max_workers=4
)
Track progress
跟踪进度
def show_progress(completed, total):
pct = (completed / total) * 100
print(f"\rProgress: {pct:.1f}%", end='', flush=True)
processor.on_progress(show_progress)
result = processor.process(items)
print(f"\nCompleted {result.successful}/{result.total_files} items")
undefineddef show_progress(completed, total):
pct = (completed / total) * 100
print(f"\r进度: {pct:.1f}%", end='', flush=True)
processor.on_progress(show_progress)
result = processor.process(items)
print(f"\n已完成 {result.successful}/{result.total_files} 个项")
undefinedPerformance Tips
性能优化建议
Mode Selection
模式选择
| Workload Type | Recommended Mode | Reason |
|---|---|---|
| File I/O | | IO-bound, threads avoid GIL issues |
| Data parsing | | Pandas releases GIL during IO |
| CPU computation | | Bypasses GIL for true parallelism |
| Network requests | | Best for many concurrent connections |
| Simple operations | | Overhead may exceed benefit |
| 工作负载类型 | 推荐模式 | 原因 |
|---|---|---|
| 文件IO | | IO密集型任务,线程可避免GIL限制 |
| 数据解析 | | Pandas在IO操作期间会释放GIL |
| CPU计算 | | 绕过GIL实现真正的并行 |
| 网络请求 | | 最适合大量并发连接 |
| 简单操作 | | 并行开销可能超过收益 |
Worker Count
工作线程数
python
import ospython
import osIO-bound (reading files, network)
IO密集型(读取文件、网络)
io_workers = os.cpu_count() * 2
io_workers = os.cpu_count() * 2
CPU-bound (heavy computation)
CPU密集型(重度计算)
cpu_workers = os.cpu_count()
cpu_workers = os.cpu_count()
Memory-constrained (large files)
内存受限(大文件)
memory_workers = max(2, os.cpu_count() // 2)
undefinedmemory_workers = max(2, os.cpu_count() // 2)
undefinedBatch Size
批处理大小
- Small files (<1MB): Large batches (500-1000)
- Medium files (1-100MB): Medium batches (50-100)
- Large files (>100MB): Small batches (10-20) or one at a time
- 小文件(<1MB): 大批次(500-1000)
- 中等文件(1-100MB): 中批次(50-100)
- 大文件(>100MB): 小批次(10-20)或逐个处理
Best Practices
最佳实践
Do
建议
- Choose correct processing mode for workload type
- Use progress callbacks for long operations
- Batch large file sets to manage memory
- Log individual failures for debugging
- Consider retry logic for transient errors
- Monitor memory usage with large DataFrames
- 根据工作负载类型选择正确的处理模式
- 对长时间运行的操作使用进度回调
- 对大型文件集使用批处理以管理内存
- 记录单个失败案例以便调试
- 考虑为临时错误添加重试逻辑
- 处理大型DataFrame时监控内存使用
Don't
避免
- Use process pool for IO-bound tasks
- Skip error handling in processor functions
- Load all results into memory at once
- Ignore batch result statistics
- Use too many workers for memory-constrained tasks
- 对IO密集型任务使用进程池
- 在处理函数中跳过错误处理
- 一次性将所有结果加载到内存
- 忽略批处理结果统计信息
- 对内存受限任务使用过多工作线程
Error Handling
错误处理
Common Errors
常见错误
| Error | Cause | Solution |
|---|---|---|
| Too many files loaded | Use batching or streaming |
| File access denied | Check file permissions |
| Processing too slow | Increase timeout or optimize |
| Too many open files | Reduce max_workers |
| 错误 | 原因 | 解决方案 |
|---|---|---|
| 加载的文件过多 | 使用批处理或流式处理 |
| 文件访问被拒绝 | 检查文件权限 |
| 处理速度过慢 | 增加超时时间或优化处理逻辑 |
| 打开的文件过多 | 减少最大工作线程数 |
Error Template
错误处理模板
python
def safe_process_directory(directory: Path, processor: Callable) -> dict:
"""Process directory with comprehensive error handling."""
try:
if not directory.exists():
return {'status': 'error', 'message': 'Directory not found'}
file_processor = FileProcessor()
result = file_processor.process_directory(directory, processor)
if result.failed > 0:
return {
'status': 'partial',
'successful': result.successful,
'failed': result.failed,
'errors': result.errors[:10]
}
return {'status': 'success', 'processed': result.successful}
except Exception as e:
return {'status': 'error', 'message': str(e)}python
def safe_process_directory(directory: Path, processor: Callable) -> dict:
"""包含全面错误处理的目录处理函数。"""
try:
if not directory.exists():
return {'status': 'error', 'message': '目录未找到'}
file_processor = FileProcessor()
result = file_processor.process_directory(directory, processor)
if result.failed > 0:
return {
'status': 'partial',
'successful': result.successful,
'failed': result.failed,
'errors': result.errors[:10]
}
return {'status': 'success', 'processed': result.successful}
except Exception as e:
return {'status': 'error', 'message': str(e)}Execution Checklist
执行检查清单
- Processing mode matches workload type
- Worker count appropriate for resources
- Batch size prevents memory issues
- Progress callback configured for feedback
- Error handling in processor function
- Results aggregated and exported
- Summary statistics reviewed
- Failed files identified and logged
- 处理模式与工作负载类型匹配
- 工作线程数与资源匹配
- 批处理大小可避免内存问题
- 已配置进度回调以获取反馈
- 处理函数中包含错误处理
- 结果已聚合并导出
- 已查看汇总统计信息
- 已识别并记录失败文件
Metrics
指标
| Metric | Target | Description |
|---|---|---|
| Throughput | 2-3x sequential | Parallel speedup factor |
| Success Rate | >99% | Percentage of files processed |
| Memory Usage | <4GB | Peak memory consumption |
| Error Rate | <1% | Processing failures |
| 指标 | 目标 | 描述 |
|---|---|---|
| 吞吐量 | 比串行快2-3倍 | 并行加速比 |
| 成功率 | >99% | 成功处理的文件百分比 |
| 内存使用 | <4GB | 峰值内存消耗 |
| 错误率 | <1% | 处理失败的比例 |
Related Skills
相关技能
- data-pipeline-processor - Data transformation
- yaml-workflow-executor - Workflow automation
- engineering-report-generator - Report generation
- data-pipeline-processor - 数据转换
- yaml-workflow-executor - 工作流自动化
- engineering-report-generator - 报告生成
Version History
版本历史
- 1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples
- 1.0.0 (2024-10-15): Initial release with FileScanner, ParallelProcessor, progress tracking, result aggregation
- 1.1.0 (2026-01-02): 升级为SKILL_TEMPLATE_v2格式,新增快速开始、错误处理、指标、执行检查清单和更多示例
- 1.0.0 (2024-10-15): 初始版本,包含FileScanner、ParallelProcessor、进度跟踪和结果聚合功能