analyzing-supply-chain-malware-artifacts

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Analyzing Supply Chain Malware Artifacts

供应链恶意软件工件分析

Overview

概述

Supply chain attacks compromise legitimate software distribution channels to deliver malware through trusted update mechanisms. Notable examples include SolarWinds SUNBURST (2020, affecting 18,000+ customers), 3CX SmoothOperator (2023, a cascading supply chain attack originating from Trading Technologies), and numerous npm/PyPI package poisoning campaigns. Analysis involves comparing trojanized binaries against legitimate versions, identifying injected code in build artifacts, examining code signing anomalies, and tracing the infection chain from initial compromise through payload delivery. As of 2025, supply chain attacks account for 30% of all breaches, a 100% increase from prior years.
供应链攻击通过攻陷合法软件分发渠道,借助可信更新机制传播恶意软件。典型案例包括2020年影响18000+客户的SolarWinds SUNBURST、2023年源自Trading Technologies的3CX SmoothOperator级联供应链攻击,以及众多npm/PyPI包投毒事件。分析工作包括对比被植入木马的二进制文件与合法版本、识别构建工件中注入的代码、检查代码签名异常,并追踪从初始攻陷到 payload 投递的感染链。截至2025年,供应链攻击占所有数据泄露事件的30%,较前几年增长了100%。

When to Use

适用场景

  • When investigating security incidents that require analyzing supply chain malware artifacts
  • When building detection rules or threat hunting queries for this domain
  • When SOC analysts need structured procedures for this analysis type
  • When validating security monitoring coverage for related attack techniques
  • 当需要分析供应链恶意软件工件来调查安全事件时
  • 当针对该领域构建检测规则或威胁狩猎查询时
  • 当SOC分析师需要此类分析的结构化流程时
  • 当验证相关攻击技术的安全监控覆盖范围时

Prerequisites

前置条件

  • Python 3.9+ with
    pefile
    ,
    ssdeep
    ,
    hashlib
  • Binary diff tools (BinDiff, Diaphora)
  • Code signing verification tools (sigcheck, codesign)
  • Software composition analysis (SCA) tools
  • Access to legitimate software versions for comparison
  • Package repository monitoring (npm, PyPI, NuGet)
  • 安装Python 3.9+及
    pefile
    ssdeep
    hashlib
  • 二进制对比工具(BinDiff、Diaphora)
  • 代码签名验证工具(sigcheck、codesign)
  • 软件成分分析(SCA)工具
  • 可获取合法软件版本用于对比
  • 包仓库监控(npm、PyPI、NuGet)

Workflow

工作流程

Step 1: Binary Comparison Analysis

步骤1:二进制对比分析

python
#!/usr/bin/env python3
"""Compare trojanized binary against legitimate version."""
import hashlib
import pefile
import sys
import json


def compare_pe_files(legitimate_path, suspect_path):
    """Compare PE file structures between legitimate and suspect versions."""
    legit_pe = pefile.PE(legitimate_path)
    suspect_pe = pefile.PE(suspect_path)

    report = {"differences": [], "suspicious_sections": [], "import_changes": []}

    # Compare sections
    legit_sections = {s.Name.rstrip(b'\x00').decode(): {
        "size": s.SizeOfRawData,
        "entropy": s.get_entropy(),
        "characteristics": s.Characteristics,
    } for s in legit_pe.sections}

    suspect_sections = {s.Name.rstrip(b'\x00').decode(): {
        "size": s.SizeOfRawData,
        "entropy": s.get_entropy(),
        "characteristics": s.Characteristics,
    } for s in suspect_pe.sections}

    # Find new or modified sections
    for name, props in suspect_sections.items():
        if name not in legit_sections:
            report["suspicious_sections"].append({
                "name": name, "reason": "New section not in legitimate version",
                "size": props["size"], "entropy": round(props["entropy"], 2),
            })
        elif abs(props["size"] - legit_sections[name]["size"]) > 1024:
            report["suspicious_sections"].append({
                "name": name, "reason": "Section size significantly changed",
                "legit_size": legit_sections[name]["size"],
                "suspect_size": props["size"],
            })

    # Compare imports
    legit_imports = set()
    if hasattr(legit_pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in legit_pe.DIRECTORY_ENTRY_IMPORT:
            for imp in entry.imports:
                if imp.name:
                    legit_imports.add(f"{entry.dll.decode()}!{imp.name.decode()}")

    suspect_imports = set()
    if hasattr(suspect_pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in suspect_pe.DIRECTORY_ENTRY_IMPORT:
            for imp in entry.imports:
                if imp.name:
                    suspect_imports.add(f"{entry.dll.decode()}!{imp.name.decode()}")

    new_imports = suspect_imports - legit_imports
    if new_imports:
        report["import_changes"] = list(new_imports)

    # Check code signing
    report["legit_signed"] = bool(legit_pe.OPTIONAL_HEADER.DATA_DIRECTORY[4].Size)
    report["suspect_signed"] = bool(suspect_pe.OPTIONAL_HEADER.DATA_DIRECTORY[4].Size)

    return report


def hash_file(filepath):
    """Calculate multiple hashes for a file."""
    hashes = {}
    with open(filepath, 'rb') as f:
        data = f.read()
    for algo in ['md5', 'sha1', 'sha256']:
        h = hashlib.new(algo)
        h.update(data)
        hashes[algo] = h.hexdigest()
    return hashes


if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]} <legitimate_binary> <suspect_binary>")
        sys.exit(1)
    report = compare_pe_files(sys.argv[1], sys.argv[2])
    print(json.dumps(report, indent=2))
python
#!/usr/bin/env python3
"""Compare trojanized binary against legitimate version."""
import hashlib
import pefile
import sys
import json


def compare_pe_files(legitimate_path, suspect_path):
    """Compare PE file structures between legitimate and suspect versions."""
    legit_pe = pefile.PE(legitimate_path)
    suspect_pe = pefile.PE(suspect_path)

    report = {"differences": [], "suspicious_sections": [], "import_changes": []}

    # Compare sections
    legit_sections = {s.Name.rstrip(b'\x00').decode(): {
        "size": s.SizeOfRawData,
        "entropy": s.get_entropy(),
        "characteristics": s.Characteristics,
    } for s in legit_pe.sections}

    suspect_sections = {s.Name.rstrip(b'\x00').decode(): {
        "size": s.SizeOfRawData,
        "entropy": s.get_entropy(),
        "characteristics": s.Characteristics,
    } for s in suspect_pe.sections}

    # Find new or modified sections
    for name, props in suspect_sections.items():
        if name not in legit_sections:
            report["suspicious_sections"].append({
                "name": name, "reason": "New section not in legitimate version",
                "size": props["size"], "entropy": round(props["entropy"], 2),
            })
        elif abs(props["size"] - legit_sections[name]["size"]) > 1024:
            report["suspicious_sections"].append({
                "name": name, "reason": "Section size significantly changed",
                "legit_size": legit_sections[name]["size"],
                "suspect_size": props["size"],
            })

    # Compare imports
    legit_imports = set()
    if hasattr(legit_pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in legit_pe.DIRECTORY_ENTRY_IMPORT:
            for imp in entry.imports:
                if imp.name:
                    legit_imports.add(f"{entry.dll.decode()}!{imp.name.decode()}")

    suspect_imports = set()
    if hasattr(suspect_pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in suspect_pe.DIRECTORY_ENTRY_IMPORT:
            for imp in entry.imports:
                if imp.name:
                    suspect_imports.add(f"{entry.dll.decode()}!{imp.name.decode()}")

    new_imports = suspect_imports - legit_imports
    if new_imports:
        report["import_changes"] = list(new_imports)

    # Check code signing
    report["legit_signed"] = bool(legit_pe.OPTIONAL_HEADER.DATA_DIRECTORY[4].Size)
    report["suspect_signed"] = bool(suspect_pe.OPTIONAL_HEADER.DATA_DIRECTORY[4].Size)

    return report


def hash_file(filepath):
    """Calculate multiple hashes for a file."""
    hashes = {}
    with open(filepath, 'rb') as f:
        data = f.read()
    for algo in ['md5', 'sha1', 'sha256']:
        h = hashlib.new(algo)
        h.update(data)
        hashes[algo] = h.hexdigest()
    return hashes


if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]} <legitimate_binary> <suspect_binary>")
        sys.exit(1)
    report = compare_pe_files(sys.argv[1], sys.argv[2])
    print(json.dumps(report, indent=2))

Validation Criteria

验证标准

  • Trojanized components identified through binary diffing
  • Injected code isolated and analyzed separately
  • Code signing anomalies documented
  • Infection timeline reconstructed from build artifacts
  • Downstream impact scope assessed across affected systems
  • IOCs extracted for detection and blocking
  • 通过二进制对比识别出被植入木马的组件
  • 隔离并单独分析注入的代码
  • 记录代码签名异常
  • 从构建工件重建感染时间线
  • 评估受影响系统的下游影响范围
  • 提取IOC用于检测和拦截

References

参考资料