building-ioc-enrichment-pipeline-with-opencti

OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using O

16 stars

Best use case

building-ioc-enrichment-pipeline-with-opencti is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using O

Teams using building-ioc-enrichment-pipeline-with-opencti should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/building-ioc-enrichment-pipeline-with-opencti/SKILL.md --create-dirs "https://raw.githubusercontent.com/plurigrid/asi/main/plugins/asi/skills/building-ioc-enrichment-pipeline-with-opencti/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/building-ioc-enrichment-pipeline-with-opencti/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How building-ioc-enrichment-pipeline-with-opencti Compares

Feature / Agentbuilding-ioc-enrichment-pipeline-with-openctiStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using O

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Building IOC Enrichment Pipeline with OpenCTI

## Overview

OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using OpenCTI's connector ecosystem to enrich indicators with context from VirusTotal, Shodan, AbuseIPDB, GreyNoise, and other sources. The pipeline automatically enriches newly ingested indicators, correlates them with known threat actors and campaigns, and scores them for analyst prioritization.


## When to Use

- When deploying or configuring building ioc enrichment pipeline with opencti capabilities in your environment
- When establishing security controls aligned to compliance requirements
- When building or improving security architecture for this domain
- When conducting security assessments that require this implementation

## Prerequisites

- Docker and Docker Compose for OpenCTI deployment
- Python 3.9+ with `pycti` library
- API keys for enrichment services: VirusTotal, Shodan, AbuseIPDB, GreyNoise
- Understanding of STIX 2.1 data model and relationships
- ElasticSearch or OpenSearch for OpenCTI backend
- RabbitMQ or Redis for connector messaging

## Key Concepts

### OpenCTI Architecture

OpenCTI uses a GraphQL API frontend backed by ElasticSearch for storage and Redis/RabbitMQ for connector communication. Data is natively stored as STIX 2.1 objects with relationships. Connectors are categorized as: External Import (feed ingestion), Internal Import (file parsing), Internal Enrichment (context addition), and Stream (real-time export).

### Enrichment Connector Model

Internal enrichment connectors are triggered automatically when new observables are created or manually by analysts. Each connector receives STIX objects, queries external services, and returns STIX 2.1 bundles that augment the original observable with additional context, labels, and relationships.

### Confidence Scoring

OpenCTI uses a 0-100 confidence scale for indicators. Enrichment connectors can update confidence scores based on external validation: VirusTotal detection ratios, Shodan exposure data, AbuseIPDB report counts, and GreyNoise classification results.

## Workflow

### Step 1: Deploy OpenCTI with Docker Compose

```yaml
# docker-compose.yml (key services)
version: '3'
services:
  opencti:
    image: opencti/platform:6.4.4
    environment:
      - APP__PORT=8080
      - APP__ADMIN__EMAIL=admin@opencti.io
      - APP__ADMIN__PASSWORD=ChangeMeNow
      - APP__ADMIN__TOKEN=your-admin-token-uuid
      - ELASTICSEARCH__URL=http://elasticsearch:9200
      - MINIO__ENDPOINT=minio
      - RABBITMQ__HOSTNAME=rabbitmq
    ports:
      - "8080:8080"
    depends_on:
      - elasticsearch
      - minio
      - rabbitmq
      - redis

  connector-virustotal:
    image: opencti/connector-virustotal:6.4.4
    environment:
      - OPENCTI_URL=http://opencti:8080
      - OPENCTI_TOKEN=your-admin-token-uuid
      - CONNECTOR_ID=connector-virustotal-id
      - CONNECTOR_NAME=VirusTotal
      - CONNECTOR_SCOPE=StixFile,Artifact,IPv4-Addr,Domain-Name,Url
      - CONNECTOR_AUTO=true
      - VIRUSTOTAL_TOKEN=your-vt-api-key
      - VIRUSTOTAL_MAX_TLP=TLP:AMBER

  connector-shodan:
    image: opencti/connector-shodan:6.4.4
    environment:
      - OPENCTI_URL=http://opencti:8080
      - OPENCTI_TOKEN=your-admin-token-uuid
      - CONNECTOR_ID=connector-shodan-id
      - CONNECTOR_NAME=Shodan
      - CONNECTOR_SCOPE=IPv4-Addr
      - CONNECTOR_AUTO=true
      - SHODAN_TOKEN=your-shodan-api-key
      - SHODAN_MAX_TLP=TLP:AMBER

  connector-abuseipdb:
    image: opencti/connector-abuseipdb:6.4.4
    environment:
      - OPENCTI_URL=http://opencti:8080
      - OPENCTI_TOKEN=your-admin-token-uuid
      - CONNECTOR_ID=connector-abuseipdb-id
      - CONNECTOR_NAME=AbuseIPDB
      - CONNECTOR_SCOPE=IPv4-Addr
      - CONNECTOR_AUTO=true
      - ABUSEIPDB_API_KEY=your-abuseipdb-key
```

### Step 2: Build Custom Enrichment Connector

```python
import os
from pycti import OpenCTIConnectorHelper, get_config_variable
from stix2 import (
    Bundle, Indicator, Note, Relationship,
    IPv4Address, DomainName
)
import requests


class CustomEnrichmentConnector:
    def __init__(self):
        config = {
            "opencti": {
                "url": os.environ.get("OPENCTI_URL"),
                "token": os.environ.get("OPENCTI_TOKEN"),
            },
            "connector": {
                "id": os.environ.get("CONNECTOR_ID"),
                "name": "CustomEnrichment",
                "scope": "IPv4-Addr,Domain-Name,Url",
                "auto": True,
                "type": "INTERNAL_ENRICHMENT",
            },
        }
        self.helper = OpenCTIConnectorHelper(config)
        self.helper.listen(self._process_message)

    def _process_message(self, data):
        entity_id = data["entity_id"]
        stix_object = self.helper.api.stix_cyber_observable.read(id=entity_id)

        if not stix_object:
            return "Observable not found"

        observable_type = stix_object["entity_type"]
        observable_value = stix_object.get("value", "")

        enrichment_results = []

        if observable_type == "IPv4-Addr":
            enrichment_results = self._enrich_ip(observable_value, entity_id)
        elif observable_type == "Domain-Name":
            enrichment_results = self._enrich_domain(observable_value, entity_id)

        if enrichment_results:
            bundle = Bundle(objects=enrichment_results, allow_custom=True)
            self.helper.send_stix2_bundle(bundle.serialize())

        return "Enrichment completed"

    def _enrich_ip(self, ip_address, entity_id):
        """Enrich IP address with GreyNoise, AbuseIPDB context."""
        objects = []

        # GreyNoise Community API
        try:
            gn_response = requests.get(
                f"https://api.greynoise.io/v3/community/{ip_address}",
                headers={"key": os.environ.get("GREYNOISE_API_KEY")},
                timeout=30,
            )
            if gn_response.status_code == 200:
                gn_data = gn_response.json()
                classification = gn_data.get("classification", "unknown")
                noise = gn_data.get("noise", False)
                riot = gn_data.get("riot", False)

                note_content = (
                    f"## GreyNoise Enrichment\n"
                    f"- Classification: {classification}\n"
                    f"- Internet Noise: {noise}\n"
                    f"- RIOT (Benign Service): {riot}\n"
                    f"- Name: {gn_data.get('name', 'N/A')}\n"
                    f"- Last Seen: {gn_data.get('last_seen', 'N/A')}"
                )

                note = Note(
                    content=note_content,
                    object_refs=[entity_id],
                    abstract=f"GreyNoise: {classification}",
                    allow_custom=True,
                )
                objects.append(note)

                # Add labels based on classification
                if classification == "malicious":
                    self.helper.api.stix_cyber_observable.add_label(
                        id=entity_id, label_name="greynoise:malicious"
                    )
                elif riot:
                    self.helper.api.stix_cyber_observable.add_label(
                        id=entity_id, label_name="greynoise:benign-service"
                    )

        except Exception as e:
            self.helper.log_error(f"GreyNoise enrichment failed: {e}")

        return objects

    def _enrich_domain(self, domain, entity_id):
        """Enrich domain with WHOIS and DNS context."""
        objects = []

        try:
            # Use SecurityTrails API for domain enrichment
            st_response = requests.get(
                f"https://api.securitytrails.com/v1/domain/{domain}",
                headers={"APIKEY": os.environ.get("SECURITYTRAILS_API_KEY")},
                timeout=30,
            )
            if st_response.status_code == 200:
                st_data = st_response.json()
                current_dns = st_data.get("current_dns", {})

                a_records = [
                    r.get("ip") for r in current_dns.get("a", {}).get("values", [])
                ]

                note_content = (
                    f"## SecurityTrails Enrichment\n"
                    f"- A Records: {', '.join(a_records)}\n"
                    f"- Alexa Rank: {st_data.get('alexa_rank', 'N/A')}\n"
                    f"- Hostname: {st_data.get('hostname', 'N/A')}"
                )

                note = Note(
                    content=note_content,
                    object_refs=[entity_id],
                    abstract=f"SecurityTrails: {domain}",
                    allow_custom=True,
                )
                objects.append(note)

        except Exception as e:
            self.helper.log_error(f"SecurityTrails enrichment failed: {e}")

        return objects


if __name__ == "__main__":
    connector = CustomEnrichmentConnector()

Related Skills

performing-malware-hash-enrichment-with-virustotal

16
from plurigrid/asi

Enrich malware file hashes using the VirusTotal API to retrieve detection rates, behavioral analysis, YARA matches, and contextual threat intelligence for incident triage and IOC validation.

performing-ioc-enrichment-automation

16
from plurigrid/asi

Automates Indicator of Compromise (IOC) enrichment by orchestrating lookups across VirusTotal, AbuseIPDB, Shodan, MISP, and other intelligence sources to provide contextual scoring and disposition recommendations. Use when SOC analysts need rapid multi-source enrichment of IPs, domains, URLs, and file hashes during alert triage or incident investigation.

integrating-sast-into-github-actions-pipeline

16
from plurigrid/asi

This skill covers integrating Static Application Security Testing (SAST) tools—CodeQL and Semgrep—into GitHub Actions CI/CD pipelines. It addresses configuring automated code scanning on pull requests and pushes, tuning rules to reduce false positives, uploading SARIF results to GitHub Advanced Security, and establishing quality gates that block merges when high-severity vulnerabilities are detected.

integrating-dast-with-owasp-zap-in-pipeline

16
from plurigrid/asi

This skill covers integrating OWASP ZAP (Zed Attack Proxy) for Dynamic Application Security Testing in CI/CD pipelines. It addresses configuring baseline, full, and API scans against running applications, interpreting ZAP findings, tuning scan policies, and establishing DAST quality gates in GitHub Actions and GitLab CI.

building-vulnerability-scanning-workflow

16
from plurigrid/asi

Builds a structured vulnerability scanning workflow using tools like Nessus, Qualys, and OpenVAS to discover, prioritize, and track remediation of security vulnerabilities across infrastructure. Use when SOC teams need to establish recurring vulnerability assessment processes, integrate scan results with SIEM alerting, and build remediation tracking dashboards.

building-vulnerability-exception-tracking-system

16
from plurigrid/asi

Build a vulnerability exception and risk acceptance tracking system with approval workflows, compensating controls documentation, and expiration management.

building-vulnerability-dashboard-with-defectdojo

16
from plurigrid/asi

Deploy DefectDojo as a centralized vulnerability management dashboard with scanner integrations, deduplication, metrics tracking, and Jira ticketing workflows.

building-vulnerability-aging-and-sla-tracking

16
from plurigrid/asi

Implement a vulnerability aging dashboard and SLA tracking system to measure remediation performance against severity-based timelines and drive accountability.

building-threat-intelligence-platform

16
from plurigrid/asi

Building a Threat Intelligence Platform (TIP) involves deploying and integrating multiple CTI tools into a unified system for collecting, analyzing, enriching, and disseminating threat intelligence. T

building-threat-intelligence-feed-integration

16
from plurigrid/asi

Builds automated threat intelligence feed integration pipelines connecting STIX/TAXII feeds, open-source threat intel, and commercial TI platforms into SIEM and security tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence by automating feed ingestion, normalization, scoring, and distribution to detection systems.

building-threat-intelligence-enrichment-in-splunk

16
from plurigrid/asi

Build automated threat intelligence enrichment pipelines in Splunk Enterprise Security using lookup tables, modular inputs, and the Threat Intelligence Framework.

building-threat-hunt-hypothesis-framework

16
from plurigrid/asi

Build a systematic threat hunt hypothesis framework that transforms threat intelligence, attack patterns, and environmental data into testable hunting hypotheses.