agent-swarm-orchestrator

Designs multi-agent systems with coordinated agent swarms, task distribution, inter-agent communication, and emergent collective behavior.

16 stars

Best use case

agent-swarm-orchestrator is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Designs multi-agent systems with coordinated agent swarms, task distribution, inter-agent communication, and emergent collective behavior.

Teams using agent-swarm-orchestrator should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/agent-swarm-orchestrator/SKILL.md --create-dirs "https://raw.githubusercontent.com/diegosouzapw/awesome-omni-skill/main/skills/ai-agents/agent-swarm-orchestrator/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/agent-swarm-orchestrator/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How agent-swarm-orchestrator Compares

Feature / Agentagent-swarm-orchestratorStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Designs multi-agent systems with coordinated agent swarms, task distribution, inter-agent communication, and emergent collective behavior.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Agent Swarm Orchestrator

This skill provides guidance for designing multi-agent systems where multiple AI agents coordinate to accomplish complex tasks through distributed execution and emergent behavior.

## Core Competencies

- **Swarm Architecture**: Agent topologies, communication patterns
- **Task Distribution**: Work allocation, load balancing
- **Coordination Protocols**: Consensus, voting, delegation
- **Emergent Behavior**: Collective intelligence from simple rules

## Multi-Agent Fundamentals

### Why Multi-Agent Systems

```
Single Agent:                    Multi-Agent Swarm:
┌─────────────────┐             ┌─────────────────────────────┐
│                 │             │  ┌───┐ ┌───┐ ┌───┐ ┌───┐   │
│   One Agent     │             │  │ A │ │ A │ │ A │ │ A │   │
│   Sequential    │     vs      │  └───┘ └───┘ └───┘ └───┘   │
│   Single POV    │             │  Parallel, Diverse POV     │
│                 │             │  Specialization possible    │
└─────────────────┘             └─────────────────────────────┘

Benefits:
- Parallelism: Multiple agents work simultaneously
- Specialization: Agents can have different capabilities
- Resilience: System continues if one agent fails
- Diverse perspectives: Multiple approaches to problems
```

### Agent Roles

| Role | Responsibility | Characteristics |
|------|----------------|-----------------|
| Orchestrator | Coordinate swarm | Global view, task assignment |
| Worker | Execute tasks | Specialized skills, focused |
| Supervisor | Quality control | Review, approve, redirect |
| Specialist | Domain expertise | Deep knowledge, narrow scope |
| Scout | Exploration | Information gathering, research |

## Swarm Topologies

### Hierarchical

```
                    ┌──────────────┐
                    │ Orchestrator │
                    └──────┬───────┘
                           │
          ┌────────────────┼────────────────┐
          │                │                │
    ┌─────┴─────┐    ┌─────┴─────┐    ┌─────┴─────┐
    │Supervisor │    │Supervisor │    │Supervisor │
    └─────┬─────┘    └─────┬─────┘    └─────┬─────┘
          │                │                │
    ┌─────┼─────┐    ┌─────┼─────┐    ┌─────┼─────┐
    │     │     │    │     │     │    │     │     │
   ┌┴┐   ┌┴┐   ┌┴┐  ┌┴┐   ┌┴┐   ┌┴┐  ┌┴┐   ┌┴┐   ┌┴┐
   │W│   │W│   │W│  │W│   │W│   │W│  │W│   │W│   │W│
   └─┘   └─┘   └─┘  └─┘   └─┘   └─┘  └─┘   └─┘   └─┘
         Workers

Best for: Clear task decomposition, quality control needed
```

### Peer-to-Peer

```
        ┌───┐───────────────┌───┐
        │ A │               │ A │
        └───┘               └───┘
          │ \             / │
          │   \         /   │
          │     \     /     │
          │       \ /       │
        ┌───┐     ╳       ┌───┐
        │ A │   /   \     │ A │
        └───┘ /       \   └───┘
            /           \
        ┌───┐           ┌───┐
        │ A │───────────│ A │
        └───┘           └───┘

Best for: Collaborative problem-solving, no single point of failure
```

### Blackboard

```
┌─────────────────────────────────────────────────────┐
│                     Blackboard                       │
│  ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│  │ Problem     │ │ Partial     │ │ Solutions     │ │
│  │ State       │ │ Results     │ │               │ │
│  └─────────────┘ └─────────────┘ └───────────────┘ │
└───────────────────────┬─────────────────────────────┘
                        │
    ┌───────────────────┼───────────────────┐
    │         Read/Write│                   │
    ▼                   ▼                   ▼
┌───────┐          ┌───────┐          ┌───────┐
│Agent A│          │Agent B│          │Agent C│
│Analyst│          │Builder│          │Critic │
└───────┘          └───────┘          └───────┘

Best for: Complex problems, agents contribute asynchronously
```

## Agent Implementation

### Base Agent Structure

```python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional, List
from enum import Enum
import asyncio

class AgentStatus(Enum):
    IDLE = "idle"
    WORKING = "working"
    WAITING = "waiting"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class AgentMessage:
    sender_id: str
    recipient_id: str  # Or "broadcast"
    message_type: str
    content: Any
    timestamp: float = field(default_factory=lambda: time.time())
    correlation_id: Optional[str] = None

@dataclass
class Task:
    id: str
    description: str
    priority: int = 0
    dependencies: List[str] = field(default_factory=list)
    assigned_to: Optional[str] = None
    status: str = "pending"
    result: Any = None


class BaseAgent(ABC):
    """Base class for swarm agents"""

    def __init__(self, agent_id: str, capabilities: List[str]):
        self.id = agent_id
        self.capabilities = capabilities
        self.status = AgentStatus.IDLE
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.current_task: Optional[Task] = None

    @abstractmethod
    async def process_task(self, task: Task) -> Any:
        """Process assigned task - implement in subclass"""
        pass

    @abstractmethod
    def can_handle(self, task: Task) -> bool:
        """Check if agent can handle this task type"""
        pass

    async def receive_message(self, message: AgentMessage):
        """Add message to queue for processing"""
        await self.message_queue.put(message)

    async def run(self):
        """Main agent loop"""
        while True:
            # Check for messages
            try:
                message = await asyncio.wait_for(
                    self.message_queue.get(),
                    timeout=0.1
                )
                await self._handle_message(message)
            except asyncio.TimeoutError:
                pass

            # Work on current task
            if self.current_task and self.status == AgentStatus.WORKING:
                await self._work_on_task()

    async def _handle_message(self, message: AgentMessage):
        """Handle incoming message"""
        if message.message_type == "assign_task":
            self.current_task = message.content
            self.status = AgentStatus.WORKING
        elif message.message_type == "cancel_task":
            self.current_task = None
            self.status = AgentStatus.IDLE
        elif message.message_type == "status_request":
            await self._send_status(message.sender_id)

    async def _work_on_task(self):
        """Execute current task"""
        try:
            result = await self.process_task(self.current_task)
            self.current_task.result = result
            self.current_task.status = "completed"
            self.status = AgentStatus.COMPLETED
        except Exception as e:
            self.current_task.status = "failed"
            self.status = AgentStatus.FAILED
```

### Specialized Agents

```python
class ResearchAgent(BaseAgent):
    """Agent specialized for information gathering"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["research", "search", "analyze"])
        self.search_tools = []

    def can_handle(self, task: Task) -> bool:
        return any(cap in task.description.lower()
                   for cap in ["research", "find", "search", "investigate"])

    async def process_task(self, task: Task) -> dict:
        # Research implementation
        results = await self._search(task.description)
        analysis = await self._analyze(results)
        return {
            "sources": results,
            "analysis": analysis,
            "confidence": self._calculate_confidence(results)
        }


class CodeAgent(BaseAgent):
    """Agent specialized for code generation"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["code", "implement", "debug"])

    def can_handle(self, task: Task) -> bool:
        return any(cap in task.description.lower()
                   for cap in ["implement", "code", "write", "fix", "debug"])

    async def process_task(self, task: Task) -> dict:
        # Code generation implementation
        code = await self._generate_code(task.description)
        tests = await self._generate_tests(code)
        return {
            "code": code,
            "tests": tests,
            "language": self._detect_language(code)
        }


class ReviewAgent(BaseAgent):
    """Agent specialized for quality review"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["review", "critique", "approve"])

    def can_handle(self, task: Task) -> bool:
        return "review" in task.description.lower()

    async def process_task(self, task: Task) -> dict:
        artifact = task.content  # What to review
        issues = await self._find_issues(artifact)
        suggestions = await self._generate_suggestions(issues)
        return {
            "approved": len(issues) == 0,
            "issues": issues,
            "suggestions": suggestions
        }
```

## Orchestration Patterns

### Task-Based Orchestration

```python
class SwarmOrchestrator:
    """Coordinate agent swarm for task completion"""

    def __init__(self):
        self.agents: dict[str, BaseAgent] = {}
        self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self.completed_tasks: dict[str, Task] = {}
        self.message_bus = MessageBus()

    def register_agent(self, agent: BaseAgent):
        """Add agent to swarm"""
        self.agents[agent.id] = agent

    async def submit_task(self, task: Task):
        """Submit task for processing"""
        # Calculate effective priority (lower = higher priority)
        priority = -task.priority  # Negate for min-heap behavior
        await self.task_queue.put((priority, task))

    async def run(self):
        """Main orchestration loop"""
        while True:
            # Get highest priority task
            _, task = await self.task_queue.get()

            # Check dependencies
            if not self._dependencies_met(task):
                # Re-queue with lower priority
                await self.task_queue.put((1, task))
                continue

            # Find suitable agent
            agent = await self._find_available_agent(task)
            if agent:
                await self._assign_task(agent, task)
            else:
                # No agent available, re-queue
                await asyncio.sleep(0.1)
                await self.task_queue.put((0, task))

    def _dependencies_met(self, task: Task) -> bool:
        """Check if all dependencies are completed"""
        for dep_id in task.dependencies:
            if dep_id not in self.completed_tasks:
                return False
            if self.completed_tasks[dep_id].status != "completed":
                return False
        return True

    async def _find_available_agent(self, task: Task) -> Optional[BaseAgent]:
        """Find an idle agent that can handle the task"""
        for agent in self.agents.values():
            if agent.status == AgentStatus.IDLE and agent.can_handle(task):
                return agent
        return None

    async def _assign_task(self, agent: BaseAgent, task: Task):
        """Assign task to agent"""
        task.assigned_to = agent.id
        task.status = "assigned"

        message = AgentMessage(
            sender_id="orchestrator",
            recipient_id=agent.id,
            message_type="assign_task",
            content=task
        )
        await agent.receive_message(message)
```

### Workflow Orchestration

```python
from dataclasses import dataclass
from typing import Callable, List

@dataclass
class WorkflowStep:
    name: str
    agent_type: str
    input_transform: Callable[[dict], dict] = lambda x: x
    required_approval: bool = False

class WorkflowOrchestrator:
    """Execute multi-step workflows with agent swarm"""

    def __init__(self, swarm: SwarmOrchestrator):
        self.swarm = swarm
        self.workflows: dict[str, List[WorkflowStep]] = {}

    def register_workflow(self, name: str, steps: List[WorkflowStep]):
        """Register a multi-step workflow"""
        self.workflows[name] = steps

    async def execute_workflow(
        self,
        workflow_name: str,
        initial_input: dict
    ) -> dict:
        """Execute workflow through agent swarm"""
        steps = self.workflows[workflow_name]
        current_data = initial_input
        results = []

        for i, step in enumerate(steps):
            # Transform input for this step
            step_input = step.input_transform(current_data)

            # Create task
            task = Task(
                id=f"{workflow_name}_{i}_{step.name}",
                description=f"Execute {step.name}",
                context=step_input
            )

            # Submit and wait for completion
            await self.swarm.submit_task(task)
            result = await self._wait_for_completion(task.id)

            # Handle approval if required
            if step.required_approval:
                approved = await self._request_approval(step, result)
                if not approved:
                    return {"status": "rejected", "step": step.name}

            results.append(result)
            current_data = {**current_data, **result}

        return {
            "status": "completed",
            "results": results,
            "final_output": current_data
        }
```

## Inter-Agent Communication

### Message Patterns

```python
class MessageBus:
    """Central message routing for swarm communication"""

    def __init__(self):
        self.subscribers: dict[str, List[BaseAgent]] = {}
        self.message_history: List[AgentMessage] = []

    def subscribe(self, topic: str, agent: BaseAgent):
        """Subscribe agent to topic"""
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(agent)

    async def publish(self, topic: str, message: AgentMessage):
        """Publish message to topic subscribers"""
        self.message_history.append(message)

        for agent in self.subscribers.get(topic, []):
            if agent.id != message.sender_id:  # Don't send to self
                await agent.receive_message(message)

    async def send_direct(self, message: AgentMessage):
        """Send message to specific agent"""
        # Route to recipient
        pass

    async def broadcast(self, message: AgentMessage):
        """Broadcast to all agents"""
        for topic_subscribers in self.subscribers.values():
            for agent in topic_subscribers:
                if agent.id != message.sender_id:
                    await agent.receive_message(message)
```

### Consensus Protocol

```python
class ConsensusProtocol:
    """Achieve consensus among agents"""

    def __init__(self, agents: List[BaseAgent], threshold: float = 0.66):
        self.agents = agents
        self.threshold = threshold

    async def vote(self, proposal: Any) -> dict:
        """Collect votes from agents"""
        votes = {}

        for agent in self.agents:
            message = AgentMessage(
                sender_id="consensus",
                recipient_id=agent.id,
                message_type="vote_request",
                content=proposal
            )
            response = await self._request_vote(agent, message)
            votes[agent.id] = response

        return self._tally_votes(votes)

    def _tally_votes(self, votes: dict) -> dict:
        """Calculate consensus result"""
        approve_count = sum(1 for v in votes.values() if v.get("approve"))
        total = len(votes)

        consensus_reached = (approve_count / total) >= self.threshold

        return {
            "consensus_reached": consensus_reached,
            "approve_count": approve_count,
            "reject_count": total - approve_count,
            "threshold": self.threshold,
            "votes": votes
        }
```

## Emergent Behavior

### Stigmergy Pattern

Indirect coordination through environment modification:

```python
class SharedWorkspace:
    """Shared environment for stigmergic coordination"""

    def __init__(self):
        self.artifacts: dict[str, Any] = {}
        self.pheromones: dict[str, float] = {}  # Strength indicators
        self.decay_rate = 0.1

    def deposit(self, key: str, artifact: Any, strength: float = 1.0):
        """Add artifact to shared space"""
        self.artifacts[key] = artifact
        self.pheromones[key] = strength

    def sense(self, pattern: str) -> List[tuple[str, Any, float]]:
        """Find artifacts matching pattern"""
        matches = []
        for key, artifact in self.artifacts.items():
            if pattern in key:
                strength = self.pheromones.get(key, 0)
                matches.append((key, artifact, strength))
        return sorted(matches, key=lambda x: -x[2])  # Highest strength first

    def decay(self):
        """Reduce pheromone strengths over time"""
        for key in self.pheromones:
            self.pheromones[key] *= (1 - self.decay_rate)
            if self.pheromones[key] < 0.01:
                del self.pheromones[key]
                del self.artifacts[key]

    def reinforce(self, key: str, amount: float = 0.1):
        """Strengthen pheromone trail"""
        if key in self.pheromones:
            self.pheromones[key] = min(1.0, self.pheromones[key] + amount)
```

### Ant Colony Optimization

```python
class AntColonyTaskAllocator:
    """Allocate tasks using ant colony optimization"""

    def __init__(self, agents: List[BaseAgent], tasks: List[Task]):
        self.agents = agents
        self.tasks = tasks
        self.pheromones = {}  # (agent_id, task_id) -> strength
        self.alpha = 1.0  # Pheromone importance
        self.beta = 2.0   # Heuristic importance
        self.evaporation = 0.1

    def _calculate_probability(
        self,
        agent: BaseAgent,
        task: Task
    ) -> float:
        """Calculate probability of assigning task to agent"""
        key = (agent.id, task.id)

        # Pheromone component
        pheromone = self.pheromones.get(key, 0.1)

        # Heuristic: agent capability match
        heuristic = 1.0 if agent.can_handle(task) else 0.1

        return (pheromone ** self.alpha) * (heuristic ** self.beta)

    def allocate(self) -> dict[str, str]:
        """Generate task allocation"""
        allocation = {}
        available_tasks = set(t.id for t in self.tasks)

        for agent in self.agents:
            if not available_tasks:
                break

            # Calculate probabilities for available tasks
            probs = {}
            for task in self.tasks:
                if task.id in available_tasks:
                    probs[task.id] = self._calculate_probability(agent, task)

            # Normalize and select
            total = sum(probs.values())
            if total > 0:
                selected = self._weighted_choice(probs, total)
                allocation[agent.id] = selected
                available_tasks.remove(selected)

        return allocation

    def update_pheromones(self, allocation: dict, quality: dict):
        """Update pheromones based on solution quality"""
        # Evaporation
        for key in self.pheromones:
            self.pheromones[key] *= (1 - self.evaporation)

        # Deposit based on quality
        for agent_id, task_id in allocation.items():
            key = (agent_id, task_id)
            deposit = quality.get(task_id, 0.1)
            self.pheromones[key] = self.pheromones.get(key, 0) + deposit
```

## Best Practices

### Design Guidelines

1. **Keep agents simple**: Complex behavior emerges from simple rules
2. **Define clear interfaces**: Message formats, task structures
3. **Plan for failure**: Agents will fail; system should continue
4. **Monitor collective behavior**: Individual agents may be fine but swarm stuck
5. **Version coordination protocols**: Agents may run different versions

### Anti-Patterns to Avoid

- **God orchestrator**: One agent that does everything
- **Chatty agents**: Too much inter-agent communication
- **Tight coupling**: Agents depending on specific other agents
- **Missing deadlines**: No timeouts on task completion
- **State explosion**: Agents maintaining too much state

## References

- `references/swarm-topologies.md` - Detailed topology patterns
- `references/coordination-protocols.md` - Consensus and voting algorithms
- `references/emergent-patterns.md` - Stigmergy and self-organization

Related Skills

cascade-orchestrator

16
from diegosouzapw/awesome-omni-skill

Creates sophisticated workflow cascades coordinating multiple micro-skills with sequential pipelines, parallel execution, conditional branching, and Codex sandbox iteration. Enhanced with multi-model routing (Gemini/Codex), ruv-swarm coordination, memory persistence, and audit-pipeline patterns for production workflows.

careerswarm

16
from diegosouzapw/awesome-omni-skill

CareerSwarm repo: monitor, code sweeps, debugging, and sync with Cursor. Workspace is this repo; read OPENCLAW.md and CONTEXT/todo before big edits.

agent-multi-repo-swarm

16
from diegosouzapw/awesome-omni-skill

Agent skill for multi-repo-swarm - invoke with $agent-multi-repo-swarm

Proactive Orchestrator

16
from diegosouzapw/awesome-omni-skill

Event-driven orchestrator that chains existing skills, edge functions, and agent capabilities into automated workflows triggered by real-time events. Receives events from webhooks (MeetingBaaS, email, calendar), cron jobs (morning brief, pipeline scan), and Slack interactions (button clicks, approvals), then executes multi-step sequences with context-aware routing, HITL approval gates, and self-invocation for long-running chains. This is the conductor — it connects 30+ existing Slack functions, 6 specialist agents, and the sequence executor into coherent, event-driven sales workflows. NOT triggered by user chat messages. Triggered by system events only.

ln-1000-pipeline-orchestrator

16
from diegosouzapw/awesome-omni-skill

Meta-orchestrator (L0): reads kanban board, drives Stories through pipeline 300->310->400->500 in parallel via TeamCreate. Max 3 concurrent Stories. Auto squash-merge to develop on quality gate PASS.

archaeology-orchestrator

16
from diegosouzapw/awesome-omni-skill

고고학 발굴조사 고찰 작성 자동화 파이프라인 마스터 오케스트레이터

AOC Orchestrator

16
from diegosouzapw/awesome-omni-skill

Main coordinator for the automated Advent of Code workflow. Orchestrates puzzle fetching, TDD solving, and submission for daily AoC challenges. Use when running the full automated solving pipeline or when user requests to solve an AoC day.

adb-workflow-orchestrator

16
from diegosouzapw/awesome-omni-skill

TOON workflow orchestration engine for coordinating ADB automation scripts across phases with error recovery

swarm-mail

16
from diegosouzapw/awesome-omni-skill

Coordinate with other agents using Swarm mail and file locking

parallel-orchestrator

16
from diegosouzapw/awesome-omni-skill

Orchestrate parallel agent workflows using HtmlGraph's ParallelWorkflow. Activate when planning multi-agent work, using Task tool for sub-agents, or coordinating concurrent feature implementation.

n8n-mcp-orchestrator

16
from diegosouzapw/awesome-omni-skill

Expert MCP (Model Context Protocol) orchestration with n8n workflow automation. Master bidirectional MCP integration, expose n8n workflows as AI agent tools, consume MCP servers in workflows, build agentic systems, orchestrate multi-agent workflows, and create production-ready AI-powered automation pipelines with Claude Code integration.

dev-swarm-install-ai-code-agent

16
from diegosouzapw/awesome-omni-skill

Install AI code agent CLI tools including claude-code, gemini-cli, codex, and github copilot-cli. Use when setting up AI coding assistants or when the user asks to install an AI code agent.