building-threat-feed-aggregation-with-misp

Deploy MISP (Malware Information Sharing Platform) to aggregate, correlate, and distribute threat intelligence feeds from multiple sources for centralized IOC management and automated SIEM integration.

16 stars

Best use case

building-threat-feed-aggregation-with-misp is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Deploy MISP (Malware Information Sharing Platform) to aggregate, correlate, and distribute threat intelligence feeds from multiple sources for centralized IOC management and automated SIEM integration.

Teams using building-threat-feed-aggregation-with-misp should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/building-threat-feed-aggregation-with-misp/SKILL.md --create-dirs "https://raw.githubusercontent.com/plurigrid/asi/main/plugins/asi/skills/building-threat-feed-aggregation-with-misp/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/building-threat-feed-aggregation-with-misp/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How building-threat-feed-aggregation-with-misp Compares

Feature / Agentbuilding-threat-feed-aggregation-with-mispStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Deploy MISP (Malware Information Sharing Platform) to aggregate, correlate, and distribute threat intelligence feeds from multiple sources for centralized IOC management and automated SIEM integration.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Building Threat Feed Aggregation with MISP

## Overview

MISP is the leading open-source threat intelligence platform for collecting, storing, distributing, and sharing cybersecurity indicators and threat intelligence. It aggregates feeds from OSINT sources, commercial providers, and sharing communities into a unified platform with automatic correlation, STIX/TAXII export, and direct integration with SIEMs and security tools. This skill covers deploying MISP via Docker, configuring feeds from sources like abuse.ch, AlienVault OTX, and CIRCL, setting up automated feed synchronization, and integrating with Splunk, Elasticsearch, and SOAR platforms.


## When to Use

- When deploying or configuring building threat feed aggregation with misp capabilities in your environment
- When establishing security controls aligned to compliance requirements
- When building or improving security architecture for this domain
- When conducting security assessments that require this implementation

## Prerequisites

- Docker and Docker Compose for deployment
- Python 3.9+ with `pymisp` library for API interaction
- Linux server with 8GB+ RAM for production deployment
- Understanding of IOC types and threat intelligence lifecycle
- Network access to external feed URLs

## Key Concepts

### MISP Architecture

MISP stores threat intelligence as Events containing Attributes (IOCs) organized by type and category. Events can have Tags (MITRE ATT&CK, TLP marking, sector tags), Galaxies (threat actor profiles, malware families, attack patterns), and Objects (structured groupings of related attributes). Events are correlated automatically across the instance.

### Feed Types

MISP supports three feed formats: MISP format (native JSON events), CSV (comma-separated IOCs), and freetext (unstructured text with automatic IOC extraction). Feeds can be remote (fetched from URLs) or local (uploaded files). MISP ships with 80+ default OSINT feeds including abuse.ch URLhaus, Botvrij, CIRCL OSINT, and malware traffic analysis.

### Sharing and Synchronization

MISP instances can synchronize with other MISP instances via push/pull mechanisms. Sharing groups control distribution (organization only, this community, connected communities, all communities). The TAXII server module enables integration with STIX/TAXII consumers.

## Workflow

### Step 1: Deploy MISP with Docker

```yaml
# docker-compose.yml for MISP deployment
version: '3.8'
services:
  misp:
    image: coolacid/misp-docker:core-latest
    container_name: misp
    restart: unless-stopped
    ports:
      - "443:443"
      - "80:80"
    environment:
      - MYSQL_HOST=misp-db
      - MYSQL_DATABASE=misp
      - MYSQL_USER=misp
      - MYSQL_PASSWORD=misp_db_password_change_me
      - MISP_ADMIN_EMAIL=admin@organization.com
      - MISP_ADMIN_PASSPHRASE=admin_password_change_me
      - MISP_BASEURL=https://misp.organization.com
      - POSTFIX_RELAY_HOST=smtp.organization.com
      - TIMEZONE=UTC
    volumes:
      - misp-data:/var/www/MISP/app/files
      - misp-config:/var/www/MISP/app/Config
    depends_on:
      - misp-db
      - misp-redis

  misp-db:
    image: mysql:8.0
    container_name: misp-db
    restart: unless-stopped
    environment:
      - MYSQL_DATABASE=misp
      - MYSQL_USER=misp
      - MYSQL_PASSWORD=misp_db_password_change_me
      - MYSQL_ROOT_PASSWORD=root_password_change_me
    volumes:
      - misp-db-data:/var/lib/mysql

  misp-redis:
    image: redis:7
    container_name: misp-redis
    restart: unless-stopped

volumes:
  misp-data:
  misp-config:
  misp-db-data:
```

### Step 2: Configure Feeds via PyMISP API

```python
from pymisp import PyMISP, MISPFeed
import json

class MISPFeedManager:
    def __init__(self, misp_url, misp_key, verify_ssl=False):
        self.misp = PyMISP(misp_url, misp_key, verify_ssl)
        print(f"[+] Connected to MISP: {misp_url}")

    def list_feeds(self):
        """List all configured feeds."""
        feeds = self.misp.feeds()
        enabled = [f for f in feeds if f.get("Feed", {}).get("enabled")]
        disabled = [f for f in feeds if not f.get("Feed", {}).get("enabled")]
        print(f"[+] Feeds: {len(enabled)} enabled, {len(disabled)} disabled")
        return feeds

    def enable_default_feeds(self):
        """Enable recommended default OSINT feeds."""
        recommended_feeds = [
            "CIRCL OSINT Feed",
            "Botvrij.eu - Indicators of Compromise",
            "abuse.ch URLhaus Host file",
            "abuse.ch Feodo Tracker",
            "abuse.ch SSL Blacklist",
            "malwaredomainlist",
            "CyberCure - IP Feed",
        ]

        feeds = self.misp.feeds()
        enabled_count = 0
        for feed in feeds:
            feed_data = feed.get("Feed", {})
            if feed_data.get("name") in recommended_feeds:
                if not feed_data.get("enabled"):
                    self.misp.enable_feed(feed_data["id"])
                    self.misp.enable_feed_cache(feed_data["id"])
                    enabled_count += 1
                    print(f"  [+] Enabled: {feed_data['name']}")

        print(f"[+] Enabled {enabled_count} feeds")

    def add_custom_feed(self, name, url, provider, feed_format="csv",
                        input_source="network", enabled=True):
        """Add a custom threat intelligence feed."""
        feed = MISPFeed()
        feed.name = name
        feed.provider = provider
        feed.url = url
        feed.source_format = feed_format
        feed.input_source = input_source
        feed.enabled = enabled
        feed.caching_enabled = True
        feed.publish = False
        feed.distribution = "3"  # All communities

        result = self.misp.add_feed(feed)
        if "Feed" in result:
            feed_id = result["Feed"]["id"]
            print(f"[+] Added feed: {name} (ID: {feed_id})")
            return feed_id
        else:
            print(f"[-] Error adding feed: {result}")
            return None

    def fetch_all_feeds(self):
        """Trigger fetch for all enabled feeds."""
        feeds = self.misp.feeds()
        for feed in feeds:
            feed_data = feed.get("Feed", {})
            if feed_data.get("enabled"):
                self.misp.fetch_feed(feed_data["id"])
                print(f"  [*] Fetching: {feed_data['name']}")
        print("[+] Feed fetch triggered for all enabled feeds")

manager = MISPFeedManager(
    "https://misp.organization.com",
    "YOUR_MISP_API_KEY",
)
manager.enable_default_feeds()
manager.add_custom_feed(
    name="Abuse.ch MalwareBazaar Recent",
    url="https://bazaar.abuse.ch/export/csv/recent/",
    provider="abuse.ch",
    feed_format="csv",
)
manager.fetch_all_feeds()
```

### Step 3: Search and Correlate Indicators

```python
def search_indicators(misp, value=None, type_attribute=None, tags=None, last_days=30):
    """Search MISP for indicators with correlation."""
    from datetime import datetime, timedelta
    date_from = (datetime.now() - timedelta(days=last_days)).strftime("%Y-%m-%d")

    search_params = {
        "date_from": date_from,
        "published": True,
        "enforceWarninglist": True,
    }
    if value:
        search_params["value"] = value
    if type_attribute:
        search_params["type_attribute"] = type_attribute
    if tags:
        search_params["tags"] = tags

    results = misp.search("attributes", **search_params)
    attributes = results.get("Attribute", [])
    print(f"[+] Search returned {len(attributes)} attributes")

    # Group by event for context
    events = {}
    for attr in attributes:
        event_id = attr.get("event_id", "")
        if event_id not in events:
            events[event_id] = {"attributes": [], "tags": set()}
        events[event_id]["attributes"].append({
            "type": attr.get("type", ""),
            "value": attr.get("value", ""),
            "category": attr.get("category", ""),
            "timestamp": attr.get("timestamp", ""),
        })
        for tag in attr.get("Tag", []):
            events[event_id]["tags"].add(tag.get("name", ""))

    return {"attributes": attributes, "events": events}

# Search for specific IOC
misp = manager.misp
results = search_indicators(misp, value="203.0.113.1")
results_by_type = search_indicators(misp, type_attribute="ip-dst", last_days=7)
results_by_tag = search_indicators(misp, tags=["tlp:white", "type:OSINT"])
```

### Step 4: Export to SIEM (Splunk / Elasticsearch)

```python
import requests
from datetime import datetime, timedelta

class MISPSIEMExporter:
    def __init__(self, misp_client):
        self.misp = misp_client

    def export_to_splunk(self, splunk_url, hec_token, days=7):
        """Export recent MISP indicators to Splunk via HEC."""
        date_from = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
        results = self.misp.search("attributes", date_from=date_from,
                                    published=True, enforceWarninglist=True)
        attributes = results.get("Attribute", [])

        headers = {"Authorization": f"Splunk {hec_token}"}
        exported = 0
        for attr in attributes:
            event = {
                "event": {
                    "ioc_type": attr.get("type", ""),
                    "ioc_value": attr.get("value", ""),
                    "category": attr.get("category", ""),
                    "event_id": attr.get("event_id", ""),
                    "timestamp": attr.get("timestamp", ""),
                    "tags": [t.get("name", "") for t in attr.get("Tag", [])],
                },
                "sourcetype": "misp:attribute",
                "source": "misp",
                "index": "threat_intel",
            }
            resp = requests.post(
                f"{splunk_url}/services/collector/event",
                headers=headers, json=event,
                verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true",  # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
            )
            if resp.status_code == 200:
                exported += 1

        print(f"[+] Exported {exported}/{len(attributes)} indicators to Splunk")

    def export_ioc_list(self, output_file, ioc_types=None, days=30):
        """Export flat IOC list for firewall/proxy blocklists."""
        ioc_types = ioc_types or ["ip-dst", "domain", "hostname", "url"]
        date_from = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")

        all_iocs = []
        for ioc_type in ioc_types:
            results = self.misp.search(
                "attributes", type_attribute=ioc_type,
                date_from=date_from, published=True,
                enforceWarninglist=True,
            )
            for attr in results.get("Attribute", []):
                all_iocs.append(attr.get("value", ""))

        unique_iocs = sorted(set(all_iocs))
        with open(output_file, "w") as f:
            for ioc in unique_iocs:
                f.write(f"{ioc}\n")

        print(f"[+] Exported {len(unique_iocs)} unique IOCs to {output_file}")

exporter = MISPSIEMExporter(misp)
exporter.export_ioc_list("blocklist_ips.txt", ioc_types=["ip-dst"], days=7)
```

## Validation Criteria

- MISP deployed and accessible via web interface and API
- Default OSINT feeds enabled and fetching data
- Custom feeds added and ingesting indicators
- Indicators searchable with correlation across events
- IOCs exported to SIEM (Splunk/Elasticsearch) successfully
- Blocklists generated for firewall/proxy integration

## References

- [MISP Project](https://www.misp-project.org/)
- [MISP GitHub Repository](https://github.com/MISP/MISP)
- [MISP Default Feeds](https://www.misp-project.org/feeds/)
- [PyMISP Documentation](https://pymisp.readthedocs.io/)
- [Kraven Security: MISP Using Feeds](https://kravensecurity.com/threat-intelligence-with-misp-part-4-using-feeds/)
- [Cosive: What is MISP](https://www.cosive.com/blog/what-is-misp-the-ultimate-introduction)

Related Skills

tracking-threat-actor-infrastructure

16
from plurigrid/asi

Threat actor infrastructure tracking involves monitoring and mapping adversary-controlled assets including command-and-control (C2) servers, phishing domains, exploit kit hosts, bulletproof hosting, a

profiling-threat-actor-groups

16
from plurigrid/asi

Develops comprehensive threat actor profiles for APT groups, criminal organizations, and hacktivist collectives by aggregating TTP documentation, historical campaign data, tooling fingerprints, and attribution indicators from multiple intelligence sources. Use when briefing executives on sector-specific threats, updating threat model assumptions, or prioritizing defensive controls against specific adversaries. Activates for requests involving MITRE ATT&CK Groups, Mandiant APT profiles, CrowdStrike adversary naming, or sector-specific threat briefings.

processing-stix-taxii-feeds

16
from plurigrid/asi

Processes STIX 2.1 threat intelligence bundles delivered via TAXII 2.1 servers, normalizing objects into platform-native schemas and routing them to appropriate consuming systems. Use when onboarding new TAXII collection endpoints, automating bi-directional intelligence sharing with ISACs, or building pipeline validation for malformed STIX bundles. Activates for requests involving OASIS STIX, TAXII server configuration, MISP TAXII, or Cortex XSOAR feed integrations.

performing-threat-modeling-with-owasp-threat-dragon

16
from plurigrid/asi

Use OWASP Threat Dragon to create data flow diagrams, identify threats using STRIDE and LINDDUN methodologies, and generate threat model reports for secure design review.

performing-threat-landscape-assessment-for-sector

16
from plurigrid/asi

Conduct a sector-specific threat landscape assessment by analyzing threat actor targeting patterns, common attack vectors, and industry-specific vulnerabilities to inform organizational risk management.

performing-threat-intelligence-sharing-with-misp

16
from plurigrid/asi

Use PyMISP to create, enrich, and share threat intelligence events on a MISP platform, including IOC management, feed integration, STIX export, and community sharing workflows.

performing-threat-hunting-with-yara-rules

16
from plurigrid/asi

Use YARA pattern-matching rules to hunt for malware, suspicious files, and indicators of compromise across filesystems and memory dumps. Covers rule authoring, yara-python scanning, and integration with threat intel feeds.

performing-threat-hunting-with-elastic-siem

16
from plurigrid/asi

Performs proactive threat hunting in Elastic Security SIEM using KQL/EQL queries, detection rules, and Timeline investigation to identify threats that evade automated detection. Use when SOC teams need to hunt for specific ATT&CK techniques, investigate anomalous behaviors, or validate detection coverage gaps using Elasticsearch and Kibana Security.

performing-threat-emulation-with-atomic-red-team

16
from plurigrid/asi

Executes Atomic Red Team tests for MITRE ATT&CK technique validation using the atomic-operator Python framework. Loads test definitions from YAML atomics, runs attack simulations, and validates detection coverage. Use when testing SIEM detection rules, validating EDR coverage, or conducting purple team exercises.

performing-insider-threat-investigation

16
from plurigrid/asi

Investigates insider threat incidents involving employees, contractors, or trusted partners who misuse authorized access to steal data, sabotage systems, or violate security policies. Combines digital forensics, user behavior analytics, and HR/legal coordination to build an evidence-based case. Activates for requests involving insider threat investigation, employee data theft, privilege misuse, user behavior anomaly, or internal threat detection.

performing-dark-web-monitoring-for-threats

16
from plurigrid/asi

Dark web monitoring involves systematically scanning Tor hidden services, underground forums, paste sites, and dark web marketplaces to identify threats targeting an organization, including leaked cre

investigating-insider-threat-indicators

16
from plurigrid/asi

Investigates insider threat indicators including data exfiltration attempts, unauthorized access patterns, policy violations, and pre-departure behaviors using SIEM analytics, DLP alerts, and HR data correlation. Use when SOC teams receive insider threat referrals from HR, detect anomalous data movement by employees, or need to build investigation timelines for potential insider threats.