analyzing-ransomware-leak-site-intelligence

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Analyzing Ransomware Leak Site Intelligence

勒索软件泄露站点情报分析

Overview

概述

Ransomware groups operating under double-extortion models maintain data leak sites (DLS) on Tor hidden services where they post victim names, stolen data samples, and countdown timers to pressure payment. In H1 2025, 96 unique ransomware groups were active, listing approximately 535 victims per month. Monitoring these sites provides intelligence on active threat groups, targeted sectors, geographic patterns, and emerging ransomware families. This skill covers safely collecting DLS intelligence, extracting structured data, tracking group activity trends, and producing sector-specific risk assessments.
采用双重勒索模式的勒索软件团伙会在Tor隐藏服务上维护数据泄露站点(DLS),在这些站点上发布受害者名称、被盗数据样本和倒计时器,以此施压受害者支付赎金。2025年上半年,有96个独特的勒索软件团伙活跃,每月约列出535名受害者。监控这些站点可以获取活跃威胁团伙、目标行业、地域模式以及新兴勒索软件家族的情报。本技能涵盖安全收集DLS情报、提取结构化数据、追踪团伙活动趋势以及生成特定行业风险评估的内容。

When to Use

使用场景

  • When investigating security incidents that require analyzing ransomware leak site intelligence
  • When building detection rules or threat hunting queries for this domain
  • When SOC analysts need structured procedures for this analysis type
  • When validating security monitoring coverage for related attack techniques
  • 当调查需要分析勒索软件泄露站点情报的安全事件时
  • 当构建该领域的检测规则或威胁狩猎查询时
  • 当SOC分析师需要此类分析的标准化流程时
  • 当验证相关攻击技术的安全监控覆盖范围时

Prerequisites

前置条件

  • Python 3.9+ with
    requests
    ,
    beautifulsoup4
    ,
    pandas
    ,
    matplotlib
    libraries
  • Tor proxy (SOCKS5) for accessing .onion sites or commercial DLS monitoring feeds
  • Understanding of ransomware double-extortion business model
  • Familiarity with major ransomware families (Qilin, Akira, LockBit, BlackCat, Clop)
  • Access to ransomware tracking feeds (Ransomwatch, RansomLook, DarkFeed)
  • 安装了
    requests
    beautifulsoup4
    pandas
    matplotlib
    库的Python 3.9+环境
  • 用于访问.onion站点的Tor代理(SOCKS5)或商用DLS监控数据源
  • 了解勒索软件双重勒索商业模式
  • 熟悉主流勒索软件家族(Qilin、Akira、LockBit、BlackCat、Clop)
  • 可访问勒索软件追踪数据源(Ransomwatch、RansomLook、DarkFeed)

Key Concepts

核心概念

Double Extortion Model

双重勒索模式

Modern ransomware groups encrypt victim data AND exfiltrate it before encryption. Leak sites serve as public pressure: victims are listed with a countdown timer, partial data samples, and file trees. If ransom is not paid, full data is published. Some groups have moved to triple extortion, adding DDoS threats or contacting victims' customers directly.
现代勒索软件团伙会在加密受害者数据之前先窃取数据。泄露站点作为公开施压手段:受害者会被列出,并附带倒计时器、部分数据样本和文件目录树。如果不支付赎金,完整数据将被公开。部分团伙已升级至三重勒索,增加了DDoS威胁或直接联系受害者客户的手段。

DLS Intelligence Value

DLS情报价值

Leak sites provide: victim identification (company name, sector, country), attack timeline (when listed, deadline, data published), data volume estimates, group capability assessment (sectors targeted, attack frequency, operational tempo), and trend analysis (new groups emerging, groups rebranding, law enforcement takedowns).
泄露站点可提供:受害者识别(公司名称、行业、国家)、攻击时间线(列出时间、截止日期、数据发布时间)、数据量估计、团伙能力评估(目标行业、攻击频率、运营节奏)以及趋势分析(新兴团伙出现、团伙品牌重塑、执法机构取缔行动)。

Safe Collection Practices

安全收集实践

Never directly access DLS sites in a production environment. Use purpose-built monitoring services (Ransomwatch, DarkFeed, KELA, Flashpoint), Tor-isolated research VMs, commercial threat intelligence platforms, or community-maintained datasets. All analysis should be conducted in isolated environments with proper authorization.
切勿在生产环境中直接访问DLS站点。使用专门的监控服务(Ransomwatch、DarkFeed、KELA、Flashpoint)、隔离Tor环境的研究虚拟机、商用威胁情报平台或社区维护的数据集。所有分析应在经过授权的隔离环境中进行。

Workflow

工作流程

Step 1: Ingest Ransomware Leak Site Data from Public Feeds

步骤1:从公开数据源导入勒索软件泄露站点数据

python
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter

class RansomwareIntelCollector:
    """Collect ransomware DLS intelligence from public tracking sources."""

    RANSOMWATCH_API = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/posts.json"
    RANSOMWATCH_GROUPS = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/groups.json"

    def __init__(self):
        self.posts = []
        self.groups = []

    def fetch_ransomwatch_data(self):
        """Fetch ransomware victim posts from ransomwatch."""
        resp = requests.get(self.RANSOMWATCH_API, timeout=30)
        if resp.status_code == 200:
            self.posts = resp.json()
            print(f"[+] Loaded {len(self.posts)} victim posts from ransomwatch")
        else:
            print(f"[-] Failed to fetch posts: {resp.status_code}")

        resp = requests.get(self.RANSOMWATCH_GROUPS, timeout=30)
        if resp.status_code == 200:
            self.groups = resp.json()
            print(f"[+] Loaded {len(self.groups)} ransomware group profiles")

        return self.posts

    def get_recent_victims(self, days=30):
        """Get victims posted in the last N days."""
        cutoff = datetime.now() - timedelta(days=days)
        recent = []
        for post in self.posts:
            try:
                discovered = datetime.fromisoformat(
                    post.get("discovered", "").replace("Z", "+00:00")
                )
                if discovered.replace(tzinfo=None) >= cutoff:
                    recent.append(post)
            except (ValueError, TypeError):
                continue
        print(f"[+] {len(recent)} victims in last {days} days")
        return recent

    def get_group_activity(self, group_name):
        """Get all posts by a specific ransomware group."""
        group_posts = [
            p for p in self.posts
            if p.get("group_name", "").lower() == group_name.lower()
        ]
        print(f"[+] {group_name}: {len(group_posts)} total victims")
        return group_posts

collector = RansomwareIntelCollector()
collector.fetch_ransomwatch_data()
recent = collector.get_recent_victims(days=30)
python
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter

class RansomwareIntelCollector:
    """Collect ransomware DLS intelligence from public tracking sources."""

    RANSOMWATCH_API = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/posts.json"
    RANSOMWATCH_GROUPS = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/groups.json"

    def __init__(self):
        self.posts = []
        self.groups = []

    def fetch_ransomwatch_data(self):
        """Fetch ransomware victim posts from ransomwatch."""
        resp = requests.get(self.RANSOMWATCH_API, timeout=30)
        if resp.status_code == 200:
            self.posts = resp.json()
            print(f"[+] Loaded {len(self.posts)} victim posts from ransomwatch")
        else:
            print(f"[-] Failed to fetch posts: {resp.status_code}")

        resp = requests.get(self.RANSOMWATCH_GROUPS, timeout=30)
        if resp.status_code == 200:
            self.groups = resp.json()
            print(f"[+] Loaded {len(self.groups)} ransomware group profiles")

        return self.posts

    def get_recent_victims(self, days=30):
        """Get victims posted in the last N days."""
        cutoff = datetime.now() - timedelta(days=days)
        recent = []
        for post in self.posts:
            try:
                discovered = datetime.fromisoformat(
                    post.get("discovered", "").replace("Z", "+00:00")
                )
                if discovered.replace(tzinfo=None) >= cutoff:
                    recent.append(post)
            except (ValueError, TypeError):
                continue
        print(f"[+] {len(recent)} victims in last {days} days")
        return recent

    def get_group_activity(self, group_name):
        """Get all posts by a specific ransomware group."""
        group_posts = [
            p for p in self.posts
            if p.get("group_name", "").lower() == group_name.lower()
        ]
        print(f"[+] {group_name}: {len(group_posts)} total victims")
        return group_posts

collector = RansomwareIntelCollector()
collector.fetch_ransomwatch_data()
recent = collector.get_recent_victims(days=30)

Step 2: Analyze Group Activity and Trends

步骤2:分析团伙活动与趋势

python
def analyze_group_trends(posts, top_n=15):
    """Analyze ransomware group activity trends."""
    group_counts = Counter(p.get("group_name", "unknown") for p in posts)
    monthly_activity = {}

    for post in posts:
        try:
            date = datetime.fromisoformat(
                post.get("discovered", "").replace("Z", "+00:00")
            )
            month_key = date.strftime("%Y-%m")
            group = post.get("group_name", "unknown")
            if month_key not in monthly_activity:
                monthly_activity[month_key] = Counter()
            monthly_activity[month_key][group] += 1
        except (ValueError, TypeError):
            continue

    analysis = {
        "total_posts": len(posts),
        "unique_groups": len(group_counts),
        "top_groups": group_counts.most_common(top_n),
        "monthly_totals": {
            month: sum(counts.values())
            for month, counts in sorted(monthly_activity.items())
        },
        "monthly_top_groups": {
            month: counts.most_common(5)
            for month, counts in sorted(monthly_activity.items())
        },
    }

    print(f"\n=== Ransomware Group Activity ===")
    print(f"Total victims tracked: {analysis['total_posts']}")
    print(f"Active groups: {analysis['unique_groups']}")
    print(f"\nTop {top_n} Groups:")
    for group, count in analysis["top_groups"]:
        print(f"  {group}: {count} victims")

    return analysis

trends = analyze_group_trends(collector.posts)
python
def analyze_group_trends(posts, top_n=15):
    """Analyze ransomware group activity trends."""
    group_counts = Counter(p.get("group_name", "unknown") for p in posts)
    monthly_activity = {}

    for post in posts:
        try:
            date = datetime.fromisoformat(
                post.get("discovered", "").replace("Z", "+00:00")
            )
            month_key = date.strftime("%Y-%m")
            group = post.get("group_name", "unknown")
            if month_key not in monthly_activity:
                monthly_activity[month_key] = Counter()
            monthly_activity[month_key][group] += 1
        except (ValueError, TypeError):
            continue

    analysis = {
        "total_posts": len(posts),
        "unique_groups": len(group_counts),
        "top_groups": group_counts.most_common(top_n),
        "monthly_totals": {
            month: sum(counts.values())
            for month, counts in sorted(monthly_activity.items())
        },
        "monthly_top_groups": {
            month: counts.most_common(5)
            for month, counts in sorted(monthly_activity.items())
        },
    }

    print(f"\n=== Ransomware Group Activity ===")
    print(f"Total victims tracked: {analysis['total_posts']}")
    print(f"Active groups: {analysis['unique_groups']}")
    print(f"\nTop {top_n} Groups:")
    for group, count in analysis["top_groups"]:
        print(f"  {group}: {count} victims")

    return analysis

trends = analyze_group_trends(collector.posts)

Step 3: Sector and Geographic Risk Assessment

步骤3:行业与地域风险评估

python
def assess_sector_risk(posts, target_sector=None, target_country=None):
    """Assess ransomware risk for specific sector or geography."""
    sector_data = {}
    country_data = {}

    for post in posts:
        # Extract sector if available (not all feeds include this)
        sector = post.get("sector", post.get("industry", "unknown"))
        country = post.get("country", "unknown")

        if sector not in sector_data:
            sector_data[sector] = {"count": 0, "groups": Counter(), "recent": []}
        sector_data[sector]["count"] += 1
        sector_data[sector]["groups"][post.get("group_name", "")] += 1

        if country not in country_data:
            country_data[country] = {"count": 0, "groups": Counter()}
        country_data[country]["count"] += 1
        country_data[country]["groups"][post.get("group_name", "")] += 1

    # Sector risk scoring
    total = len(posts)
    risk_assessment = {
        "total_victims": total,
        "sectors": {},
        "countries": {},
    }

    for sector, data in sorted(sector_data.items(), key=lambda x: -x[1]["count"]):
        pct = (data["count"] / total * 100) if total > 0 else 0
        risk_assessment["sectors"][sector] = {
            "victim_count": data["count"],
            "percentage": round(pct, 1),
            "top_groups": data["groups"].most_common(5),
            "risk_level": (
                "critical" if pct > 15
                else "high" if pct > 8
                else "medium" if pct > 3
                else "low"
            ),
        }

    for country, data in sorted(country_data.items(), key=lambda x: -x[1]["count"]):
        pct = (data["count"] / total * 100) if total > 0 else 0
        risk_assessment["countries"][country] = {
            "victim_count": data["count"],
            "percentage": round(pct, 1),
            "top_groups": data["groups"].most_common(5),
        }

    return risk_assessment

risk = assess_sector_risk(collector.posts)
python
def assess_sector_risk(posts, target_sector=None, target_country=None):
    """Assess ransomware risk for specific sector or geography."""
    sector_data = {}
    country_data = {}

    for post in posts:
        # Extract sector if available (not all feeds include this)
        sector = post.get("sector", post.get("industry", "unknown"))
        country = post.get("country", "unknown")

        if sector not in sector_data:
            sector_data[sector] = {"count": 0, "groups": Counter(), "recent": []}
        sector_data[sector]["count"] += 1
        sector_data[sector]["groups"][post.get("group_name", "")] += 1

        if country not in country_data:
            country_data[country] = {"count": 0, "groups": Counter()}
        country_data[country]["count"] += 1
        country_data[country]["groups"][post.get("group_name", "")] += 1

    # Sector risk scoring
    total = len(posts)
    risk_assessment = {
        "total_victims": total,
        "sectors": {},
        "countries": {},
    }

    for sector, data in sorted(sector_data.items(), key=lambda x: -x[1]["count"]):
        pct = (data["count"] / total * 100) if total > 0 else 0
        risk_assessment["sectors"][sector] = {
            "victim_count": data["count"],
            "percentage": round(pct, 1),
            "top_groups": data["groups"].most_common(5),
            "risk_level": (
                "critical" if pct > 15
                else "high" if pct > 8
                else "medium" if pct > 3
                else "low"
            ),
        }

    for country, data in sorted(country_data.items(), key=lambda x: -x[1]["count"]):
        pct = (data["count"] / total * 100) if total > 0 else 0
        risk_assessment["countries"][country] = {
            "victim_count": data["count"],
            "percentage": round(pct, 1),
            "top_groups": data["groups"].most_common(5),
        }

    return risk_assessment

risk = assess_sector_risk(collector.posts)

Step 4: Track Emerging and Rebranding Groups

步骤4:追踪新兴与品牌重塑的团伙

python
def track_new_groups(posts, lookback_days=90):
    """Identify newly emerged ransomware groups."""
    group_first_seen = {}
    for post in posts:
        group = post.get("group_name", "")
        try:
            date = datetime.fromisoformat(
                post.get("discovered", "").replace("Z", "+00:00")
            )
            if group not in group_first_seen or date < group_first_seen[group]["first_seen"]:
                group_first_seen[group] = {
                    "first_seen": date,
                    "first_victim": post.get("post_title", ""),
                }
        except (ValueError, TypeError):
            continue

    cutoff = datetime.now() - timedelta(days=lookback_days)
    new_groups = {
        group: info for group, info in group_first_seen.items()
        if info["first_seen"].replace(tzinfo=None) >= cutoff
    }

    # Count total victims per new group
    for group in new_groups:
        victims = [p for p in posts if p.get("group_name") == group]
        new_groups[group]["total_victims"] = len(victims)
        new_groups[group]["avg_per_month"] = round(
            len(victims) / max(1, lookback_days / 30), 1
        )

    print(f"\n=== New Groups (last {lookback_days} days) ===")
    for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"]):
        print(f"  {group}: {info['total_victims']} victims, "
              f"first seen {info['first_seen'].strftime('%Y-%m-%d')}")

    return new_groups

new_groups = track_new_groups(collector.posts, lookback_days=90)
python
def track_new_groups(posts, lookback_days=90):
    """Identify newly emerged ransomware groups."""
    group_first_seen = {}
    for post in posts:
        group = post.get("group_name", "")
        try:
            date = datetime.fromisoformat(
                post.get("discovered", "").replace("Z", "+00:00")
            )
            if group not in group_first_seen or date < group_first_seen[group]["first_seen"]:
                group_first_seen[group] = {
                    "first_seen": date,
                    "first_victim": post.get("post_title", ""),
                }
        except (ValueError, TypeError):
            continue

    cutoff = datetime.now() - timedelta(days=lookback_days)
    new_groups = {
        group: info for group, info in group_first_seen.items()
        if info["first_seen"].replace(tzinfo=None) >= cutoff
    }

    # Count total victims per new group
    for group in new_groups:
        victims = [p for p in posts if p.get("group_name") == group]
        new_groups[group]["total_victims"] = len(victims)
        new_groups[group]["avg_per_month"] = round(
            len(victims) / max(1, lookback_days / 30), 1
        )

    print(f"\n=== New Groups (last {lookback_days} days) ===")
    for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"]):
        print(f"  {group}: {info['total_victims']} victims, "
              f"first seen {info['first_seen'].strftime('%Y-%m-%d')}")

    return new_groups

new_groups = track_new_groups(collector.posts, lookback_days=90)

Step 5: Generate Intelligence Report

步骤5:生成情报报告

python
def generate_ransomware_intel_report(trends, risk, new_groups):
    """Generate ransomware threat intelligence report."""
    report = f"""# Ransomware Threat Intelligence Report
Generated: {datetime.now().isoformat()}
python
def generate_ransomware_intel_report(trends, risk, new_groups):
    """Generate ransomware threat intelligence report."""
    report = f"""# Ransomware Threat Intelligence Report
Generated: {datetime.now().isoformat()}

Executive Summary

Executive Summary

  • Total victims tracked: {trends['total_posts']}
  • Active ransomware groups: {trends['unique_groups']}
  • New groups (last 90 days): {len(new_groups)}
  • Total victims tracked: {trends['total_posts']}
  • Active ransomware groups: {trends['unique_groups']}
  • New groups (last 90 days): {len(new_groups)}

Top Active Groups

Top Active Groups

RankGroupVictims
"""
for i, (group, count) in enumerate(trends["top_groups"][:10], 1):
    report += f"| {i} | {group} | {count} |\n"

report += "\n## New Emerging Groups\n"
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"])[:10]:
    report += f"- **{group}**: {info['total_victims']} victims since {info['first_seen'].strftime('%Y-%m-%d')}\n"

report += "\n## Sector Risk Assessment\n"
report += "| Sector | Victims | % | Risk Level |\n|--------|---------|---|------------|\n"
for sector, data in list(risk["sectors"].items())[:10]:
    report += f"| {sector} | {data['victim_count']} | {data['percentage']}% | {data['risk_level'].upper()} |\n"

report += """
RankGroupVictims
"""
for i, (group, count) in enumerate(trends["top_groups"][:10], 1):
    report += f"| {i} | {group} | {count} |\n"

report += "\n## New Emerging Groups\n"
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"])[:10]:
    report += f"- **{group}**: {info['total_victims']} victims since {info['first_seen'].strftime('%Y-%m-%d')}\n"

report += "\n## Sector Risk Assessment\n"
report += "| Sector | Victims | % | Risk Level |\n|--------|---------|---|------------|\n"
for sector, data in list(risk["sectors"].items())[:10]:
    report += f"| {sector} | {data['victim_count']} | {data['percentage']}% | {data['risk_level'].upper()} |\n"

report += """

Recommendations

Recommendations

  1. Monitor DLS feeds daily for your organization and supply chain partners
  2. Prioritize patching vulnerabilities exploited by top active groups
  3. Implement offline backup strategy to reduce extortion leverage
  4. Conduct tabletop exercises for ransomware scenario response
  5. Share indicators with sector ISACs and threat sharing communities """ with open("ransomware_intel_report.md", "w") as f: f.write(report) print("[+] Report saved: ransomware_intel_report.md") return report
generate_ransomware_intel_report(trends, risk, new_groups)
undefined
  1. Monitor DLS feeds daily for your organization and supply chain partners
  2. Prioritize patching vulnerabilities exploited by top active groups
  3. Implement offline backup strategy to reduce extortion leverage
  4. Conduct tabletop exercises for ransomware scenario response
  5. Share indicators with sector ISACs and threat sharing communities """ with open("ransomware_intel_report.md", "w") as f: f.write(report) print("[+] Report saved: ransomware_intel_report.md") return report
generate_ransomware_intel_report(trends, risk, new_groups)
undefined

Validation Criteria

验证标准

  • Ransomware victim data ingested from public tracking feeds
  • Group activity trends analyzed with monthly breakdowns
  • Sector and geographic risk assessment produced
  • New and emerging groups identified with activity metrics
  • Intelligence report generated with actionable recommendations
  • All collection conducted through authorized public sources
  • 已从公开追踪数据源导入勒索软件受害者数据
  • 已分析团伙活动趋势并提供月度细分
  • 已生成行业与地域风险评估
  • 已识别新兴团伙并统计其活动指标
  • 已生成包含可操作建议的情报报告
  • 所有数据收集均通过授权的公开来源进行

References

参考资料