implementing-ot-network-traffic-analysis-with-nozomi

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Implementing OT Network Traffic Analysis with Nozomi

基于Nozomi实现OT网络流量分析

When to Use

适用场景

  • When deploying passive OT network monitoring using Nozomi Networks Guardian sensors
  • When requiring asset visibility without active scanning in sensitive ICS environments
  • When building a Nozomi-based OT SOC with centralized management via Vantage or CMC
  • When integrating OT network monitoring with Fortinet, Splunk, or ServiceNow ecosystems
  • When monitoring compliance with IEC 62443 network segmentation policies
Do not use for active vulnerability scanning of OT devices (see performing-ot-vulnerability-scanning-safely), for environments standardized on Dragos (see implementing-dragos-platform-for-ot-monitoring), or for IT-only network monitoring.
  • 使用Nozomi Networks Guardian传感器部署被动式OT网络监控时
  • 在敏感的ICS环境中需要无需主动扫描的资产可见性时
  • 构建基于Nozomi、通过Vantage或CMC进行集中管理的OT SOC时
  • 将OT网络监控与Fortinet、Splunk或ServiceNow生态系统集成时
  • 监控是否符合IEC 62443网络分段策略时
不适用场景:对OT设备进行主动漏洞扫描(请参考performing-ot-vulnerability-scanning-safely)、标准化使用Dragos的环境(请参考implementing-dragos-platform-for-ot-monitoring),或仅针对IT网络的监控。

Prerequisites

前提条件

  • Nozomi Networks Guardian sensor (hardware, VM, or container)
  • Network TAP or SPAN port configured on monitored OT network segments
  • Nozomi Vantage (cloud) or Central Management Console for multi-sensor management
  • Nozomi Threat Intelligence subscription for updated detection signatures
  • Network architecture documentation for sensor placement planning
  • Nozomi Networks Guardian传感器(硬件、虚拟机或容器版本)
  • 在受监控的OT网络分段上配置好的网络TAP或SPAN端口
  • 用于多传感器管理的Nozomi Vantage(云端)或Central Management Console
  • 用于获取更新检测特征的Nozomi威胁情报订阅
  • 用于传感器部署规划的网络架构文档

Workflow

工作流程

Step 1: Deploy Guardian Sensors for Passive Monitoring

步骤1:部署Guardian传感器以实现被动监控

python
#!/usr/bin/env python3
"""Nozomi Guardian Deployment Manager and Alert Analyzer.

Manages Nozomi Guardian sensor deployment validation, asset inventory
extraction, and threat alert analysis for OT environments.
"""

import json
import sys
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional

try:
    import requests
except ImportError:
    print("Install requests: pip install requests")
    sys.exit(1)


class NozomiGuardianManager:
    """Manages Nozomi Networks Guardian for OT monitoring."""

    def __init__(self, guardian_url: str, api_token: str, verify_ssl: bool = False):
        self.guardian_url = guardian_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json",
        })
        self.session.verify = verify_ssl

    def get_nodes(self, node_type: Optional[str] = None) -> List[Dict]:
        """Retrieve discovered network nodes (assets)."""
        params = {}
        if node_type:
            params["type"] = node_type
        resp = self.session.get(f"{self.guardian_url}/api/v1/nodes", params=params)
        resp.raise_for_status()
        return resp.json().get("result", [])

    def get_alerts(self, severity: str = "high", limit: int = 100) -> List[Dict]:
        """Retrieve security alerts."""
        params = {"severity": severity, "limit": limit, "status": "open"}
        resp = self.session.get(f"{self.guardian_url}/api/v1/alerts", params=params)
        resp.raise_for_status()
        return resp.json().get("result", [])

    def get_links(self) -> List[Dict]:
        """Retrieve communication links between nodes."""
        resp = self.session.get(f"{self.guardian_url}/api/v1/links")
        resp.raise_for_status()
        return resp.json().get("result", [])

    def get_vulnerabilities(self) -> List[Dict]:
        """Retrieve detected vulnerabilities."""
        resp = self.session.get(f"{self.guardian_url}/api/v1/vulnerabilities")
        resp.raise_for_status()
        return resp.json().get("result", [])

    def validate_deployment(self):
        """Validate Guardian sensor deployment and coverage."""
        print(f"\n{'='*65}")
        print("NOZOMI GUARDIAN DEPLOYMENT VALIDATION")
        print(f"{'='*65}")
        print(f"Guardian URL: {self.guardian_url}")
        print(f"Validation Time: {datetime.now().isoformat()}")

        # Check system status
        try:
            resp = self.session.get(f"{self.guardian_url}/api/v1/system/status")
            if resp.status_code == 200:
                status = resp.json()
                print(f"\n--- SYSTEM STATUS ---")
                print(f"  Version: {status.get('version', 'N/A')}")
                print(f"  Uptime: {status.get('uptime', 'N/A')}")
                print(f"  Packets Processed: {status.get('packets_processed', 'N/A')}")
                print(f"  Threat Intelligence: {status.get('threat_intelligence_version', 'N/A')}")
        except requests.RequestException as e:
            print(f"  [!] System status unavailable: {e}")

        # Asset discovery summary
        nodes = self.get_nodes()
        print(f"\n--- ASSET DISCOVERY ---")
        print(f"  Total Nodes Discovered: {len(nodes)}")

        type_counts = defaultdict(int)
        vendor_counts = defaultdict(int)
        protocol_set = set()
        for node in nodes:
            type_counts[node.get("type", "unknown")] += 1
            vendor_counts[node.get("vendor", "Unknown")] += 1
            for proto in node.get("protocols", []):
                protocol_set.add(proto)

        print(f"\n  By Type:")
        for ntype, count in sorted(type_counts.items(), key=lambda x: -x[1]):
            print(f"    {ntype}: {count}")

        print(f"\n  By Vendor:")
        for vendor, count in sorted(vendor_counts.items(), key=lambda x: -x[1])[:10]:
            print(f"    {vendor}: {count}")

        print(f"\n  Protocols Observed: {', '.join(sorted(protocol_set))}")

        # Alert summary
        alerts = self.get_alerts(severity="high")
        print(f"\n--- ALERT SUMMARY ---")
        print(f"  High/Critical Alerts: {len(alerts)}")

        alert_types = defaultdict(int)
        for alert in alerts:
            alert_types[alert.get("type_id", "unknown")] += 1

        for atype, count in sorted(alert_types.items(), key=lambda x: -x[1])[:10]:
            print(f"    {atype}: {count}")

        # Vulnerability summary
        vulns = self.get_vulnerabilities()
        print(f"\n--- VULNERABILITY SUMMARY ---")
        print(f"  Total Vulnerabilities: {len(vulns)}")

        sev_counts = defaultdict(int)
        for vuln in vulns:
            sev_counts[vuln.get("severity", "unknown")] += 1

        for sev in ["critical", "high", "medium", "low"]:
            if sev in sev_counts:
                print(f"    {sev.capitalize()}: {sev_counts[sev]}")

    def analyze_communication_patterns(self):
        """Analyze OT communication patterns for anomalies."""
        links = self.get_links()
        nodes = {n.get("id"): n for n in self.get_nodes()}

        print(f"\n--- COMMUNICATION ANALYSIS ---")
        print(f"  Total Communication Links: {len(links)}")

        # Identify cross-zone communications
        cross_zone = []
        for link in links:
            src_node = nodes.get(link.get("source_id"), {})
            dst_node = nodes.get(link.get("destination_id"), {})
            src_zone = src_node.get("zone", "unknown")
            dst_zone = dst_node.get("zone", "unknown")

            if src_zone != dst_zone and src_zone != "unknown" and dst_zone != "unknown":
                cross_zone.append({
                    "source": src_node.get("label", "Unknown"),
                    "source_zone": src_zone,
                    "destination": dst_node.get("label", "Unknown"),
                    "dest_zone": dst_zone,
                    "protocols": link.get("protocols", []),
                })

        if cross_zone:
            print(f"\n  Cross-Zone Communications: {len(cross_zone)}")
            for comm in cross_zone[:10]:
                print(f"    {comm['source']} ({comm['source_zone']}) -> "
                      f"{comm['destination']} ({comm['dest_zone']}) "
                      f"via {', '.join(comm['protocols'])}")


if __name__ == "__main__":
    manager = NozomiGuardianManager(
        guardian_url="https://nozomi-guardian.plant.local",
        api_token="your-api-token",
    )

    manager.validate_deployment()
    manager.analyze_communication_patterns()
python
#!/usr/bin/env python3
"""Nozomi Guardian Deployment Manager and Alert Analyzer.

Manages Nozomi Guardian sensor deployment validation, asset inventory
extraction, and threat alert analysis for OT environments.
"""

import json
import sys
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional

try:
    import requests
except ImportError:
    print("Install requests: pip install requests")
    sys.exit(1)


class NozomiGuardianManager:
    """Manages Nozomi Networks Guardian for OT monitoring."""

    def __init__(self, guardian_url: str, api_token: str, verify_ssl: bool = False):
        self.guardian_url = guardian_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json",
        })
        self.session.verify = verify_ssl

    def get_nodes(self, node_type: Optional[str] = None) -> List[Dict]:
        """Retrieve discovered network nodes (assets)."""
        params = {}
        if node_type:
            params["type"] = node_type
        resp = self.session.get(f"{self.guardian_url}/api/v1/nodes", params=params)
        resp.raise_for_status()
        return resp.json().get("result", [])

    def get_alerts(self, severity: str = "high", limit: int = 100) -> List[Dict]:
        """Retrieve security alerts."""
        params = {"severity": severity, "limit": limit, "status": "open"}
        resp = self.session.get(f"{self.guardian_url}/api/v1/alerts", params=params)
        resp.raise_for_status()
        return resp.json().get("result", [])

    def get_links(self) -> List[Dict]:
        """Retrieve communication links between nodes."""
        resp = self.session.get(f"{self.guardian_url}/api/v1/links")
        resp.raise_for_status()
        return resp.json().get("result", [])

    def get_vulnerabilities(self) -> List[Dict]:
        """Retrieve detected vulnerabilities."""
        resp = self.session.get(f"{self.guardian_url}/api/v1/vulnerabilities")
        resp.raise_for_status()
        return resp.json().get"result", [])

    def validate_deployment(self):
        """Validate Guardian sensor deployment and coverage."""
        print(f"\n{'='*65}")
        print("NOZOMI GUARDIAN DEPLOYMENT VALIDATION")
        print(f"{'='*65}")
        print(f"Guardian URL: {self.guardian_url}")
        print(f"Validation Time: {datetime.now().isoformat()}")

        # Check system status
        try:
            resp = self.session.get(f"{self.guardian_url}/api/v1/system/status")
            if resp.status_code == 200:
                status = resp.json()
                print(f"\n--- SYSTEM STATUS ---")
                print(f"  Version: {status.get('version', 'N/A')}")
                print(f"  Uptime: {status.get('uptime', 'N/A')}")
                print(f"  Packets Processed: {status.get('packets_processed', 'N/A')}")
                print(f"  Threat Intelligence: {status.get('threat_intelligence_version', 'N/A')}")
        except requests.RequestException as e:
            print(f"  [!] System status unavailable: {e}")

        # Asset discovery summary
        nodes = self.get_nodes()
        print(f"\n--- ASSET DISCOVERY ---")
        print(f"  Total Nodes Discovered: {len(nodes)}")

        type_counts = defaultdict(int)
        vendor_counts = defaultdict(int)
        protocol_set = set()
        for node in nodes:
            type_counts[node.get("type", "unknown")] += 1
            vendor_counts[node.get("vendor", "Unknown")] += 1
            for proto in node.get("protocols", []):
                protocol_set.add(proto)

        print(f"\n  By Type:")
        for ntype, count in sorted(type_counts.items(), key=lambda x: -x[1]):
            print(f"    {ntype}: {count}")

        print(f"\n  By Vendor:")
        for vendor, count in sorted(vendor_counts.items(), key=lambda x: -x[1])[:10]:
            print(f"    {vendor}: {count}")

        print(f"\n  Protocols Observed: {', '.join(sorted(protocol_set))}")

        # Alert summary
        alerts = self.get_alerts(severity="high")
        print(f"\n--- ALERT SUMMARY ---")
        print(f"  High/Critical Alerts: {len(alerts)}")

        alert_types = defaultdict(int)
        for alert in alerts:
            alert_types[alert.get("type_id", "unknown")] += 1

        for atype, count in sorted(alert_types.items(), key=lambda x: -x[1])[:10]:
            print(f"    {atype}: {count}")

        # Vulnerability summary
        vulns = self.get_vulnerabilities()
        print(f"\n--- VULNERABILITY SUMMARY ---")
        print(f"  Total Vulnerabilities: {len(vulns)}")

        sev_counts = defaultdict(int)
        for vuln in vulns:
            sev_counts[vuln.get("severity", "unknown")] += 1

        for sev in ["critical", "high", "medium", "low"]:
            if sev in sev_counts:
                print(f"    {sev.capitalize()}: {sev_counts[sev]}")

    def analyze_communication_patterns(self):
        """Analyze OT communication patterns for anomalies."""
        links = self.get_links()
        nodes = {n.get("id"): n for n in self.get_nodes()}

        print(f"\n--- COMMUNICATION ANALYSIS ---")
        print(f"  Total Communication Links: {len(links)}")

        # Identify cross-zone communications
        cross_zone = []
        for link in links:
            src_node = nodes.get(link.get("source_id"), {})
            dst_node = nodes.get(link.get("destination_id"), {})
            src_zone = src_node.get("zone", "unknown")
            dst_zone = dst_node.get("zone", "unknown")

            if src_zone != dst_zone and src_zone != "unknown" and dst_zone != "unknown":
                cross_zone.append({
                    "source": src_node.get("label", "Unknown"),
                    "source_zone": src_zone,
                    "destination": dst_node.get"label", "Unknown"),
                    "dest_zone": dst_zone,
                    "protocols": link.get("protocols", []),
                })

        if cross_zone:
            print(f"\n  Cross-Zone Communications: {len(cross_zone)}")
            for comm in cross_zone[:10]:
                print(f"    {comm['source']} ({comm['source_zone']}) -> "
                      f"{comm['destination']} ({comm['dest_zone']}) "
                      f"via {', '.join(comm['protocols'])}")


if __name__ == "__main__":
    manager = NozomiGuardianManager(
        guardian_url="https://nozomi-guardian.plant.local",
        api_token="your-api-token",
    )

    manager.validate_deployment()
    manager.analyze_communication_patterns()

Key Concepts

核心概念

TermDefinition
GuardianNozomi Networks passive sensor that monitors OT network traffic via SPAN/TAP without generating additional traffic
VantageNozomi cloud-based central management platform for aggregating data across multiple Guardian sensors
Behavioral Anomaly Detection (BAD)Nozomi's AI-driven approach to detecting deviations from learned normal OT network behavior
Smart PollingNozomi's active query feature using native protocols to safely extract additional device details
Asset IntelligenceNozomi's automatic identification and classification of OT/IoT assets from network traffic
Threat Intelligence FeedNozomi Labs-maintained feed of OT-specific threat indicators, updated based on global honeypot data
术语定义
GuardianNozomi Networks的被动传感器,通过SPAN/TAP监控OT网络流量,不会产生额外流量
VantageNozomi的云端集中管理平台,用于聚合多个Guardian传感器的数据
Behavioral Anomaly Detection (BAD)Nozomi基于AI的检测方法,用于识别与已学习的正常OT网络行为的偏差
Smart PollingNozomi主动查询功能,使用原生协议安全提取更多设备细节
Asset IntelligenceNozomi从网络流量中自动识别和分类OT/IoT资产的功能
Threat Intelligence FeedNozomi Labs维护的OT特定威胁指标源,基于全球蜜罐数据更新

Output Format

输出格式

NOZOMI GUARDIAN OT MONITORING REPORT
=======================================
Site: [site name]
Date: YYYY-MM-DD

ASSET VISIBILITY:
  Total Assets: [count]
  PLCs: [count] | HMIs: [count] | Switches: [count]
  Protocols: [list]
  Vendors: [top 5]

THREAT DETECTION:
  Critical Alerts: [count]
  High Alerts: [count]
  Top Alert Categories: [list]

VULNERABILITIES:
  Critical: [count]
  High: [count]

NETWORK ANALYSIS:
  Communication Links: [count]
  Cross-Zone Flows: [count]
NOZOMI GUARDIAN OT MONITORING REPORT
=======================================
Site: [site name]
Date: YYYY-MM-DD

ASSET VISIBILITY:
  Total Assets: [count]
  PLCs: [count] | HMIs: [count] | Switches: [count]
  Protocols: [list]
  Vendors: [top 5]

THREAT DETECTION:
  Critical Alerts: [count]
  High Alerts: [count]
  Top Alert Categories: [list]

VULNERABILITIES:
  Critical: [count]
  High: [count]

NETWORK ANALYSIS:
  Communication Links: [count]
  Cross-Zone Flows: [count]