Loading...
Loading...
Investigate supply chain attack artifacts including trojanized software updates, compromised build pipelines, and sideloaded dependencies to identify intrusion vectors and scope of compromise.
npx skill4agent add mukul975/anthropic-cybersecurity-skills analyzing-supply-chain-malware-artifactspefilessdeephashlib#!/usr/bin/env python3
"""Compare trojanized binary against legitimate version."""
import hashlib
import pefile
import sys
import json
def compare_pe_files(legitimate_path, suspect_path):
"""Compare PE file structures between legitimate and suspect versions."""
legit_pe = pefile.PE(legitimate_path)
suspect_pe = pefile.PE(suspect_path)
report = {"differences": [], "suspicious_sections": [], "import_changes": []}
# Compare sections
legit_sections = {s.Name.rstrip(b'\x00').decode(): {
"size": s.SizeOfRawData,
"entropy": s.get_entropy(),
"characteristics": s.Characteristics,
} for s in legit_pe.sections}
suspect_sections = {s.Name.rstrip(b'\x00').decode(): {
"size": s.SizeOfRawData,
"entropy": s.get_entropy(),
"characteristics": s.Characteristics,
} for s in suspect_pe.sections}
# Find new or modified sections
for name, props in suspect_sections.items():
if name not in legit_sections:
report["suspicious_sections"].append({
"name": name, "reason": "New section not in legitimate version",
"size": props["size"], "entropy": round(props["entropy"], 2),
})
elif abs(props["size"] - legit_sections[name]["size"]) > 1024:
report["suspicious_sections"].append({
"name": name, "reason": "Section size significantly changed",
"legit_size": legit_sections[name]["size"],
"suspect_size": props["size"],
})
# Compare imports
legit_imports = set()
if hasattr(legit_pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in legit_pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
if imp.name:
legit_imports.add(f"{entry.dll.decode()}!{imp.name.decode()}")
suspect_imports = set()
if hasattr(suspect_pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in suspect_pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
if imp.name:
suspect_imports.add(f"{entry.dll.decode()}!{imp.name.decode()}")
new_imports = suspect_imports - legit_imports
if new_imports:
report["import_changes"] = list(new_imports)
# Check code signing
report["legit_signed"] = bool(legit_pe.OPTIONAL_HEADER.DATA_DIRECTORY[4].Size)
report["suspect_signed"] = bool(suspect_pe.OPTIONAL_HEADER.DATA_DIRECTORY[4].Size)
return report
def hash_file(filepath):
"""Calculate multiple hashes for a file."""
hashes = {}
with open(filepath, 'rb') as f:
data = f.read()
for algo in ['md5', 'sha1', 'sha256']:
h = hashlib.new(algo)
h.update(data)
hashes[algo] = h.hexdigest()
return hashes
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <legitimate_binary> <suspect_binary>")
sys.exit(1)
report = compare_pe_files(sys.argv[1], sys.argv[2])
print(json.dumps(report, indent=2))