parallel-file-processor

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Parallel File Processor

并行文件处理器

Version: 1.1.0 Category: Development Last Updated: 2026-01-02
Process multiple files concurrently with intelligent batching, progress tracking, and result aggregation for significant performance improvements.
版本: 1.1.0 分类: 开发工具 最后更新: 2026-01-02
通过智能批处理、进度跟踪和结果聚合功能,实现多文件并发处理,大幅提升性能。

Quick Start

快速开始

python
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pandas as pd

def process_csv(file_path: Path) -> dict:
    """Process a single CSV file."""
    df = pd.read_csv(file_path)
    return {'file': file_path.name, 'rows': len(df), 'columns': len(df.columns)}
python
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pandas as pd

def process_csv(file_path: Path) -> dict:
    """处理单个CSV文件。"""
    df = pd.read_csv(file_path)
    return {'file': file_path.name, 'rows': len(df), 'columns': len(df.columns)}

Get all CSV files

获取所有CSV文件

files = list(Path('data/raw/').glob('*.csv'))
files = list(Path('data/raw/').glob('*.csv'))

Process in parallel

并行处理

results = [] with ThreadPoolExecutor(max_workers=8) as executor: futures = {executor.submit(process_csv, f): f for f in files} for future in as_completed(futures): results.append(future.result())
print(f"Processed {len(results)} files")
undefined
results = [] with ThreadPoolExecutor(max_workers=8) as executor: futures = {executor.submit(process_csv, f): f for f in files} for future in as_completed(futures): results.append(future.result())
print(f"已处理 {len(results)} 个文件")
undefined

When to Use

适用场景

  • Processing large numbers of files (100+ files)
  • Batch operations on directory contents
  • Extracting data from multiple ZIP archives
  • Aggregating results from parallel operations
  • CPU-bound file transformations
  • IO-bound file operations with proper concurrency
  • 处理大量文件(100个以上)
  • 对目录内容执行批量操作
  • 从多个ZIP归档中提取数据
  • 聚合并行操作的结果
  • CPU密集型文件转换
  • 具备适当并发机制的IO密集型文件操作

Core Pattern

核心流程

Directory Scan -> Filter -> Batch -> Parallel Process -> Aggregate -> Output
目录扫描 -> 过滤 -> 批处理 -> 并行处理 -> 聚合 -> 输出

Implementation

实现细节

Core Components

核心组件

python
from dataclasses import dataclass, field
from pathlib import Path
from typing import (
    List, Dict, Any, Callable, Optional, Generator, TypeVar, Generic
)
from enum import Enum
import logging

logger = logging.getLogger(__name__)

T = TypeVar('T')
R = TypeVar('R')

class ProcessingMode(Enum):
    """Processing execution mode."""
    SEQUENTIAL = "sequential"
    THREAD_POOL = "thread_pool"
    PROCESS_POOL = "process_pool"
    ASYNC = "async"

@dataclass
class FileInfo:
    """File metadata container."""
    path: Path
    size_bytes: int
    modified_time: float
    extension: str
    relative_path: Optional[str] = None

    @classmethod
    def from_path(cls, path: Path, base_path: Path = None) -> 'FileInfo':
        """Create FileInfo from path."""
        stat = path.stat()
        relative = str(path.relative_to(base_path)) if base_path else None
        return cls(
            path=path,
            size_bytes=stat.st_size,
            modified_time=stat.st_mtime,
            extension=path.suffix.lower(),
            relative_path=relative
        )

@dataclass
class ProcessingResult(Generic[T]):
    """Result of processing a single file."""
    file_info: FileInfo
    success: bool
    result: Optional[T] = None
    error: Optional[str] = None
    duration_seconds: float = 0.0

@dataclass
class BatchResult(Generic[T]):
    """Aggregated results from batch processing."""
    total_files: int = 0
    successful: int = 0
    failed: int = 0
    results: List[ProcessingResult[T]] = field(default_factory=list)
    total_duration_seconds: float = 0.0
    errors: List[str] = field(default_factory=list)

    @property
    def success_rate(self) -> float:
        """Calculate success rate as percentage."""
        if self.total_files == 0:
            return 100.0
        return (self.successful / self.total_files) * 100

    def successful_results(self) -> List[T]:
        """Get list of successful results only."""
        return [r.result for r in self.results if r.success and r.result is not None]
python
from dataclasses import dataclass, field
from pathlib import Path
from typing import (
    List, Dict, Any, Callable, Optional, Generator, TypeVar, Generic
)
from enum import Enum
import logging

logger = logging.getLogger(__name__)

T = TypeVar('T')
R = TypeVar('R')

class ProcessingMode(Enum):
    """处理执行模式。"""
    SEQUENTIAL = "sequential"
    THREAD_POOL = "thread_pool"
    PROCESS_POOL = "process_pool"
    ASYNC = "async"

@dataclass
class FileInfo:
    """文件元数据容器。"""
    path: Path
    size_bytes: int
    modified_time: float
    extension: str
    relative_path: Optional[str] = None

    @classmethod
    def from_path(cls, path: Path, base_path: Path = None) -> 'FileInfo':
        """从路径创建FileInfo对象。"""
        stat = path.stat()
        relative = str(path.relative_to(base_path)) if base_path else None
        return cls(
            path=path,
            size_bytes=stat.st_size,
            modified_time=stat.st_mtime,
            extension=path.suffix.lower(),
            relative_path=relative
        )

@dataclass
class ProcessingResult(Generic[T]):
    """单个文件的处理结果。"""
    file_info: FileInfo
    success: bool
    result: Optional[T] = None
    error: Optional[str] = None
    duration_seconds: float = 0.0

@dataclass
class BatchResult(Generic[T]):
    """批处理的聚合结果。"""
    total_files: int = 0
    successful: int = 0
    failed: int = 0
    results: List[ProcessingResult[T]] = field(default_factory=list)
    total_duration_seconds: float = 0.0
    errors: List[str] = field(default_factory=list)

    @property
    def success_rate(self) -> float:
        """计算成功率(百分比)。"""
        if self.total_files == 0:
            return 100.0
        return (self.successful / self.total_files) * 100

    def successful_results(self) -> List[T]:
        """仅获取成功的结果列表。"""
        return [r.result for r in self.results if r.success and r.result is not None]

File Scanner

文件扫描器

python
import fnmatch
from typing import List, Optional, Set, Generator
from pathlib import Path

class FileScanner:
    """
    Scan directories for files matching patterns.

    Supports glob patterns, extension filtering, and size limits.
    """

    def __init__(self,
                 include_patterns: List[str] = None,
                 exclude_patterns: List[str] = None,
                 extensions: Set[str] = None,
                 min_size: int = 0,
                 max_size: int = None,
                 recursive: bool = True):
        """
        Initialize file scanner.

        Args:
            include_patterns: Glob patterns to include (e.g., ['*.csv', '*.xlsx'])
            exclude_patterns: Glob patterns to exclude (e.g., ['*_backup*'])
            extensions: File extensions to include (e.g., {'.csv', '.xlsx'})
            min_size: Minimum file size in bytes
            max_size: Maximum file size in bytes
            recursive: Scan subdirectories
        """
        self.include_patterns = include_patterns or ['*']
        self.exclude_patterns = exclude_patterns or []
        self.extensions = extensions
        self.min_size = min_size
        self.max_size = max_size
        self.recursive = recursive

    def scan(self, directory: Path) -> Generator[FileInfo, None, None]:
        """
        Scan directory and yield matching files.

        Args:
            directory: Directory to scan

        Yields:
            FileInfo for each matching file
        """
        directory = Path(directory)

        if not directory.exists():
            raise FileNotFoundError(f"Directory not found: {directory}")

        if not directory.is_dir():
            raise ValueError(f"Not a directory: {directory}")

        # Choose iteration method
        if self.recursive:
            files = directory.rglob('*')
        else:
            files = directory.glob('*')

        for path in files:
            if path.is_file() and self._matches(path):
                try:
                    yield FileInfo.from_path(path, directory)
                except Exception as e:
                    logger.warning(f"Could not get info for {path}: {e}")

    def _matches(self, path: Path) -> bool:
        """Check if file matches all criteria."""
        name = path.name

        # Check include patterns
        if not any(fnmatch.fnmatch(name, p) for p in self.include_patterns):
            return False

        # Check exclude patterns
        if any(fnmatch.fnmatch(name, p) for p in self.exclude_patterns):
            return False

        # Check extension
        if self.extensions and path.suffix.lower() not in self.extensions:
            return False

        # Check size
        try:
            size = path.stat().st_size
            if size < self.min_size:
                return False
            if self.max_size and size > self.max_size:
                return False
        except OSError:
            return False

        return True

    def count(self, directory: Path) -> int:
        """Count matching files without loading all info."""
        return sum(1 for _ in self.scan(directory))

    def list_files(self, directory: Path) -> List[FileInfo]:
        """Get all matching files as list."""
        return list(self.scan(directory))
python
import fnmatch
from typing import List, Optional, Set, Generator
from pathlib import Path

class FileScanner:
    """
    扫描目录中匹配指定模式的文件。

    支持glob模式、扩展名过滤和大小限制。
    """

    def __init__(self,
                 include_patterns: List[str] = None,
                 exclude_patterns: List[str] = None,
                 extensions: Set[str] = None,
                 min_size: int = 0,
                 max_size: int = None,
                 recursive: bool = True):
        """
        初始化文件扫描器。

        参数:
            include_patterns: 要包含的glob模式(例如: ['*.csv', '*.xlsx'])
            exclude_patterns: 要排除的glob模式(例如: ['*_backup*'])
            extensions: 要包含的文件扩展名(例如: {'.csv', '.xlsx'})
            min_size: 文件最小大小(字节)
            max_size: 文件最大大小(字节)
            recursive: 是否扫描子目录
        """
        self.include_patterns = include_patterns or ['*']
        self.exclude_patterns = exclude_patterns or []
        self.extensions = extensions
        self.min_size = min_size
        self.max_size = max_size
        self.recursive = recursive

    def scan(self, directory: Path) -> Generator[FileInfo, None, None]:
        """
        扫描目录并生成匹配的文件信息。

        参数:
            directory: 要扫描的目录

        生成:
            每个匹配文件的FileInfo对象
        """
        directory = Path(directory)

        if not directory.exists():
            raise FileNotFoundError(f"目录未找到: {directory}")

        if not directory.is_dir():
            raise ValueError(f"不是目录: {directory}")

        # 选择迭代方式
        if self.recursive:
            files = directory.rglob('*')
        else:
            files = directory.glob('*')

        for path in files:
            if path.is_file() and self._matches(path):
                try:
                    yield FileInfo.from_path(path, directory)
                except Exception as e:
                    logger.warning(f"无法获取文件信息 {path}: {e}")

    def _matches(self, path: Path) -> bool:
        """检查文件是否符合所有条件。"""
        name = path.name

        # 检查包含模式
        if not any(fnmatch.fnmatch(name, p) for p in self.include_patterns):
            return False

        # 检查排除模式
        if any(fnmatch.fnmatch(name, p) for p in self.exclude_patterns):
            return False

        # 检查扩展名
        if self.extensions and path.suffix.lower() not in self.extensions:
            return False

        # 检查大小
        try:
            size = path.stat().st_size
            if size < self.min_size:
                return False
            if self.max_size and size > self.max_size:
                return False
        except OSError:
            return False

        return True

    def count(self, directory: Path) -> int:
        """统计匹配的文件数量,无需加载所有信息。"""
        return sum(1 for _ in self.scan(directory))

    def list_files(self, directory: Path) -> List[FileInfo]:
        """获取所有匹配文件的列表。"""
        return list(self.scan(directory))

Parallel Processor

并行处理器

python
import time
from concurrent.futures import (
    ThreadPoolExecutor, ProcessPoolExecutor,
    as_completed, Future
)
from typing import Callable, TypeVar, Generic, List
import asyncio
from functools import partial

T = TypeVar('T')
R = TypeVar('R')

class ParallelProcessor(Generic[T, R]):
    """
    Process items in parallel with configurable execution modes.
    """

    def __init__(self,
                 processor: Callable[[T], R],
                 mode: ProcessingMode = ProcessingMode.THREAD_POOL,
                 max_workers: int = None,
                 batch_size: int = None,
                 timeout: float = None):
        """
        Initialize parallel processor.

        Args:
            processor: Function to process each item
            mode: Processing mode (thread, process, async)
            max_workers: Maximum concurrent workers
            batch_size: Items per batch (for memory management)
            timeout: Timeout per item in seconds
        """
        self.processor = processor
        self.mode = mode
        self.max_workers = max_workers or self._default_workers()
        self.batch_size = batch_size or 100
        self.timeout = timeout

        self._progress_callback: Optional[Callable[[int, int], None]] = None

    def _default_workers(self) -> int:
        """Get default worker count based on mode."""
        import os
        cpu_count = os.cpu_count() or 4

        if self.mode == ProcessingMode.PROCESS_POOL:
            return cpu_count
        elif self.mode == ProcessingMode.THREAD_POOL:
            return cpu_count * 2  # IO-bound benefits from more threads
        else:
            return cpu_count

    def on_progress(self, callback: Callable[[int, int], None]):
        """Set progress callback: callback(completed, total)."""
        self._progress_callback = callback
        return self

    def process(self, items: List[T]) -> BatchResult[R]:
        """
        Process all items and return aggregated results.

        Args:
            items: Items to process

        Returns:
            BatchResult with all results
        """
        start_time = time.time()
        total = len(items)

        if self.mode == ProcessingMode.SEQUENTIAL:
            result = self._process_sequential(items)
        elif self.mode == ProcessingMode.THREAD_POOL:
            result = self._process_threaded(items)
        elif self.mode == ProcessingMode.PROCESS_POOL:
            result = self._process_multiprocess(items)
        elif self.mode == ProcessingMode.ASYNC:
            result = asyncio.run(self._process_async(items))
        else:
            raise ValueError(f"Unknown mode: {self.mode}")

        result.total_duration_seconds = time.time() - start_time
        return result

    def _process_threaded(self, items: List[T]) -> BatchResult[R]:
        """Process items using thread pool."""
        result = BatchResult(total_files=len(items))
        completed = 0

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_item = {
                executor.submit(self._process_single, item): item
                for item in items
            }

            # Collect results as they complete
            for future in as_completed(future_to_item):
                proc_result = future.result()
                result.results.append(proc_result)

                if proc_result.success:
                    result.successful += 1
                else:
                    result.failed += 1
                    if proc_result.error:
                        result.errors.append(proc_result.error)

                completed += 1
                if self._progress_callback:
                    self._progress_callback(completed, len(items))

        return result

    def _process_single(self, item: T) -> ProcessingResult[R]:
        """Process a single item with error handling."""
        start_time = time.time()

        # Create FileInfo if item is a Path or FileInfo
        if isinstance(item, Path):
            file_info = FileInfo.from_path(item)
        elif isinstance(item, FileInfo):
            file_info = item
        else:
            # Create dummy FileInfo for non-file items
            file_info = FileInfo(
                path=Path(""),
                size_bytes=0,
                modified_time=0,
                extension=""
            )

        try:
            result = self.processor(item)
            return ProcessingResult(
                file_info=file_info,
                success=True,
                result=result,
                duration_seconds=time.time() - start_time
            )
        except Exception as e:
            return ProcessingResult(
                file_info=file_info,
                success=False,
                error=str(e),
                duration_seconds=time.time() - start_time
            )
python
import time
from concurrent.futures import (
    ThreadPoolExecutor, ProcessPoolExecutor,
    as_completed, Future
)
from typing import Callable, TypeVar, Generic, List
import asyncio
from functools import partial

T = TypeVar('T')
R = TypeVar('R')

class ParallelProcessor(Generic[T, R]):
    """
    支持可配置执行模式的并行项处理工具。
    """

    def __init__(self,
                 processor: Callable[[T], R],
                 mode: ProcessingMode = ProcessingMode.THREAD_POOL,
                 max_workers: int = None,
                 batch_size: int = None,
                 timeout: float = None):
        """
        初始化并行处理器。

        参数:
            processor: 处理每个项的函数
            mode: 处理模式(线程、进程、异步)
            max_workers: 最大并发工作线程数
            batch_size: 每个批次的项数(用于内存管理)
            timeout: 每个项的超时时间(秒)
        """
        self.processor = processor
        self.mode = mode
        self.max_workers = max_workers or self._default_workers()
        self.batch_size = batch_size or 100
        self.timeout = timeout

        self._progress_callback: Optional[Callable[[int, int], None]] = None

    def _default_workers(self) -> int:
        """根据模式获取默认工作线程数。"""
        import os
        cpu_count = os.cpu_count() or 4

        if self.mode == ProcessingMode.PROCESS_POOL:
            return cpu_count
        elif self.mode == ProcessingMode.THREAD_POOL:
            return cpu_count * 2  # IO密集型任务受益于更多线程
        else:
            return cpu_count

    def on_progress(self, callback: Callable[[int, int], None]):
        """设置进度回调函数: callback(已完成数, 总数)。"""
        self._progress_callback = callback
        return self

    def process(self, items: List[T]) -> BatchResult[R]:
        """
        处理所有项并返回聚合结果。

        参数:
            items: 要处理的项列表

        返回:
            包含所有结果的BatchResult对象
        """
        start_time = time.time()
        total = len(items)

        if self.mode == ProcessingMode.SEQUENTIAL:
            result = self._process_sequential(items)
        elif self.mode == ProcessingMode.THREAD_POOL:
            result = self._process_threaded(items)
        elif self.mode == ProcessingMode.PROCESS_POOL:
            result = self._process_multiprocess(items)
        elif self.mode == ProcessingMode.ASYNC:
            result = asyncio.run(self._process_async(items))
        else:
            raise ValueError(f"未知模式: {self.mode}")

        result.total_duration_seconds = time.time() - start_time
        return result

    def _process_threaded(self, items: List[T]) -> BatchResult[R]:
        """使用线程池处理项。"""
        result = BatchResult(total_files=len(items))
        completed = 0

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有任务
            future_to_item = {
                executor.submit(self._process_single, item): item
                for item in items
            }

            # 收集完成的结果
            for future in as_completed(future_to_item):
                proc_result = future.result()
                result.results.append(proc_result)

                if proc_result.success:
                    result.successful += 1
                else:
                    result.failed += 1
                    if proc_result.error:
                        result.errors.append(proc_result.error)

                completed += 1
                if self._progress_callback:
                    self._progress_callback(completed, len(items))

        return result

    def _process_single(self, item: T) -> ProcessingResult[R]:
        """处理单个项并包含错误处理。"""
        start_time = time.time()

        # 如果项是Path或FileInfo,创建对应的FileInfo对象
        if isinstance(item, Path):
            file_info = FileInfo.from_path(item)
        elif isinstance(item, FileInfo):
            file_info = item
        else:
            # 为非文件项创建虚拟FileInfo
            file_info = FileInfo(
                path=Path(""),
                size_bytes=0,
                modified_time=0,
                extension=""
            )

        try:
            result = self.processor(item)
            return ProcessingResult(
                file_info=file_info,
                success=True,
                result=result,
                duration_seconds=time.time() - start_time
            )
        except Exception as e:
            return ProcessingResult(
                file_info=file_info,
                success=False,
                error=str(e),
                duration_seconds=time.time() - start_time
            )

File Processor

文件处理器

python
class FileProcessor:
    """
    High-level file processing with parallel execution.

    Combines scanning, filtering, and parallel processing.
    """

    def __init__(self,
                 scanner: FileScanner = None,
                 mode: ProcessingMode = ProcessingMode.THREAD_POOL,
                 max_workers: int = None):
        self.scanner = scanner or FileScanner()
        self.mode = mode
        self.max_workers = max_workers

    def process_directory(self,
                         directory: Path,
                         processor: Callable[[FileInfo], Any],
                         progress_callback: Callable[[int, int], None] = None
                         ) -> BatchResult:
        """Process all matching files in a directory."""
        files = self.scanner.list_files(directory)
        logger.info(f"Found {len(files)} files to process")

        if not files:
            return BatchResult()

        parallel = ParallelProcessor(
            processor=processor,
            mode=self.mode,
            max_workers=self.max_workers
        )

        if progress_callback:
            parallel.on_progress(progress_callback)

        return parallel.process(files)

    def aggregate_csv(self,
                      directory: Path,
                      output_path: Path = None,
                      **read_kwargs) -> pd.DataFrame:
        """Read and aggregate all CSV files in directory."""
        self.scanner = FileScanner(extensions={'.csv'})

        def read_csv(file_info: FileInfo) -> pd.DataFrame:
            df = pd.read_csv(file_info.path, **read_kwargs)
            df['_source_file'] = file_info.path.name
            return df

        result = self.process_directory(directory, read_csv)
        dfs = result.successful_results()

        if not dfs:
            return pd.DataFrame()

        combined = pd.concat(dfs, ignore_index=True)

        if output_path:
            combined.to_csv(output_path, index=False)

        return combined

    def extract_all_zips(self,
                         directory: Path,
                         output_directory: Path
                         ) -> BatchResult:
        """Extract all ZIP files in directory."""
        import zipfile

        self.scanner = FileScanner(extensions={'.zip'})
        output_directory.mkdir(parents=True, exist_ok=True)

        def extract_zip(file_info: FileInfo) -> Dict:
            extract_dir = output_directory / file_info.path.stem
            extract_dir.mkdir(exist_ok=True)

            with zipfile.ZipFile(file_info.path, 'r') as zf:
                zf.extractall(extract_dir)
                return {
                    'source': str(file_info.path),
                    'destination': str(extract_dir),
                    'files_extracted': len(zf.namelist())
                }

        return self.process_directory(directory, extract_zip)
python
class FileProcessor:
    """
    支持并行执行的高级文件处理工具。

    整合了扫描、过滤和并行处理功能。
    """

    def __init__(self,
                 scanner: FileScanner = None,
                 mode: ProcessingMode = ProcessingMode.THREAD_POOL,
                 max_workers: int = None):
        self.scanner = scanner or FileScanner()
        self.mode = mode
        self.max_workers = max_workers

    def process_directory(self,
                         directory: Path,
                         processor: Callable[[FileInfo], Any],
                         progress_callback: Callable[[int, int], None] = None
                         ) -> BatchResult:
        """处理目录中所有匹配的文件。"""
        files = self.scanner.list_files(directory)
        logger.info(f"找到 {len(files)} 个待处理文件")

        if not files:
            return BatchResult()

        parallel = ParallelProcessor(
            processor=processor,
            mode=self.mode,
            max_workers=self.max_workers
        )

        if progress_callback:
            parallel.on_progress(progress_callback)

        return parallel.process(files)

    def aggregate_csv(self,
                      directory: Path,
                      output_path: Path = None,
                      **read_kwargs) -> pd.DataFrame:
        """读取并聚合目录中的所有CSV文件。"""
        self.scanner = FileScanner(extensions={'.csv'})

        def read_csv(file_info: FileInfo) -> pd.DataFrame:
            df = pd.read_csv(file_info.path, **read_kwargs)
            df['_source_file'] = file_info.path.name
            return df

        result = self.process_directory(directory, read_csv)
        dfs = result.successful_results()

        if not dfs:
            return pd.DataFrame()

        combined = pd.concat(dfs, ignore_index=True)

        if output_path:
            combined.to_csv(output_path, index=False)

        return combined

    def extract_all_zips(self,
                         directory: Path,
                         output_directory: Path
                         ) -> BatchResult:
        """提取目录中的所有ZIP文件。"""
        import zipfile

        self.scanner = FileScanner(extensions={'.zip'})
        output_directory.mkdir(parents=True, exist_ok=True)

        def extract_zip(file_info: FileInfo) -> Dict:
            extract_dir = output_directory / file_info.path.stem
            extract_dir.mkdir(exist_ok=True)

            with zipfile.ZipFile(file_info.path, 'r') as zf:
                zf.extractall(extract_dir)
                return {
                    'source': str(file_info.path),
                    'destination': str(extract_dir),
                    'files_extracted': len(zf.namelist())
                }

        return self.process_directory(directory, extract_zip)

Progress Tracking

进度跟踪器

python
from datetime import datetime, timedelta
import sys

class ProgressTracker:
    """Track and display processing progress."""

    def __init__(self,
                 total: int,
                 description: str = "Processing",
                 show_eta: bool = True,
                 bar_width: int = 40):
        self.total = total
        self.description = description
        self.show_eta = show_eta
        self.bar_width = bar_width
        self.completed = 0
        self.start_time: Optional[datetime] = None

    def start(self):
        """Start tracking."""
        self.start_time = datetime.now()
        self.completed = 0
        self._display()

    def update(self, completed: int, total: int):
        """Update progress."""
        self.completed = completed
        self.total = total
        self._display()

    def _display(self):
        """Display progress bar."""
        if self.total == 0:
            return

        pct = self.completed / self.total
        filled = int(self.bar_width * pct)
        bar = '#' * filled + '-' * (self.bar_width - filled)

        # Calculate ETA
        eta_str = ""
        if self.show_eta and self.start_time and self.completed > 0:
            elapsed = (datetime.now() - self.start_time).total_seconds()
            rate = self.completed / elapsed
            remaining = (self.total - self.completed) / rate if rate > 0 else 0
            eta_str = f" ETA: {timedelta(seconds=int(remaining))}"

        line = (f"\r{self.description}: |{bar}| "
                f"{self.completed}/{self.total} ({pct*100:.1f}%){eta_str}")

        sys.stdout.write(line)
        sys.stdout.flush()

        if self.completed == self.total:
            print()

    def finish(self):
        """Mark processing as complete."""
        self.completed = self.total
        self._display()
python
from datetime import datetime, timedelta
import sys

class ProgressTracker:
    """跟踪并显示处理进度。"""

    def __init__(self,
                 total: int,
                 description: str = "Processing",
                 show_eta: bool = True,
                 bar_width: int = 40):
        self.total = total
        self.description = description
        self.show_eta = show_eta
        self.bar_width = bar_width
        self.completed = 0
        self.start_time: Optional[datetime] = None

    def start(self):
        """开始跟踪进度。"""
        self.start_time = datetime.now()
        self.completed = 0
        self._display()

    def update(self, completed: int, total: int):
        """更新进度。"""
        self.completed = completed
        self.total = total
        self._display()

    def _display(self):
        """显示进度条。"""
        if self.total == 0:
            return

        pct = self.completed / self.total
        filled = int(self.bar_width * pct)
        bar = '#' * filled + '-' * (self.bar_width - filled)

        # 计算预计完成时间
        eta_str = ""
        if self.show_eta and self.start_time and self.completed > 0:
            elapsed = (datetime.now() - self.start_time).total_seconds()
            rate = self.completed / elapsed
            remaining = (self.total - self.completed) / rate if rate > 0 else 0
            eta_str = f" 预计剩余时间: {timedelta(seconds=int(remaining))}"

        line = (f"\r{self.description}: |{bar}| "
                f"{self.completed}/{self.total} ({pct*100:.1f}%){eta_str}")

        sys.stdout.write(line)
        sys.stdout.flush()

        if self.completed == self.total:
            print()

    def finish(self):
        """标记处理完成。"""
        self.completed = self.total
        self._display()

Result Aggregator

结果聚合器

python
import json

class ResultAggregator:
    """Aggregate and export batch processing results."""

    def __init__(self, batch_result: BatchResult):
        self.batch_result = batch_result

    def to_dataframe(self) -> pd.DataFrame:
        """Convert results to DataFrame."""
        data = []
        for r in self.batch_result.results:
            row = {
                'file_path': str(r.file_info.path),
                'file_name': r.file_info.path.name,
                'file_size': r.file_info.size_bytes,
                'success': r.success,
                'duration_seconds': r.duration_seconds,
                'error': r.error
            }

            if r.success and isinstance(r.result, dict):
                for k, v in r.result.items():
                    if not k.startswith('_'):
                        row[f'result_{k}'] = v

            data.append(row)

        return pd.DataFrame(data)

    def summary(self) -> Dict[str, Any]:
        """Generate summary statistics."""
        return {
            'total_files': self.batch_result.total_files,
            'successful': self.batch_result.successful,
            'failed': self.batch_result.failed,
            'success_rate_pct': self.batch_result.success_rate,
            'total_duration_seconds': self.batch_result.total_duration_seconds,
            'avg_duration_seconds': (
                self.batch_result.total_duration_seconds /
                self.batch_result.total_files
                if self.batch_result.total_files > 0 else 0
            ),
            'errors': self.batch_result.errors[:10]
        }

    def export_csv(self, path: Path):
        """Export results to CSV."""
        df = self.to_dataframe()
        df.to_csv(path, index=False)

    def export_json(self, path: Path):
        """Export summary to JSON."""
        summary = self.summary()
        with open(path, 'w') as f:
            json.dump(summary, f, indent=2)

    def combine_dataframes(self) -> pd.DataFrame:
        """Combine results that are DataFrames."""
        dfs = [r for r in self.batch_result.successful_results()
               if isinstance(r, pd.DataFrame)]

        if not dfs:
            return pd.DataFrame()

        return pd.concat(dfs, ignore_index=True)
python
import json

class ResultAggregator:
    """聚合并导出批处理结果。"""

    def __init__(self, batch_result: BatchResult):
        self.batch_result = batch_result

    def to_dataframe(self) -> pd.DataFrame:
        """将结果转换为DataFrame。"""
        data = []
        for r in self.batch_result.results:
            row = {
                'file_path': str(r.file_info.path),
                'file_name': r.file_info.path.name,
                'file_size': r.file_info.size_bytes,
                'success': r.success,
                'duration_seconds': r.duration_seconds,
                'error': r.error
            }

            if r.success and isinstance(r.result, dict):
                for k, v in r.result.items():
                    if not k.startswith('_'):
                        row[f'result_{k}'] = v

            data.append(row)

        return pd.DataFrame(data)

    def summary(self) -> Dict[str, Any]:
        """生成汇总统计信息。"""
        return {
            'total_files': self.batch_result.total_files,
            'successful': self.batch_result.successful,
            'failed': self.batch_result.failed,
            'success_rate_pct': self.batch_result.success_rate,
            'total_duration_seconds': self.batch_result.total_duration_seconds,
            'avg_duration_seconds': (
                self.batch_result.total_duration_seconds /
                self.batch_result.total_files
                if self.batch_result.total_files > 0 else 0
            ),
            'errors': self.batch_result.errors[:10]
        }

    def export_csv(self, path: Path):
        """将结果导出为CSV文件。"""
        df = self.to_dataframe()
        df.to_csv(path, index=False)

    def export_json(self, path: Path):
        """将汇总信息导出为JSON文件。"""
        summary = self.summary()
        with open(path, 'w') as f:
            json.dump(summary, f, indent=2)

    def combine_dataframes(self) -> pd.DataFrame:
        """合并结果中的DataFrame对象。"""
        dfs = [r for r in self.batch_result.successful_results()
               if isinstance(r, pd.DataFrame)]

        if not dfs:
            return pd.DataFrame()

        return pd.concat(dfs, ignore_index=True)

YAML Configuration

YAML配置

Basic Configuration

基础配置

yaml
undefined
yaml
undefined

config/parallel_processing.yaml

config/parallel_processing.yaml

scan: directory: "data/raw/" recursive: true
include_patterns: - ".csv" - ".xlsx"
exclude_patterns: - "_backup" - "~$*"
extensions: - ".csv" - ".xlsx"
size_limits: min_bytes: 100 max_bytes: 104857600 # 100MB
processing: mode: thread_pool # sequential, thread_pool, process_pool, async max_workers: 8 batch_size: 100 timeout_seconds: 30
output: results_csv: "data/results/processing_results.csv" summary_json: "data/results/summary.json" combined_output: "data/processed/combined.csv"
progress: enabled: true show_eta: true
undefined
scan: directory: "data/raw/" recursive: true
include_patterns: - ".csv" - ".xlsx"
exclude_patterns: - "_backup" - "~$*"
extensions: - ".csv" - ".xlsx"
size_limits: min_bytes: 100 max_bytes: 104857600 # 100MB
processing: mode: thread_pool # sequential, thread_pool, process_pool, async max_workers: 8 batch_size: 100 timeout_seconds: 30
output: results_csv: "data/results/processing_results.csv" summary_json: "data/results/summary.json" combined_output: "data/processed/combined.csv"
progress: enabled: true show_eta: true
undefined

Usage Examples

使用示例

Example 1: Process CSV Files

示例1: 处理CSV文件

python
from parallel_file_processor import (
    FileScanner, FileProcessor, ProcessingMode,
    ProgressTracker, ResultAggregator
)
from pathlib import Path
import pandas as pd
python
from parallel_file_processor import (
    FileScanner, FileProcessor, ProcessingMode,
    ProgressTracker, ResultAggregator
)
from pathlib import Path
import pandas as pd

Define processing function

定义处理函数

def process_csv(file_info): """Extract statistics from CSV file.""" df = pd.read_csv(file_info.path) return { 'rows': len(df), 'columns': len(df.columns), 'memory_mb': df.memory_usage(deep=True).sum() / 1e6, 'numeric_columns': len(df.select_dtypes(include='number').columns) }
def process_csv(file_info): """从CSV文件中提取统计信息。""" df = pd.read_csv(file_info.path) return { 'rows': len(df), 'columns': len(df.columns), 'memory_mb': df.memory_usage(deep=True).sum() / 1e6, 'numeric_columns': len(df.select_dtypes(include='number').columns) }

Setup scanner and processor

设置扫描器和处理器

scanner = FileScanner(extensions={'.csv'}) processor = FileProcessor( scanner=scanner, mode=ProcessingMode.THREAD_POOL, max_workers=8 )
scanner = FileScanner(extensions={'.csv'}) processor = FileProcessor( scanner=scanner, mode=ProcessingMode.THREAD_POOL, max_workers=8 )

Create progress tracker

创建进度跟踪器

tracker = ProgressTracker(0, "Processing CSVs") tracker.start()
tracker = ProgressTracker(0, "处理CSV文件") tracker.start()

Process with progress

带进度跟踪的处理

result = processor.process_directory( Path("data/raw/"), process_csv, progress_callback=tracker.update )
tracker.finish()
result = processor.process_directory( Path("data/raw/"), process_csv, progress_callback=tracker.update )
tracker.finish()

Aggregate results

聚合结果

aggregator = ResultAggregator(result) print(f"\nSummary: {aggregator.summary()}") aggregator.export_csv(Path("data/results/csv_stats.csv"))
undefined
aggregator = ResultAggregator(result) print(f"\n汇总信息: {aggregator.summary()}") aggregator.export_csv(Path("data/results/csv_stats.csv"))
undefined

Example 2: Parallel ZIP Extraction

示例2: 并行提取ZIP文件

python
undefined
python
undefined

Extract all ZIPs in parallel

并行提取所有ZIP文件

processor = FileProcessor(mode=ProcessingMode.THREAD_POOL)
result = processor.extract_all_zips( directory=Path("data/archives/"), output_directory=Path("data/extracted/") )
print(f"Extracted {result.successful} ZIP files") print(f"Failed: {result.failed}")
processor = FileProcessor(mode=ProcessingMode.THREAD_POOL)
result = processor.extract_all_zips( directory=Path("data/archives/"), output_directory=Path("data/extracted/") )
print(f"已提取 {result.successful} 个ZIP文件") print(f"失败数量: {result.failed}")

Get extraction details

获取提取详情

aggregator = ResultAggregator(result) df = aggregator.to_dataframe() total_files = df['result_files_extracted'].sum() print(f"Total files extracted: {total_files}")
undefined
aggregator = ResultAggregator(result) df = aggregator.to_dataframe() total_files = df['result_files_extracted'].sum() print(f"提取的文件总数: {total_files}")
undefined

Example 3: Aggregate Data from Multiple Sources

示例3: 聚合多源数据

python
undefined
python
undefined

Aggregate CSV files with custom processing

带自定义处理的CSV文件聚合

def load_and_clean(file_info): """Load CSV and perform basic cleaning.""" df = pd.read_csv(file_info.path)
# Clean column names
df.columns = [c.lower().strip().replace(' ', '_') for c in df.columns]

# Add metadata
df['_source'] = file_info.path.name
df['_loaded_at'] = pd.Timestamp.now()

return df
processor = FileProcessor( scanner=FileScanner(extensions={'.csv'}), mode=ProcessingMode.THREAD_POOL )
result = processor.process_directory( Path("data/monthly_reports/"), load_and_clean )
def load_and_clean(file_info): """加载CSV文件并执行基础清洗。""" df = pd.read_csv(file_info.path)
# 清洗列名
df.columns = [c.lower().strip().replace(' ', '_') for c in df.columns]

# 添加元数据
df['_source'] = file_info.path.name
df['_loaded_at'] = pd.Timestamp.now()

return df
processor = FileProcessor( scanner=FileScanner(extensions={'.csv'}), mode=ProcessingMode.THREAD_POOL )
result = processor.process_directory( Path("data/monthly_reports/"), load_and_clean )

Combine all DataFrames

合并所有DataFrame

aggregator = ResultAggregator(result) combined_df = aggregator.combine_dataframes()
print(f"Combined {len(combined_df)} rows from {result.successful} files") combined_df.to_csv("data/combined_reports.csv", index=False)
undefined
aggregator = ResultAggregator(result) combined_df = aggregator.combine_dataframes()
print(f"从 {result.successful} 个文件中合并得到 {len(combined_df)} 行数据") combined_df.to_csv("data/combined_reports.csv", index=False)
undefined

Example 4: Custom Batch Processing

示例4: 自定义批处理

python
from parallel_file_processor import ParallelProcessor, ProcessingMode
python
from parallel_file_processor import ParallelProcessor, ProcessingMode

Process list of items (not files)

处理非文件类型的项列表

items = list(range(1000))
def heavy_computation(item): """CPU-intensive calculation.""" import math result = sum(math.sin(i * item) for i in range(10000)) return {'item': item, 'result': result}
items = list(range(1000))
def heavy_computation(item): """CPU密集型计算。""" import math result = sum(math.sin(i * item) for i in range(10000)) return {'item': item, 'result': result}

Use process pool for CPU-bound work

对CPU密集型任务使用进程池

processor = ParallelProcessor( processor=heavy_computation, mode=ProcessingMode.PROCESS_POOL, max_workers=4 )
processor = ParallelProcessor( processor=heavy_computation, mode=ProcessingMode.PROCESS_POOL, max_workers=4 )

Track progress

跟踪进度

def show_progress(completed, total): pct = (completed / total) * 100 print(f"\rProgress: {pct:.1f}%", end='', flush=True)
processor.on_progress(show_progress)
result = processor.process(items) print(f"\nCompleted {result.successful}/{result.total_files} items")
undefined
def show_progress(completed, total): pct = (completed / total) * 100 print(f"\r进度: {pct:.1f}%", end='', flush=True)
processor.on_progress(show_progress)
result = processor.process(items) print(f"\n已完成 {result.successful}/{result.total_files} 个项")
undefined

Performance Tips

性能优化建议

Mode Selection

模式选择

Workload TypeRecommended ModeReason
File I/O
THREAD_POOL
IO-bound, threads avoid GIL issues
Data parsing
THREAD_POOL
Pandas releases GIL during IO
CPU computation
PROCESS_POOL
Bypasses GIL for true parallelism
Network requests
ASYNC
Best for many concurrent connections
Simple operations
SEQUENTIAL
Overhead may exceed benefit
工作负载类型推荐模式原因
文件IO
THREAD_POOL
IO密集型任务,线程可避免GIL限制
数据解析
THREAD_POOL
Pandas在IO操作期间会释放GIL
CPU计算
PROCESS_POOL
绕过GIL实现真正的并行
网络请求
ASYNC
最适合大量并发连接
简单操作
SEQUENTIAL
并行开销可能超过收益

Worker Count

工作线程数

python
import os
python
import os

IO-bound (reading files, network)

IO密集型(读取文件、网络)

io_workers = os.cpu_count() * 2
io_workers = os.cpu_count() * 2

CPU-bound (heavy computation)

CPU密集型(重度计算)

cpu_workers = os.cpu_count()
cpu_workers = os.cpu_count()

Memory-constrained (large files)

内存受限(大文件)

memory_workers = max(2, os.cpu_count() // 2)
undefined
memory_workers = max(2, os.cpu_count() // 2)
undefined

Batch Size

批处理大小

  • Small files (<1MB): Large batches (500-1000)
  • Medium files (1-100MB): Medium batches (50-100)
  • Large files (>100MB): Small batches (10-20) or one at a time
  • 小文件(<1MB): 大批次(500-1000)
  • 中等文件(1-100MB): 中批次(50-100)
  • 大文件(>100MB): 小批次(10-20)或逐个处理

Best Practices

最佳实践

Do

建议

  1. Choose correct processing mode for workload type
  2. Use progress callbacks for long operations
  3. Batch large file sets to manage memory
  4. Log individual failures for debugging
  5. Consider retry logic for transient errors
  6. Monitor memory usage with large DataFrames
  1. 根据工作负载类型选择正确的处理模式
  2. 对长时间运行的操作使用进度回调
  3. 对大型文件集使用批处理以管理内存
  4. 记录单个失败案例以便调试
  5. 考虑为临时错误添加重试逻辑
  6. 处理大型DataFrame时监控内存使用

Don't

避免

  1. Use process pool for IO-bound tasks
  2. Skip error handling in processor functions
  3. Load all results into memory at once
  4. Ignore batch result statistics
  5. Use too many workers for memory-constrained tasks
  1. 对IO密集型任务使用进程池
  2. 在处理函数中跳过错误处理
  3. 一次性将所有结果加载到内存
  4. 忽略批处理结果统计信息
  5. 对内存受限任务使用过多工作线程

Error Handling

错误处理

Common Errors

常见错误

ErrorCauseSolution
MemoryError
Too many files loadedUse batching or streaming
PermissionError
File access deniedCheck file permissions
TimeoutError
Processing too slowIncrease timeout or optimize
OSError
Too many open filesReduce max_workers
错误原因解决方案
MemoryError
加载的文件过多使用批处理或流式处理
PermissionError
文件访问被拒绝检查文件权限
TimeoutError
处理速度过慢增加超时时间或优化处理逻辑
OSError
打开的文件过多减少最大工作线程数

Error Template

错误处理模板

python
def safe_process_directory(directory: Path, processor: Callable) -> dict:
    """Process directory with comprehensive error handling."""
    try:
        if not directory.exists():
            return {'status': 'error', 'message': 'Directory not found'}

        file_processor = FileProcessor()
        result = file_processor.process_directory(directory, processor)

        if result.failed > 0:
            return {
                'status': 'partial',
                'successful': result.successful,
                'failed': result.failed,
                'errors': result.errors[:10]
            }

        return {'status': 'success', 'processed': result.successful}

    except Exception as e:
        return {'status': 'error', 'message': str(e)}
python
def safe_process_directory(directory: Path, processor: Callable) -> dict:
    """包含全面错误处理的目录处理函数。"""
    try:
        if not directory.exists():
            return {'status': 'error', 'message': '目录未找到'}

        file_processor = FileProcessor()
        result = file_processor.process_directory(directory, processor)

        if result.failed > 0:
            return {
                'status': 'partial',
                'successful': result.successful,
                'failed': result.failed,
                'errors': result.errors[:10]
            }

        return {'status': 'success', 'processed': result.successful}

    except Exception as e:
        return {'status': 'error', 'message': str(e)}

Execution Checklist

执行检查清单

  • Processing mode matches workload type
  • Worker count appropriate for resources
  • Batch size prevents memory issues
  • Progress callback configured for feedback
  • Error handling in processor function
  • Results aggregated and exported
  • Summary statistics reviewed
  • Failed files identified and logged
  • 处理模式与工作负载类型匹配
  • 工作线程数与资源匹配
  • 批处理大小可避免内存问题
  • 已配置进度回调以获取反馈
  • 处理函数中包含错误处理
  • 结果已聚合并导出
  • 已查看汇总统计信息
  • 已识别并记录失败文件

Metrics

指标

MetricTargetDescription
Throughput2-3x sequentialParallel speedup factor
Success Rate>99%Percentage of files processed
Memory Usage<4GBPeak memory consumption
Error Rate<1%Processing failures
指标目标描述
吞吐量比串行快2-3倍并行加速比
成功率>99%成功处理的文件百分比
内存使用<4GB峰值内存消耗
错误率<1%处理失败的比例

Related Skills

相关技能

  • data-pipeline-processor - Data transformation
  • yaml-workflow-executor - Workflow automation
  • engineering-report-generator - Report generation

  • data-pipeline-processor - 数据转换
  • yaml-workflow-executor - 工作流自动化
  • engineering-report-generator - 报告生成

Version History

版本历史

  • 1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples
  • 1.0.0 (2024-10-15): Initial release with FileScanner, ParallelProcessor, progress tracking, result aggregation
  • 1.1.0 (2026-01-02): 升级为SKILL_TEMPLATE_v2格式,新增快速开始、错误处理、指标、执行检查清单和更多示例
  • 1.0.0 (2024-10-15): 初始版本,包含FileScanner、ParallelProcessor、进度跟踪和结果聚合功能