batch-processor

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Batch Processor Skill

批量处理器Skill

Overview

概述

This skill enables efficient bulk processing of documents - convert, transform, extract, or analyze hundreds of files with parallel execution and progress tracking.
该Skill可实现高效的文档批量处理——通过并行执行和进度跟踪,完成数百个文件的转换、转译、提取或分析。

How to Use

使用方法

  1. Describe what you want to accomplish
  2. Provide any required input data or files
  3. I'll execute the appropriate operations
Example prompts:
  • "Convert 100 PDFs to Word documents"
  • "Extract text from all images in a folder"
  • "Batch rename and organize files"
  • "Mass update document headers/footers"
  1. 描述你想要完成的任务
  2. 提供所需的输入数据或文件
  3. 我会执行相应的操作
示例提示:
  • "将100个PDF转换为Word文档"
  • "提取文件夹中所有图片的文本"
  • "批量重命名并整理文件"
  • "批量更新文档页眉/页脚"

Domain Knowledge

领域知识

Batch Processing Patterns

批量处理模式

Input: [file1, file2, ..., fileN]
    ┌─────────────┐
    │  Parallel   │  ← Process multiple files concurrently
    │  Workers    │
    └─────────────┘
Output: [result1, result2, ..., resultN]
Input: [file1, file2, ..., fileN]
    ┌─────────────┐
    │  Parallel   │  ← 同时处理多个文件
    │  Workers    │
    └─────────────┘
Output: [result1, result2, ..., resultN]

Python Implementation

Python实现

python
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm

def process_file(file_path: Path) -> dict:
    """Process a single file."""
    # Your processing logic here
    return {"path": str(file_path), "status": "success"}

def batch_process(input_dir: str, pattern: str = "*.*", max_workers: int = 4):
    """Process all matching files in directory."""
    
    files = list(Path(input_dir).glob(pattern))
    results = []
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        
        for future in tqdm(as_completed(futures), total=len(files)):
            file = futures[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                results.append({"path": str(file), "error": str(e)})
    
    return results
python
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm

def process_file(file_path: Path) -> dict:
    """处理单个文件。"""
    # 在此处添加你的处理逻辑
    return {"path": str(file_path), "status": "success"}

def batch_process(input_dir: str, pattern: str = "*.*", max_workers: int = 4):
    """处理目录中所有匹配的文件。"""
    
    files = list(Path(input_dir).glob(pattern))
    results = []
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        
        for future in tqdm(as_completed(futures), total=len(files)):
            file = futures[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                results.append({"path": str(file), "error": str(e)})
    
    return results

Usage

使用示例

results = batch_process("/documents/invoices", "*.pdf", max_workers=8) print(f"Processed {len(results)} files")
undefined
results = batch_process("/documents/invoices", "*.pdf", max_workers=8) print(f"Processed {len(results)} files")
undefined

Error Handling & Resume

错误处理与断点续传

python
import json
from pathlib import Path

class BatchProcessor:
    def __init__(self, checkpoint_file: str = "checkpoint.json"):
        self.checkpoint_file = checkpoint_file
        self.processed = self._load_checkpoint()
    
    def _load_checkpoint(self):
        if Path(self.checkpoint_file).exists():
            return json.load(open(self.checkpoint_file))
        return {}
    
    def _save_checkpoint(self):
        json.dump(self.processed, open(self.checkpoint_file, "w"))
    
    def process(self, files: list, processor_func):
        for file in files:
            if str(file) in self.processed:
                continue  # Skip already processed
            
            try:
                result = processor_func(file)
                self.processed[str(file)] = {"status": "success", **result}
            except Exception as e:
                self.processed[str(file)] = {"status": "error", "error": str(e)}
            
            self._save_checkpoint()  # Resume-safe
python
import json
from pathlib import Path

class BatchProcessor:
    def __init__(self, checkpoint_file: str = "checkpoint.json"):
        self.checkpoint_file = checkpoint_file
        self.processed = self._load_checkpoint()
    
    def _load_checkpoint(self):
        if Path(self.checkpoint_file).exists():
            return json.load(open(self.checkpoint_file))
        return {}
    
    def _save_checkpoint(self):
        json.dump(self.processed, open(self.checkpoint_file, "w"))
    
    def process(self, files: list, processor_func):
        for file in files:
            if str(file) in self.processed:
                continue  # 跳过已处理的文件
            
            try:
                result = processor_func(file)
                self.processed[str(file)] = {"status": "success", **result}
            except Exception as e:
                self.processed[str(file)] = {"status": "error", "error": str(e)}
            
            self._save_checkpoint()  # 支持断点续传

Best Practices

最佳实践

  1. Use progress bars (tqdm) for user feedback
  2. Implement checkpointing for long jobs
  3. Set reasonable worker counts (CPU cores)
  4. Log failures for later review
  1. 使用进度条(tqdm)为用户提供反馈
  2. 为长时间任务实现 checkpoint 机制
  3. 设置合理的工作线程数(基于CPU核心数)
  4. 记录失败情况以便后续查看

Installation

安装

bash
undefined
bash
undefined

Install required dependencies

安装所需依赖

pip install python-docx openpyxl python-pptx reportlab jinja2
undefined
pip install python-docx openpyxl python-pptx reportlab jinja2
undefined

Resources

资源