analyzing-campaign-attribution-evidence

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Analyzing Campaign Attribution Evidence

活动归因证据分析

Overview

概述

Campaign attribution analysis involves systematically evaluating evidence to determine which threat actor or group is responsible for a cyber operation. This skill covers collecting and weighting attribution indicators using the Diamond Model and ACH (Analysis of Competing Hypotheses), analyzing infrastructure overlaps, TTP consistency, malware code similarities, operational timing patterns, and language artifacts to build confidence-weighted attribution assessments.
活动归因分析指的是系统评估证据,以确定哪个威胁行为者或组织对某一网络操作负责。本技能涵盖使用Diamond Model和ACH(Analysis of Competing Hypotheses,竞争性假设分析法)收集并权衡归因指标,分析基础设施重叠、TTP一致性、恶意软件代码相似性、操作时间模式以及语言工件,以构建带有置信度权重的归因评估。

When to Use

适用场景

  • When investigating security incidents that require analyzing campaign attribution evidence
  • When building detection rules or threat hunting queries for this domain
  • When SOC analysts need structured procedures for this analysis type
  • When validating security monitoring coverage for related attack techniques
  • 当调查需要分析活动归因证据的安全事件时
  • 当为此领域构建检测规则或威胁狩猎查询时
  • 当SOC分析师需要此类分析的结构化流程时
  • 当验证相关攻击技术的安全监控覆盖范围时

Prerequisites

前置条件

  • Python 3.9+ with
    attackcti
    ,
    stix2
    ,
    networkx
    libraries
  • Access to threat intelligence platforms (MISP, OpenCTI)
  • Understanding of Diamond Model of Intrusion Analysis
  • Familiarity with MITRE ATT&CK threat group profiles
  • Knowledge of malware analysis and infrastructure tracking techniques
  • 安装有
    attackcti
    stix2
    networkx
    库的Python 3.9+环境
  • 能够访问威胁情报平台(MISP、OpenCTI)
  • 了解Diamond Model of Intrusion Analysis(入侵分析钻石模型)
  • 熟悉MITRE ATT&CK威胁组织画像
  • 掌握恶意软件分析和基础设施追踪技术

Key Concepts

核心概念

Attribution Evidence Categories

归因证据类别

  1. Infrastructure Overlap: Shared C2 servers, domains, IP ranges, hosting providers
  2. TTP Consistency: Matching ATT&CK techniques and sub-techniques across campaigns
  3. Malware Code Similarity: Shared code bases, compilers, PDB paths, encryption routines
  4. Operational Patterns: Timing (working hours, time zones), targeting patterns, operational tempo
  5. Language Artifacts: Embedded strings, variable names, error messages in specific languages
  6. Victimology: Target sector, geography, and organizational profile consistency
  1. 基础设施重叠:共享的C2服务器、域名、IP范围、托管服务商
  2. TTP一致性:跨活动匹配ATT&CK技术及子技术
  3. 恶意软件代码相似性:共享代码库、编译器、PDB路径、加密例程
  4. 操作模式:时间安排(工作时段、时区)、目标模式、操作节奏
  5. 语言工件:特定语言的嵌入字符串、变量名、错误信息
  6. 受害者特征:目标行业、地域及组织画像的一致性

Confidence Levels

置信度等级

  • High Confidence: Multiple independent evidence categories converge on same actor
  • Moderate Confidence: Several evidence categories match, some ambiguity remains
  • Low Confidence: Limited evidence, possible false flags or shared tooling
  • 高置信度:多个独立证据类别指向同一行为者
  • 中等置信度:多个证据类别匹配,但仍存在一些模糊性
  • 低置信度:证据有限,可能存在虚假标记或共享工具

Analysis of Competing Hypotheses (ACH)

竞争性假设分析法(ACH)

Structured analytical method that evaluates evidence against multiple competing hypotheses. Each piece of evidence is scored as consistent, inconsistent, or neutral with respect to each hypothesis. The hypothesis with the least inconsistent evidence is favored.
一种结构化分析方法,针对多个竞争性假设评估证据。每一项证据会被评分,判断其与各假设是否一致、不一致或无关。不一致证据最少的假设会被优先考虑。

Workflow

工作流程

Step 1: Collect Attribution Evidence

步骤1:收集归因证据

python
from stix2 import MemoryStore, Filter
from collections import defaultdict

class AttributionAnalyzer:
    def __init__(self):
        self.evidence = []
        self.hypotheses = {}

    def add_evidence(self, category, description, value, confidence):
        self.evidence.append({
            "category": category,
            "description": description,
            "value": value,
            "confidence": confidence,
            "timestamp": None,
        })

    def add_hypothesis(self, actor_name, actor_id=""):
        self.hypotheses[actor_name] = {
            "actor_id": actor_id,
            "consistent_evidence": [],
            "inconsistent_evidence": [],
            "neutral_evidence": [],
            "score": 0,
        }

    def evaluate_evidence(self, evidence_idx, actor_name, assessment):
        """Assess evidence against a hypothesis: consistent/inconsistent/neutral."""
        if assessment == "consistent":
            self.hypotheses[actor_name]["consistent_evidence"].append(evidence_idx)
            self.hypotheses[actor_name]["score"] += self.evidence[evidence_idx]["confidence"]
        elif assessment == "inconsistent":
            self.hypotheses[actor_name]["inconsistent_evidence"].append(evidence_idx)
            self.hypotheses[actor_name]["score"] -= self.evidence[evidence_idx]["confidence"] * 2
        else:
            self.hypotheses[actor_name]["neutral_evidence"].append(evidence_idx)

    def rank_hypotheses(self):
        """Rank hypotheses by attribution score."""
        ranked = sorted(
            self.hypotheses.items(),
            key=lambda x: x[1]["score"],
            reverse=True,
        )
        return [
            {
                "actor": name,
                "score": data["score"],
                "consistent": len(data["consistent_evidence"]),
                "inconsistent": len(data["inconsistent_evidence"]),
                "confidence": self._score_to_confidence(data["score"]),
            }
            for name, data in ranked
        ]

    def _score_to_confidence(self, score):
        if score >= 80:
            return "HIGH"
        elif score >= 40:
            return "MODERATE"
        else:
            return "LOW"
python
from stix2 import MemoryStore, Filter
from collections import defaultdict

class AttributionAnalyzer:
    def __init__(self):
        self.evidence = []
        self.hypotheses = {}

    def add_evidence(self, category, description, value, confidence):
        self.evidence.append({
            "category": category,
            "description": description,
            "value": value,
            "confidence": confidence,
            "timestamp": None,
        })

    def add_hypothesis(self, actor_name, actor_id=""):
        self.hypotheses[actor_name] = {
            "actor_id": actor_id,
            "consistent_evidence": [],
            "inconsistent_evidence": [],
            "neutral_evidence": [],
            "score": 0,
        }

    def evaluate_evidence(self, evidence_idx, actor_name, assessment):
        """Assess evidence against a hypothesis: consistent/inconsistent/neutral."""
        if assessment == "consistent":
            self.hypotheses[actor_name]["consistent_evidence"].append(evidence_idx)
            self.hypotheses[actor_name]["score"] += self.evidence[evidence_idx]["confidence"]
        elif assessment == "inconsistent":
            self.hypotheses[actor_name]["inconsistent_evidence"].append(evidence_idx)
            self.hypotheses[actor_name]["score"] -= self.evidence[evidence_idx]["confidence"] * 2
        else:
            self.hypotheses[actor_name]["neutral_evidence"].append(evidence_idx)

    def rank_hypotheses(self):
        """Rank hypotheses by attribution score."""
        ranked = sorted(
            self.hypotheses.items(),
            key=lambda x: x[1]["score"],
            reverse=True,
        )
        return [
            {
                "actor": name,
                "score": data["score"],
                "consistent": len(data["consistent_evidence"]),
                "inconsistent": len(data["inconsistent_evidence"]),
                "confidence": self._score_to_confidence(data["score"]),
            }
            for name, data in ranked
        ]

    def _score_to_confidence(self, score):
        if score >= 80:
            return "HIGH"
        elif score >= 40:
            return "MODERATE"
        else:
            return "LOW"

Step 2: Infrastructure Overlap Analysis

步骤2:基础设施重叠分析

python
def analyze_infrastructure_overlap(campaign_a_infra, campaign_b_infra):
    """Compare infrastructure between two campaigns for attribution."""
    overlap = {
        "shared_ips": set(campaign_a_infra.get("ips", [])).intersection(
            campaign_b_infra.get("ips", [])
        ),
        "shared_domains": set(campaign_a_infra.get("domains", [])).intersection(
            campaign_b_infra.get("domains", [])
        ),
        "shared_asns": set(campaign_a_infra.get("asns", [])).intersection(
            campaign_b_infra.get("asns", [])
        ),
        "shared_registrars": set(campaign_a_infra.get("registrars", [])).intersection(
            campaign_b_infra.get("registrars", [])
        ),
    }

    overlap_score = 0
    if overlap["shared_ips"]:
        overlap_score += 30
    if overlap["shared_domains"]:
        overlap_score += 25
    if overlap["shared_asns"]:
        overlap_score += 15
    if overlap["shared_registrars"]:
        overlap_score += 10

    return {
        "overlap": {k: list(v) for k, v in overlap.items()},
        "overlap_score": overlap_score,
        "assessment": "STRONG" if overlap_score >= 40 else "MODERATE" if overlap_score >= 20 else "WEAK",
    }
python
def analyze_infrastructure_overlap(campaign_a_infra, campaign_b_infra):
    """Compare infrastructure between two campaigns for attribution."""
    overlap = {
        "shared_ips": set(campaign_a_infra.get("ips", [])).intersection(
            campaign_b_infra.get("ips", [])
        ),
        "shared_domains": set(campaign_a_infra.get("domains", [])).intersection(
            campaign_b_infra.get("domains", [])
        ),
        "shared_asns": set(campaign_a_infra.get("asns", [])).intersection(
            campaign_b_infra.get("asns", [])
        ),
        "shared_registrars": set(campaign_a_infra.get("registrars", [])).intersection(
            campaign_b_infra.get("registrars", [])
        ),
    }

    overlap_score = 0
    if overlap["shared_ips"]:
        overlap_score += 30
    if overlap["shared_domains"]:
        overlap_score += 25
    if overlap["shared_asns"]:
        overlap_score += 15
    if overlap["shared_registrars"]:
        overlap_score += 10

    return {
        "overlap": {k: list(v) for k, v in overlap.items()},
        "overlap_score": overlap_score,
        "assessment": "STRONG" if overlap_score >= 40 else "MODERATE" if overlap_score >= 20 else "WEAK",
    }

Step 3: TTP Comparison Across Campaigns

步骤3:跨活动TTP对比

python
from attackcti import attack_client

def compare_campaign_ttps(campaign_techniques, known_actor_techniques):
    """Compare campaign TTPs against known threat actor profiles."""
    campaign_set = set(campaign_techniques)
    actor_set = set(known_actor_techniques)

    common = campaign_set.intersection(actor_set)
    unique_campaign = campaign_set - actor_set
    unique_actor = actor_set - campaign_set

    jaccard = len(common) / len(campaign_set.union(actor_set)) if campaign_set.union(actor_set) else 0

    return {
        "common_techniques": sorted(common),
        "common_count": len(common),
        "unique_to_campaign": sorted(unique_campaign),
        "unique_to_actor": sorted(unique_actor),
        "jaccard_similarity": round(jaccard, 3),
        "overlap_percentage": round(len(common) / len(campaign_set) * 100, 1) if campaign_set else 0,
    }
python
from attackcti import attack_client

def compare_campaign_ttps(campaign_techniques, known_actor_techniques):
    """Compare campaign TTPs against known threat actor profiles."""
    campaign_set = set(campaign_techniques)
    actor_set = set(known_actor_techniques)

    common = campaign_set.intersection(actor_set)
    unique_campaign = campaign_set - actor_set
    unique_actor = actor_set - campaign_set

    jaccard = len(common) / len(campaign_set.union(actor_set)) if campaign_set.union(actor_set) else 0

    return {
        "common_techniques": sorted(common),
        "common_count": len(common),
        "unique_to_campaign": sorted(unique_campaign),
        "unique_to_actor": sorted(unique_actor),
        "jaccard_similarity": round(jaccard, 3),
        "overlap_percentage": round(len(common) / len(campaign_set) * 100, 1) if campaign_set else 0,
    }

Step 4: Generate Attribution Report

步骤4:生成归因报告

python
def generate_attribution_report(analyzer):
    """Generate structured attribution assessment report."""
    rankings = analyzer.rank_hypotheses()

    report = {
        "assessment_date": "2026-02-23",
        "total_evidence_items": len(analyzer.evidence),
        "hypotheses_evaluated": len(analyzer.hypotheses),
        "rankings": rankings,
        "primary_attribution": rankings[0] if rankings else None,
        "evidence_summary": [
            {
                "index": i,
                "category": e["category"],
                "description": e["description"],
                "confidence": e["confidence"],
            }
            for i, e in enumerate(analyzer.evidence)
        ],
    }

    return report
python
def generate_attribution_report(analyzer):
    """Generate structured attribution assessment report."""
    rankings = analyzer.rank_hypotheses()

    report = {
        "assessment_date": "2026-02-23",
        "total_evidence_items": len(analyzer.evidence),
        "hypotheses_evaluated": len(analyzer.hypotheses),
        "rankings": rankings,
        "primary_attribution": rankings[0] if rankings else None,
        "evidence_summary": [
            {
                "index": i,
                "category": e["category"],
                "description": e["description"],
                "confidence": e["confidence"],
            }
            for i, e in enumerate(analyzer.evidence)
        ],
    }

    return report

Validation Criteria

验证标准

  • Evidence collection covers all six attribution categories
  • ACH matrix properly evaluates evidence against competing hypotheses
  • Infrastructure overlap analysis identifies shared indicators
  • TTP comparison uses ATT&CK technique IDs for precision
  • Attribution confidence levels are properly justified
  • Report includes alternative hypotheses and false flag considerations
  • 证据收集覆盖所有六个归因类别
  • ACH矩阵正确针对竞争性假设评估证据
  • 基础设施重叠分析识别出共享指标
  • TTP对比使用ATT&CK技术ID以保证精度
  • 归因置信度等级有合理依据
  • 报告包含备选假设和虚假标记考量

References

参考资料