log-forensics
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLog Forensics
日志取证
Comprehensive log forensics skill for analyzing various log sources to reconstruct events, detect anomalies, and identify indicators of compromise. Enables correlation across multiple log sources, timeline creation, and automated anomaly detection.
一款全面的日志取证工具,可分析各类日志来源以还原事件、检测异常并识别入侵指标(IOC)。支持跨多个日志源的关联分析、时间线创建以及自动化异常检测。
Capabilities
功能特性
- Windows Event Log Analysis: Parse and analyze EVTX files for security events
- Syslog Analysis: Parse Unix/Linux syslog and rsyslog formats
- Web Server Log Analysis: Analyze Apache, Nginx, IIS access and error logs
- Application Log Analysis: Parse application-specific log formats
- Log Correlation: Correlate events across multiple log sources
- Timeline Generation: Create chronological event timelines
- Anomaly Detection: Detect unusual patterns and outliers
- Authentication Analysis: Track login attempts, failures, and lateral movement
- IOC Extraction: Extract indicators of compromise from log entries
- Statistical Analysis: Perform statistical analysis on log patterns
- Windows事件日志分析:解析并分析EVTX文件中的安全事件
- Syslog分析:解析Unix/Linux syslog和rsyslog格式
- Web服务器日志分析:分析Apache、Nginx、IIS的访问日志和错误日志
- 应用程序日志分析:解析特定应用程序的日志格式
- 日志关联分析:跨多个日志源关联事件
- 时间线生成:创建按时间顺序排列的事件时间线
- 异常检测:检测异常模式和离群值
- 认证分析:跟踪登录尝试、失败记录和横向移动
- IOC提取:从日志条目中提取入侵指标
- 统计分析:对日志模式进行统计分析
Quick Start
快速开始
python
from log_forensics import LogAnalyzer, EventLogParser, LogCorrelatorpython
from log_forensics import LogAnalyzer, EventLogParser, LogCorrelatorParse Windows Event Logs
Parse Windows Event Logs
parser = EventLogParser("/evidence/Security.evtx")
events = parser.parse_all()
parser = EventLogParser("/evidence/Security.evtx")
events = parser.parse_all()
Analyze authentication events
Analyze authentication events
auth_events = parser.get_authentication_events()
auth_events = parser.get_authentication_events()
Create log correlator
Create log correlator
correlator = LogCorrelator()
correlator.add_source("windows", parser)
correlator.add_source("firewall", FirewallLogParser("/evidence/firewall.log"))
timeline = correlator.create_timeline()
undefinedcorrelator = LogCorrelator()
correlator.add_source("windows", parser)
correlator.add_source("firewall", FirewallLogParser("/evidence/firewall.log"))
timeline = correlator.create_timeline()
undefinedUsage
使用方法
Task 1: Windows Event Log Analysis
任务1:Windows事件日志分析
Input: Windows EVTX log files
Process:
- Load and parse EVTX files
- Filter by event IDs of interest
- Extract relevant fields
- Identify security-relevant events
- Generate analysis report
Output: Parsed events with security analysis
Example:
python
from log_forensics import EventLogParser输入:Windows EVTX日志文件
流程:
- 加载并解析EVTX文件
- 筛选感兴趣的事件ID
- 提取相关字段
- 识别与安全相关的事件
- 生成分析报告
输出:带有安全分析结果的解析后事件
示例:
python
from log_forensics import EventLogParserParse Security event log
Parse Security event log
parser = EventLogParser("/evidence/Security.evtx")
parser = EventLogParser("/evidence/Security.evtx")
Get all events
Get all events
events = parser.parse_all()
print(f"Total events: {len(events)}")
events = parser.parse_all()
print(f"Total events: {len(events)}")
Filter by event ID (4624 = successful login)
Filter by event ID (4624 = successful login)
logins = parser.filter_by_event_id([4624, 4625])
for login in logins:
print(f"[{login.timestamp}] Event {login.event_id}")
print(f" User: {login.get_field('TargetUserName')}")
print(f" Domain: {login.get_field('TargetDomainName')}")
print(f" Logon Type: {login.get_field('LogonType')}")
print(f" Source IP: {login.get_field('IpAddress')}")
logins = parser.filter_by_event_id([4624, 4625])
for login in logins:
print(f"[{login.timestamp}] Event {login.event_id}")
print(f" User: {login.get_field('TargetUserName')}")
print(f" Domain: {login.get_field('TargetDomainName')}")
print(f" Logon Type: {login.get_field('LogonType')}")
print(f" Source IP: {login.get_field('IpAddress')}")
Get authentication failures
Get authentication failures
failures = parser.get_failed_logins()
for f in failures:
print(f"Failed login: {f.username} from {f.source_ip}")
print(f" Failure reason: {f.failure_reason}")
failures = parser.get_failed_logins()
for f in failures:
print(f"Failed login: {f.username} from {f.source_ip}")
print(f" Failure reason: {f.failure_reason}")
Detect brute force attempts
Detect brute force attempts
brute_force = parser.detect_brute_force(
threshold=10,
time_window_minutes=5
)
for bf in brute_force:
print(f"Brute force: {bf.target_account}")
print(f" Attempts: {bf.attempt_count}")
print(f" Source IPs: {bf.source_ips}")
brute_force = parser.detect_brute_force(
threshold=10,
time_window_minutes=5
)
for bf in brute_force:
print(f"Brute force: {bf.target_account}")
print(f" Attempts: {bf.attempt_count}")
print(f" Source IPs: {bf.source_ips}")
Get process creation events (4688)
Get process creation events (4688)
processes = parser.get_process_creation_events()
for p in processes:
print(f"Process: {p.process_name}")
print(f" Command line: {p.command_line}")
print(f" Parent: {p.parent_process}")
processes = parser.get_process_creation_events()
for p in processes:
print(f"Process: {p.process_name}")
print(f" Command line: {p.command_line}")
print(f" Parent: {p.parent_process}")
Export to CSV
Export to CSV
parser.export_csv("/evidence/security_events.csv")
undefinedparser.export_csv("/evidence/security_events.csv")
undefinedTask 2: Security Event Detection
任务2:安全事件检测
Input: Windows Security Event Log
Process:
- Identify security-relevant event IDs
- Detect privilege escalation
- Identify lateral movement
- Detect persistence mechanisms
- Flag suspicious activities
Output: Security findings with severity ratings
Example:
python
from log_forensics import EventLogParser, SecurityDetector
parser = EventLogParser("/evidence/Security.evtx")
detector = SecurityDetector(parser)输入:Windows安全事件日志
流程:
- 识别与安全相关的事件ID
- 检测权限提升行为
- 识别横向移动
- 检测持久化机制
- 标记可疑活动
输出:带有严重程度评级的安全发现
示例:
python
from log_forensics import EventLogParser, SecurityDetector
parser = EventLogParser("/evidence/Security.evtx")
detector = SecurityDetector(parser)Detect privilege escalation
Detect privilege escalation
priv_esc = detector.detect_privilege_escalation()
for pe in priv_esc:
print(f"PRIV ESC: {pe.technique}")
print(f" User: {pe.user}")
print(f" Timestamp: {pe.timestamp}")
print(f" Details: {pe.details}")
priv_esc = detector.detect_privilege_escalation()
for pe in priv_esc:
print(f"PRIV ESC: {pe.technique}")
print(f" User: {pe.user}")
print(f" Timestamp: {pe.timestamp}")
print(f" Details: {pe.details}")
Detect lateral movement
Detect lateral movement
lateral = detector.detect_lateral_movement()
for lm in lateral:
print(f"Lateral Movement: {lm.source} -> {lm.destination}")
print(f" Technique: {lm.technique}")
print(f" Account: {lm.account}")
lateral = detector.detect_lateral_movement()
for lm in lateral:
print(f"Lateral Movement: {lm.source} -> {lm.destination}")
print(f" Technique: {lm.technique}")
print(f" Account: {lm.account}")
Detect account manipulation
Detect account manipulation
account_changes = detector.detect_account_changes()
for ac in account_changes:
print(f"Account Change: {ac.action}")
print(f" Target: {ac.target_account}")
print(f" By: {ac.actor}")
account_changes = detector.detect_account_changes()
for ac in account_changes:
print(f"Account Change: {ac.action}")
print(f" Target: {ac.target_account}")
print(f" By: {ac.actor}")
Detect service installations
Detect service installations
services = detector.detect_service_installations()
for s in services:
print(f"Service Installed: {s.service_name}")
print(f" Path: {s.service_path}")
print(f" Account: {s.service_account}")
services = detector.detect_service_installations()
for s in services:
print(f"Service Installed: {s.service_name}")
print(f" Path: {s.service_path}")
print(f" Account: {s.service_account}")
Detect scheduled tasks
Detect scheduled tasks
tasks = detector.detect_scheduled_tasks()
tasks = detector.detect_scheduled_tasks()
Detect log clearing
Detect log clearing
cleared = detector.detect_log_clearing()
for c in cleared:
print(f"LOG CLEARED: {c.log_name} at {c.timestamp}")
print(f" By: {c.actor}")
cleared = detector.detect_log_clearing()
for c in cleared:
print(f"LOG CLEARED: {c.log_name} at {c.timestamp}")
print(f" By: {c.actor}")
Generate security report
Generate security report
detector.generate_report("/evidence/security_findings.html")
undefineddetector.generate_report("/evidence/security_findings.html")
undefinedTask 3: Syslog Analysis
任务3:Syslog分析
Input: Unix/Linux syslog files
Process:
- Parse syslog format
- Categorize by facility and severity
- Identify authentication events
- Detect suspicious activities
- Create timeline
Output: Parsed syslog with analysis
Example:
python
from log_forensics import SyslogParser输入:Unix/Linux syslog文件
流程:
- 解析Syslog格式
- 按工具类型和严重程度分类
- 识别认证事件
- 检测可疑活动
- 创建时间线
输出:带有分析结果的解析后Syslog
示例:
python
from log_forensics import SyslogParserParse syslog
Parse syslog
parser = SyslogParser("/evidence/messages")
parser = SyslogParser("/evidence/messages")
Get all entries
Get all entries
entries = parser.parse_all()
print(f"Total entries: {len(entries)}")
entries = parser.parse_all()
print(f"Total entries: {len(entries)}")
Filter by severity
Filter by severity
errors = parser.filter_by_severity(["error", "crit", "alert", "emerg"])
for e in errors:
print(f"[{e.timestamp}] {e.facility}.{e.severity}: {e.message}")
errors = parser.filter_by_severity(["error", "crit", "alert", "emerg"])
for e in errors:
print(f"[{e.timestamp}] {e.facility}.{e.severity}: {e.message}")
Get authentication events
Get authentication events
auth = parser.get_auth_events()
for a in auth:
print(f"[{a.timestamp}] {a.event_type}: {a.user}")
print(f" Source: {a.source_ip}")
print(f" Success: {a.success}")
auth = parser.get_auth_events()
for a in auth:
print(f"[{a.timestamp}] {a.event_type}: {a.user}")
print(f" Source: {a.source_ip}")
print(f" Success: {a.success}")
Detect SSH brute force
Detect SSH brute force
ssh_attacks = parser.detect_ssh_brute_force()
for attack in ssh_attacks:
print(f"SSH Attack from {attack.source_ip}")
print(f" Attempts: {attack.count}")
print(f" Users tried: {attack.users}")
ssh_attacks = parser.detect_ssh_brute_force()
for attack in ssh_attacks:
print(f"SSH Attack from {attack.source_ip}")
print(f" Attempts: {attack.count}")
print(f" Users tried: {attack.users}")
Analyze sudo usage
Analyze sudo usage
sudo = parser.get_sudo_events()
for s in sudo:
print(f"Sudo: {s.user} -> {s.run_as}")
print(f" Command: {s.command}")
print(f" Allowed: {s.allowed}")
sudo = parser.get_sudo_events()
for s in sudo:
print(f"Sudo: {s.user} -> {s.run_as}")
print(f" Command: {s.command}")
print(f" Allowed: {s.allowed}")
Get cron job executions
Get cron job executions
cron = parser.get_cron_events()
cron = parser.get_cron_events()
Export timeline
Export timeline
parser.export_timeline("/evidence/syslog_timeline.csv")
undefinedparser.export_timeline("/evidence/syslog_timeline.csv")
undefinedTask 4: Web Server Log Analysis
任务4:Web服务器日志分析
Input: Apache/Nginx/IIS access logs
Process:
- Parse access log format
- Identify unique visitors
- Detect attack patterns
- Find suspicious requests
- Generate access statistics
Output: Web access analysis with attack detection
Example:
python
from log_forensics import WebLogParser输入:Apache/Nginx/IIS访问日志
流程:
- 解析访问日志格式
- 识别唯一访客
- 检测攻击模式
- 查找可疑请求
- 生成访问统计数据
输出:带有攻击检测结果的Web访问分析
示例:
python
from log_forensics import WebLogParserParse Apache access log
Parse Apache access log
parser = WebLogParser(
"/evidence/access.log",
log_format="apache_combined"
)
parser = WebLogParser(
"/evidence/access.log",
log_format="apache_combined"
)
Get all requests
Get all requests
requests = parser.parse_all()
print(f"Total requests: {len(requests)}")
requests = parser.parse_all()
print(f"Total requests: {len(requests)}")
Get unique visitors
Get unique visitors
visitors = parser.get_unique_visitors()
print(f"Unique IPs: {len(visitors)}")
visitors = parser.get_unique_visitors()
print(f"Unique IPs: {len(visitors)}")
Find suspicious requests
Find suspicious requests
suspicious = parser.find_suspicious_requests()
for s in suspicious:
print(f"SUSPICIOUS: {s.request}")
print(f" IP: {s.client_ip}")
print(f" Reason: {s.detection_reason}")
suspicious = parser.find_suspicious_requests()
for s in suspicious:
print(f"SUSPICIOUS: {s.request}")
print(f" IP: {s.client_ip}")
print(f" Reason: {s.detection_reason}")
Detect SQL injection attempts
Detect SQL injection attempts
sqli = parser.detect_sql_injection()
for attack in sqli:
print(f"SQLi: {attack.request}")
print(f" Parameter: {attack.parameter}")
print(f" IP: {attack.source_ip}")
sqli = parser.detect_sql_injection()
for attack in sqli:
print(f"SQLi: {attack.request}")
print(f" Parameter: {attack.parameter}")
print(f" IP: {attack.source_ip}")
Detect path traversal
Detect path traversal
traversal = parser.detect_path_traversal()
traversal = parser.detect_path_traversal()
Detect web shells
Detect web shells
webshells = parser.detect_webshell_access()
for ws in webshells:
print(f"Webshell: {ws.path}")
print(f" IP: {ws.client_ip}")
print(f" Commands: {ws.detected_commands}")
webshells = parser.detect_webshell_access()
for ws in webshells:
print(f"Webshell: {ws.path}")
print(f" IP: {ws.client_ip}")
print(f" Commands: {ws.detected_commands}")
Get response code distribution
Get response code distribution
codes = parser.get_status_code_distribution()
print(f"200 OK: {codes.get(200, 0)}")
print(f"404 Not Found: {codes.get(404, 0)}")
print(f"500 Error: {codes.get(500, 0)}")
codes = parser.get_status_code_distribution()
print(f"200 OK: {codes.get(200, 0)}")
print(f"404 Not Found: {codes.get(404, 0)}")
print(f"500 Error: {codes.get(500, 0)}")
Analyze by user agent
Analyze by user agent
user_agents = parser.analyze_user_agents()
for ua in user_agents.suspicious:
print(f"Suspicious UA: {ua.user_agent}")
print(f" Reason: {ua.reason}")
user_agents = parser.analyze_user_agents()
for ua in user_agents.suspicious:
print(f"Suspicious UA: {ua.user_agent}")
print(f" Reason: {ua.reason}")
Export to CSV
Export to CSV
parser.export_csv("/evidence/web_access.csv")
undefinedparser.export_csv("/evidence/web_access.csv")
undefinedTask 5: Log Correlation
任务5:日志关联分析
Input: Multiple log sources
Process:
- Normalize log formats
- Align timestamps
- Correlate related events
- Build unified timeline
- Identify attack chains
Output: Correlated timeline with attack sequences
Example:
python
from log_forensics import LogCorrelator, EventLogParser, SyslogParser, WebLogParser输入:多个日志源
流程:
- 标准化日志格式
- 对齐时间戳
- 关联相关事件
- 构建统一时间线
- 识别攻击链
输出:带有攻击序列的关联时间线
示例:
python
from log_forensics import LogCorrelator, EventLogParser, SyslogParser, WebLogParserInitialize correlator
Initialize correlator
correlator = LogCorrelator()
correlator = LogCorrelator()
Add log sources
Add log sources
correlator.add_source(
"windows",
EventLogParser("/evidence/Security.evtx")
)
correlator.add_source(
"linux",
SyslogParser("/evidence/auth.log")
)
correlator.add_source(
"webserver",
WebLogParser("/evidence/access.log")
)
correlator.add_source(
"windows",
EventLogParser("/evidence/Security.evtx")
)
correlator.add_source(
"linux",
SyslogParser("/evidence/auth.log")
)
correlator.add_source(
"webserver",
WebLogParser("/evidence/access.log")
)
Normalize timestamps to UTC
Normalize timestamps to UTC
correlator.normalize_timestamps(timezone="UTC")
correlator.normalize_timestamps(timezone="UTC")
Create unified timeline
Create unified timeline
timeline = correlator.create_timeline()
for event in timeline:
print(f"[{event.timestamp}] {event.source}: {event.summary}")
timeline = correlator.create_timeline()
for event in timeline:
print(f"[{event.timestamp}] {event.source}: {event.summary}")
Correlate by IP address
Correlate by IP address
ip_activity = correlator.correlate_by_ip("192.168.1.100")
print(f"Activity from 192.168.1.100:")
for event in ip_activity:
print(f" [{event.source}] {event.summary}")
ip_activity = correlator.correlate_by_ip("192.168.1.100")
print(f"Activity from 192.168.1.100:")
for event in ip_activity:
print(f" [{event.source}] {event.summary}")
Correlate by username
Correlate by username
user_activity = correlator.correlate_by_user("admin")
user_activity = correlator.correlate_by_user("admin")
Detect attack chains
Detect attack chains
chains = correlator.detect_attack_chains()
for chain in chains:
print(f"Attack Chain: {chain.name}")
print(f" Confidence: {chain.confidence}")
print(f" Events: {len(chain.events)}")
for event in chain.events:
print(f" - {event.timestamp}: {event.summary}")
chains = correlator.detect_attack_chains()
for chain in chains:
print(f"Attack Chain: {chain.name}")
print(f" Confidence: {chain.confidence}")
print(f" Events: {len(chain.events)}")
for event in chain.events:
print(f" - {event.timestamp}: {event.summary}")
Find temporal correlations
Find temporal correlations
correlations = correlator.find_temporal_correlations(
time_window_seconds=60
)
correlations = correlator.find_temporal_correlations(
time_window_seconds=60
)
Export correlated timeline
Export correlated timeline
correlator.export_timeline("/evidence/correlated_timeline.csv")
correlator.export_timeline_html("/evidence/timeline.html")
undefinedcorrelator.export_timeline("/evidence/correlated_timeline.csv")
correlator.export_timeline_html("/evidence/timeline.html")
undefinedTask 6: Authentication Analysis
任务6:认证分析
Input: Log files containing authentication events
Process:
- Extract all authentication events
- Analyze login patterns
- Detect anomalous logins
- Identify credential attacks
- Track session activity
Output: Authentication analysis report
Example:
python
from log_forensics import AuthenticationAnalyzer输入:包含认证事件的日志文件
流程:
- 提取所有认证事件
- 分析登录模式
- 检测异常登录
- 识别凭证攻击
- 跟踪会话活动
输出:认证分析报告
示例:
python
from log_forensics import AuthenticationAnalyzerInitialize with multiple sources
Initialize with multiple sources
analyzer = AuthenticationAnalyzer()
analyzer.add_windows_logs("/evidence/Security.evtx")
analyzer.add_linux_logs("/evidence/auth.log")
analyzer.add_vpn_logs("/evidence/vpn.log")
analyzer = AuthenticationAnalyzer()
analyzer.add_windows_logs("/evidence/Security.evtx")
analyzer.add_linux_logs("/evidence/auth.log")
analyzer.add_vpn_logs("/evidence/vpn.log")
Get all authentication events
Get all authentication events
auth_events = analyzer.get_all_events()
auth_events = analyzer.get_all_events()
Analyze login patterns per user
Analyze login patterns per user
patterns = analyzer.analyze_user_patterns("john.doe")
print(f"User: john.doe")
print(f" Usual login times: {patterns.usual_hours}")
print(f" Usual locations: {patterns.usual_locations}")
print(f" Failed attempts: {patterns.failed_count}")
patterns = analyzer.analyze_user_patterns("john.doe")
print(f"User: john.doe")
print(f" Usual login times: {patterns.usual_hours}")
print(f" Usual locations: {patterns.usual_locations}")
print(f" Failed attempts: {patterns.failed_count}")
Detect anomalous logins
Detect anomalous logins
anomalies = analyzer.detect_anomalies()
for a in anomalies:
print(f"ANOMALY: {a.user} at {a.timestamp}")
print(f" Reason: {a.reason}")
print(f" Details: {a.details}")
anomalies = analyzer.detect_anomalies()
for a in anomalies:
print(f"ANOMALY: {a.user} at {a.timestamp}")
print(f" Reason: {a.reason}")
print(f" Details: {a.details}")
Detect impossible travel
Detect impossible travel
travel = analyzer.detect_impossible_travel()
for t in travel:
print(f"Impossible Travel: {t.user}")
print(f" Location 1: {t.location1} at {t.time1}")
print(f" Location 2: {t.location2} at {t.time2}")
print(f" Distance: {t.distance_km}km in {t.time_diff_minutes}min")
travel = analyzer.detect_impossible_travel()
for t in travel:
print(f"Impossible Travel: {t.user}")
print(f" Location 1: {t.location1} at {t.time1}")
print(f" Location 2: {t.location2} at {t.time2}")
print(f" Distance: {t.distance_km}km in {t.time_diff_minutes}min")
Detect credential stuffing
Detect credential stuffing
stuffing = analyzer.detect_credential_stuffing()
stuffing = analyzer.detect_credential_stuffing()
Get failed login summary
Get failed login summary
failed = analyzer.get_failed_login_summary()
print(f"Total failed logins: {failed.total}")
print(f"Top targeted accounts: {failed.top_accounts}")
print(f"Top source IPs: {failed.top_sources}")
failed = analyzer.get_failed_login_summary()
print(f"Total failed logins: {failed.total}")
print(f"Top targeted accounts: {failed.top_accounts}")
print(f"Top source IPs: {failed.top_sources}")
Generate authentication report
Generate authentication report
analyzer.generate_report("/evidence/auth_analysis.html")
undefinedanalyzer.generate_report("/evidence/auth_analysis.html")
undefinedTask 7: PowerShell and Command Line Analysis
任务7:PowerShell与命令行分析
Input: Windows Event Logs with PowerShell/command logging
Process:
- Extract PowerShell events (4103, 4104)
- Decode encoded commands
- Detect malicious patterns
- Identify obfuscation techniques
- Extract IOCs from commands
Output: Command analysis with threat indicators
Example:
python
from log_forensics import PowerShellAnalyzer输入:包含PowerShell/命令行日志的Windows事件日志
流程:
- 提取PowerShell事件(4103、4104)
- 解码编码的命令
- 检测恶意模式
- 识别混淆技术
- 从命令中提取IOC
输出:带有威胁指标的命令分析结果
示例:
python
from log_forensics import PowerShellAnalyzerParse PowerShell logs
Parse PowerShell logs
analyzer = PowerShellAnalyzer()
analyzer.add_event_log("/evidence/Microsoft-Windows-PowerShell%4Operational.evtx")
analyzer.add_event_log("/evidence/Security.evtx")
analyzer = PowerShellAnalyzer()
analyzer.add_event_log("/evidence/Microsoft-Windows-PowerShell%4Operational.evtx")
analyzer.add_event_log("/evidence/Security.evtx")
Get all PowerShell events
Get all PowerShell events
events = analyzer.get_all_events()
events = analyzer.get_all_events()
Decode encoded commands
Decode encoded commands
decoded = analyzer.decode_encoded_commands()
for d in decoded:
print(f"Encoded command at {d.timestamp}:")
print(f" Original: {d.encoded[:50]}...")
print(f" Decoded: {d.decoded}")
decoded = analyzer.decode_encoded_commands()
for d in decoded:
print(f"Encoded command at {d.timestamp}:")
print(f" Original: {d.encoded[:50]}...")
print(f" Decoded: {d.decoded}")
Detect malicious patterns
Detect malicious patterns
malicious = analyzer.detect_malicious_patterns()
for m in malicious:
print(f"MALICIOUS: {m.pattern}")
print(f" Command: {m.command}")
print(f" Technique: {m.mitre_technique}")
malicious = analyzer.detect_malicious_patterns()
for m in malicious:
print(f"MALICIOUS: {m.pattern}")
print(f" Command: {m.command}")
print(f" Technique: {m.mitre_technique}")
Detect download cradles
Detect download cradles
cradles = analyzer.detect_download_cradles()
for c in cradles:
print(f"Download Cradle: {c.type}")
print(f" URL: {c.url}")
print(f" Command: {c.command}")
cradles = analyzer.detect_download_cradles()
for c in cradles:
print(f"Download Cradle: {c.type}")
print(f" URL: {c.url}")
print(f" Command: {c.command}")
Detect obfuscation
Detect obfuscation
obfuscated = analyzer.detect_obfuscation()
for o in obfuscated:
print(f"Obfuscation: {o.technique}")
print(f" Score: {o.obfuscation_score}")
obfuscated = analyzer.detect_obfuscation()
for o in obfuscated:
print(f"Obfuscation: {o.technique}")
print(f" Score: {o.obfuscation_score}")
Extract IOCs from commands
Extract IOCs from commands
iocs = analyzer.extract_iocs()
print(f"URLs found: {len(iocs.urls)}")
print(f"IPs found: {len(iocs.ips)}")
print(f"Domains found: {len(iocs.domains)}")
print(f"File paths: {len(iocs.file_paths)}")
iocs = analyzer.extract_iocs()
print(f"URLs found: {len(iocs.urls)}")
print(f"IPs found: {len(iocs.ips)}")
print(f"Domains found: {len(iocs.domains)}")
print(f"File paths: {len(iocs.file_paths)}")
Generate report
Generate report
analyzer.generate_report("/evidence/powershell_analysis.html")
undefinedanalyzer.generate_report("/evidence/powershell_analysis.html")
undefinedTask 8: Firewall and Network Log Analysis
任务8:防火墙与网络日志分析
Input: Firewall logs (various formats)
Process:
- Parse firewall log format
- Analyze allowed/denied traffic
- Detect port scans
- Identify suspicious patterns
- Generate traffic statistics
Output: Firewall log analysis
Example:
python
from log_forensics import FirewallLogParser输入:防火墙日志(多种格式)
流程:
- 解析防火墙日志格式
- 分析允许/拒绝的流量
- 检测端口扫描
- 识别可疑模式
- 生成流量统计数据
输出:防火墙日志分析结果
示例:
python
from log_forensics import FirewallLogParserParse firewall logs
Parse firewall logs
parser = FirewallLogParser(
"/evidence/firewall.log",
format="pfsense" # or "iptables", "windows_firewall", "cisco_asa"
)
parser = FirewallLogParser(
"/evidence/firewall.log",
format="pfsense" # or "iptables", "windows_firewall", "cisco_asa"
)
Get all events
Get all events
events = parser.parse_all()
events = parser.parse_all()
Get denied traffic
Get denied traffic
denied = parser.get_denied_traffic()
for d in denied:
print(f"DENIED: {d.src_ip}:{d.src_port} -> {d.dst_ip}:{d.dst_port}")
print(f" Protocol: {d.protocol}")
print(f" Rule: {d.rule_name}")
denied = parser.get_denied_traffic()
for d in denied:
print(f"DENIED: {d.src_ip}:{d.src_port} -> {d.dst_ip}:{d.dst_port}")
print(f" Protocol: {d.protocol}")
print(f" Rule: {d.rule_name}")
Detect port scans
Detect port scans
scans = parser.detect_port_scans()
for s in scans:
print(f"Port Scan: {s.source_ip}")
print(f" Target: {s.target_ip}")
print(f" Ports: {s.ports_scanned}")
print(f" Type: {s.scan_type}")
scans = parser.detect_port_scans()
for s in scans:
print(f"Port Scan: {s.source_ip}")
print(f" Target: {s.target_ip}")
print(f" Ports: {s.ports_scanned}")
print(f" Type: {s.scan_type}")
Detect potential C2
Detect potential C2
c2_indicators = parser.detect_c2_indicators()
for c2 in c2_indicators:
print(f"C2 Indicator: {c2.internal_ip} -> {c2.external_ip}")
print(f" Pattern: {c2.pattern}")
c2_indicators = parser.detect_c2_indicators()
for c2 in c2_indicators:
print(f"C2 Indicator: {c2.internal_ip} -> {c2.external_ip}")
print(f" Pattern: {c2.pattern}")
Get traffic summary
Get traffic summary
summary = parser.get_traffic_summary()
print(f"Total connections: {summary.total}")
print(f"Allowed: {summary.allowed}")
print(f"Denied: {summary.denied}")
print(f"Top talkers: {summary.top_sources}")
summary = parser.get_traffic_summary()
print(f"Total connections: {summary.total}")
print(f"Allowed: {summary.allowed}")
print(f"Denied: {summary.denied}")
print(f"Top talkers: {summary.top_sources}")
Analyze by destination port
Analyze by destination port
port_analysis = parser.analyze_by_port()
for port, stats in port_analysis.items():
print(f"Port {port}: {stats.connection_count} connections")
port_analysis = parser.analyze_by_port()
for port, stats in port_analysis.items():
print(f"Port {port}: {stats.connection_count} connections")
Export analysis
Export analysis
parser.export_csv("/evidence/firewall_events.csv")
undefinedparser.export_csv("/evidence/firewall_events.csv")
undefinedTask 9: Cloud Service Log Analysis
任务9:云服务日志分析
Input: Cloud platform logs (AWS, Azure, GCP)
Process:
- Parse cloud log format
- Identify management events
- Detect suspicious API calls
- Analyze IAM activities
- Check for data access
Output: Cloud activity analysis
Example:
python
from log_forensics import CloudLogAnalyzer输入:云平台日志(AWS、Azure、GCP)
流程:
- 解析云日志格式
- 识别管理事件
- 检测可疑API调用
- 分析IAM活动
- 检查数据访问情况
输出:云活动分析结果
示例:
python
from log_forensics import CloudLogAnalyzerAWS CloudTrail analysis
AWS CloudTrail analysis
aws_analyzer = CloudLogAnalyzer(
"/evidence/cloudtrail/",
platform="aws"
)
aws_analyzer = CloudLogAnalyzer(
"/evidence/cloudtrail/",
platform="aws"
)
Get all events
Get all events
events = aws_analyzer.parse_all()
events = aws_analyzer.parse_all()
Get IAM events
Get IAM events
iam_events = aws_analyzer.get_iam_events()
for e in iam_events:
print(f"[{e.timestamp}] {e.event_name}")
print(f" User: {e.user_identity}")
print(f" Source IP: {e.source_ip}")
iam_events = aws_analyzer.get_iam_events()
for e in iam_events:
print(f"[{e.timestamp}] {e.event_name}")
print(f" User: {e.user_identity}")
print(f" Source IP: {e.source_ip}")
Detect suspicious activities
Detect suspicious activities
suspicious = aws_analyzer.detect_suspicious_activities()
for s in suspicious:
print(f"SUSPICIOUS: {s.event_name}")
print(f" Reason: {s.reason}")
print(f" Risk: {s.risk_level}")
suspicious = aws_analyzer.detect_suspicious_activities()
for s in suspicious:
print(f"SUSPICIOUS: {s.event_name}")
print(f" Reason: {s.reason}")
print(f" Risk: {s.risk_level}")
Detect privilege escalation
Detect privilege escalation
priv_esc = aws_analyzer.detect_privilege_escalation()
priv_esc = aws_analyzer.detect_privilege_escalation()
Detect data exfiltration indicators
Detect data exfiltration indicators
exfil = aws_analyzer.detect_data_exfiltration()
for e in exfil:
print(f"Potential Exfil: {e.resource}")
print(f" Action: {e.action}")
print(f" By: {e.user}")
exfil = aws_analyzer.detect_data_exfiltration()
for e in exfil:
print(f"Potential Exfil: {e.resource}")
print(f" Action: {e.action}")
print(f" By: {e.user}")
Analyze S3 access
Analyze S3 access
s3_access = aws_analyzer.get_s3_access_events()
for access in s3_access:
print(f"S3: {access.action} on {access.bucket}")
print(f" Object: {access.object_key}")
print(f" User: {access.user}")
s3_access = aws_analyzer.get_s3_access_events()
for access in s3_access:
print(f"S3: {access.action} on {access.bucket}")
print(f" Object: {access.object_key}")
print(f" User: {access.user}")
Generate cloud security report
Generate cloud security report
aws_analyzer.generate_report("/evidence/cloud_analysis.html")
undefinedaws_analyzer.generate_report("/evidence/cloud_analysis.html")
undefinedTask 10: Log Anomaly Detection
任务10:日志异常检测
Input: Any log source
Process:
- Establish baseline patterns
- Apply statistical analysis
- Detect outliers
- Identify unusual sequences
- Flag anomalies
Output: Anomaly detection results
Example:
python
from log_forensics import AnomalyDetector输入:任何日志源
流程:
- 建立基准模式
- 应用统计分析
- 检测离群值
- 识别异常序列
- 标记异常
输出:异常检测结果
示例:
python
from log_forensics import AnomalyDetectorInitialize detector
Initialize detector
detector = AnomalyDetector()
detector = AnomalyDetector()
Add log sources
Add log sources
detector.add_logs("/evidence/Security.evtx")
detector.add_logs("/evidence/access.log")
detector.add_logs("/evidence/auth.log")
detector.add_logs("/evidence/Security.evtx")
detector.add_logs("/evidence/access.log")
detector.add_logs("/evidence/auth.log")
Build baseline (using first portion of logs)
Build baseline (using first portion of logs)
detector.build_baseline(training_percentage=0.7)
detector.build_baseline(training_percentage=0.7)
Detect volume anomalies
Detect volume anomalies
volume_anomalies = detector.detect_volume_anomalies()
for a in volume_anomalies:
print(f"Volume Anomaly at {a.timestamp}")
print(f" Expected: {a.expected_count}")
print(f" Actual: {a.actual_count}")
print(f" Deviation: {a.deviation}x")
volume_anomalies = detector.detect_volume_anomalies()
for a in volume_anomalies:
print(f"Volume Anomaly at {a.timestamp}")
print(f" Expected: {a.expected_count}")
print(f" Actual: {a.actual_count}")
print(f" Deviation: {a.deviation}x")
Detect timing anomalies
Detect timing anomalies
timing_anomalies = detector.detect_timing_anomalies()
for a in timing_anomalies:
print(f"Timing Anomaly: {a.description}")
print(f" Event: {a.event_type}")
print(f" Usual time: {a.usual_time}")
print(f" Occurred: {a.actual_time}")
timing_anomalies = detector.detect_timing_anomalies()
for a in timing_anomalies:
print(f"Timing Anomaly: {a.description}")
print(f" Event: {a.event_type}")
print(f" Usual time: {a.usual_time}")
print(f" Occurred: {a.actual_time}")
Detect sequence anomalies
Detect sequence anomalies
sequence_anomalies = detector.detect_sequence_anomalies()
for a in sequence_anomalies:
print(f"Unusual Sequence: {a.sequence}")
print(f" Probability: {a.probability}")
sequence_anomalies = detector.detect_sequence_anomalies()
for a in sequence_anomalies:
print(f"Unusual Sequence: {a.sequence}")
print(f" Probability: {a.probability}")
Detect rare events
Detect rare events
rare_events = detector.find_rare_events(threshold=0.01)
for e in rare_events:
print(f"Rare Event: {e.event_type}")
print(f" Frequency: {e.frequency}")
print(f" Count: {e.count}")
rare_events = detector.find_rare_events(threshold=0.01)
for e in rare_events:
print(f"Rare Event: {e.event_type}")
print(f" Frequency: {e.frequency}")
print(f" Count: {e.count}")
Generate anomaly report
Generate anomaly report
detector.generate_report("/evidence/anomaly_report.html")
undefineddetector.generate_report("/evidence/anomaly_report.html")
undefinedConfiguration
配置
Environment Variables
环境变量
| Variable | Description | Required | Default |
|---|---|---|---|
| Default timezone for log parsing | No | UTC |
| Path to EVTX parser binary | No | Built-in |
| Path to GeoIP database | No | None |
| Path to YARA rules for log analysis | No | None |
| 变量 | 描述 | 是否必填 | 默认值 |
|---|---|---|---|
| 日志解析的默认时区 | 否 | UTC |
| EVTX解析器二进制文件路径 | 否 | 内置 |
| GeoIP数据库路径 | 否 | None |
| 用于日志分析的YARA规则路径 | 否 | None |
Options
选项
| Option | Type | Description |
|---|---|---|
| boolean | Normalize all timestamps to UTC |
| boolean | Enable parallel log parsing |
| boolean | Cache parsed log entries |
| integer | Maximum memory for log processing |
| integer | Lines to process per chunk |
| 选项 | 类型 | 描述 |
|---|---|---|
| 布尔值 | 将所有时间戳标准化为UTC |
| 布尔值 | 启用并行日志解析 |
| 布尔值 | 缓存解析后的日志条目 |
| 整数 | 日志处理的最大内存限制 |
| 整数 | 每个处理块的行数 |
Examples
示例场景
Example 1: Investigating Unauthorized Access
示例1:调查未授权访问
Scenario: Detecting and analyzing unauthorized system access
python
from log_forensics import EventLogParser, AuthenticationAnalyzer场景:检测并分析未授权系统访问
python
from log_forensics import EventLogParser, AuthenticationAnalyzerParse Security event log
Parse Security event log
parser = EventLogParser("/evidence/Security.evtx")
parser = EventLogParser("/evidence/Security.evtx")
Get failed login attempts
Get failed login attempts
failed = parser.get_failed_logins()
print(f"Total failed logins: {len(failed)}")
failed = parser.get_failed_logins()
print(f"Total failed logins: {len(failed)}")
Group by target account
Group by target account
accounts = {}
for f in failed:
if f.target_account not in accounts:
accounts[f.target_account] = []
accounts[f.target_account].append(f)
accounts = {}
for f in failed:
if f.target_account not in accounts:
accounts[f.target_account] = []
accounts[f.target_account].append(f)
Find accounts with many failures
Find accounts with many failures
for account, failures in accounts.items():
if len(failures) > 10:
print(f"Account: {account}")
print(f" Failures: {len(failures)}")
unique_ips = set(f.source_ip for f in failures)
print(f" Source IPs: {unique_ips}")
for account, failures in accounts.items():
if len(failures) > 10:
print(f"Account: {account}")
print(f" Failures: {len(failures)}")
unique_ips = set(f.source_ip for f in failures)
print(f" Source IPs: {unique_ips}")
Check for successful logins after failures
Check for successful logins after failures
auth_analyzer = AuthenticationAnalyzer()
auth_analyzer.add_parser(parser)
compromise_indicators = auth_analyzer.find_success_after_failure()
undefinedauth_analyzer = AuthenticationAnalyzer()
auth_analyzer.add_parser(parser)
compromise_indicators = auth_analyzer.find_success_after_failure()
undefinedExample 2: Insider Threat Investigation
示例2:内部威胁调查
Scenario: Analyzing logs for insider threat indicators
python
from log_forensics import LogCorrelator, EventLogParser, FileAccessParser场景:分析日志以识别内部威胁指标
python
from log_forensics import LogCorrelator, EventLogParser, FileAccessParserCombine multiple log sources
Combine multiple log sources
correlator = LogCorrelator()
correlator.add_source("security", EventLogParser("/evidence/Security.evtx"))
correlator.add_source("files", FileAccessParser("/evidence/file_audit.evtx"))
correlator = LogCorrelator()
correlator.add_source("security", EventLogParser("/evidence/Security.evtx"))
correlator.add_source("files", FileAccessParser("/evidence/file_audit.evtx"))
Analyze specific user's activity
Analyze specific user's activity
user = "john.smith"
user_timeline = correlator.get_user_activity(user)
user = "john.smith"
user_timeline = correlator.get_user_activity(user)
Look for data collection indicators
Look for data collection indicators
data_access = correlator.find_bulk_file_access(
user=user,
threshold=100,
time_window_hours=1
)
data_access = correlator.find_bulk_file_access(
user=user,
threshold=100,
time_window_hours=1
)
Check for off-hours activity
Check for off-hours activity
off_hours = correlator.find_off_hours_activity(
user=user,
business_hours=(9, 18),
business_days=[0, 1, 2, 3, 4] # Mon-Fri
)
off_hours = correlator.find_off_hours_activity(
user=user,
business_hours=(9, 18),
business_days=[0, 1, 2, 3, 4] # 周一至周五
)
Generate insider threat report
Generate insider threat report
correlator.generate_insider_report(user, "/evidence/insider_report.html")
undefinedcorrelator.generate_insider_report(user, "/evidence/insider_report.html")
undefinedLimitations
局限性
- Large log files may require significant memory
- Some log formats may not be fully supported
- Timestamp parsing depends on consistent formats
- Anomaly detection requires sufficient baseline data
- Real-time analysis not supported
- Encrypted logs cannot be parsed
- Log rotation may cause gaps in analysis
- 大型日志文件可能需要大量内存
- 部分日志格式可能未完全支持
- 时间戳解析依赖于格式的一致性
- 异常检测需要足够的基准数据
- 不支持实时分析
- 无法解析加密日志
- 日志轮转可能导致分析出现缺口
Troubleshooting
故障排除
Common Issue 1: EVTX Parsing Errors
常见问题1:EVTX解析错误
Problem: Unable to parse Windows Event Log
Solution:
- Check file for corruption
- Ensure file is complete (not truncated)
- Try alternative parser
问题:无法解析Windows事件日志
解决方案:
- 检查文件是否损坏
- 确保文件完整(未被截断)
- 尝试使用其他解析器
Common Issue 2: Timestamp Misalignment
常见问题2:时间戳对齐错误
Problem: Events from different sources don't correlate
Solution:
- Verify source timezones
- Use normalize_timestamps option
- Check for clock skew
问题:不同来源的事件无法关联
解决方案:
- 验证源时区设置
- 使用normalize_timestamps选项
- 检查时钟偏差
Common Issue 3: Memory Exhaustion
常见问题3:内存耗尽
Problem: Out of memory on large log files
Solution:
- Use streaming mode
- Process in chunks
- Increase max_memory_mb setting
问题:处理大型日志文件时出现内存不足
解决方案:
- 使用流模式
- 分块处理
- 增加max_memory_mb设置
Related Skills
相关工具
- timeline-forensics: Super timeline creation
- memory-forensics: Correlate with memory analysis
- network-forensics: Correlate with network captures
- registry-forensics: Windows registry analysis
- incident-response: IR workflow integration
- timeline-forensics: 超级时间线创建工具
- memory-forensics: 与内存分析关联
- network-forensics: 与网络捕获关联
- registry-forensics: Windows注册表分析
- incident-response: 事件响应工作流集成
References
参考资料
- Log Forensics Reference
- Windows Event ID Guide
- Log Format Specifications
- 日志取证参考文档
- Windows事件ID指南
- 日志格式规范