detecting-attacks-on-historian-servers

Detect cyber attacks targeting OT historian servers (OSIsoft PI, Ignition, Wonderware) that sit at the IT/OT boundary and serve as pivot points for lateral movement between enterprise and control networks, including data manipulation, unauthorized queries, and exploitation of historian-specific vulnerabilities.

4,032 stars

Best use case

detecting-attacks-on-historian-servers is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Detect cyber attacks targeting OT historian servers (OSIsoft PI, Ignition, Wonderware) that sit at the IT/OT boundary and serve as pivot points for lateral movement between enterprise and control networks, including data manipulation, unauthorized queries, and exploitation of historian-specific vulnerabilities.

Teams using detecting-attacks-on-historian-servers should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/detecting-attacks-on-historian-servers/SKILL.md --create-dirs "https://raw.githubusercontent.com/mukul975/Anthropic-Cybersecurity-Skills/main/skills/detecting-attacks-on-historian-servers/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/detecting-attacks-on-historian-servers/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How detecting-attacks-on-historian-servers Compares

Feature / Agentdetecting-attacks-on-historian-serversStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Detect cyber attacks targeting OT historian servers (OSIsoft PI, Ignition, Wonderware) that sit at the IT/OT boundary and serve as pivot points for lateral movement between enterprise and control networks, including data manipulation, unauthorized queries, and exploitation of historian-specific vulnerabilities.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# Detecting Attacks on Historian Servers

## When to Use

- When monitoring historian servers that bridge IT and OT networks for compromise indicators
- When detecting unauthorized queries or data manipulation in process historian databases
- When investigating lateral movement through historian servers between IT and OT zones
- When responding to alerts about exploitation of historian-specific vulnerabilities (CVE-2025-0921)
- When validating historian data integrity after a suspected OT security incident

**Do not use** for general database security monitoring (see database security skills), for historian deployment and configuration, or for IT-only data warehouse security.

## Prerequisites

- Historian server inventory (OSIsoft PI, Ignition, GE Proficy, Wonderware InSQL)
- Network monitoring on historian network segments (both IT-facing and OT-facing interfaces)
- Historian API access for data integrity validation
- Baseline of normal historian query patterns (which applications query which tags)
- Understanding of historian architecture (data sources, interfaces, client connections)

## Workflow

### Step 1: Monitor Historian for Attack Indicators

```python
#!/usr/bin/env python3
"""OT Historian Attack Detector.

Monitors historian servers for unauthorized access, data manipulation,
lateral movement indicators, and exploitation of historian-specific
vulnerabilities. Supports OSIsoft PI and Ignition platforms.
"""

import json
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional

try:
    import requests
except ImportError:
    print("Install requests: pip install requests")
    sys.exit(1)


class HistorianAttackDetector:
    """Detects attacks targeting OT historian servers."""

    def __init__(self, historian_type: str, historian_url: str,
                 api_credentials: dict, verify_ssl: bool = False):
        self.historian_type = historian_type
        self.historian_url = historian_url.rstrip("/")
        self.credentials = api_credentials
        self.verify_ssl = verify_ssl
        self.alerts = []
        self.authorized_clients = set()
        self.authorized_queries = {}

    def set_baseline(self, authorized_clients: List[str],
                     authorized_query_patterns: Dict[str, List[str]]):
        """Set baseline of authorized historian clients and query patterns."""
        self.authorized_clients = set(authorized_clients)
        self.authorized_queries = authorized_query_patterns

    def check_active_connections(self) -> List[dict]:
        """Check for unauthorized connections to historian."""
        connections = []

        if self.historian_type == "osisoft_pi":
            try:
                resp = requests.get(
                    f"{self.historian_url}/piwebapi/system/status",
                    auth=(self.credentials.get("username"), self.credentials.get("password")),
                    verify=self.verify_ssl,
                    timeout=10,
                )
                if resp.status_code == 200:
                    data = resp.json()
                    connections = data.get("ConnectedClients", [])
            except requests.RequestException as e:
                print(f"[!] PI Web API error: {e}")

        elif self.historian_type == "ignition":
            try:
                resp = requests.get(
                    f"{self.historian_url}/data/status/connections",
                    headers={"Authorization": f"Bearer {self.credentials.get('token')}"},
                    verify=self.verify_ssl,
                    timeout=10,
                )
                if resp.status_code == 200:
                    connections = resp.json().get("connections", [])
            except requests.RequestException as e:
                print(f"[!] Ignition API error: {e}")

        # Check for unauthorized clients
        for conn in connections:
            client_ip = conn.get("client_ip", conn.get("address", ""))
            if self.authorized_clients and client_ip not in self.authorized_clients:
                self.alerts.append({
                    "severity": "HIGH",
                    "type": "UNAUTHORIZED_HISTORIAN_CLIENT",
                    "timestamp": datetime.now().isoformat(),
                    "source_ip": client_ip,
                    "details": f"Unauthorized client {client_ip} connected to {self.historian_type} historian",
                    "mitre": "T0802 - Automated Collection",
                })

        return connections

    def check_data_integrity(self, tags: List[str], hours_back: int = 24):
        """Check historian data for manipulation indicators."""
        print(f"[*] Checking data integrity for {len(tags)} tags over last {hours_back}h")

        integrity_issues = []
        for tag in tags:
            try:
                if self.historian_type == "osisoft_pi":
                    resp = requests.get(
                        f"{self.historian_url}/piwebapi/streams/{tag}/recorded",
                        params={"startTime": f"*-{hours_back}h", "endTime": "*"},
                        auth=(self.credentials.get("username"), self.credentials.get("password")),
                        verify=self.verify_ssl,
                        timeout=15,
                    )
                    if resp.status_code == 200:
                        items = resp.json().get("Items", [])
                        # Check for suspicious patterns
                        if len(items) == 0:
                            integrity_issues.append({
                                "tag": tag, "issue": "NO_DATA",
                                "detail": "No data points in expected timeframe - possible deletion",
                            })
                        else:
                            values = [i.get("Value", 0) for i in items if isinstance(i.get("Value"), (int, float))]
                            if values and len(set(values)) == 1 and len(values) > 100:
                                integrity_issues.append({
                                    "tag": tag, "issue": "FLATLINE",
                                    "detail": f"Constant value {values[0]} for {len(values)} points - possible replay/spoofing",
                                })
            except requests.RequestException:
                pass

        for issue in integrity_issues:
            self.alerts.append({
                "severity": "HIGH",
                "type": f"DATA_INTEGRITY_{issue['issue']}",
                "timestamp": datetime.now().isoformat(),
                "tag": issue["tag"],
                "details": issue["detail"],
                "mitre": "T0809 - Data Destruction" if issue["issue"] == "NO_DATA" else "T0832 - Manipulation of View",
            })

        return integrity_issues

    def check_lateral_movement_indicators(self):
        """Check for indicators of historian being used as pivot point."""
        indicators = []

        # Check 1: Historian making outbound connections to Level 1 devices
        # (Historian should receive data, not initiate connections to PLCs)
        indicators.append({
            "check": "Outbound connections to PLC subnets",
            "description": "Historian initiating connections to Level 1 devices may indicate compromise",
            "detection": "Monitor firewall logs for historian IP connecting to PLC ports (502, 102, 44818)",
        })

        # Check 2: New processes or services on historian
        indicators.append({
            "check": "Unauthorized processes on historian server",
            "description": "Attackers may install tools on historian for lateral movement",
            "detection": "Monitor process creation events (Sysmon EventID 1) on historian",
        })

        # Check 3: Unusual authentication to historian
        indicators.append({
            "check": "Authentication from unexpected sources",
            "description": "Compromised IT systems authenticating to historian for pivoting",
            "detection": "Monitor Windows Security Event 4624 for logons from non-baseline sources",
        })

        return indicators

    def generate_report(self):
        """Generate historian attack detection report."""
        print(f"\n{'='*70}")
        print("HISTORIAN ATTACK DETECTION REPORT")
        print(f"{'='*70}")
        print(f"Historian Type: {self.historian_type}")
        print(f"Historian URL: {self.historian_url}")
        print(f"Report Time: {datetime.now().isoformat()}")
        print(f"Total Alerts: {len(self.alerts)}")

        if self.alerts:
            print(f"\n--- ALERTS ---")
            for alert in self.alerts:
                print(f"\n  [{alert['severity']}] {alert['type']}")
                print(f"    Time: {alert['timestamp']}")
                print(f"    Detail: {alert['details']}")
                print(f"    MITRE ICS: {alert.get('mitre', 'N/A')}")

        print(f"\n--- LATERAL MOVEMENT CHECKS ---")
        for indicator in self.check_lateral_movement_indicators():
            print(f"\n  Check: {indicator['check']}")
            print(f"    Risk: {indicator['description']}")
            print(f"    Detection: {indicator['detection']}")


if __name__ == "__main__":
    detector = HistorianAttackDetector(
        historian_type="osisoft_pi",
        historian_url="https://pi-server.plant.local",
        api_credentials={"username": "pi_reader", "password": "api_key_here"},
    )

    detector.set_baseline(
        authorized_clients=["10.10.2.10", "10.10.2.20", "10.10.3.50", "10.10.150.10"],
        authorized_query_patterns={},
    )

    detector.check_active_connections()
    detector.check_data_integrity(tags=["REACTOR_01.TEMP", "PUMP_03.FLOW"], hours_back=24)
    detector.generate_report()
```

## Key Concepts

| Term | Definition |
|------|------------|
| OT Historian | Database server (OSIsoft PI, Ignition, Wonderware) storing time-series process data from SCADA/DCS systems |
| Pivot Point | Historian's position between IT and OT networks makes it a prime target for attackers to move between zones |
| Data Replay Attack | Feeding historical data to an HMI to mask real-time process manipulation (Stuxnet technique) |
| OSIsoft PI | Most widely deployed OT historian, used by 65% of Global 500 process companies |
| Ignition | Inductive Automation SCADA platform with historian module, increasingly targeted due to Python scripting capabilities |
| CVE-2025-0921 | Ignition SCADA privileged file system vulnerability allowing escalation through malicious project files |

## Output Format

```
HISTORIAN ATTACK DETECTION REPORT
====================================
Historian: [type and hostname]
Date: YYYY-MM-DD

CONNECTION ANALYSIS:
  Authorized Clients: [count]
  Unauthorized Clients Detected: [count with IPs]

DATA INTEGRITY:
  Tags Checked: [count]
  Integrity Issues: [count]
  Flatline Detections: [count]
  Data Gaps: [count]

LATERAL MOVEMENT INDICATORS:
  Outbound PLC Connections: [found/not found]
  Unauthorized Processes: [found/not found]
  Anomalous Authentication: [found/not found]
```

Related Skills

securing-historian-server-in-ot-environment

4032
from mukul975/Anthropic-Cybersecurity-Skills

This skill covers hardening and securing process historian servers (OSIsoft PI, Honeywell PHD, GE Proficy, AVEVA Historian) in OT environments. It addresses network placement across Purdue levels, access control for historian interfaces, data replication through DMZ using data diodes or PI-to-PI connectors, SQL injection prevention in historian queries, and integrity protection of process data used for safety analysis, regulatory reporting, and process optimization.

hunting-for-ntlm-relay-attacks

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect NTLM relay attacks by analyzing Windows Event 4624 logon type 3 with NTLMSSP authentication, identifying IP-to-hostname mismatches, Responder traffic signatures, SMB signing status, and suspicious authentication patterns across the domain.

hunting-for-dcsync-attacks

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect DCSync attacks by analyzing Windows Event ID 4662 for unauthorized DS-Replication-Get-Changes requests from non-domain-controller accounts.

hunting-credential-stuffing-attacks

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detects credential stuffing attacks by analyzing authentication logs for login velocity anomalies, ASN diversity, password spray patterns, and geographic distribution of failed logins. Uses statistical analysis on Splunk or raw log data. Use when investigating account takeover campaigns or building detection rules for auth abuse.

detecting-wmi-persistence

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect WMI event subscription persistence by analyzing Sysmon Event IDs 19, 20, and 21 for malicious EventFilter, EventConsumer, and FilterToConsumerBinding creation.

detecting-typosquatting-packages-in-npm-pypi

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detects typosquatting attacks in npm and PyPI package registries by analyzing package name similarity using Levenshtein distance and other string metrics, examining publish date heuristics to identify recently created packages mimicking established ones, and flagging download count anomalies where suspicious packages have disproportionately low usage compared to their legitimate targets. The analyst queries the PyPI JSON API and npm registry API to gather package metadata for automated comparison. Activates for requests involving package typosquatting detection, dependency confusion analysis, malicious package identification, or software supply chain threat hunting in package registries.

detecting-t1548-abuse-elevation-control-mechanism

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect abuse of elevation control mechanisms including UAC bypass, sudo exploitation, and setuid/setgid manipulation by monitoring registry modifications, process elevation flags, and unusual parent-child process relationships.

detecting-t1055-process-injection-with-sysmon

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect process injection techniques (T1055) including classic DLL injection, process hollowing, and APC injection by analyzing Sysmon events for cross-process memory operations, remote thread creation, and anomalous DLL loading patterns.

detecting-t1003-credential-dumping-with-edr

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect OS credential dumping techniques targeting LSASS memory, SAM database, NTDS.dit, and cached credentials using EDR telemetry, Sysmon process access monitoring, and Windows security event correlation.

detecting-suspicious-powershell-execution

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect suspicious PowerShell execution patterns including encoded commands, download cradles, AMSI bypass attempts, and constrained language mode evasion.

detecting-suspicious-oauth-application-consent

4032
from mukul975/Anthropic-Cybersecurity-Skills

Detect risky OAuth application consent grants in Azure AD / Microsoft Entra ID using Microsoft Graph API, audit logs, and permission analysis to identify illicit consent grant attacks.

detecting-supply-chain-attacks-in-ci-cd

4032
from mukul975/Anthropic-Cybersecurity-Skills

Scans GitHub Actions workflows and CI/CD pipeline configurations for supply chain attack vectors including unpinned actions, script injection via expressions, dependency confusion, and secrets exposure. Uses PyGithub and YAML parsing for automated audit. Use when hardening CI/CD pipelines or investigating compromised build systems.