metatron-pentest-assistant

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

METATRON Penetration Testing Assistant

METATRON渗透测试助手

Skill by ara.so — Daily 2026 Skills collection.
METATRON is a CLI-based AI penetration testing assistant that runs entirely locally — no cloud, no API keys. It orchestrates recon tools (nmap, whois, whatweb, curl, dig, nikto), feeds results to a locally running fine-tuned LLM (
metatron-qwen
via Ollama), and stores all findings in MariaDB with full scan history, vulnerability tracking, and PDF/HTML export.

技能由ara.so提供 —— 2026年度技能合集。
METATRON是一款完全本地运行的CLI式AI渗透测试助手,无需云服务、无需API密钥。它可调度各类侦察工具(nmap、whois、whatweb、curl、dig、nikto),将结果输入到本地运行的微调LLM(通过Ollama部署的
metatron-qwen
),并将所有发现存储在MariaDB中,支持完整扫描历史记录、漏洞跟踪,以及PDF/HTML导出功能。

Architecture Overview

架构概览

metatron.py     ← CLI entry point, main menu, scan orchestration
db.py           ← MariaDB CRUD (history, vulns, fixes, exploits, summary)
tools.py        ← Recon tool runners (nmap, whois, whatweb, curl, dig, nikto)
llm.py          ← Ollama interface, agentic loop, AI tool dispatch
search.py       ← DuckDuckGo search + CVE lookup (no API key)
Modelfile       ← Custom metatron-qwen model config
Database spine: every scan creates a
sl_no
in
history
; all other tables link via
sl_no
.

metatron.py     ← CLI入口、主菜单、扫描调度模块
db.py           ← MariaDB增删改查逻辑(历史记录、漏洞、修复方案、漏洞利用、总结)
tools.py        ← 侦察工具执行器(nmap, whois, whatweb, curl, dig, nikto)
llm.py          ← Ollama接口、智能体循环、AI工具调度
search.py       ← DuckDuckGo搜索 + CVE查询(无需API密钥)
Modelfile       ← 自定义metatron-qwen模型配置
核心数据库逻辑: 每次扫描都会在
history
表中生成一个
sl_no
编号,所有其他表都通过
sl_no
关联。

Installation

安装步骤

1. Clone and set up Python environment

1. 克隆仓库并配置Python环境

bash
git clone https://github.com/sooryathejas/METATRON.git
cd METATRON
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
bash
git clone https://github.com/sooryathejas/METATRON.git
cd METATRON
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

2. Install system recon tools

2. 安装系统级侦察工具

bash
sudo apt install nmap whois whatweb curl dnsutils nikto
bash
sudo apt install nmap whois whatweb curl dnsutils nikto

3. Install Ollama and pull base model

3. 安装Ollama并拉取基础模型

bash
curl -fsSL https://ollama.com/install.sh | sh
bash
curl -fsSL https://ollama.com/install.sh | sh

8GB+ RAM:

内存8GB及以上设备使用:

ollama pull huihui_ai/qwen3.5-abliterated:9b
ollama pull huihui_ai/qwen3.5-abliterated:9b

<8GB RAM — use 4b and edit Modelfile FROM line accordingly:

内存小于8GB的设备使用4b版本,并对应修改Modelfile的FROM行:

ollama pull huihui_ai/qwen3.5-abliterated:4b
undefined
ollama pull huihui_ai/qwen3.5-abliterated:4b
undefined

4. Build the custom metatron-qwen model

4. 构建自定义metatron-qwen模型

bash
ollama create metatron-qwen -f Modelfile
ollama list   # verify metatron-qwen appears
Modelfile (the repo ships this — key parameters):
FROM huihui_ai/qwen3.5-abliterated:9b
PARAMETER num_ctx 16384
PARAMETER temperature 0.7
PARAMETER top_k 10
PARAMETER top_p 0.9
To use 4b instead, edit
Modelfile
:
FROM huihui_ai/qwen3.5-abliterated:4b
Then rebuild:
ollama create metatron-qwen -f Modelfile
bash
ollama create metatron-qwen -f Modelfile
ollama list   # 验证metatron-qwen是否出现在列表中
Modelfile(仓库自带该文件,核心参数如下):
FROM huihui_ai/qwen3.5-abliterated:9b
PARAMETER num_ctx 16384
PARAMETER temperature 0.7
PARAMETER top_k 10
PARAMETER top_p 0.9
如果要使用4b版本,修改
Modelfile
FROM huihui_ai/qwen3.5-abliterated:4b
然后重新构建:
ollama create metatron-qwen -f Modelfile

5. Set up MariaDB

5. 配置MariaDB

bash
sudo systemctl start mariadb
sudo systemctl enable mariadb
mysql -u root
sql
CREATE DATABASE metatron;
CREATE USER 'metatron'@'localhost' IDENTIFIED BY '123';
GRANT ALL PRIVILEGES ON metatron.* TO 'metatron'@'localhost';
FLUSH PRIVILEGES;
EXIT;
Create all tables:
bash
mysql -u metatron -p123 metatron < schema.sql
Or manually (paste from README schema block). The 5 tables:
  • history
    — one row per scan session (spine)
  • vulnerabilities
    — findings per session
  • fixes
    — remediation per vulnerability
  • exploits_attempted
    — exploit attempts per session
  • summary
    — raw scan + full AI analysis dump

bash
sudo systemctl start mariadb
sudo systemctl enable mariadb
mysql -u root
sql
CREATE DATABASE metatron;
CREATE USER 'metatron'@'localhost' IDENTIFIED BY '123';
GRANT ALL PRIVILEGES ON metatron.* TO 'metatron'@'localhost';
FLUSH PRIVILEGES;
EXIT;
创建所有表:
bash
mysql -u metatron -p123 metatron < schema.sql
或者手动创建(从README的schema块中粘贴SQL语句),共5张表:
  • history
    — 每个扫描会话对应一行记录(核心关联表)
  • vulnerabilities
    — 每个会话发现的漏洞
  • fixes
    — 每个漏洞对应的修复方案
  • exploits_attempted
    — 每个会话尝试的漏洞利用记录
  • summary
    — 原始扫描结果 + 完整AI分析导出

Running METATRON

运行METATRON

METATRON requires two terminals:
Terminal 1 — Load model into memory:
bash
ollama run metatron-qwen
METATRON需要两个终端窗口运行:
终端1 — 将模型加载到内存:
bash
ollama run metatron-qwen

Wait for >>> prompt before proceeding

等待出现>>>提示符后再进行下一步操作


**Terminal 2 — Launch the assistant:**
```bash
cd ~/METATRON
source venv/bin/activate
python metatron.py

**终端2 — 启动助手:**
```bash
cd ~/METATRON
source venv/bin/activate
python metatron.py

Main Menu Flow

主菜单流程

[1] New Scan      → enter target IP/domain → select tools → AI analyzes → saved to DB
[2] View History  → browse past scans → view/edit/delete/export
[3] Exit
[1] 新建扫描      → 输入目标IP/域名 → 选择工具 → AI分析 → 保存到数据库
[2] 查看历史记录  → 浏览过往扫描 → 查看/编辑/删除/导出
[3] 退出

New Scan — Tool Selection

新建扫描 — 工具选择

[1] nmap
[2] whois
[3] whatweb
[4] curl headers
[5] dig DNS
[6] nikto
[a] Run all (except nikto)
[n] Run all + nikto (slow, thorough)
[1] nmap
[2] whois
[3] whatweb
[4] curl headers
[5] dig DNS
[6] nikto
[a] 运行所有工具(除了nikto)
[n] 运行所有工具+nikto(速度慢,扫描更全面)

Exporting Reports

导出报告

From View History → select scan → export:
  • PDF
    — professional vulnerability report
  • HTML
    — browser-viewable report

查看历史 → 选择扫描 → 导出路径下可导出两种格式:
  • PDF
    — 专业漏洞报告
  • HTML
    — 可在浏览器查看的报告

Code Examples

代码示例

Programmatically run a scan and save to DB (
db.py
patterns)

程序化运行扫描并保存到数据库(
db.py
使用示例)

python
import mysql.connector

def get_db_connection():
    return mysql.connector.connect(
        host="localhost",
        user="metatron",
        password="123",
        database="metatron"
    )

def create_scan_session(target: str) -> int:
    """Create a new history entry, return sl_no."""
    from datetime import datetime
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        "INSERT INTO history (target, scan_date, status) VALUES (%s, %s, %s)",
        (target, datetime.now(), "active")
    )
    conn.commit()
    sl_no = cursor.lastrowid
    cursor.close()
    conn.close()
    return sl_no

def save_vulnerability(sl_no: int, vuln_name: str, severity: str,
                       port: str, service: str, description: str) -> int:
    """Save a vulnerability finding, return vuln id."""
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        """INSERT INTO vulnerabilities
           (sl_no, vuln_name, severity, port, service, description)
           VALUES (%s, %s, %s, %s, %s, %s)""",
        (sl_no, vuln_name, severity, port, service, description)
    )
    conn.commit()
    vuln_id = cursor.lastrowid
    cursor.close()
    conn.close()
    return vuln_id

def save_fix(sl_no: int, vuln_id: int, fix_text: str, source: str = "AI"):
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        "INSERT INTO fixes (sl_no, vuln_id, fix_text, source) VALUES (%s, %s, %s, %s)",
        (sl_no, vuln_id, fix_text, source)
    )
    conn.commit()
    cursor.close()
    conn.close()

def save_summary(sl_no: int, raw_scan: str, ai_analysis: str, risk_level: str):
    from datetime import datetime
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        """INSERT INTO summary (sl_no, raw_scan, ai_analysis, risk_level, generated_at)
           VALUES (%s, %s, %s, %s, %s)""",
        (sl_no, raw_scan, ai_analysis, risk_level, datetime.now())
    )
    conn.commit()
    cursor.close()
    conn.close()

def get_scan_history():
    """Retrieve all scan sessions."""
    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM history ORDER BY scan_date DESC")
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

def get_vulnerabilities_for_scan(sl_no: int):
    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)
    cursor.execute(
        "SELECT * FROM vulnerabilities WHERE sl_no = %s", (sl_no,)
    )
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows
python
import mysql.connector

def get_db_connection():
    return mysql.connector.connect(
        host="localhost",
        user="metatron",
        password="123",
        database="metatron"
    )

def create_scan_session(target: str) -> int:
    """创建新的历史记录条目,返回sl_no编号。"""
    from datetime import datetime
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        "INSERT INTO history (target, scan_date, status) VALUES (%s, %s, %s)",
        (target, datetime.now(), "active")
    )
    conn.commit()
    sl_no = cursor.lastrowid
    cursor.close()
    conn.close()
    return sl_no

def save_vulnerability(sl_no: int, vuln_name: str, severity: str,
                       port: str, service: str, description: str) -> int:
    """保存漏洞发现结果,返回漏洞id。"""
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        """INSERT INTO vulnerabilities
           (sl_no, vuln_name, severity, port, service, description)
           VALUES (%s, %s, %s, %s, %s, %s)""",
        (sl_no, vuln_name, severity, port, service, description)
    )
    conn.commit()
    vuln_id = cursor.lastrowid
    cursor.close()
    conn.close()
    return vuln_id

def save_fix(sl_no: int, vuln_id: int, fix_text: str, source: str = "AI"):
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        "INSERT INTO fixes (sl_no, vuln_id, fix_text, source) VALUES (%s, %s, %s, %s)",
        (sl_no, vuln_id, fix_text, source)
    )
    conn.commit()
    cursor.close()
    conn.close()

def save_summary(sl_no: int, raw_scan: str, ai_analysis: str, risk_level: str):
    from datetime import datetime
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(
        """INSERT INTO summary (sl_no, raw_scan, ai_analysis, risk_level, generated_at)
           VALUES (%s, %s, %s, %s, %s)""",
        (sl_no, raw_scan, ai_analysis, risk_level, datetime.now())
    )
    conn.commit()
    cursor.close()
    conn.close()

def get_scan_history():
    """获取所有扫描会话。"""
    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM history ORDER BY scan_date DESC")
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

def get_vulnerabilities_for_scan(sl_no: int):
    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)
    cursor.execute(
        "SELECT * FROM vulnerabilities WHERE sl_no = %s", (sl_no,)
    )
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

Running recon tools (
tools.py
patterns)

运行侦察工具(
tools.py
使用示例)

python
import subprocess

def run_nmap(target: str) -> str:
    """Run nmap service/version scan."""
    result = subprocess.run(
        ["nmap", "-sV", "-sC", "-T4", target],
        capture_output=True, text=True, timeout=120
    )
    return result.stdout + result.stderr

def run_whois(target: str) -> str:
    result = subprocess.run(
        ["whois", target],
        capture_output=True, text=True, timeout=30
    )
    return result.stdout

def run_whatweb(target: str) -> str:
    result = subprocess.run(
        ["whatweb", "-a", "3", target],
        capture_output=True, text=True, timeout=60
    )
    return result.stdout

def run_curl_headers(target: str) -> str:
    result = subprocess.run(
        ["curl", "-I", "-L", "--max-time", "15", target],
        capture_output=True, text=True, timeout=20
    )
    return result.stdout

def run_dig(target: str) -> str:
    result = subprocess.run(
        ["dig", target, "ANY"],
        capture_output=True, text=True, timeout=15
    )
    return result.stdout

def run_nikto(target: str) -> str:
    """Slow but thorough web scanner."""
    result = subprocess.run(
        ["nikto", "-h", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

def run_selected_tools(target: str, selections: list) -> dict:
    """
    selections: list of tool names, e.g. ['nmap', 'whois', 'dig']
    Returns dict of {tool_name: output}
    """
    tool_map = {
        'nmap': run_nmap,
        'whois': run_whois,
        'whatweb': run_whatweb,
        'curl': run_curl_headers,
        'dig': run_dig,
        'nikto': run_nikto,
    }
    results = {}
    for tool in selections:
        if tool in tool_map:
            print(f"[*] Running {tool} on {target}...")
            try:
                results[tool] = tool_map[tool](target)
            except subprocess.TimeoutExpired:
                results[tool] = f"[TIMEOUT] {tool} timed out"
            except Exception as e:
                results[tool] = f"[ERROR] {tool}: {e}"
    return results
python
import subprocess

def run_nmap(target: str) -> str:
    """运行nmap服务/版本扫描。"""
    result = subprocess.run(
        ["nmap", "-sV", "-sC", "-T4", target],
        capture_output=True, text=True, timeout=120
    )
    return result.stdout + result.stderr

def run_whois(target: str) -> str:
    result = subprocess.run(
        ["whois", target],
        capture_output=True, text=True, timeout=30
    )
    return result.stdout

def run_whatweb(target: str) -> str:
    result = subprocess.run(
        ["whatweb", "-a", "3", target],
        capture_output=True, text=True, timeout=60
    )
    return result.stdout

def run_curl_headers(target: str) -> str:
    result = subprocess.run(
        ["curl", "-I", "-L", "--max-time", "15", target],
        capture_output=True, text=True, timeout=20
    )
    return result.stdout

def run_dig(target: str) -> str:
    result = subprocess.run(
        ["dig", target, "ANY"],
        capture_output=True, text=True, timeout=15
    )
    return result.stdout

def run_nikto(target: str) -> str:
    """速度慢但扫描全面的Web扫描器。"""
    result = subprocess.run(
        ["nikto", "-h", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

def run_selected_tools(target: str, selections: list) -> dict:
    """
    selections: 工具名称列表,例如 ['nmap', 'whois', 'dig']
    返回 {工具名称: 输出结果} 的字典
    """
    tool_map = {
        'nmap': run_nmap,
        'whois': run_whois,
        'whatweb': run_whatweb,
        'curl': run_curl_headers,
        'dig': run_dig,
        'nikto': run_nikto,
    }
    results = {}
    for tool in selections:
        if tool in tool_map:
            print(f"[*] 正在对{target}运行{tool}...")
            try:
                results[tool] = tool_map[tool](target)
            except subprocess.TimeoutExpired:
                results[tool] = f"[超时] {tool}运行超时"
            except Exception as e:
                results[tool] = f"[错误] {tool}: {e}"
    return results

Querying Ollama LLM (
llm.py
patterns)

查询Ollama LLM(
llm.py
使用示例)

python
import requests
import json

OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL_NAME = "metatron-qwen"

def query_llm(prompt: str, stream: bool = True) -> str:
    """Send prompt to metatron-qwen, return full response."""
    payload = {
        "model": MODEL_NAME,
        "prompt": prompt,
        "stream": stream
    }
    response = requests.post(OLLAMA_URL, json=payload, stream=stream)
    
    if not stream:
        return response.json().get("response", "")
    
    full_response = ""
    for line in response.iter_lines():
        if line:
            chunk = json.loads(line)
            token = chunk.get("response", "")
            print(token, end="", flush=True)
            full_response += token
            if chunk.get("done"):
                break
    print()
    return full_response

def build_pentest_prompt(target: str, scan_results: dict) -> str:
    """Build the analysis prompt from scan results."""
    combined = "\n\n".join(
        f"=== {tool.upper()} ===\n{output}"
        for tool, output in scan_results.items()
    )
    return f"""You are an expert penetration tester analyzing scan results for: {target}

SCAN RESULTS:
{combined}

Provide a structured analysis covering:
1. VULNERABILITIES FOUND — name, severity (Critical/High/Medium/Low), port, service, description
2. EXPLOIT SUGGESTIONS — specific tools or techniques for each vulnerability
3. RECOMMENDED FIXES — actionable remediation steps
4. OVERALL RISK LEVEL — Critical / High / Medium / Low

Format vulnerabilities as:
VULN: <name> | SEVERITY: <level> | PORT: <port> | SERVICE: <service>
DESC: <description>
FIX: <remediation>
"""

def analyze_target(target: str, scan_results: dict) -> str:
    prompt = build_pentest_prompt(target, scan_results)
    print("\n[🤖] metatron-qwen analyzing...\n")
    return query_llm(prompt)
python
import requests
import json

OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL_NAME = "metatron-qwen"

def query_llm(prompt: str, stream: bool = True) -> str:
    """向metatron-qwen发送prompt,返回完整响应。"""
    payload = {
        "model": MODEL_NAME,
        "prompt": prompt,
        "stream": stream
    }
    response = requests.post(OLLAMA_URL, json=payload, stream=stream)
    
    if not stream:
        return response.json().get("response", "")
    
    full_response = ""
    for line in response.iter_lines():
        if line:
            chunk = json.loads(line)
            token = chunk.get("response", "")
            print(token, end="", flush=True)
            full_response += token
            if chunk.get("done"):
                break
    print()
    return full_response

def build_pentest_prompt(target: str, scan_results: dict) -> str:
    """基于扫描结果构建分析提示词。"""
    combined = "\n\n".join(
        f"=== {tool.upper()} ===\n{output}"
        for tool, output in scan_results.items()
    )
    return f"""你是一名专业渗透测试工程师,正在分析以下目标的扫描结果:{target}

扫描结果:
{combined}

请提供结构化分析,包含以下内容:
1. 发现的漏洞 — 名称、严重等级(Critical/High/Medium/Low)、端口、服务、描述
2. 漏洞利用建议 — 每个漏洞对应的具体工具或技术
3. 推荐修复方案 — 可落地的修复步骤
4. 整体风险等级 — Critical / High / Medium / Low

漏洞请按以下格式输出:
VULN: <漏洞名> | SEVERITY: <等级> | PORT: <端口> | SERVICE: <服务>
DESC: <描述>
FIX: <修复方案>
"""

def analyze_target(target: str, scan_results: dict) -> str:
    prompt = build_pentest_prompt(target, scan_results)
    print("\n[🤖] metatron-qwen正在分析...\n")
    return query_llm(prompt)

DuckDuckGo search and CVE lookup (
search.py
patterns)

DuckDuckGo搜索和CVE查询(
search.py
使用示例)

python
from duckduckgo_search import DDGS

def search_exploits(query: str, max_results: int = 5) -> list:
    """Search DuckDuckGo for exploit info — no API key needed."""
    with DDGS() as ddgs:
        results = list(ddgs.text(query, max_results=max_results))
    return results

def lookup_cve(cve_id: str) -> list:
    """Look up a CVE identifier."""
    query = f"{cve_id} vulnerability exploit details"
    return search_exploits(query)

def search_service_vulns(service: str, version: str) -> list:
    query = f"{service} {version} known vulnerabilities CVE exploit"
    return search_exploits(query)
python
from duckduckgo_search import DDGS

def search_exploits(query: str, max_results: int = 5) -> list:
    """在DuckDuckGo搜索漏洞利用信息 — 无需API密钥。"""
    with DDGS() as ddgs:
        results = list(ddgs.text(query, max_results=max_results))
    return results

def lookup_cve(cve_id: str) -> list:
    """查询CVE编号详情。"""
    query = f"{cve_id} vulnerability exploit details"
    return search_exploits(query)

def search_service_vulns(service: str, version: str) -> list:
    query = f"{service} {version} known vulnerabilities CVE exploit"
    return search_exploits(query)

Usage example:

使用示例:

results = lookup_cve("CVE-2021-44228")

results = lookup_cve("CVE-2021-44228")

results = search_service_vulns("Apache", "2.4.49")

results = search_service_vulns("Apache", "2.4.49")

undefined
undefined

Full scan pipeline (end-to-end)

完整扫描流程(端到端)

python
from tools import run_selected_tools
from llm import analyze_target
from db import (create_scan_session, save_vulnerability,
                save_fix, save_summary)

def run_full_scan(target: str, tools: list = None):
    if tools is None:
        tools = ['nmap', 'whois', 'whatweb', 'curl', 'dig']
    
    # 1. Create DB session
    sl_no = create_scan_session(target)
    print(f"[+] Scan session #{sl_no} created for {target}")
    
    # 2. Run recon
    scan_results = run_selected_tools(target, tools)
    raw_scan = "\n\n".join(f"{k}:\n{v}" for k, v in scan_results.items())
    
    # 3. AI analysis
    ai_output = analyze_target(target, scan_results)
    
    # 4. Parse and save (simplified — real parser in llm.py)
    # Save summary
    save_summary(sl_no, raw_scan, ai_output, risk_level="High")
    
    print(f"\n[✓] Results saved to database (sl_no={sl_no})")
    return sl_no, ai_output
python
from tools import run_selected_tools
from llm import analyze_target
from db import (create_scan_session, save_vulnerability,
                save_fix, save_summary)

def run_full_scan(target: str, tools: list = None):
    if tools is None:
        tools = ['nmap', 'whois', 'whatweb', 'curl', 'dig']
    
    # 1. 创建数据库会话
    sl_no = create_scan_session(target)
    print(f"[+] 已为{target}创建扫描会话#{sl_no}")
    
    # 2. 执行侦察
    scan_results = run_selected_tools(target, tools)
    raw_scan = "\n\n".join(f"{k}:\n{v}" for k, v in scan_results.items())
    
    # 3. AI分析
    ai_output = analyze_target(target, scan_results)
    
    # 4. 解析并保存(简化版,完整解析逻辑在llm.py中)
    # 保存总结
    save_summary(sl_no, raw_scan, ai_output, risk_level="High")
    
    print(f"\n[✓] 结果已保存到数据库(sl_no={sl_no})")
    return sl_no, ai_output

Run it:

运行示例:

sl_no, analysis = run_full_scan("192.168.1.1", ['nmap', 'whois'])

sl_no, analysis = run_full_scan("192.168.1.1", ['nmap', 'whois'])


---

---

Common Patterns

常用功能实现

Check if Ollama model is running before scan

扫描前检查Ollama模型是否运行

python
import requests

def check_ollama_ready(model: str = "metatron-qwen") -> bool:
    try:
        resp = requests.get("http://localhost:11434/api/tags", timeout=5)
        models = [m["name"] for m in resp.json().get("models", [])]
        return any(model in m for m in models)
    except Exception:
        return False

if not check_ollama_ready():
    print("[!] metatron-qwen not found. Run: ollama run metatron-qwen")
    exit(1)
python
import requests

def check_ollama_ready(model: str = "metatron-qwen") -> bool:
    try:
        resp = requests.get("http://localhost:11434/api/tags", timeout=5)
        models = [m["name"] for m in resp.json().get("models", [])]
        return any(model in m for m in models)
    except Exception:
        return False

if not check_ollama_ready():
    print("[!] 未找到metatron-qwen,请运行:ollama run metatron-qwen")
    exit(1)

Query scan history

查询扫描历史

python
from db import get_db_connection

def get_full_scan_report(sl_no: int) -> dict:
    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)
    
    cursor.execute("SELECT * FROM history WHERE sl_no = %s", (sl_no,))
    history = cursor.fetchone()
    
    cursor.execute("SELECT * FROM vulnerabilities WHERE sl_no = %s", (sl_no,))
    vulns = cursor.fetchall()
    
    cursor.execute("SELECT * FROM summary WHERE sl_no = %s", (sl_no,))
    summary = cursor.fetchone()
    
    cursor.close()
    conn.close()
    
    return {"history": history, "vulnerabilities": vulns, "summary": summary}
python
from db import get_db_connection

def get_full_scan_report(sl_no: int) -> dict:
    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)
    
    cursor.execute("SELECT * FROM history WHERE sl_no = %s", (sl_no,))
    history = cursor.fetchone()
    
    cursor.execute("SELECT * FROM vulnerabilities WHERE sl_no = %s", (sl_no,))
    vulns = cursor.fetchall()
    
    cursor.execute("SELECT * FROM summary WHERE sl_no = %s", (sl_no,))
    summary = cursor.fetchone()
    
    cursor.close()
    conn.close()
    
    return {"history": history, "vulnerabilities": vulns, "summary": summary}

Add a custom recon tool

添加自定义侦察工具

python
undefined
python
undefined

In tools.py — add your tool function:

在tools.py中添加你的工具函数:

def run_gobuster(target: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt") -> str: result = subprocess.run( ["gobuster", "dir", "-u", f"http://{target}", "-w", wordlist], capture_output=True, text=True, timeout=180 ) return result.stdout
def run_gobuster(target: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt") -> str: result = subprocess.run( ["gobuster", "dir", "-u", f"http://{target}", "-w", wordlist], capture_output=True, text=True, timeout=180 ) return result.stdout

Register it in the tool_map in run_selected_tools():

在run_selected_tools()的tool_map中注册该工具:

tool_map['gobuster'] = run_gobuster

---
tool_map['gobuster'] = run_gobuster

---

Troubleshooting

故障排查

metatron-qwen
not found / connection refused

metatron-qwen
未找到 / 连接被拒绝

bash
undefined
bash
undefined

Terminal 1: ensure model is loaded

终端1:确认模型已加载

ollama run metatron-qwen
ollama run metatron-qwen

Should show >>> prompt

应该出现>>>提示符

Verify Ollama API is reachable

验证Ollama API可访问

Out of memory when running 9b model

运行9b模型时内存不足

bash
undefined
bash
undefined

Switch to 4b: edit Modelfile first line:

切换到4b版本:先修改Modelfile第一行:

FROM huihui_ai/qwen3.5-abliterated:4b

FROM huihui_ai/qwen3.5-abliterated:4b

ollama create metatron-qwen -f Modelfile
undefined
ollama create metatron-qwen -f Modelfile
undefined

MariaDB connection error

MariaDB连接错误

bash
sudo systemctl status mariadb
sudo systemctl start mariadb
bash
sudo systemctl status mariadb
sudo systemctl start mariadb

Verify credentials work:

验证凭证有效:

mysql -u metatron -p123 metatron -e "SHOW TABLES;"
undefined
mysql -u metatron -p123 metatron -e "SHOW TABLES;"
undefined

mysql.connector
not found

未找到
mysql.connector

bash
source venv/bin/activate
pip install mysql-connector-python
bash
source venv/bin/activate
pip install mysql-connector-python

nmap requires root for SYN scan

nmap需要root权限运行SYN扫描

bash
sudo nmap -sV -sC -T4 <target>
bash
sudo nmap -sV -sC -T4 <目标>

Or use TCP connect scan (no root needed):

或者使用无需root的TCP连接扫描:

nmap -sT -sV <target>
undefined
nmap -sT -sV <目标>
undefined

Nikto timeout

Nikto超时

Nikto is slow by design. Either use
[a]
(all without nikto) or increase the subprocess timeout in
tools.py
:
python
result = subprocess.run(["nikto", "-h", target],
                        capture_output=True, text=True,
                        timeout=600)  # 10 minutes
Nikto本身扫描速度较慢,你可以选择
[a]
选项(运行除nikto外的所有工具),或者在
tools.py
中增加subprocess的超时时间:
python
result = subprocess.run(["nikto", "-h", target],
                        capture_output=True, text=True,
                        timeout=600)  # 10分钟

Slow AI responses

AI响应缓慢

The 9b model needs time to load. If response is slow after the first query, it's still loading. Subsequent queries will be faster. Ensure no other GPU/memory-heavy processes are running.

9b模型需要时间加载,如果第一次查询后响应仍然很慢,说明模型还在加载中,后续查询速度会提升。请确保没有其他占用GPU/内存的进程正在运行。

Configuration Reference

配置参考

SettingLocationDefaultNotes
DB host
db.py
localhost
Change for remote DB
DB user
db.py
metatron
Match SQL user created
DB password
db.py
123
Change in production
DB name
db.py
metatron
Ollama URL
llm.py
http://localhost:11434
Model name
llm.py
metatron-qwen
Must match
ollama list
Context window
Modelfile
16384
Increase for large scans
Temperature
Modelfile
0.7
Lower = more deterministic
Security note: For production use, replace the hardcoded DB password with an environment variable:
os.environ.get("METATRON_DB_PASSWORD", "123")

设置项位置默认值说明
数据库主机
db.py
localhost
远程数据库请修改该值
数据库用户名
db.py
metatron
与你创建的SQL用户名保持一致
数据库密码
db.py
123
生产环境请修改
数据库名
db.py
metatron
Ollama URL
llm.py
http://localhost:11434
模型名称
llm.py
metatron-qwen
必须与
ollama list
的结果匹配
上下文窗口大小
Modelfile
16384
大型扫描可增大该值
Temperature参数
Modelfile
0.7
数值越低输出越确定
安全提示: 生产环境使用时,请将硬编码的数据库密码替换为环境变量:
os.environ.get("METATRON_DB_PASSWORD", "123")

Legal Disclaimer

法律免责声明

METATRON is for educational purposes and authorized penetration testing only. Only scan systems you own or have explicit written permission to test. Unauthorized scanning is illegal.
METATRON仅用于教育目的和授权渗透测试。请仅扫描你拥有或获得明确书面授权的系统,未经授权扫描属于违法行为。