building-threat-intelligence-enrichment-in-splunk

Build automated threat intelligence enrichment pipelines in Splunk Enterprise Security using lookup tables, modular inputs, and the Threat Intelligence Framework.

16 stars

Best use case

building-threat-intelligence-enrichment-in-splunk is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Build automated threat intelligence enrichment pipelines in Splunk Enterprise Security using lookup tables, modular inputs, and the Threat Intelligence Framework.

Teams using building-threat-intelligence-enrichment-in-splunk should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/building-threat-intelligence-enrichment-in-splunk/SKILL.md --create-dirs "https://raw.githubusercontent.com/plurigrid/asi/main/plugins/asi/skills/building-threat-intelligence-enrichment-in-splunk/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/building-threat-intelligence-enrichment-in-splunk/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How building-threat-intelligence-enrichment-in-splunk Compares

Feature / Agentbuilding-threat-intelligence-enrichment-in-splunkStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Build automated threat intelligence enrichment pipelines in Splunk Enterprise Security using lookup tables, modular inputs, and the Threat Intelligence Framework.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Building Threat Intelligence Enrichment in Splunk

## Overview

Splunk's Threat Intelligence Framework in Enterprise Security enables SOC teams to automatically correlate indicators of compromise (IOCs) against security events. The framework ingests threat feeds, normalizes indicators into KV Store collections, and uses lookup-based correlation searches to flag matching events. Splunk Threat Intelligence Management centralizes collection, normalization, and enrichment from multiple sources, reducing triage time by providing analysts with immediate context.


## When to Use

- When deploying or configuring building threat intelligence enrichment in splunk capabilities in your environment
- When establishing security controls aligned to compliance requirements
- When building or improving security architecture for this domain
- When conducting security assessments that require this implementation

## Prerequisites

- Splunk Enterprise Security (ES) 7.x or later
- Threat Intelligence Management add-on or Threat Intelligence Framework
- API keys for external threat intelligence feeds (MISP, OTX, VirusTotal, AbuseIPDB)
- KV Store enabled and properly configured
- Admin access for modular input configuration

## Threat Intelligence Framework Architecture

```
External TI Sources (STIX/TAXII, CSV, API)
    |
    v
Modular Inputs (download and parse feeds)
    |
    v
KV Store Collections (normalized IOC storage)
    |-- ip_intel
    |-- domain_intel
    |-- file_intel
    |-- url_intel
    |-- email_intel
    |
    v
Threat Intelligence Lookups
    |
    v
Correlation Searches (match events against IOCs)
    |
    v
Notable Events (enriched with TI context)
```

## Configuring Threat Intelligence Sources

### STIX/TAXII Feed Integration

```conf
# inputs.conf - TAXII feed configuration
[threatlist://taxii_feed_example]
description = TAXII 2.1 Threat Feed
type = taxii
url = https://threatfeed.example.com/taxii2/
collection = threat-indicators-v21
polling_interval = 3600
api_key = <encrypted_api_key>
disabled = false
```

### CSV-Based Threat List

```conf
# inputs.conf - CSV threat list
[threatlist://custom_blocklist]
description = Internal threat blocklist
type = csv
url = https://internal.company.com/threat-feeds/blocklist.csv
polling_interval = 1800
disabled = false
```

### Custom Modular Input for API-Based Feeds

```python
# bin/threatfeed_otx.py - OTX AlienVault feed collector
import json
import sys
import requests
from splunklib.modularinput import Script, Scheme, Argument, Event


class OTXFeedInput(Script):
    def get_scheme(self):
        scheme = Scheme("OTX AlienVault Feed")
        scheme.description = "Collects IOCs from AlienVault OTX"
        scheme.use_external_validation = False
        scheme.streaming_mode = Scheme.streaming_mode_xml

        api_key_arg = Argument("api_key")
        api_key_arg.data_type = Argument.data_type_string
        api_key_arg.required_on_create = True
        scheme.add_argument(api_key_arg)

        pulse_days_arg = Argument("pulse_days")
        pulse_days_arg.data_type = Argument.data_type_number
        pulse_days_arg.required_on_create = False
        scheme.add_argument(pulse_days_arg)

        return scheme

    def stream_events(self, inputs, ew):
        for input_name, input_item in inputs.inputs.items():
            api_key = input_item["api_key"]
            pulse_days = int(input_item.get("pulse_days", 30))

            headers = {"X-OTX-API-KEY": api_key}
            url = f"https://otx.alienvault.com/api/v1/pulses/subscribed?modified_since={pulse_days}d"

            try:
                response = requests.get(url, headers=headers, timeout=60)
                response.raise_for_status()
                data = response.json()

                for pulse in data.get("results", []):
                    for indicator in pulse.get("indicators", []):
                        event = Event()
                        event.stanza = input_name
                        event.data = json.dumps({
                            "indicator": indicator["indicator"],
                            "type": indicator["type"],
                            "pulse_name": pulse["name"],
                            "pulse_id": pulse["id"],
                            "description": indicator.get("description", ""),
                            "created": indicator.get("created", ""),
                            "threat_source": "OTX",
                            "confidence": pulse.get("adversary", "unknown"),
                        })
                        ew.write_event(event)
            except requests.RequestException as e:
                ew.log("ERROR", f"OTX feed collection failed: {str(e)}")


if __name__ == "__main__":
    sys.exit(OTXFeedInput().run(sys.argv))
```

## Building Enrichment Lookups

### KV Store Collection Configuration

```conf
# collections.conf
[ip_threat_intel]
field.ip = string
field.threat_type = string
field.confidence = number
field.source = string
field.description = string
field.first_seen = time
field.last_seen = time
field.severity = string

[domain_threat_intel]
field.domain = string
field.threat_type = string
field.confidence = number
field.source = string
field.whois_registrar = string
field.whois_created = string

[file_hash_intel]
field.file_hash = string
field.hash_type = string
field.malware_family = string
field.confidence = number
field.source = string
field.detection_names = string
```

### Lookup Table Definitions

```conf
# transforms.conf
[ip_threat_intel_lookup]
external_type = kvstore
collection = ip_threat_intel
fields_list = ip, threat_type, confidence, source, description, severity

[domain_threat_intel_lookup]
external_type = kvstore
collection = domain_threat_intel
fields_list = domain, threat_type, confidence, source

[file_hash_intel_lookup]
external_type = kvstore
collection = file_hash_intel
fields_list = file_hash, hash_type, malware_family, confidence, source
```

## Enrichment Correlation Searches

### IP-Based Threat Intelligence Correlation

```spl
| tstats summariesonly=true count from datamodel=Network_Traffic
    where All_Traffic.action=allowed
    by All_Traffic.src_ip, All_Traffic.dest_ip, All_Traffic.dest_port, _time span=5m
| rename "All_Traffic.*" as *
| lookup ip_threat_intel_lookup ip as dest_ip OUTPUT threat_type, confidence, source as ti_source, severity as ti_severity
| where isnotnull(threat_type)
| lookup asset_lookup ip as src_ip OUTPUT asset_name, asset_owner, asset_priority
| eval urgency=case(
    ti_severity=="critical" AND asset_priority=="critical", "critical",
    ti_severity=="high" OR asset_priority=="critical", "high",
    ti_severity=="medium", "medium",
    true(), "low"
)
| eval description="Connection from ".src_ip." (".asset_name.") to known malicious IP ".dest_ip." (".threat_type.") - Source: ".ti_source
```

### Domain-Based Threat Intelligence Correlation

```spl
index=dns sourcetype=stream:dns query_type=A OR query_type=AAAA
| lookup domain_threat_intel_lookup domain as query OUTPUT threat_type as domain_threat, confidence as domain_confidence, source as ti_source
| where isnotnull(domain_threat) AND domain_confidence > 70
| stats count dc(src_ip) as unique_sources values(src_ip) as source_ips by query, domain_threat, ti_source
| eval severity=case(domain_confidence > 90, "critical", domain_confidence > 70, "high", true(), "medium")
| eval description="DNS queries to malicious domain ".query." from ".unique_sources." hosts - Threat: ".domain_threat
```

### File Hash Correlation

```spl
index=endpoint sourcetype=sysmon EventCode=1
| lookup file_hash_intel_lookup file_hash as Hashes OUTPUT malware_family, confidence as hash_confidence, source as ti_source
| where isnotnull(malware_family)
| stats count values(ParentCommandLine) as parent_commands by Computer, User, Image, malware_family, ti_source
| eval severity="critical"
| eval description="Known malware ".malware_family." executed on ".Computer." by ".User." - Binary: ".Image
```

## Multi-Source Enrichment Pipeline

```spl
index=firewall sourcetype=pan:traffic action=allowed
| eval indicators=mvappend(src_ip, dest_ip)
| mvexpand indicators
| lookup ip_threat_intel_lookup ip as indicators OUTPUT threat_type as ip_threat, confidence as ip_confidence, source as ip_ti_source
| lookup geo_ip_lookup ip as indicators OUTPUT country, city, latitude, longitude
| lookup whois_lookup ip as indicators OUTPUT org as ip_org, asn as ip_asn
| where isnotnull(ip_threat)
| stats count
    values(ip_threat) as threat_types
    values(ip_ti_source) as intel_sources
    values(country) as countries
    values(ip_org) as organizations
    latest(_time) as last_seen
    earliest(_time) as first_seen
    by src_ip, dest_ip, dest_port
| eval enrichment_context="Threat: ".mvjoin(threat_types, ", ")." | Geo: ".mvjoin(countries, ", ")." | Org: ".mvjoin(organizations, ", ")
```

## Threat Intelligence Dashboards

### IOC Coverage Statistics

```spl
| inputlookup ip_threat_intel_lookup
| stats count by source, threat_type
| sort -count
| head 20
```

### Feed Freshness Monitoring

```spl
| inputlookup ip_threat_intel_lookup
| eval age_days=round((now() - strptime(last_seen, "%Y-%m-%dT%H:%M:%S")) / 86400, 0)
| stats count avg(age_days) as avg_age_days max(age_days) as max_age_days by source
| eval status=case(avg_age_days > 30, "STALE", avg_age_days > 7, "AGING", true(), "FRESH")
```

## References

- [Splunk Threat Intelligence Framework Documentation](https://help.splunk.com/en/splunk-enterprise-security-8/administer/8.2/threat-intelligence/overview-of-threat-intelligence-in-splunk-enterprise-security)
- [Splunk Lantern - Threat Intelligence Enrichment](https://lantern.splunk.com/Security/UCE/Guided_Insights/Threat_intelligence)
- [Integrated Intelligence Enrichment - Splunk Blog](https://www.splunk.com/en_us/blog/security/integrated-intelligence-enrichment-with-threat-intelligence-management.html)
- [Cisco Talos Threat Intelligence in Splunk](https://www.splunk.com/en_us/blog/security/cisco-talos-threat-intelligence-splunk-security.html)

Related Skills

triaging-security-alerts-in-splunk

16
from plurigrid/asi

Triages security alerts in Splunk Enterprise Security by classifying severity, investigating notable events, correlating related telemetry, and making escalation or closure decisions using SPL queries and the Incident Review dashboard. Use when SOC analysts face queued alerts from correlation searches, need to prioritize investigation order, or must document triage decisions for handoff to Tier 2/3 analysts.

tracking-threat-actor-infrastructure

16
from plurigrid/asi

Threat actor infrastructure tracking involves monitoring and mapping adversary-controlled assets including command-and-control (C2) servers, phishing domains, exploit kit hosts, bulletproof hosting, a

profiling-threat-actor-groups

16
from plurigrid/asi

Develops comprehensive threat actor profiles for APT groups, criminal organizations, and hacktivist collectives by aggregating TTP documentation, historical campaign data, tooling fingerprints, and attribution indicators from multiple intelligence sources. Use when briefing executives on sector-specific threats, updating threat model assumptions, or prioritizing defensive controls against specific adversaries. Activates for requests involving MITRE ATT&CK Groups, Mandiant APT profiles, CrowdStrike adversary naming, or sector-specific threat briefings.

performing-threat-modeling-with-owasp-threat-dragon

16
from plurigrid/asi

Use OWASP Threat Dragon to create data flow diagrams, identify threats using STRIDE and LINDDUN methodologies, and generate threat model reports for secure design review.

performing-threat-landscape-assessment-for-sector

16
from plurigrid/asi

Conduct a sector-specific threat landscape assessment by analyzing threat actor targeting patterns, common attack vectors, and industry-specific vulnerabilities to inform organizational risk management.

performing-threat-intelligence-sharing-with-misp

16
from plurigrid/asi

Use PyMISP to create, enrich, and share threat intelligence events on a MISP platform, including IOC management, feed integration, STIX export, and community sharing workflows.

performing-threat-hunting-with-yara-rules

16
from plurigrid/asi

Use YARA pattern-matching rules to hunt for malware, suspicious files, and indicators of compromise across filesystems and memory dumps. Covers rule authoring, yara-python scanning, and integration with threat intel feeds.

performing-threat-hunting-with-elastic-siem

16
from plurigrid/asi

Performs proactive threat hunting in Elastic Security SIEM using KQL/EQL queries, detection rules, and Timeline investigation to identify threats that evade automated detection. Use when SOC teams need to hunt for specific ATT&CK techniques, investigate anomalous behaviors, or validate detection coverage gaps using Elasticsearch and Kibana Security.

performing-threat-emulation-with-atomic-red-team

16
from plurigrid/asi

Executes Atomic Red Team tests for MITRE ATT&CK technique validation using the atomic-operator Python framework. Loads test definitions from YAML atomics, runs attack simulations, and validates detection coverage. Use when testing SIEM detection rules, validating EDR coverage, or conducting purple team exercises.

performing-open-source-intelligence-gathering

16
from plurigrid/asi

Open Source Intelligence (OSINT) gathering is the first active phase of a red team engagement, where operators collect publicly available information about the target organization to identify attack s

performing-malware-hash-enrichment-with-virustotal

16
from plurigrid/asi

Enrich malware file hashes using the VirusTotal API to retrieve detection rates, behavioral analysis, YARA matches, and contextual threat intelligence for incident triage and IOC validation.

performing-ioc-enrichment-automation

16
from plurigrid/asi

Automates Indicator of Compromise (IOC) enrichment by orchestrating lookups across VirusTotal, AbuseIPDB, Shodan, MISP, and other intelligence sources to provide contextual scoring and disposition recommendations. Use when SOC analysts need rapid multi-source enrichment of IPs, domains, URLs, and file hashes during alert triage or incident investigation.