Loading...
Loading...
Compare original and translation side by side
undefinedundefinedundefinedundefinedimport canimport canwhile time.time() - start_time < duration:
msg = bus.recv(timeout=1.0)
if msg:
messages.append({
'timestamp': msg.timestamp,
'arbitration_id': hex(msg.arbitration_id),
'data': msg.data.hex(),
'dlc': msg.dlc
})
return messageswhile time.time() - start_time < duration:
msg = bus.recv(timeout=1.0)
if msg:
messages.append({
'timestamp': msg.timestamp,
'arbitration_id': hex(msg.arbitration_id),
'data': msg.data.hex(),
'dlc': msg.dlc
})
return messagesundefinedundefinedfrom collections import Counter
import json
def analyze_can_traffic(messages):
"""Analyze CAN traffic for security assessment"""
analysis = {
'total_messages': len(messages),
'unique_ids': set(),
'id_frequency': Counter(),
'data_patterns': {}
}
for msg in messages:
arb_id = msg['arbitration_id']
analysis['unique_ids'].add(arb_id)
analysis['id_frequency'][arb_id] += 1
# Track data patterns per ID
if arb_id not in analysis['data_patterns']:
analysis['data_patterns'][arb_id] = []
analysis['data_patterns'][arb_id].append(msg['data'])
# Convert set to list for JSON serialization
analysis['unique_ids'] = list(analysis['unique_ids'])
analysis['id_frequency'] = dict(analysis['id_frequency'])
return analysis
def save_analysis(analysis, output_file='analysis.json'):
"""Save analysis results to file"""
with open(output_file, 'w') as f:
json.dump(analysis, f, indent=2)from collections import Counter
import json
def analyze_can_traffic(messages):
"""分析CAN流量以进行安全评估"""
analysis = {
'total_messages': len(messages),
'unique_ids': set(),
'id_frequency': Counter(),
'data_patterns': {}
}
for msg in messages:
arb_id = msg['arbitration_id']
analysis['unique_ids'].add(arb_id)
analysis['id_frequency'][arb_id] += 1
# 按ID跟踪数据模式
if arb_id not in analysis['data_patterns']:
analysis['data_patterns'][arb_id] = []
analysis['data_patterns'][arb_id].append(msg['data'])
# 将集合转换为列表以支持JSON序列化
analysis['unique_ids'] = list(analysis['unique_ids'])
analysis['id_frequency'] = dict(analysis['id_frequency'])
return analysis
def save_analysis(analysis, output_file='analysis.json'):
"""将分析结果保存到文件"""
with open(output_file, 'w') as f:
json.dump(analysis, f, indent=2)import random
import time
def fuzz_can_id(target_id, num_iterations=100, delay=0.1):
"""Fuzz a specific CAN ID with random data"""
print(f"Starting fuzzing on CAN ID: {hex(target_id)}")
for i in range(num_iterations):
# Generate random 8-byte payload
fuzz_data = bytes([random.randint(0, 255) for _ in range(8)])
msg = can.Message(
arbitration_id=target_id,
data=fuzz_data,
is_extended_id=False
)
try:
bus.send(msg)
print(f"[{i+1}/{num_iterations}] Sent: {fuzz_data.hex()}")
time.sleep(delay)
except Exception as e:
print(f"Error sending message: {e}")
def intelligent_fuzz(target_id, baseline_data, mutations=50):
"""Perform mutation-based fuzzing on known data"""
baseline = bytes.fromhex(baseline_data)
for i in range(mutations):
mutated = bytearray(baseline)
# Mutate random byte
byte_idx = random.randint(0, len(mutated) - 1)
mutated[byte_idx] = random.randint(0, 255)
msg = can.Message(
arbitration_id=target_id,
data=bytes(mutated),
is_extended_id=False
)
bus.send(msg)
time.sleep(0.05)import random
import time
def fuzz_can_id(target_id, num_iterations=100, delay=0.1):
"""对特定CAN ID进行随机数据模糊测试"""
print(f"开始对CAN ID进行模糊测试: {hex(target_id)}")
for i in range(num_iterations):
# 生成随机8字节负载
fuzz_data = bytes([random.randint(0, 255) for _ in range(8)])
msg = can.Message(
arbitration_id=target_id,
data=fuzz_data,
is_extended_id=False
)
try:
bus.send(msg)
print(f"[{i+1}/{num_iterations}] 已发送: {fuzz_data.hex()}")
time.sleep(delay)
except Exception as e:
print(f"发送消息出错: {e}")
def intelligent_fuzz(target_id, baseline_data, mutations=50):
"""对已知数据执行基于变异的模糊测试"""
baseline = bytes.fromhex(baseline_data)
for i in range(mutations):
mutated = bytearray(baseline)
# 变异随机字节
byte_idx = random.randint(0, len(mutated) - 1)
mutated[byte_idx] = random.randint(0, 255)
msg = can.Message(
arbitration_id=target_id,
data=bytes(mutated),
is_extended_id=False
)
bus.send(msg)
time.sleep(0.05)import pickle
def capture_session(output_file='session.pkl', duration=30):
"""Capture CAN session for replay"""
print(f"Capturing session for {duration} seconds...")
messages = []
start_time = time.time()
while time.time() - start_time < duration:
msg = bus.recv(timeout=1.0)
if msg:
messages.append({
'arbitration_id': msg.arbitration_id,
'data': bytes(msg.data),
'timestamp': msg.timestamp,
'is_extended_id': msg.is_extended_id
})
with open(output_file, 'wb') as f:
pickle.dump(messages, f)
print(f"Captured {len(messages)} messages")
return messages
def replay_session(input_file='session.pkl', speed_multiplier=1.0):
"""Replay captured CAN session"""
with open(input_file, 'rb') as f:
messages = pickle.load(f)
print(f"Replaying {len(messages)} messages...")
if not messages:
return
base_time = messages[0]['timestamp']
start_time = time.time()
for msg_data in messages:
# Calculate timing
original_delay = msg_data['timestamp'] - base_time
target_time = start_time + (original_delay / speed_multiplier)
# Wait until target time
sleep_time = target_time - time.time()
if sleep_time > 0:
time.sleep(sleep_time)
# Send message
msg = can.Message(
arbitration_id=msg_data['arbitration_id'],
data=msg_data['data'],
is_extended_id=msg_data['is_extended_id']
)
bus.send(msg)import pickle
def capture_session(output_file='session.pkl', duration=30):
"""捕获CAN会话用于重放"""
print(f"正在捕获会话,时长{duration}秒...")
messages = []
start_time = time.time()
while time.time() - start_time < duration:
msg = bus.recv(timeout=1.0)
if msg:
messages.append({
'arbitration_id': msg.arbitration_id,
'data': bytes(msg.data),
'timestamp': msg.timestamp,
'is_extended_id': msg.is_extended_id
})
with open(output_file, 'wb') as f:
pickle.dump(messages, f)
print(f"已捕获{len(messages)}条消息")
return messages
def replay_session(input_file='session.pkl', speed_multiplier=1.0):
"""重放捕获的CAN会话"""
with open(input_file, 'rb') as f:
messages = pickle.load(f)
print(f"正在重放{len(messages)}条消息...")
if not messages:
return
base_time = messages[0]['timestamp']
start_time = time.time()
for msg_data in messages:
# 计算时序
original_delay = msg_data['timestamp'] - base_time
target_time = start_time + (original_delay / speed_multiplier)
# 等待至目标时间
sleep_time = target_time - time.time()
if sleep_time > 0:
time.sleep(sleep_time)
# 发送消息
msg = can.Message(
arbitration_id=msg_data['arbitration_id'],
data=msg_data['data'],
is_extended_id=msg_data['is_extended_id']
)
bus.send(msg)undefinedundefinedundefinedundefinedconfig.json{
"interface": {
"channel": "can0",
"bustype": "socketcan",
"bitrate": 500000
},
"logging": {
"level": "INFO",
"file": "s800.log",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
},
"fuzzing": {
"default_iterations": 100,
"default_delay": 0.1,
"target_ids": ["0x123", "0x456", "0x789"]
},
"analysis": {
"output_dir": "./results",
"save_pcap": true
}
}config.json{
"interface": {
"channel": "can0",
"bustype": "socketcan",
"bitrate": 500000
},
"logging": {
"level": "INFO",
"file": "s800.log",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
},
"fuzzing": {
"default_iterations": 100,
"default_delay": 0.1,
"target_ids": ["0x123", "0x456", "0x789"]
},
"analysis": {
"output_dir": "./results",
"save_pcap": true
}
}import os
from datetime import datetime
def run_security_assessment(target_ids, duration=60):
"""Perform comprehensive security assessment"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = f"assessment_{timestamp}"
os.makedirs(output_dir, exist_ok=True)
# 1. Capture baseline traffic
print("[1/4] Capturing baseline traffic...")
baseline = read_can_traffic(duration=duration)
with open(f"{output_dir}/baseline.json", 'w') as f:
json.dump(baseline, f, indent=2)
# 2. Analyze traffic
print("[2/4] Analyzing traffic...")
analysis = analyze_can_traffic(baseline)
save_analysis(analysis, f"{output_dir}/analysis.json")
# 3. Perform targeted fuzzing
print("[3/4] Fuzzing target IDs...")
for target_id in target_ids:
fuzz_can_id(int(target_id, 16), num_iterations=50)
time.sleep(2)
# 4. Capture post-fuzz traffic
print("[4/4] Capturing post-fuzz traffic...")
post_fuzz = read_can_traffic(duration=30)
with open(f"{output_dir}/post_fuzz.json", 'w') as f:
json.dump(post_fuzz, f, indent=2)
print(f"Assessment complete. Results in {output_dir}/")import os
from datetime import datetime
def run_security_assessment(target_ids, duration=60):
"""执行全面安全评估"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = f"assessment_{timestamp}"
os.makedirs(output_dir, exist_ok=True)
# 1. 捕获基准流量
print("[1/4] 正在捕获基准流量...")
baseline = read_can_traffic(duration=duration)
with open(f"{output_dir}/baseline.json", 'w') as f:
json.dump(baseline, f, indent=2)
# 2. 分析流量
print("[2/4] 正在分析流量...")
analysis = analyze_can_traffic(baseline)
save_analysis(analysis, f"{output_dir}/analysis.json")
# 3. 执行定向模糊测试
print("[3/4] 正在对目标ID进行模糊测试...")
for target_id in target_ids:
fuzz_can_id(int(target_id, 16), num_iterations=50)
time.sleep(2)
# 4. 捕获模糊测试后流量
print("[4/4] 正在捕获模糊测试后流量...")
post_fuzz = read_can_traffic(duration=30)
with open(f"{output_dir}/post_fuzz.json", 'w') as f:
json.dump(post_fuzz, f, indent=2)
print(f"评估完成。结果保存在 {output_dir}/")def monitor_can_bus(alert_ids=None, callback=None):
"""Monitor CAN bus for specific IDs or anomalies"""
alert_ids = alert_ids or []
print("Starting CAN bus monitoring...")
print(f"Alert IDs: {[hex(id) for id in alert_ids]}")
while True:
msg = bus.recv(timeout=1.0)
if msg:
if msg.arbitration_id in alert_ids:
alert_msg = f"ALERT: Detected ID {hex(msg.arbitration_id)} - Data: {msg.data.hex()}"
print(alert_msg)
if callback:
callback(msg)def monitor_can_bus(alert_ids=None, callback=None):
"""监控CAN总线以识别特定ID或异常"""
alert_ids = alert_ids or []
print("开始CAN总线监控...")
print(f"告警ID: {[hex(id) for id in alert_ids]}")
while True:
msg = bus.recv(timeout=1.0)
if msg:
if msg.arbitration_id in alert_ids:
alert_msg = f"告警: 检测到ID {hex(msg.arbitration_id)} - 数据: {msg.data.hex()}"
print(alert_msg)
if callback:
callback(msg)undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefined