building-threat-intelligence-feed-integration

Builds automated threat intelligence feed integration pipelines connecting STIX/TAXII feeds, open-source threat intel, and commercial TI platforms into SIEM and security tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence by automating feed ingestion, normalization, scoring, and distribution to detection systems.

16 stars

Best use case

building-threat-intelligence-feed-integration is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Builds automated threat intelligence feed integration pipelines connecting STIX/TAXII feeds, open-source threat intel, and commercial TI platforms into SIEM and security tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence by automating feed ingestion, normalization, scoring, and distribution to detection systems.

Teams using building-threat-intelligence-feed-integration should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/building-threat-intelligence-feed-integration/SKILL.md --create-dirs "https://raw.githubusercontent.com/plurigrid/asi/main/plugins/asi/skills/building-threat-intelligence-feed-integration/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/building-threat-intelligence-feed-integration/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How building-threat-intelligence-feed-integration Compares

Feature / Agentbuilding-threat-intelligence-feed-integrationStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Builds automated threat intelligence feed integration pipelines connecting STIX/TAXII feeds, open-source threat intel, and commercial TI platforms into SIEM and security tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence by automating feed ingestion, normalization, scoring, and distribution to detection systems.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Building Threat Intelligence Feed Integration

## When to Use

Use this skill when:
- SOC teams need automated ingestion of threat intelligence feeds into SIEM platforms
- Multiple TI sources require normalization into a common format (STIX 2.1)
- Detection systems need real-time IOC matching against network and endpoint telemetry
- TI feed quality assessment and deduplication processes need to be established

**Do not use** for manual IOC lookup — use dedicated enrichment tools (VirusTotal, AbuseIPDB) for ad-hoc queries.

## Prerequisites

- MISP instance or Threat Intelligence Platform (TIP) for feed aggregation
- STIX/TAXII client library (`taxii2-client`, `stix2` Python packages)
- SIEM platform (Splunk ES, Elastic Security, or Sentinel) with TI framework configured
- API keys for commercial and open-source feeds (AlienVault OTX, Abuse.ch, CISA AIS)
- Python 3.8+ for feed processing automation

## Workflow

### Step 1: Identify and Catalog Intelligence Sources

Map available feeds by type, format, and update frequency:

| Feed Source | Format | IOC Types | Update Freq | Cost |
|-------------|--------|-----------|-------------|------|
| AlienVault OTX | STIX/JSON | IP, Domain, Hash, URL | Real-time | Free |
| Abuse.ch URLhaus | CSV/JSON | URL, Domain | Every 5 min | Free |
| Abuse.ch MalwareBazaar | JSON API | File Hash | Real-time | Free |
| CISA AIS | STIX/TAXII 2.1 | All types | Daily | Free (US Gov) |
| CrowdStrike Intel | STIX/JSON | All types + Actor TTP | Real-time | Commercial |
| Mandiant Advantage | STIX 2.1 | All types + Reports | Real-time | Commercial |

### Step 2: Ingest STIX/TAXII Feeds

Connect to a TAXII 2.1 server and download indicators:

```python
from taxii2client.v21 import Server, Collection
from stix2 import parse

# Connect to TAXII server (example: CISA AIS)
server = Server(
    "https://taxii.cisa.gov/taxii2/",
    user="your_username",
    password="your_password"
)

# List available collections
for api_root in server.api_roots:
    print(f"API Root: {api_root.title}")
    for collection in api_root.collections:
        print(f"  Collection: {collection.title} (ID: {collection.id})")

# Fetch indicators from a collection
collection = Collection(
    "https://taxii.cisa.gov/taxii2/collections/COLLECTION_ID/",
    user="your_username",
    password="your_password"
)

# Get indicators added in last 24 hours
from datetime import datetime, timedelta
added_after = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")

response = collection.get_objects(added_after=added_after, type=["indicator"])
for obj in response.get("objects", []):
    indicator = parse(obj)
    print(f"Type: {indicator.type}")
    print(f"Pattern: {indicator.pattern}")
    print(f"Valid Until: {indicator.valid_until}")
    print(f"Confidence: {indicator.confidence}")
    print("---")
```

### Step 3: Ingest Open-Source Feeds

**Abuse.ch URLhaus Feed:**

```python
import requests
import csv
from io import StringIO

# Download URLhaus recent URLs
response = requests.get("https://urlhaus.abuse.ch/downloads/csv_recent/")
reader = csv.reader(StringIO(response.text), delimiter=',')

indicators = []
for row in reader:
    if row[0].startswith("#"):
        continue
    indicators.append({
        "id": row[0],
        "dateadded": row[1],
        "url": row[2],
        "url_status": row[3],
        "threat": row[5],
        "tags": row[6]
    })

print(f"Ingested {len(indicators)} URLs from URLhaus")

# Filter for active threats only
active = [i for i in indicators if i["url_status"] == "online"]
print(f"Active threats: {len(active)}")
```

**AlienVault OTX Pulse Feed:**

```python
from OTXv2 import OTXv2, IndicatorTypes

otx = OTXv2("YOUR_OTX_API_KEY")

# Get subscribed pulses (last 24 hours)
pulses = otx.getall(modified_since="2024-03-14T00:00:00")

for pulse in pulses:
    print(f"Pulse: {pulse['name']}")
    print(f"Tags: {pulse['tags']}")
    for indicator in pulse["indicators"]:
        print(f"  IOC: {indicator['indicator']} ({indicator['type']})")
```

**Abuse.ch Feodo Tracker (C2 IPs):**

```python
response = requests.get("https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.json")
c2_data = response.json()

for entry in c2_data:
    print(f"IP: {entry['ip_address']}:{entry['port']}")
    print(f"Malware: {entry['malware']}")
    print(f"First Seen: {entry['first_seen']}")
    print(f"Last Online: {entry['last_online']}")
```

### Step 4: Normalize and Deduplicate

Convert all feeds to STIX 2.1 format for standardization:

```python
from stix2 import Indicator, Bundle
import hashlib

def create_stix_indicator(ioc_value, ioc_type, source, confidence=50):
    """Convert raw IOC to STIX 2.1 indicator"""
    pattern_map = {
        "ipv4": f"[ipv4-addr:value = '{ioc_value}']",
        "domain": f"[domain-name:value = '{ioc_value}']",
        "url": f"[url:value = '{ioc_value}']",
        "sha256": f"[file:hashes.'SHA-256' = '{ioc_value}']",
        "md5": f"[file:hashes.MD5 = '{ioc_value}']",
    }

    return Indicator(
        name=f"{ioc_type}: {ioc_value}",
        pattern=pattern_map[ioc_type],
        pattern_type="stix",
        valid_from="2024-03-15T00:00:00Z",
        confidence=confidence,
        labels=[source],
        custom_properties={"x_source_feed": source}
    )

# Deduplicate across sources
seen_iocs = set()
unique_indicators = []

for ioc in all_collected_iocs:
    ioc_hash = hashlib.sha256(f"{ioc['type']}:{ioc['value']}".encode()).hexdigest()
    if ioc_hash not in seen_iocs:
        seen_iocs.add(ioc_hash)
        unique_indicators.append(
            create_stix_indicator(ioc["value"], ioc["type"], ioc["source"])
        )

bundle = Bundle(objects=unique_indicators)
print(f"Unique indicators: {len(unique_indicators)}")
```

### Step 5: Push to SIEM Threat Intelligence Framework

**Push to Splunk ES Threat Intelligence:**

```python
import requests

splunk_url = "https://splunk.company.com:8089"
headers = {"Authorization": f"Bearer {splunk_token}"}

for indicator in unique_indicators:
    # Extract IOC value from STIX pattern
    ioc_value = indicator.pattern.split("'")[1]

    # Upload to Splunk ES threat intel collection
    data = {
        "ip": ioc_value,
        "description": indicator.name,
        "weight": indicator.confidence // 10,
        "threat_key": indicator.id,
        "source_feed": indicator.get("x_source_feed", "unknown")
    }

    requests.post(
        f"{splunk_url}/services/data/threat_intel/item/ip_intel",
        headers=headers, data=data,
        verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true",  # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
    )
```

**Push to MISP for centralized management:**

```python
from pymisp import PyMISP, MISPEvent, MISPAttribute

misp = PyMISP("https://misp.company.com", "YOUR_MISP_API_KEY")

# Create event for feed batch
event = MISPEvent()
event.info = f"TI Feed Import - {datetime.now().strftime('%Y-%m-%d')}"
event.threat_level_id = 2  # Medium
event.analysis = 2  # Completed

# Add indicators as attributes
for ioc in unique_indicators:
    attr = MISPAttribute()
    attr.type = "ip-dst" if "ipv4" in ioc.pattern else "domain"
    attr.value = ioc.pattern.split("'")[1]
    attr.to_ids = True
    attr.comment = f"Source: {ioc.get('x_source_feed', 'mixed')}"
    event.add_attribute(**attr)

result = misp.add_event(event)
print(f"MISP Event created: {result['Event']['id']}")
```

### Step 6: Monitor Feed Health and Quality

Track feed effectiveness metrics:

```spl
index=threat_intel sourcetype="threat_intel_manager"
| stats count AS total_iocs,
        dc(threat_key) AS unique_iocs,
        dc(source_feed) AS feed_count
  by source_feed
| join source_feed [
    search index=notable source="Threat Intelligence"
    | stats count AS matches by source_feed
  ]
| eval match_rate = round(matches / unique_iocs * 100, 2)
| sort - match_rate
| table source_feed, unique_iocs, matches, match_rate
```

## Key Concepts

| Term | Definition |
|------|-----------|
| **STIX 2.1** | Structured Threat Information Expression — standardized JSON format for sharing threat intelligence objects |
| **TAXII** | Trusted Automated eXchange of Indicator Information — transport protocol for sharing STIX data via REST API |
| **TIP** | Threat Intelligence Platform — centralized system for aggregating, scoring, and distributing threat intelligence |
| **IOC Scoring** | Process of assigning confidence values to indicators based on source reliability and corroboration |
| **Feed Deduplication** | Removing duplicate IOCs across multiple sources while preserving multi-source attribution |
| **IOC Expiration** | Time-to-live policy removing aged indicators (IP: 30 days, Domain: 90 days, Hash: 1 year) |

## Tools & Systems

- **MISP**: Open-source threat intelligence platform for feed aggregation, correlation, and sharing
- **AlienVault OTX**: Free threat intelligence sharing platform with community pulse feeds
- **Abuse.ch**: Suite of free threat feeds (URLhaus, MalwareBazaar, Feodo Tracker, ThreatFox)
- **OpenCTI**: Open-source cyber threat intelligence platform supporting STIX 2.1 native storage
- **TAXII2 Client**: Python library for connecting to STIX/TAXII 2.1 servers for automated indicator retrieval

## Common Scenarios

- **New Feed Onboarding**: Evaluate feed quality, map fields to STIX, configure automated ingestion pipeline
- **Multi-SIEM Distribution**: Push normalized IOCs from MISP to Splunk, Elastic, and Sentinel simultaneously
- **False Positive Reduction**: Score IOCs by source count and age, expire stale indicators automatically
- **Feed Quality Audit**: Compare detection match rates across feeds to identify highest-value sources
- **Incident IOC Sharing**: Package investigation IOCs as STIX bundle and share with ISACs via TAXII

## Output Format

```
THREAT INTEL FEED STATUS — Daily Report
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Date:         2024-03-15
Total IOCs:   45,892 active indicators

Feed Health:
  Feed                  IOCs    Matches  Match Rate  Status
  Abuse.ch URLhaus      12,340  47       0.38%       HEALTHY
  AlienVault OTX        18,567  23       0.12%       HEALTHY
  Abuse.ch Feodo        1,203   12       1.00%       HEALTHY
  CISA AIS              8,945   8        0.09%       HEALTHY
  CrowdStrike Intel     4,837   31       0.64%       HEALTHY

Actions Today:
  New IOCs ingested:    1,247
  IOCs expired:         892
  Duplicates removed:   156
  SIEM matches:         121 notable events generated
  False positives:      3 (CDN IPs removed from feed)
```

Related Skills

zls-integration

16
from plurigrid/asi

zls-integration skill

tracking-threat-actor-infrastructure

16
from plurigrid/asi

Threat actor infrastructure tracking involves monitoring and mapping adversary-controlled assets including command-and-control (C2) servers, phishing domains, exploit kit hosts, bulletproof hosting, a

token-integration-analyzer

16
from plurigrid/asi

Comprehensive token integration and implementation analyzer based on Trail of Bits' token integration checklist. Analyzes token implementations for ERC20/ERC721 conformity, checks for 20+ weird token patterns, assesses contract composition and owner privileges, performs on-chain scarcity analysis, and evaluates how protocols handle non-standard tokens. Context-aware for both token implementations and token integrations. (project, gitignored)

profiling-threat-actor-groups

16
from plurigrid/asi

Develops comprehensive threat actor profiles for APT groups, criminal organizations, and hacktivist collectives by aggregating TTP documentation, historical campaign data, tooling fingerprints, and attribution indicators from multiple intelligence sources. Use when briefing executives on sector-specific threats, updating threat model assumptions, or prioritizing defensive controls against specific adversaries. Activates for requests involving MITRE ATT&CK Groups, Mandiant APT profiles, CrowdStrike adversary naming, or sector-specific threat briefings.

processing-stix-taxii-feeds

16
from plurigrid/asi

Processes STIX 2.1 threat intelligence bundles delivered via TAXII 2.1 servers, normalizing objects into platform-native schemas and routing them to appropriate consuming systems. Use when onboarding new TAXII collection endpoints, automating bi-directional intelligence sharing with ISACs, or building pipeline validation for malformed STIX bundles. Activates for requests involving OASIS STIX, TAXII server configuration, MISP TAXII, or Cortex XSOAR feed integrations.

performing-threat-modeling-with-owasp-threat-dragon

16
from plurigrid/asi

Use OWASP Threat Dragon to create data flow diagrams, identify threats using STRIDE and LINDDUN methodologies, and generate threat model reports for secure design review.

performing-threat-landscape-assessment-for-sector

16
from plurigrid/asi

Conduct a sector-specific threat landscape assessment by analyzing threat actor targeting patterns, common attack vectors, and industry-specific vulnerabilities to inform organizational risk management.

performing-threat-intelligence-sharing-with-misp

16
from plurigrid/asi

Use PyMISP to create, enrich, and share threat intelligence events on a MISP platform, including IOC management, feed integration, STIX export, and community sharing workflows.

performing-threat-hunting-with-yara-rules

16
from plurigrid/asi

Use YARA pattern-matching rules to hunt for malware, suspicious files, and indicators of compromise across filesystems and memory dumps. Covers rule authoring, yara-python scanning, and integration with threat intel feeds.

performing-threat-hunting-with-elastic-siem

16
from plurigrid/asi

Performs proactive threat hunting in Elastic Security SIEM using KQL/EQL queries, detection rules, and Timeline investigation to identify threats that evade automated detection. Use when SOC teams need to hunt for specific ATT&CK techniques, investigate anomalous behaviors, or validate detection coverage gaps using Elasticsearch and Kibana Security.

performing-threat-emulation-with-atomic-red-team

16
from plurigrid/asi

Executes Atomic Red Team tests for MITRE ATT&CK technique validation using the atomic-operator Python framework. Loads test definitions from YAML atomics, runs attack simulations, and validates detection coverage. Use when testing SIEM detection rules, validating EDR coverage, or conducting purple team exercises.

performing-open-source-intelligence-gathering

16
from plurigrid/asi

Open Source Intelligence (OSINT) gathering is the first active phase of a red team engagement, where operators collect publicly available information about the target organization to identify attack s