document-rag-pipeline

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Document RAG Pipeline Skill

文档RAG流水线Skill

Overview

概述

This skill creates a complete Retrieval-Augmented Generation (RAG) system from a folder of documents. It handles:
  • Regular PDF text extraction
  • OCR for scanned/image-based PDFs
  • DRM-protected file detection
  • Text chunking with overlap
  • Vector embedding generation
  • SQLite storage with full-text search
  • Semantic similarity search
该Skill可从文档文件夹中创建完整的检索增强生成(RAG)系统。它支持:
  • 常规PDF文本提取
  • 扫描/图片类PDF的OCR识别
  • DRM保护文件检测
  • 带重叠的文本分块
  • 向量嵌入生成
  • 支持全文搜索的SQLite存储
  • 语义相似度搜索

Quick Start

快速开始

bash
undefined
bash
undefined

Install dependencies

Install dependencies

pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdm
pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdm

Build knowledge base

Build knowledge base

python build_knowledge_base.py /path/to/documents --embed
python build_knowledge_base.py /path/to/documents --embed

Search documents

Search documents

python build_knowledge_base.py /path/to/documents --search "your query"
undefined
python build_knowledge_base.py /path/to/documents --search "your query"
undefined

When to Use

适用场景

  • Building searchable knowledge bases from document folders
  • Processing technical standards libraries (API, ISO, ASME, etc.)
  • Creating semantic search over engineering documents
  • OCR processing of scanned historical documents
  • Any collection of PDFs needing intelligent search
  • 从文档文件夹构建可搜索的知识库
  • 处理技术标准文档库(API、ISO、ASME等)
  • 为工程文档创建语义搜索功能
  • 扫描历史文档的OCR处理
  • 任何需要智能搜索的PDF文档集合

Architecture

架构

Document Folder
┌─────────────────────┐
│ 1. Build Inventory  │  SQLite catalog of all files
└──────────┬──────────┘
┌─────────────────────┐
│ 2. Extract Text     │  PyMuPDF for regular PDFs
└──────────┬──────────┘
┌─────────────────────┐
│ 3. OCR Scanned PDFs │  Tesseract + pytesseract
└──────────┬──────────┘
┌─────────────────────┐
│ 4. Chunk Text       │  1000 chars, 200 overlap
└──────────┬──────────┘
┌─────────────────────┐
│ 5. Generate Embeds  │  sentence-transformers
└──────────┬──────────┘
┌─────────────────────┐
│ 6. Semantic Search  │  Cosine similarity
└─────────────────────┘
Document Folder
┌─────────────────────┐
│ 1. Build Inventory  │  SQLite catalog of all files
└──────────┬──────────┘
┌─────────────────────┐
│ 2. Extract Text     │  PyMuPDF for regular PDFs
└──────────┬──────────┘
┌─────────────────────┐
│ 3. OCR Scanned PDFs │  Tesseract + pytesseract
└──────────┬──────────┘
┌─────────────────────┐
│ 4. Chunk Text       │  1000 chars, 200 overlap
└──────────┬──────────┘
┌─────────────────────┐
│ 5. Generate Embeds  │  sentence-transformers
└──────────┬──────────┘
┌─────────────────────┐
│ 6. Semantic Search  │  Cosine similarity
└─────────────────────┘

Prerequisites

前置要求

System Dependencies

系统依赖

bash
undefined
bash
undefined

Ubuntu/Debian

Ubuntu/Debian

sudo apt-get update sudo apt-get install -y tesseract-ocr tesseract-ocr-eng poppler-utils
sudo apt-get update sudo apt-get install -y tesseract-ocr tesseract-ocr-eng poppler-utils

macOS

macOS

brew install tesseract poppler
brew install tesseract poppler

Verify Tesseract

Verify Tesseract

tesseract --version # Should show 5.x
undefined
tesseract --version # Should show 5.x
undefined

Python Dependencies

Python依赖

bash
pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdm
Or with UV:
bash
uv pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdm
bash
pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdm
Or with UV:
bash
uv pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdm

Implementation

实现细节

Step 1: Database Schema

步骤1:数据库架构

python
import sqlite3
from pathlib import Path
from datetime import datetime

def create_database(db_path):
    """Create SQLite database with full schema."""
    conn = sqlite3.connect(db_path, timeout=30)
    cursor = conn.cursor()

    # Documents table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS documents (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            filename TEXT NOT NULL,
            filepath TEXT UNIQUE NOT NULL,
            file_size INTEGER,
            file_type TEXT,
            page_count INTEGER,
            extraction_method TEXT,  -- 'text', 'ocr', 'failed', 'drm_protected'
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')

    # Text chunks table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS text_chunks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            document_id INTEGER NOT NULL,
            chunk_num INTEGER NOT NULL,
            chunk_text TEXT NOT NULL,
            char_count INTEGER,
            embedding BLOB,
            embedding_model TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (document_id) REFERENCES documents(id),
            UNIQUE(document_id, chunk_num)
        )
    ''')

    # Create indexes
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON text_chunks(document_id)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_docs_filepath ON documents(filepath)')

    conn.commit()
    return conn
python
import sqlite3
from pathlib import Path
from datetime import datetime

def create_database(db_path):
    """Create SQLite database with full schema."""
    conn = sqlite3.connect(db_path, timeout=30)
    cursor = conn.cursor()

    # Documents table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS documents (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            filename TEXT NOT NULL,
            filepath TEXT UNIQUE NOT NULL,
            file_size INTEGER,
            file_type TEXT,
            page_count INTEGER,
            extraction_method TEXT,  -- 'text', 'ocr', 'failed', 'drm_protected'
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')

    # Text chunks table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS text_chunks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            document_id INTEGER NOT NULL,
            chunk_num INTEGER NOT NULL,
            chunk_text TEXT NOT NULL,
            char_count INTEGER,
            embedding BLOB,
            embedding_model TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (document_id) REFERENCES documents(id),
            UNIQUE(document_id, chunk_num)
        )
    ''')

    # Create indexes
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON text_chunks(document_id)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_docs_filepath ON documents(filepath)')

    conn.commit()
    return conn

Step 2: PDF Text Extraction

步骤2:PDF文本提取

python
import fitz  # PyMuPDF

def extract_pdf_text(pdf_path):
    """Extract text from PDF using PyMuPDF."""
    try:
        doc = fitz.open(pdf_path)
        text_parts = []

        for page_num in range(len(doc)):
            page = doc[page_num]
            text = page.get_text()
            if text.strip():
                text_parts.append(text)

        doc.close()
        full_text = "\n".join(text_parts)

        # Check if meaningful text extracted
        if len(full_text.strip()) < 100:
            return None, "no_text"

        return full_text, "text"

    except Exception as e:
        if "encrypted" in str(e).lower() or "drm" in str(e).lower():
            return None, "drm_protected"
        return None, f"error: {str(e)}"
python
import fitz  # PyMuPDF

def extract_pdf_text(pdf_path):
    """Extract text from PDF using PyMuPDF."""
    try:
        doc = fitz.open(pdf_path)
        text_parts = []

        for page_num in range(len(doc)):
            page = doc[page_num]
            text = page.get_text()
            if text.strip():
                text_parts.append(text)

        doc.close()
        full_text = "\n".join(text_parts)

        # Check if meaningful text extracted
        if len(full_text.strip()) < 100:
            return None, "no_text"

        return full_text, "text"

    except Exception as e:
        if "encrypted" in str(e).lower() or "drm" in str(e).lower():
            return None, "drm_protected"
        return None, f"error: {str(e)}"

Step 3: OCR for Scanned PDFs

步骤3:扫描PDF的OCR识别

python
import fitz
import pytesseract
from PIL import Image
import io

def ocr_pdf(pdf_path, dpi=200):
    """OCR scanned PDF using Tesseract."""
    try:
        doc = fitz.open(pdf_path)
        text_parts = []

        for page_num in range(len(doc)):
            page = doc[page_num]

            # Convert page to image
            mat = fitz.Matrix(dpi/72, dpi/72)
            pix = page.get_pixmap(matrix=mat)

            # Convert to PIL Image
            img_data = pix.tobytes("png")
            img = Image.open(io.BytesIO(img_data))

            # OCR with Tesseract
            text = pytesseract.image_to_string(img, lang='eng')
            if text.strip():
                text_parts.append(text)

        doc.close()
        full_text = "\n".join(text_parts)

        if len(full_text.strip()) < 100:
            return None, "ocr_failed"

        return full_text, "ocr"

    except Exception as e:
        return None, f"ocr_error: {str(e)}"
python
import fitz
import pytesseract
from PIL import Image
import io

def ocr_pdf(pdf_path, dpi=200):
    """OCR scanned PDF using Tesseract."""
    try:
        doc = fitz.open(pdf_path)
        text_parts = []

        for page_num in range(len(doc)):
            page = doc[page_num]

            # Convert page to image
            mat = fitz.Matrix(dpi/72, dpi/72)
            pix = page.get_pixmap(matrix=mat)

            # Convert to PIL Image
            img_data = pix.tobytes("png")
            img = Image.open(io.BytesIO(img_data))

            # OCR with Tesseract
            text = pytesseract.image_to_string(img, lang='eng')
            if text.strip():
                text_parts.append(text)

        doc.close()
        full_text = "\n".join(text_parts)

        if len(full_text.strip()) < 100:
            return None, "ocr_failed"

        return full_text, "ocr"

    except Exception as e:
        return None, f"ocr_error: {str(e)}"

Step 4: Text Chunking

步骤4:文本分块

python
def chunk_text(text, chunk_size=1000, overlap=200):
    """Split text into overlapping chunks."""
    chunks = []
    start = 0
    text_len = len(text)

    while start < text_len:
        end = start + chunk_size
        chunk = text[start:end]

        # Try to break at sentence boundary
        if end < text_len:
            last_period = chunk.rfind('.')
            last_newline = chunk.rfind('\n')
            break_point = max(last_period, last_newline)

            if break_point > chunk_size * 0.7:
                chunk = text[start:start + break_point + 1]
                end = start + break_point + 1

        chunks.append(chunk.strip())
        start = end - overlap

        if start >= text_len:
            break

    return chunks
python
def chunk_text(text, chunk_size=1000, overlap=200):
    """Split text into overlapping chunks."""
    chunks = []
    start = 0
    text_len = len(text)

    while start < text_len:
        end = start + chunk_size
        chunk = text[start:end]

        # Try to break at sentence boundary
        if end < text_len:
            last_period = chunk.rfind('.')
            last_newline = chunk.rfind('\n')
            break_point = max(last_period, last_newline)

            if break_point > chunk_size * 0.7:
                chunk = text[start:start + break_point + 1]
                end = start + break_point + 1

        chunks.append(chunk.strip())
        start = end - overlap

        if start >= text_len:
            break

    return chunks

Step 5: Embedding Generation

步骤5:嵌入生成

python
from sentence_transformers import SentenceTransformer
import numpy as np
import pickle
import os
python
from sentence_transformers import SentenceTransformer
import numpy as np
import pickle
import os

Force CPU mode (for CUDA compatibility issues)

Force CPU mode (for CUDA compatibility issues)

os.environ["CUDA_VISIBLE_DEVICES"] = ""
def create_embeddings(db_path, model_name='all-MiniLM-L6-v2', batch_size=100): """Generate embeddings for all chunks without embeddings."""
model = SentenceTransformer(model_name)
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()

# Get chunks needing embeddings
cursor.execute('''
    SELECT id, chunk_text FROM text_chunks
    WHERE embedding IS NULL
''')
chunks = cursor.fetchall()

print(f"Generating embeddings for {len(chunks)} chunks...")

for i in range(0, len(chunks), batch_size):
    batch = chunks[i:i+batch_size]
    ids = [c[0] for c in batch]
    texts = [c[1] for c in batch]

    # Generate embeddings
    embeddings = model.encode(texts, normalize_embeddings=True)

    # Store as pickled numpy arrays
    for chunk_id, emb in zip(ids, embeddings):
        emb_blob = pickle.dumps(emb.astype(np.float32))
        cursor.execute('''
            UPDATE text_chunks
            SET embedding = ?, embedding_model = ?
            WHERE id = ?
        ''', (emb_blob, model_name, chunk_id))

    conn.commit()
    print(f"  Embedded {min(i+batch_size, len(chunks))}/{len(chunks)}")

conn.close()
print("Embedding complete!")
undefined
os.environ["CUDA_VISIBLE_DEVICES"] = ""
def create_embeddings(db_path, model_name='all-MiniLM-L6-v2', batch_size=100): """Generate embeddings for all chunks without embeddings."""
model = SentenceTransformer(model_name)
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()

# Get chunks needing embeddings
cursor.execute('''
    SELECT id, chunk_text FROM text_chunks
    WHERE embedding IS NULL
''')
chunks = cursor.fetchall()

print(f"Generating embeddings for {len(chunks)} chunks...")

for i in range(0, len(chunks), batch_size):
    batch = chunks[i:i+batch_size]
    ids = [c[0] for c in batch]
    texts = [c[1] for c in batch]

    # Generate embeddings
    embeddings = model.encode(texts, normalize_embeddings=True)

    # Store as pickled numpy arrays
    for chunk_id, emb in zip(ids, embeddings):
        emb_blob = pickle.dumps(emb.astype(np.float32))
        cursor.execute('''
            UPDATE text_chunks
            SET embedding = ?, embedding_model = ?
            WHERE id = ?
        ''', (emb_blob, model_name, chunk_id))

    conn.commit()
    print(f"  Embedded {min(i+batch_size, len(chunks))}/{len(chunks)}")

conn.close()
print("Embedding complete!")
undefined

Step 6: Semantic Search

步骤6:语义搜索

python
def semantic_search(db_path, query, top_k=10, sample_size=50000):
    """Search for similar chunks using cosine similarity."""

    # Force CPU mode
    os.environ["CUDA_VISIBLE_DEVICES"] = ""

    model = SentenceTransformer('all-MiniLM-L6-v2')
    query_emb = model.encode(query, normalize_embeddings=True)

    conn = sqlite3.connect(db_path, timeout=30)
    cursor = conn.cursor()

    # Get chunks with embeddings (sample if large)
    cursor.execute('SELECT COUNT(*) FROM text_chunks WHERE embedding IS NOT NULL')
    total = cursor.fetchone()[0]

    if total > sample_size:
        # Random sample for large databases
        cursor.execute(f'''
            SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
            FROM text_chunks tc
            JOIN documents d ON tc.document_id = d.id
            WHERE tc.embedding IS NOT NULL
            ORDER BY RANDOM()
            LIMIT {sample_size}
        ''')
    else:
        cursor.execute('''
            SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
            FROM text_chunks tc
            JOIN documents d ON tc.document_id = d.id
            WHERE tc.embedding IS NOT NULL
        ''')

    results = []
    for chunk_id, text, emb_blob, filename in cursor.fetchall():
        emb = pickle.loads(emb_blob)

        # Cosine similarity (embeddings are normalized)
        similarity = np.dot(query_emb, emb)

        results.append({
            'id': chunk_id,
            'text': text[:500],  # Truncate for display
            'filename': filename,
            'score': float(similarity)
        })

    conn.close()

    # Sort by similarity
    results.sort(key=lambda x: x['score'], reverse=True)
    return results[:top_k]
python
def semantic_search(db_path, query, top_k=10, sample_size=50000):
    """Search for similar chunks using cosine similarity."""

    # Force CPU mode
    os.environ["CUDA_VISIBLE_DEVICES"] = ""

    model = SentenceTransformer('all-MiniLM-L6-v2')
    query_emb = model.encode(query, normalize_embeddings=True)

    conn = sqlite3.connect(db_path, timeout=30)
    cursor = conn.cursor()

    # Get chunks with embeddings (sample if large)
    cursor.execute('SELECT COUNT(*) FROM text_chunks WHERE embedding IS NOT NULL')
    total = cursor.fetchone()[0]

    if total > sample_size:
        # Random sample for large databases
        cursor.execute(f'''
            SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
            FROM text_chunks tc
            JOIN documents d ON tc.document_id = d.id
            WHERE tc.embedding IS NOT NULL
            ORDER BY RANDOM()
            LIMIT {sample_size}
        ''')
    else:
        cursor.execute('''
            SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
            FROM text_chunks tc
            JOIN documents d ON tc.document_id = d.id
            WHERE tc.embedding IS NOT NULL
        ''')

    results = []
    for chunk_id, text, emb_blob, filename in cursor.fetchall():
        emb = pickle.loads(emb_blob)

        # Cosine similarity (embeddings are normalized)
        similarity = np.dot(query_emb, emb)

        results.append({
            'id': chunk_id,
            'text': text[:500],  # Truncate for display
            'filename': filename,
            'score': float(similarity)
        })

    conn.close()

    # Sort by similarity
    results.sort(key=lambda x: x['score'], reverse=True)
    return results[:top_k]

Complete Pipeline Script

完整流水线脚本

python
#!/usr/bin/env python3
"""
Document RAG Pipeline - Build searchable knowledge base from PDF folder.

Usage:
    python build_knowledge_base.py /path/to/documents --db inventory.db
    python build_knowledge_base.py /path/to/documents --search "query text"
"""

import argparse
import os
from pathlib import Path
from tqdm import tqdm

def build_inventory(folder_path, db_path):
    """Build document inventory from folder."""
    conn = create_database(db_path)
    cursor = conn.cursor()

    pdf_files = list(Path(folder_path).rglob("*.pdf"))
    print(f"Found {len(pdf_files)} PDF files")

    for pdf_path in tqdm(pdf_files, desc="Building inventory"):
        # Check if already processed
        cursor.execute('SELECT id FROM documents WHERE filepath = ?',
                       (str(pdf_path),))
        if cursor.fetchone():
            continue

        file_size = pdf_path.stat().st_size

        cursor.execute('''
            INSERT INTO documents (filename, filepath, file_size, file_type)
            VALUES (?, ?, ?, 'pdf')
        ''', (pdf_path.name, str(pdf_path), file_size))

    conn.commit()
    conn.close()

def process_documents(db_path, use_ocr=True):
    """Extract text from all unprocessed documents."""
    conn = sqlite3.connect(db_path, timeout=30)
    cursor = conn.cursor()

    # Get unprocessed documents
    cursor.execute('''
        SELECT id, filepath FROM documents
        WHERE extraction_method IS NULL
    ''')
    docs = cursor.fetchall()

    stats = {'text': 0, 'ocr': 0, 'failed': 0, 'drm': 0}

    for doc_id, filepath in tqdm(docs, desc="Extracting text"):
        # Try regular extraction first
        text, method = extract_pdf_text(filepath)

        # Try OCR if no text and OCR enabled
        if text is None and use_ocr and method == "no_text":
            text, method = ocr_pdf(filepath)

        if text:
            # Chunk and store
            chunks = chunk_text(text)
            for i, chunk in enumerate(chunks):
                cursor.execute('''
                    INSERT OR IGNORE INTO text_chunks
                    (document_id, chunk_num, chunk_text, char_count)
                    VALUES (?, ?, ?, ?)
                ''', (doc_id, i, chunk, len(chunk)))

            stats['text' if method == 'text' else 'ocr'] += 1
        else:
            if 'drm' in method:
                stats['drm'] += 1
            else:
                stats['failed'] += 1

        # Update document status
        cursor.execute('''
            UPDATE documents SET extraction_method = ? WHERE id = ?
        ''', (method, doc_id))

        conn.commit()

    conn.close()
    return stats

def main():
    parser = argparse.ArgumentParser(description='Document RAG Pipeline')
    parser.add_argument('folder', help='Folder containing documents')
    parser.add_argument('--db', default='_inventory.db', help='Database path')
    parser.add_argument('--no-ocr', action='store_true', help='Skip OCR')
    parser.add_argument('--embed', action='store_true', help='Generate embeddings')
    parser.add_argument('--search', help='Search query')
    parser.add_argument('--top-k', type=int, default=10, help='Number of results')

    args = parser.parse_args()

    db_path = Path(args.folder) / args.db

    if args.search:
        # Search mode
        results = semantic_search(str(db_path), args.search, args.top_k)
        print(f"\nTop {len(results)} results for: '{args.search}'\n")
        for i, r in enumerate(results, 1):
            print(f"{i}. [{r['score']:.3f}] {r['filename']}")
            print(f"   {r['text'][:200]}...\n")
    else:
        # Build mode
        print("Step 1: Building inventory...")
        build_inventory(args.folder, str(db_path))

        print("\nStep 2: Extracting text...")
        stats = process_documents(str(db_path), use_ocr=not args.no_ocr)
        print(f"Results: {stats}")

        if args.embed:
            print("\nStep 3: Generating embeddings...")
            create_embeddings(str(db_path))

if __name__ == '__main__':
    main()
python
#!/usr/bin/env python3
"""
Document RAG Pipeline - Build searchable knowledge base from PDF folder.

Usage:
    python build_knowledge_base.py /path/to/documents --db inventory.db
    python build_knowledge_base.py /path/to/documents --search "query text"
"""

import argparse
import os
from pathlib import Path
from tqdm import tqdm

def build_inventory(folder_path, db_path):
    """Build document inventory from folder."""
    conn = create_database(db_path)
    cursor = conn.cursor()

    pdf_files = list(Path(folder_path).rglob("*.pdf"))
    print(f"Found {len(pdf_files)} PDF files")

    for pdf_path in tqdm(pdf_files, desc="Building inventory"):
        # Check if already processed
        cursor.execute('SELECT id FROM documents WHERE filepath = ?',
                       (str(pdf_path),))
        if cursor.fetchone():
            continue

        file_size = pdf_path.stat().st_size

        cursor.execute('''
            INSERT INTO documents (filename, filepath, file_size, file_type)
            VALUES (?, ?, ?, 'pdf')
        ''', (pdf_path.name, str(pdf_path), file_size))

    conn.commit()
    conn.close()

def process_documents(db_path, use_ocr=True):
    """Extract text from all unprocessed documents."""
    conn = sqlite3.connect(db_path, timeout=30)
    cursor = conn.cursor()

    # Get unprocessed documents
    cursor.execute('''
        SELECT id, filepath FROM documents
        WHERE extraction_method IS NULL
    ''')
    docs = cursor.fetchall()

    stats = {'text': 0, 'ocr': 0, 'failed': 0, 'drm': 0}

    for doc_id, filepath in tqdm(docs, desc="Extracting text"):
        # Try regular extraction first
        text, method = extract_pdf_text(filepath)

        # Try OCR if no text and OCR enabled
        if text is None and use_ocr and method == "no_text":
            text, method = ocr_pdf(filepath)

        if text:
            # Chunk and store
            chunks = chunk_text(text)
            for i, chunk in enumerate(chunks):
                cursor.execute('''
                    INSERT OR IGNORE INTO text_chunks
                    (document_id, chunk_num, chunk_text, char_count)
                    VALUES (?, ?, ?, ?)
                ''', (doc_id, i, chunk, len(chunk)))

            stats['text' if method == 'text' else 'ocr'] += 1
        else:
            if 'drm' in method:
                stats['drm'] += 1
            else:
                stats['failed'] += 1

        # Update document status
        cursor.execute('''
            UPDATE documents SET extraction_method = ? WHERE id = ?
        ''', (method, doc_id))

        conn.commit()

    conn.close()
    return stats

def main():
    parser = argparse.ArgumentParser(description='Document RAG Pipeline')
    parser.add_argument('folder', help='Folder containing documents')
    parser.add_argument('--db', default='_inventory.db', help='Database path')
    parser.add_argument('--no-ocr', action='store_true', help='Skip OCR')
    parser.add_argument('--embed', action='store_true', help='Generate embeddings')
    parser.add_argument('--search', help='Search query')
    parser.add_argument('--top-k', type=int, default=10, help='Number of results')

    args = parser.parse_args()

    db_path = Path(args.folder) / args.db

    if args.search:
        # Search mode
        results = semantic_search(str(db_path), args.search, args.top_k)
        print(f"\nTop {len(results)} results for: '{args.search}'\n")
        for i, r in enumerate(results, 1):
            print(f"{i}. [{r['score']:.3f}] {r['filename']}")
            print(f"   {r['text'][:200]}...\n")
    else:
        # Build mode
        print("Step 1: Building inventory...")
        build_inventory(args.folder, str(db_path))

        print("\nStep 2: Extracting text...")
        stats = process_documents(str(db_path), use_ocr=not args.no_ocr)
        print(f"Results: {stats}")

        if args.embed:
            print("\nStep 3: Generating embeddings...")
            create_embeddings(str(db_path))

if __name__ == '__main__':
    main()

Usage Examples

使用示例

Build Knowledge Base

构建知识库

bash
undefined
bash
undefined

Full pipeline with OCR and embeddings

Full pipeline with OCR and embeddings

python build_knowledge_base.py /path/to/documents --embed
python build_knowledge_base.py /path/to/documents --embed

Skip OCR (faster, text PDFs only)

Skip OCR (faster, text PDFs only)

python build_knowledge_base.py /path/to/documents --no-ocr --embed
python build_knowledge_base.py /path/to/documents --no-ocr --embed

Just build inventory (no extraction)

Just build inventory (no extraction)

python build_knowledge_base.py /path/to/documents
undefined
python build_knowledge_base.py /path/to/documents
undefined

Search Documents

搜索文档

bash
undefined
bash
undefined

Semantic search

Semantic search

python build_knowledge_base.py /path/to/documents --search "subsea wellhead design"
python build_knowledge_base.py /path/to/documents --search "subsea wellhead design"

More results

More results

python build_knowledge_base.py /path/to/documents --search "fatigue analysis" --top-k 20
undefined
python build_knowledge_base.py /path/to/documents --search "fatigue analysis" --top-k 20
undefined

Quick Search Script

快速搜索脚本

bash
#!/bin/bash
bash
#!/bin/bash

search_docs.sh - Quick semantic search

search_docs.sh - Quick semantic search

DB_PATH="${1:-/path/to/_inventory.db}" QUERY="$2"
CUDA_VISIBLE_DEVICES="" python3 -c " import sqlite3, pickle, numpy as np from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2') query_emb = model.encode('$QUERY', normalize_embeddings=True)
conn = sqlite3.connect('$DB_PATH') cursor = conn.cursor() cursor.execute(''' SELECT tc.chunk_text, tc.embedding, d.filename FROM text_chunks tc JOIN documents d ON tc.document_id = d.id WHERE tc.embedding IS NOT NULL ORDER BY RANDOM() LIMIT 50000 ''')
results = [] for text, emb_blob, filename in cursor.fetchall(): emb = pickle.loads(emb_blob) sim = float(np.dot(query_emb, emb)) results.append((sim, filename, text[:200]))
for score, fname, text in sorted(results, reverse=True)[:10]: print(f'[{score:.3f}] {fname}') print(f' {text}...\n') "
undefined
DB_PATH="${1:-/path/to/_inventory.db}" QUERY="$2"
CUDA_VISIBLE_DEVICES="" python3 -c " import sqlite3, pickle, numpy as np from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2') query_emb = model.encode('$QUERY', normalize_embeddings=True)
conn = sqlite3.connect('$DB_PATH') cursor = conn.cursor() cursor.execute(''' SELECT tc.chunk_text, tc.embedding, d.filename FROM text_chunks tc JOIN documents d ON tc.document_id = d.id WHERE tc.embedding IS NOT NULL ORDER BY RANDOM() LIMIT 50000 ''')
results = [] for text, emb_blob, filename in cursor.fetchall(): emb = pickle.loads(emb_blob) sim = float(np.dot(query_emb, emb)) results.append((sim, filename, text[:200]))
for score, fname, text in sorted(results, reverse=True)[:10]: print(f'[{score:.3f}] {fname}') print(f' {text}...\n') "
undefined

Execution Checklist

执行检查清单

  • Install system dependencies (Tesseract, Poppler)
  • Install Python dependencies
  • Verify document folder exists
  • Run inventory to catalog documents
  • Extract text (with or without OCR)
  • Generate embeddings
  • Test semantic search
  • Monitor for DRM-protected files
  • 安装系统依赖(Tesseract、Poppler)
  • 安装Python依赖
  • 确认文档文件夹存在
  • 运行库存命令编目文档
  • 提取文本(启用或禁用OCR)
  • 生成向量嵌入
  • 测试语义搜索
  • 监控DRM保护文件

Error Handling

错误处理

Common Errors

常见错误

Error: CUDA not available
  • Cause: CUDA driver issues or incompatible GPU
  • Solution: Force CPU mode with
    CUDA_VISIBLE_DEVICES=""
Error: Tesseract not found
  • Cause: Tesseract OCR not installed
  • Solution: Install with
    apt-get install tesseract-ocr
    or
    brew install tesseract
Error: DRM-protected files
  • Cause: FileOpen or other DRM encryption
  • Solution: Skip these files; list with
    extraction_method = 'drm_protected'
Error: SQLite database locked
  • Cause: Concurrent access without timeout
  • Solution: Use
    timeout=30
    in sqlite3.connect()
Error: Out of memory
  • Cause: Large batch sizes or too many embeddings
  • Solution: Reduce batch_size, use sampling for search
错误:CUDA不可用
  • 原因:CUDA驱动问题或GPU不兼容
  • 解决方案:通过
    CUDA_VISIBLE_DEVICES=""
    强制使用CPU模式
错误:找不到Tesseract
  • 原因:未安装Tesseract OCR
  • 解决方案:使用
    apt-get install tesseract-ocr
    brew install tesseract
    安装
错误:DRM保护文件
  • 原因:文件使用FileOpen或其他DRM加密
  • 解决方案:跳过这些文件;可通过
    extraction_method = 'drm_protected'
    筛选
错误:SQLite数据库锁定
  • 原因:无超时设置的并发访问
  • 解决方案:在sqlite3.connect()中使用
    timeout=30
错误:内存不足
  • 原因:批量大小过大或嵌入数量过多
  • 解决方案:减小batch_size,搜索时使用采样

Metrics

性能指标

MetricTypical Value
Text extraction~50 pages/second
OCR processing~2-5 pages/minute
Embedding generation~100 chunks/second (CPU)
Search latency<2 seconds (50K chunks)
Memory usage~2GB for embeddings
指标典型值
文本提取速度~50页/秒
OCR处理速度~2-5页/分钟
嵌入生成速度~100块/秒(CPU)
搜索延迟<2秒(50K块)
内存占用~2GB(嵌入数据)

Performance Metrics (Real-World)

真实场景性能指标

From O&G Standards processing (957 documents):
MetricValue
Total documents957
Text extraction811 PDFs
OCR processed96 PDFs
DRM protected50 PDFs
Total chunks1,043,616
Embedding time~4 hours (CPU)
Search latency<2 seconds
来自油气行业标准文档处理(957份文档):
指标数值
总文档数957
文本提取成功811份PDF
OCR处理96份PDF
DRM保护50份PDF
总分块数1,043,616
嵌入生成时间~4小时(CPU)
搜索延迟<2秒

Related Skills

相关技能

  • pdf-text-extractor
    - Just text extraction
  • semantic-search-setup
    - Just embeddings/search
  • rag-system-builder
    - Add LLM Q&A layer
  • knowledge-base-builder
    - Simpler document catalog

  • pdf-text-extractor
    - 仅文本提取功能
  • semantic-search-setup
    - 仅嵌入/搜索功能
  • rag-system-builder
    - 添加LLM问答层
  • knowledge-base-builder
    - 简易文档编目

Version History

版本历史

  • 1.1.0 (2026-01-02): Added Quick Start, Execution Checklist, Error Handling, Metrics sections; updated frontmatter with version, category, related_skills
  • 1.0.0 (2024-10-15): Initial release with OCR support, chunking, vector embeddings, semantic search
  • 1.1.0 (2026-01-02):新增快速开始、执行检查清单、错误处理、性能指标章节;更新版本、分类、相关技能等前置信息
  • 1.0.0 (2024-10-15):初始版本,支持OCR识别、文本分块、向量嵌入、语义搜索