building-automated-malware-submission-pipeline

Builds an automated malware submission and analysis pipeline that collects suspicious files from endpoints and email gateways, submits them to sandbox environments and multi-engine scanners, and generates verdicts with IOCs for SIEM integration. Use when SOC teams need to scale malware analysis beyond manual sandbox submissions for high-volume alert triage.

16 stars

Best use case

building-automated-malware-submission-pipeline is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Builds an automated malware submission and analysis pipeline that collects suspicious files from endpoints and email gateways, submits them to sandbox environments and multi-engine scanners, and generates verdicts with IOCs for SIEM integration. Use when SOC teams need to scale malware analysis beyond manual sandbox submissions for high-volume alert triage.

Teams using building-automated-malware-submission-pipeline should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/building-automated-malware-submission-pipeline/SKILL.md --create-dirs "https://raw.githubusercontent.com/plurigrid/asi/main/plugins/asi/skills/building-automated-malware-submission-pipeline/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/building-automated-malware-submission-pipeline/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How building-automated-malware-submission-pipeline Compares

Feature / Agentbuilding-automated-malware-submission-pipelineStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Builds an automated malware submission and analysis pipeline that collects suspicious files from endpoints and email gateways, submits them to sandbox environments and multi-engine scanners, and generates verdicts with IOCs for SIEM integration. Use when SOC teams need to scale malware analysis beyond manual sandbox submissions for high-volume alert triage.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Building Automated Malware Submission Pipeline

## When to Use

Use this skill when:
- SOC teams face high volume of suspicious file alerts requiring sandbox analysis
- Manual sandbox submission creates bottlenecks in alert triage workflow
- Endpoint and email security tools quarantine files needing automated verdict determination
- Incident response requires rapid malware family identification and IOC extraction

**Do not use** for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.

## Prerequisites

- Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
- VirusTotal API key (Enterprise for submission, free for lookup)
- MalwareBazaar API access for known malware lookup
- File collection mechanism: EDR quarantine API, email gateway export, network capture
- Python 3.8+ with `requests`, `vt-py`, `pefile` libraries
- Isolated analysis network with no production connectivity

## Workflow

### Step 1: Build File Collection Pipeline

Collect suspicious files from multiple sources:

```python
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime

class MalwareCollector:
    def __init__(self, quarantine_dir="/opt/malware_quarantine"):
        self.quarantine_dir = Path(quarantine_dir)
        self.quarantine_dir.mkdir(exist_ok=True)

    def collect_from_edr(self, edr_api_url, api_token):
        """Pull quarantined files from CrowdStrike Falcon"""
        headers = {"Authorization": f"Bearer {api_token}"}

        # Get recent quarantine events
        response = requests.get(
            f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
            headers=headers,
            params={"filter": "state:'quarantined'", "limit": 50}
        )
        file_ids = response.json()["resources"]

        for file_id in file_ids:
            # Download quarantined file
            dl_response = requests.get(
                f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
                headers=headers,
                params={"ids": file_id}
            )
            file_data = dl_response.content
            sha256 = hashlib.sha256(file_data).hexdigest()

            filepath = self.quarantine_dir / f"{sha256}.sample"
            filepath.write_bytes(file_data)
            yield {"sha256": sha256, "path": str(filepath), "source": "edr"}

    def collect_from_email_gateway(self, smtp_quarantine_path):
        """Pull attachments from email gateway quarantine"""
        import email
        from email import policy

        for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
            msg = email.message_from_binary_file(
                eml_file.open("rb"), policy=policy.default
            )
            for attachment in msg.iter_attachments():
                content = attachment.get_content()
                if isinstance(content, str):
                    content = content.encode()
                sha256 = hashlib.sha256(content).hexdigest()
                filename = attachment.get_filename() or "unknown"

                filepath = self.quarantine_dir / f"{sha256}.sample"
                filepath.write_bytes(content)
                yield {
                    "sha256": sha256,
                    "path": str(filepath),
                    "source": "email",
                    "original_filename": filename,
                    "sender": msg["From"],
                    "subject": msg["Subject"]
                }

    def compute_hashes(self, filepath):
        """Calculate MD5, SHA1, SHA256 for a file"""
        with open(filepath, "rb") as f:
            content = f.read()
        return {
            "md5": hashlib.md5(content).hexdigest(),
            "sha1": hashlib.sha1(content).hexdigest(),
            "sha256": hashlib.sha256(content).hexdigest(),
            "size": len(content)
        }
```

### Step 2: Pre-Screen with Hash Lookups

Check if the file is already known before sandbox submission:

```python
import vt

class MalwarePreScreener:
    def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
        self.vt_client = vt.Client(vt_api_key)
        self.mb_api_url = mb_api_url

    def check_virustotal(self, sha256):
        """Lookup hash in VirusTotal"""
        try:
            file_obj = self.vt_client.get_object(f"/files/{sha256}")
            stats = file_obj.last_analysis_stats
            return {
                "found": True,
                "malicious": stats.get("malicious", 0),
                "suspicious": stats.get("suspicious", 0),
                "undetected": stats.get("undetected", 0),
                "total": sum(stats.values()),
                "threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
                    "suggested_threat_label", "Unknown"
                ),
                "type": getattr(file_obj, "type_description", "Unknown")
            }
        except vt.APIError:
            return {"found": False}

    def check_malwarebazaar(self, sha256):
        """Lookup hash in MalwareBazaar"""
        response = requests.post(
            self.mb_api_url,
            data={"query": "get_info", "hash": sha256}
        )
        data = response.json()
        if data["query_status"] == "ok":
            entry = data["data"][0]
            return {
                "found": True,
                "signature": entry.get("signature", "Unknown"),
                "tags": entry.get("tags", []),
                "file_type": entry.get("file_type", "Unknown"),
                "first_seen": entry.get("first_seen", "Unknown")
            }
        return {"found": False}

    def pre_screen(self, sha256):
        """Run all pre-screening checks"""
        vt_result = self.check_virustotal(sha256)
        mb_result = self.check_malwarebazaar(sha256)

        verdict = "UNKNOWN"
        if vt_result["found"] and vt_result.get("malicious", 0) > 10:
            verdict = "KNOWN_MALICIOUS"
        elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
            verdict = "LIKELY_CLEAN"

        return {
            "sha256": sha256,
            "virustotal": vt_result,
            "malwarebazaar": mb_result,
            "pre_screen_verdict": verdict,
            "needs_sandbox": verdict == "UNKNOWN"
        }

    def close(self):
        self.vt_client.close()
```

### Step 3: Submit to Sandbox for Dynamic Analysis

**Cuckoo Sandbox Submission:**

```python
class SandboxSubmitter:
    def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
        self.cuckoo_url = cuckoo_url

    def submit_to_cuckoo(self, filepath, timeout=300):
        """Submit file to Cuckoo Sandbox"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{self.cuckoo_url}/tasks/create/file",
                files={"file": f},
                data={
                    "timeout": timeout,
                    "options": "procmemdump=yes,route=none",
                    "priority": 2,
                    "machine": "win10_x64"
                }
            )
        task_id = response.json()["task_id"]
        return task_id

    def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
        """Wait for sandbox analysis to complete"""
        import time
        elapsed = 0
        while elapsed < max_wait:
            response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
            status = response.json()["task"]["status"]
            if status == "reported":
                return self.get_report(task_id)
            elif status == "failed_analysis":
                return {"error": "Analysis failed"}
            time.sleep(poll_interval)
            elapsed += poll_interval
        return {"error": "Analysis timed out"}

    def get_report(self, task_id):
        """Retrieve analysis report"""
        response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
        report = response.json()

        # Extract key indicators
        return {
            "task_id": task_id,
            "score": report.get("info", {}).get("score", 0),
            "signatures": [
                {"name": s["name"], "severity": s["severity"], "description": s["description"]}
                for s in report.get("signatures", [])
            ],
            "network": {
                "dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
                "http": [
                    {"url": h["uri"], "method": h["method"]}
                    for h in report.get("network", {}).get("http", [])
                ],
                "hosts": report.get("network", {}).get("hosts", [])
            },
            "dropped_files": [
                {"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
                for f in report.get("dropped", [])
            ],
            "processes": [
                {"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
                for p in report.get("behavior", {}).get("processes", [])
            ],
            "registry_keys": [
                k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
            ]
        }

    def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
        """Submit to Joe Sandbox Cloud"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{joe_url}/v2/submission/new",
                headers={"Authorization": f"Bearer {joe_api_key}"},
                files={"sample": f},
                data={
                    "systems": "w10_64",
                    "internet-access": False,
                    "report-cache": True
                }
            )
        return response.json()["data"]["webid"]
```

### Step 4: Extract IOCs and Generate Verdict

```python
class VerdictGenerator:
    def __init__(self):
        self.malicious_threshold = 7  # Cuckoo score threshold

    def generate_verdict(self, pre_screen, sandbox_report):
        """Combine pre-screening and sandbox results for final verdict"""
        iocs = {
            "ips": [],
            "domains": [],
            "urls": [],
            "hashes": [],
            "registry_keys": [],
            "files_dropped": []
        }

        # Extract IOCs from sandbox report
        if sandbox_report:
            iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
            iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
            iocs["urls"] = [
                h["url"] for h in sandbox_report.get("network", {}).get("http", [])
            ]
            iocs["hashes"] = [
                f["sha256"] for f in sandbox_report.get("dropped_files", [])
            ]
            iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
            iocs["files_dropped"] = sandbox_report.get("dropped_files", [])

        # Determine verdict
        vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
        sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
        sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0

        combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)

        if combined_score >= 100:
            verdict = "MALICIOUS"
            confidence = "HIGH"
        elif combined_score >= 50:
            verdict = "SUSPICIOUS"
            confidence = "MEDIUM"
        elif combined_score >= 20:
            verdict = "POTENTIALLY_UNWANTED"
            confidence = "LOW"
        else:
            verdict = "CLEAN"
            confidence = "HIGH"

        return {
            "verdict": verdict,
            "confidence": confidence,
            "combined_score": combined_score,
            "iocs": iocs,
            "vt_detections": vt_malicious,
            "sandbox_score": sandbox_score,
            "signatures": sandbox_report.get("signatures", []) if sandbox_report else []
        }
```

### Step 5: Push Results to SIEM

```python
def push_to_splunk(verdict_result, splunk_url, splunk_token):
    """Send malware analysis verdict to Splunk HEC"""
    import json

    event = {
        "sourcetype": "malware_analysis",
        "source": "malware_pipeline",
        "event": {
            "sha256": verdict_result["sha256"],
            "verdict": verdict_result["verdict"],
            "confidence": verdict_result["confidence"],
            "score": verdict_result["combined_score"],
            "vt_detections": verdict_result["vt_detections"],
            "sandbox_score": verdict_result["sandbox_score"],
            "malware_family": verdict_result.get("threat_label", "Unknown"),
            "iocs": verdict_result["iocs"],
            "signatures": [s["name"] for s in verdict_result["signatures"]]
        }
    }

    response = requests.post(
        f"{splunk_url}/services/collector/event",
        headers={
            "Authorization": f"Splunk {splunk_token}",
            "Content-Type": "application/json"
        },
        json=event,
        verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true",  # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
    )
    return response.status_code == 200

def push_iocs_to_blocklist(iocs, firewall_api):
    """Push extracted IOCs to blocking infrastructure"""
    for ip in iocs.get("ips", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
        )
    for domain in iocs.get("domains", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
        )
```

### Step 6: Orchestrate the Full Pipeline

```python
def run_malware_pipeline(sample_path, config):
    """Execute full malware analysis pipeline"""
    collector = MalwareCollector()
    screener = MalwarePreScreener(config["vt_key"])
    submitter = SandboxSubmitter(config["cuckoo_url"])
    generator = VerdictGenerator()

    # Step 1: Hash and pre-screen
    hashes = collector.compute_hashes(sample_path)
    pre_screen = screener.pre_screen(hashes["sha256"])

    # Step 2: Submit to sandbox if unknown
    sandbox_report = None
    if pre_screen["needs_sandbox"]:
        task_id = submitter.submit_to_cuckoo(sample_path)
        sandbox_report = submitter.wait_for_analysis(task_id)

    # Step 3: Generate verdict
    verdict = generator.generate_verdict(pre_screen, sandbox_report)
    verdict["sha256"] = hashes["sha256"]
    verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")

    # Step 4: Push to SIEM
    push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])

    # Step 5: Block if malicious
    if verdict["verdict"] == "MALICIOUS":
        push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])

    screener.close()
    return verdict
```

## Key Concepts

| Term | Definition |
|------|-----------|
| **Dynamic Analysis** | Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes) |
| **Static Analysis** | Examining malware without execution (hash lookup, string analysis, PE header inspection) |
| **Sandbox Evasion** | Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis |
| **IOC Extraction** | Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports |
| **Multi-AV Scanning** | Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection |
| **Verdict** | Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean |

## Tools & Systems

- **Cuckoo Sandbox**: Open-source automated malware analysis platform with behavioral analysis and network capture
- **Joe Sandbox**: Commercial sandbox with deep behavioral analysis, YARA matching, and MITRE ATT&CK mapping
- **Any.Run**: Interactive sandbox service allowing real-time manipulation during analysis for debugging evasive malware
- **VirusTotal**: Multi-engine scanning service providing 70+ AV results and behavioral analysis reports
- **CAPE Sandbox**: Community-maintained Cuckoo fork with enhanced payload extraction and configuration dumping

## Common Scenarios

- **Email Attachment Triage**: Auto-submit quarantined email attachments, generate verdict in <5 minutes
- **EDR Quarantine Processing**: Batch-process files quarantined by endpoint security for detailed analysis
- **Incident Investigation**: Submit suspicious binaries found during IR for malware family identification and IOC extraction
- **Threat Intel Enrichment**: Analyze samples from threat feeds to extract C2 infrastructure and update blocking
- **Zero-Day Detection**: Sandbox catches novel malware missed by signature-based AV through behavioral analysis

## Output Format

```
MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample:       invoice_march.docx
SHA256:       a1b2c3d4e5f6a7b8...
File Type:    Microsoft Word Document (macro-enabled)

Pre-Screening:
  VirusTotal:    34/72 malicious (Emotet.Downloader)
  MalwareBazaar: Tags: emotet, macro, downloader

Sandbox Analysis (Cuckoo):
  Score:         9.2/10 (MALICIOUS)
  Signatures:
    - Macro executes PowerShell download cradle (severity: 8)
    - Process injection into explorer.exe (severity: 9)
    - Connects to known Emotet C2 server (severity: 9)

Extracted IOCs:
  C2 IPs:       185.234.218[.]50:8080, 45.77.123[.]45:443
  Domains:       update-service[.]evil[.]com
  Dropped Files: payload.dll (SHA256: b2c3d4e5...)
  Registry:      HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update

VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
  [DONE] IOCs pushed to Splunk threat intel
  [DONE] C2 IPs blocked on firewall
  [DONE] Domain sinkholed on DNS
  [DONE] Hash blocked on endpoint
```

Related Skills

reverse-engineering-rust-malware

16
from plurigrid/asi

Reverse engineer Rust-compiled malware using IDA Pro and Ghidra with techniques for handling non-null-terminated strings, crate dependency extraction, and Rust-specific control flow analysis.

reverse-engineering-malware-with-ghidra

16
from plurigrid/asi

Reverse engineers malware binaries using NSA's Ghidra disassembler and decompiler to understand internal logic, cryptographic routines, C2 protocols, and evasion techniques at the assembly and pseudo-C level. Activates for requests involving malware reverse engineering, disassembly analysis, decompilation, binary analysis, or understanding malware internals.

reverse-engineering-dotnet-malware-with-dnspy

16
from plurigrid/asi

Reverse engineers .NET malware using dnSpy decompiler and debugger to analyze C#/VB.NET source code, identify obfuscation techniques, extract configurations, and understand malicious functionality including stealers, RATs, and loaders. Activates for requests involving .NET malware analysis, C# malware decompilation, managed code reverse engineering, or .NET obfuscation analysis.

reverse-engineering-android-malware-with-jadx

16
from plurigrid/asi

Reverse engineers malicious Android APK files using JADX decompiler to analyze Java/Kotlin source code, identify malicious functionality including data theft, C2 communication, privilege escalation, and overlay attacks. Examines manifest permissions, receivers, services, and native libraries. Activates for requests involving Android malware analysis, APK reverse engineering, mobile malware investigation, or Android threat analysis.

performing-static-malware-analysis-with-pe-studio

16
from plurigrid/asi

Performs static analysis of Windows PE (Portable Executable) malware samples using PEStudio to examine file headers, imports, strings, resources, and indicators without executing the binary. Identifies suspicious characteristics including packing, anti-analysis techniques, and malicious imports. Activates for requests involving static malware analysis, PE file inspection, Windows executable analysis, or pre-execution malware triage.

performing-malware-triage-with-yara

16
from plurigrid/asi

Performs rapid malware triage and classification using YARA rules to match file patterns, strings, byte sequences, and structural characteristics against known malware families and suspicious indicators. Covers rule writing, scanning, and integration with analysis pipelines. Activates for requests involving YARA rule creation, malware classification, pattern matching, sample triage, or signature-based detection.

performing-malware-persistence-investigation

16
from plurigrid/asi

Systematically investigate all persistence mechanisms on Windows and Linux systems to identify how malware survives reboots and maintains access.

performing-malware-ioc-extraction

16
from plurigrid/asi

Malware IOC extraction is the process of analyzing malicious software to identify actionable indicators of compromise including file hashes, network indicators (C2 domains, IP addresses, URLs), regist

performing-malware-hash-enrichment-with-virustotal

16
from plurigrid/asi

Enrich malware file hashes using the VirusTotal API to retrieve detection rates, behavioral analysis, YARA matches, and contextual threat intelligence for incident triage and IOC validation.

performing-firmware-malware-analysis

16
from plurigrid/asi

Analyzes firmware images for embedded malware, backdoors, and unauthorized modifications targeting routers, IoT devices, UEFI/BIOS, and embedded systems. Covers firmware extraction, filesystem analysis, binary reverse engineering, and bootkit detection. Activates for requests involving firmware security analysis, IoT malware investigation, UEFI rootkit detection, or embedded device compromise assessment.

performing-automated-malware-analysis-with-cape

16
from plurigrid/asi

Deploy and operate CAPEv2 sandbox for automated malware analysis with behavioral monitoring, payload extraction, configuration parsing, and anti-evasion capabilities.

integrating-sast-into-github-actions-pipeline

16
from plurigrid/asi

This skill covers integrating Static Application Security Testing (SAST) tools—CodeQL and Semgrep—into GitHub Actions CI/CD pipelines. It addresses configuring automated code scanning on pull requests and pushes, tuning rules to reduce false positives, uploading SARIF results to GitHub Advanced Security, and establishing quality gates that block merges when high-severity vulnerabilities are detected.