Loading...
Loading...
Deploy and operate SecurityClaw, an autonomous SOC agent with RAG-based threat detection, LLM-powered anomaly analysis, and skill-based security automation
npx skill4agent add aradotso/security-skills securityclaw-autonomous-soc-agentSkill by ara.so — Security Skills collection.
# Python 3.11+ required
python --version
# Install Ollama for LLM provider
curl -fsSL https://ollama.com/install.sh | sh
ollama serve
# Pull recommended models
ollama pull qwen2.5:7b-instruct-q4_K_M
ollama pull nomic-embed-text:latest# Clone repository
git clone https://github.com/SecurityClaw/SecurityClaw.git
cd SecurityClaw
# Create virtual environment
python3.11 -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Run interactive onboarding wizard
python main.py onboardconfig.yaml.env# Database configuration
database:
provider: opensearch # or elasticsearch
host: localhost
port: 9200
use_ssl: true
verify_certs: false
username: admin
password_env: OPENSEARCH_PASSWORD # Reads from .env
# LLM provider
llm:
provider: ollama
base_url: http://localhost:11434
model: qwen2.5:7b-instruct-q4_K_M
temperature: 0.7
max_tokens: 16384
# RAG engine
rag:
index_name: securityclaw_baselines
embedding_model: nomic-embed-text:latest
embedding_dimension: 768
top_k: 5
# API server
api:
host: 0.0.0.0
port: 7799
enable_cors: true# Database credentials
OPENSEARCH_PASSWORD=your_password_here
# Optional external APIs
ABUSEIPDB_API_KEY=${ABUSEIPDB_API_KEY}
VIRUSTOTAL_API_KEY=${VIRUSTOTAL_API_KEY}
MAXMIND_LICENSE_KEY=${MAXMIND_LICENSE_KEY}
# Skill-specific variables (discovered by onboard command)
ANOMALY_TRIAGE_THRESHOLD=0.7# Start full service (scheduler + web UI + API)
python main.py service
# Access web UI at http://localhost:5173
# API at http://localhost:7799
# Start API only (no background scheduler)
SECURITYCLAW_API_ONLY=1 python main.py service
# Start scheduler loop only (no web interface)
python main.py run
# Web development mode (frontend with hot reload)
python main.py web-dev# List all loaded skills and their schedules
python main.py list-skills
# Manually dispatch a skill once
python main.py dispatch network_baseliner
python main.py dispatch threat_analyst
# Interactive chat interface (CLI)
python main.py chat
# View agent memory snapshot
python main.py status# Re-run onboarding wizard
python main.py onboard
# Validate current configuration
python main.py validate-configskills/---
skill_id: my_skill
display_name: My Custom Skill
version: 1.0.0
schedule_interval_seconds: 3600 # Optional: for scheduled execution
capabilities:
- custom_analysis
prerequisites:
- network_data
required_entities:
- ip_address
artifacts_produced:
- analysis_report
---
# System Prompt for My Skill
You are a security analyst performing custom analysis.
## Task
Analyze network data and produce findings.
## Output Format
Return JSON with "findings" array.from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
def execute(
db_connector,
llm_provider,
rag_engine,
config: Dict[str, Any],
memory: Dict[str, Any],
**kwargs
) -> Dict[str, Any]:
"""
Skill entrypoint.
Args:
db_connector: OpenSearch/ES client
llm_provider: LLM client
rag_engine: RAG context retrieval
config: Skill-specific config from instruction.md
memory: Shared agent memory (read/write)
**kwargs: Additional context (user_query, conversation_id, etc.)
Returns:
Dict with success status and results
"""
logger.info("Executing my_skill")
# Query database
query = {
"size": 100,
"query": {"match_all": {}},
"sort": [{"@timestamp": "desc"}]
}
results = db_connector.search(index="network-*", body=query)
# Retrieve RAG context
context = rag_engine.retrieve("recent network behavior", top_k=3)
# Call LLM with context
prompt = f"""Analyze these network events:
{results['hits']['hits'][:5]}
Baseline context:
{context}
Identify anomalies."""
response = llm_provider.chat([
{"role": "system", "content": config.get("system_prompt", "")},
{"role": "user", "content": prompt}
])
# Update shared memory
memory.setdefault("my_skill_runs", []).append({
"timestamp": "2026-05-19T10:00:00Z",
"findings_count": len(results['hits']['hits'])
})
return {
"success": True,
"findings": response["content"],
"context_used": len(context)
}schedule_interval_secondsinstruction.md# Triggered automatically every 6 hours
# Aggregates normal traffic patterns into RAG vectors
# Used by threat_analyst for context
# Manual dispatch:
python main.py dispatch network_baseliner# Currently manual dispatch:
python main.py dispatch anomaly_triage
# To enable 1-minute polling, add to skills/anomaly_triage/instruction.md:
# schedule_interval_seconds: 60# Manual threat analysis:
python main.py dispatch threat_analyst
# Returns verdict with LLM reasoning:
# {
# "verdict": "malicious",
# "confidence": 0.85,
# "reasoning": "Unusual port scan pattern...",
# "context_sources": ["baseline_2026-05-15", ...]
# }# Via chat interface:
# "Query OpenSearch for failed logins in the last hour"
# Skill constructs and executes:
# GET /auth-logs-*/_search
# {
# "query": {
# "bool": {
# "must": [
# {"match": {"event.outcome": "failure"}},
# {"range": {"@timestamp": {"gte": "now-1h"}}}
# ]
# }
# }
# }# Automatically updates GeoIP databases
# Requires MAXMIND_LICENSE_KEY in .env
# Manual update:
python main.py dispatch geoip_lookupimport requests
import json
url = "http://localhost:7799/chat"
payload = {
"message": "Analyze recent anomalies and check if 192.168.1.100 is malicious",
"conversation_id": "investigation_001" # Optional: for multi-turn context
}
# Server-Sent Events stream
response = requests.post(url, json=payload, stream=True)
for line in response.iter_lines():
if line.startswith(b"data: "):
data = json.loads(line[6:])
if data["type"] == "reasoning":
print(f"[THINK] {data['content']}")
elif data["type"] == "skill_call":
print(f"[SKILL] {data['skill_name']}: {data['reasoning']}")
elif data["type"] == "skill_result":
print(f"[RESULT] {data['summary']}")
elif data["type"] == "final":
print(f"[ANSWER] {data['content']}")import requests
response = requests.post(
"http://localhost:7799/dispatch",
json={"skill_name": "threat_analyst"}
)
result = response.json()
# {
# "success": true,
# "skill": "threat_analyst",
# "result": {...},
# "execution_time": 2.34
# }response = requests.get("http://localhost:7799/memory")
memory = response.json()
# {
# "escalated_findings": [...],
# "last_baseline_run": "2026-05-19T04:00:00Z",
# "anomaly_triage_cursor": "1234567890",
# "conversation_count": 5
# }# core/chat_router/graph.py structure
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
class ChatState(TypedDict):
messages: List[Dict]
user_query: str
plan: str
skill_results: List[Dict]
final_answer: str
retry_count: int
def decide_node(state):
"""Supervisor plans which skills to invoke"""
# Analyzes query against skill manifests
# Returns plan with skill sequence
pass
def execute_node(state):
"""Executes planned skills"""
# Dispatches skills with context
# Collects results
pass
def evaluate_node(state):
"""Checks if answer is complete"""
# Validates against user query
# Triggers retry if insufficient
pass
# Graph construction
workflow = StateGraph(ChatState)
workflow.add_node("decide", decide_node)
workflow.add_node("execute", execute_node)
workflow.add_node("evaluate", evaluate_node)
workflow.set_entry_point("decide")
workflow.add_edge("decide", "execute")
workflow.add_conditional_edges(
"evaluate",
should_continue,
{"continue": "decide", "end": END}
)
# Checkpoint to SQLite
memory = SqliteSaver.from_conn_string("data/conversations.db")
app = workflow.compile(checkpointer=memory)# skills/custom_detector/logic.py
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
# 1. Query recent events
events = db_connector.search(
index="network-*",
body={
"size": 1000,
"query": {
"range": {"@timestamp": {"gte": "now-1h"}}
}
}
)
# 2. Retrieve behavioral baseline
baseline = rag_engine.retrieve(
query="normal traffic patterns last 24h",
top_k=5
)
# 3. LLM analysis with context
threats = []
for hit in events['hits']['hits']:
event = hit['_source']
prompt = f"""Event: {event}
Baseline: {baseline}
Is this anomalous? Respond JSON: {{"anomalous": bool, "reason": str}}"""
response = llm_provider.chat([
{"role": "user", "content": prompt}
])
analysis = json.loads(response['content'])
if analysis['anomalous']:
threats.append({
"event": event,
"reason": analysis['reason']
})
# 4. Store findings in memory
memory.setdefault("custom_threats", []).extend(threats)
return {
"success": True,
"threats_found": len(threats),
"details": threats
}# skills/ip_enricher/logic.py
import os
import requests
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
suspicious_ips = kwargs.get("ip_addresses", [])
enriched = []
for ip in suspicious_ips:
# AbuseIPDB lookup
headers = {"Key": os.getenv("ABUSEIPDB_API_KEY")}
response = requests.get(
f"https://api.abuseipdb.com/api/v2/check",
params={"ipAddress": ip, "maxAgeInDays": 90},
headers=headers
)
data = response.json()
enriched.append({
"ip": ip,
"abuse_score": data.get("data", {}).get("abuseConfidenceScore", 0),
"reports": data.get("data", {}).get("totalReports", 0)
})
return {
"success": True,
"enriched_ips": enriched
}# Via chat interface or API:
# User: "Investigate source IP 10.0.0.50 - check logs, enrich with threat intel, analyze behavior"
# LangGraph supervisor plans:
# 1. opensearch_querier: fetch logs for 10.0.0.50
# 2. ip_enricher: check external reputation
# 3. baseline_querier: retrieve normal behavior for this IP
# 4. threat_analyst: final verdict with all context
# Automatic skill chaining based on manifests:
# - opensearch_querier provides "query_results" artifact
# - ip_enricher requires "ip_address" entity (extracted from results)
# - threat_analyst consumes all previous artifacts# Test OpenSearch connection
curl -k -u admin:password https://localhost:9200
# Test Ollama
curl http://localhost:11434/api/tags
# Validate config
python main.py validate-config# Check skill discovery
python main.py list-skills
# Verify instruction.md has valid YAML frontmatter
# Required fields: skill_id, display_name, version
# Check logic.py has execute() function:
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
pass# Verify embeddings index exists
from core.db_connector import get_db_connector
db = get_db_connector()
indices = db.cat_indices()
# Should show: securityclaw_baselines
# Rebuild baseline if empty
python main.py dispatch network_baseliner
# Check embedding model is running
ollama list # Should show nomic-embed-text:latest# Reset conversation memory (keeps runtime memory)
rm data/conversations.db
# Reset all memory (caution: loses baselines)
rm data/conversations.db data/runtime_memory.db
# View memory structure
python -c "
from core.memory import AgentMemory
memory = AgentMemory()
print(memory.get_summary())
"# Increase token budget in config.yaml
llm:
max_tokens: 32768 # Default: 16384
# Reduce context injection in prompts
# Edit core/memory.py max_context_chars (default: 4000)# Build frontend if dist/ missing
cd web
npm install
npm run build
# Check API server logs
python main.py service
# Should show: "API server started on http://0.0.0.0:7799"
# Verify CORS enabled in config.yaml
api:
enable_cors: true# Run test suite with mock providers
pytest tests/ -v
# Coverage report
pytest tests/ --cov=core --cov=skills --cov-report=html
# Test specific skill
pytest tests/test_threat_analyst.py -v
# Use mock OpenSearch (no real database needed)
# tests/conftest.py provides mock_db_connector fixturedata/conversations.dbweb/api/server.py.env