batch-processor
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBatch Processor Skill
批量处理器Skill
Overview
概述
This skill enables efficient bulk processing of documents - convert, transform, extract, or analyze hundreds of files with parallel execution and progress tracking.
该Skill可实现高效的文档批量处理——通过并行执行和进度跟踪,完成数百个文件的转换、转译、提取或分析。
How to Use
使用方法
- Describe what you want to accomplish
- Provide any required input data or files
- I'll execute the appropriate operations
Example prompts:
- "Convert 100 PDFs to Word documents"
- "Extract text from all images in a folder"
- "Batch rename and organize files"
- "Mass update document headers/footers"
- 描述你想要完成的任务
- 提供所需的输入数据或文件
- 我会执行相应的操作
示例提示:
- "将100个PDF转换为Word文档"
- "提取文件夹中所有图片的文本"
- "批量重命名并整理文件"
- "批量更新文档页眉/页脚"
Domain Knowledge
领域知识
Batch Processing Patterns
批量处理模式
Input: [file1, file2, ..., fileN]
│
▼
┌─────────────┐
│ Parallel │ ← Process multiple files concurrently
│ Workers │
└─────────────┘
│
▼
Output: [result1, result2, ..., resultN]Input: [file1, file2, ..., fileN]
│
▼
┌─────────────┐
│ Parallel │ ← 同时处理多个文件
│ Workers │
└─────────────┘
│
▼
Output: [result1, result2, ..., resultN]Python Implementation
Python实现
python
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
def process_file(file_path: Path) -> dict:
"""Process a single file."""
# Your processing logic here
return {"path": str(file_path), "status": "success"}
def batch_process(input_dir: str, pattern: str = "*.*", max_workers: int = 4):
"""Process all matching files in directory."""
files = list(Path(input_dir).glob(pattern))
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_file, f): f for f in files}
for future in tqdm(as_completed(futures), total=len(files)):
file = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({"path": str(file), "error": str(e)})
return resultspython
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
def process_file(file_path: Path) -> dict:
"""处理单个文件。"""
# 在此处添加你的处理逻辑
return {"path": str(file_path), "status": "success"}
def batch_process(input_dir: str, pattern: str = "*.*", max_workers: int = 4):
"""处理目录中所有匹配的文件。"""
files = list(Path(input_dir).glob(pattern))
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_file, f): f for f in files}
for future in tqdm(as_completed(futures), total=len(files)):
file = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({"path": str(file), "error": str(e)})
return resultsUsage
使用示例
results = batch_process("/documents/invoices", "*.pdf", max_workers=8)
print(f"Processed {len(results)} files")
undefinedresults = batch_process("/documents/invoices", "*.pdf", max_workers=8)
print(f"Processed {len(results)} files")
undefinedError Handling & Resume
错误处理与断点续传
python
import json
from pathlib import Path
class BatchProcessor:
def __init__(self, checkpoint_file: str = "checkpoint.json"):
self.checkpoint_file = checkpoint_file
self.processed = self._load_checkpoint()
def _load_checkpoint(self):
if Path(self.checkpoint_file).exists():
return json.load(open(self.checkpoint_file))
return {}
def _save_checkpoint(self):
json.dump(self.processed, open(self.checkpoint_file, "w"))
def process(self, files: list, processor_func):
for file in files:
if str(file) in self.processed:
continue # Skip already processed
try:
result = processor_func(file)
self.processed[str(file)] = {"status": "success", **result}
except Exception as e:
self.processed[str(file)] = {"status": "error", "error": str(e)}
self._save_checkpoint() # Resume-safepython
import json
from pathlib import Path
class BatchProcessor:
def __init__(self, checkpoint_file: str = "checkpoint.json"):
self.checkpoint_file = checkpoint_file
self.processed = self._load_checkpoint()
def _load_checkpoint(self):
if Path(self.checkpoint_file).exists():
return json.load(open(self.checkpoint_file))
return {}
def _save_checkpoint(self):
json.dump(self.processed, open(self.checkpoint_file, "w"))
def process(self, files: list, processor_func):
for file in files:
if str(file) in self.processed:
continue # 跳过已处理的文件
try:
result = processor_func(file)
self.processed[str(file)] = {"status": "success", **result}
except Exception as e:
self.processed[str(file)] = {"status": "error", "error": str(e)}
self._save_checkpoint() # 支持断点续传Best Practices
最佳实践
- Use progress bars (tqdm) for user feedback
- Implement checkpointing for long jobs
- Set reasonable worker counts (CPU cores)
- Log failures for later review
- 使用进度条(tqdm)为用户提供反馈
- 为长时间任务实现 checkpoint 机制
- 设置合理的工作线程数(基于CPU核心数)
- 记录失败情况以便后续查看
Installation
安装
bash
undefinedbash
undefinedInstall required dependencies
安装所需依赖
pip install python-docx openpyxl python-pptx reportlab jinja2
undefinedpip install python-docx openpyxl python-pptx reportlab jinja2
undefined