Best use case
agent-workflow-designer is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Agent Workflow Designer
Teams using agent-workflow-designer should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/agent-workflow-designer/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How agent-workflow-designer Compares
| Feature / Agent | agent-workflow-designer | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Agent Workflow Designer
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
Best AI Skills for Claude
Explore the best AI skills for Claude and Claude Code across coding, research, workflow automation, documentation, and agent operations.
ChatGPT vs Claude for Agent Skills
Compare ChatGPT and Claude for AI agent skills across coding, writing, research, and reusable workflow execution.
Cursor vs Codex for AI Workflows
Compare Cursor and Codex for AI coding workflows, repository assistance, debugging, refactoring, and reusable developer skills.
SKILL.md Source
# Agent Workflow Designer
**Tier:** POWERFUL
**Category:** Engineering
**Domain:** Multi-Agent Systems / AI Orchestration
---
## Overview
Design production-grade multi-agent orchestration systems. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven, consensus), platform-specific implementations, handoff protocols, state management, error recovery, context window budgeting, and cost optimization.
---
## Core Capabilities
- Pattern selection guide for any orchestration requirement
- Handoff protocol templates (structured context passing)
- State management patterns for multi-agent workflows
- Error recovery and retry strategies
- Context window budget management
- Cost optimization strategies per platform
- Platform-specific configs: Claude Code Agent Teams, OpenClaw, CrewAI, AutoGen
---
## When to Use
- Building a multi-step AI pipeline that exceeds one agent's context capacity
- Parallelizing research, generation, or analysis tasks for speed
- Creating specialist agents with defined roles and handoff contracts
- Designing fault-tolerant AI workflows for production
---
## Pattern Selection Guide
```
Is the task sequential (each step needs previous output)?
YES → Sequential Pipeline
NO → Can tasks run in parallel?
YES → Parallel Fan-out/Fan-in
NO → Is there a hierarchy of decisions?
YES → Hierarchical Delegation
NO → Is it event-triggered?
YES → Event-Driven
NO → Need consensus/validation?
YES → Consensus Pattern
```
---
## Pattern 1: Sequential Pipeline
**Use when:** Each step depends on the previous output. Research → Draft → Review → Polish.
```python
# sequential_pipeline.py
from dataclasses import dataclass
from typing import Callable, Any
import anthropic
@dataclass
class PipelineStage:
name: "str"
system_prompt: str
input_key: str # what to take from state
output_key: str # what to write to state
model: str = "claude-3-5-sonnet-20241022"
max_tokens: int = 2048
class SequentialPipeline:
def __init__(self, stages: list[PipelineStage]):
self.stages = stages
self.client = anthropic.Anthropic()
def run(self, initial_input: str) -> dict:
state = {"input": initial_input}
for stage in self.stages:
print(f"[{stage.name}] Processing...")
stage_input = state.get(stage.input_key, "")
response = self.client.messages.create(
model=stage.model,
max_tokens=stage.max_tokens,
system=stage.system_prompt,
messages=[{"role": "user", "content": stage_input}],
)
state[stage.output_key] = response.content[0].text
state[f"{stage.name}_tokens"] = response.usage.input_tokens + response.usage.output_tokens
print(f"[{stage.name}] Done. Tokens: {state[f'{stage.name}_tokens']}")
return state
# Example: Blog post pipeline
pipeline = SequentialPipeline([
PipelineStage(
name="researcher",
system_prompt="You are a research specialist. Given a topic, produce a structured research brief with: key facts, statistics, expert perspectives, and controversy points.",
input_key="input",
output_key="research",
),
PipelineStage(
name="writer",
system_prompt="You are a senior content writer. Using the research provided, write a compelling 800-word blog post with a clear hook, 3 main sections, and a strong CTA.",
input_key="research",
output_key="draft",
),
PipelineStage(
name="editor",
system_prompt="You are a copy editor. Review the draft for: clarity, flow, grammar, and SEO. Return the improved version only, no commentary.",
input_key="draft",
output_key="final",
),
])
```
---
## Pattern 2: Parallel Fan-out / Fan-in
**Use when:** Independent tasks that can run concurrently. Research 5 competitors simultaneously.
```python
# parallel_fanout.py
import asyncio
import anthropic
from typing import Any
async def run_agent(client, task_name: "str-system-str-user-str-model-str"claude-3-5-sonnet-20241022") -> dict:
"""Single async agent call"""
loop = asyncio.get_event_loop()
def _call():
return client.messages.create(
model=model,
max_tokens=2048,
system=system,
messages=[{"role": "user", "content": user}],
)
response = await loop.run_in_executor(None, _call)
return {
"task": task_name,
"output": response.content[0].text,
"tokens": response.usage.input_tokens + response.usage.output_tokens,
}
async def parallel_research(competitors: list[str], research_type: str) -> dict:
"""Fan-out: research all competitors in parallel. Fan-in: synthesize results."""
client = anthropic.Anthropic()
# FAN-OUT: spawn parallel agent calls
tasks = [
run_agent(
client,
task_name=competitor,
system=f"You are a competitive intelligence analyst. Research {competitor} and provide: pricing, key features, target market, and known weaknesses.",
user=f"Analyze {competitor} for comparison with our product in the {research_type} market.",
)
for competitor in competitors
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle failures gracefully
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
if failed:
print(f"Warning: {len(failed)} research tasks failed: {failed}")
# FAN-IN: synthesize
combined_research = "\n\n".join([
f"## {r['task']}\n{r['output']}" for r in successful
])
synthesis = await run_agent(
client,
task_name="synthesizer",
system="You are a strategic analyst. Synthesize competitor research into a concise comparison matrix and strategic recommendations.",
user=f"Synthesize these competitor analyses:\n\n{combined_research}",
model="claude-3-5-sonnet-20241022",
)
return {
"individual_analyses": successful,
"synthesis": synthesis["output"],
"total_tokens": sum(r["tokens"] for r in successful) + synthesis["tokens"],
}
```
---
## Pattern 3: Hierarchical Delegation
**Use when:** Complex tasks with subtask discovery. Orchestrator breaks down work, delegates to specialists.
```python
# hierarchical_delegation.py
import json
import anthropic
ORCHESTRATOR_SYSTEM = """You are an orchestration agent. Your job is to:
1. Analyze the user's request
2. Break it into subtasks
3. Assign each to the appropriate specialist agent
4. Collect results and synthesize
Available specialists:
- researcher: finds facts, data, and information
- writer: creates content and documents
- coder: writes and reviews code
- analyst: analyzes data and produces insights
Respond with a JSON plan:
{
"subtasks": [
{"id": "1", "agent": "researcher", "task": "...", "depends_on": []},
{"id": "2", "agent": "writer", "task": "...", "depends_on": ["1"]}
]
}"""
SPECIALIST_SYSTEMS = {
"researcher": "You are a research specialist. Find accurate, relevant information and cite sources when possible.",
"writer": "You are a professional writer. Create clear, engaging content in the requested format.",
"coder": "You are a senior software engineer. Write clean, well-commented code with error handling.",
"analyst": "You are a data analyst. Provide structured analysis with evidence-backed conclusions.",
}
class HierarchicalOrchestrator:
def __init__(self):
self.client = anthropic.Anthropic()
def run(self, user_request: str) -> str:
# 1. Orchestrator creates plan
plan_response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=ORCHESTRATOR_SYSTEM,
messages=[{"role": "user", "content": user_request}],
)
plan = json.loads(plan_response.content[0].text)
results = {}
# 2. Execute subtasks respecting dependencies
for subtask in self._topological_sort(plan["subtasks"]):
context = self._build_context(subtask, results)
specialist = SPECIALIST_SYSTEMS[subtask["agent"]]
result = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
system=specialist,
messages=[{"role": "user", "content": f"{context}\n\nTask: {subtask['task']}"}],
)
results[subtask["id"]] = result.content[0].text
# 3. Final synthesis
all_results = "\n\n".join([f"### {k}\n{v}" for k, v in results.items()])
synthesis = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
system="Synthesize the specialist outputs into a coherent final response.",
messages=[{"role": "user", "content": f"Original request: {user_request}\n\nSpecialist outputs:\n{all_results}"}],
)
return synthesis.content[0].text
def _build_context(self, subtask: dict, results: dict) -> str:
if not subtask.get("depends_on"):
return ""
deps = [f"Output from task {dep}:\n{results[dep]}" for dep in subtask["depends_on"] if dep in results]
return "Previous results:\n" + "\n\n".join(deps) if deps else ""
def _topological_sort(self, subtasks: list) -> list:
# Simple ordered execution respecting depends_on
ordered, remaining = [], list(subtasks)
completed = set()
while remaining:
for task in remaining:
if all(dep in completed for dep in task.get("depends_on", [])):
ordered.append(task)
completed.add(task["id"])
remaining.remove(task)
break
return ordered
```
---
## Handoff Protocol Template
```python
# Standard handoff context format — use between all agents
@dataclass
class AgentHandoff:
"""Structured context passed between agents in a workflow."""
task_id: str
workflow_id: str
step_number: int
total_steps: int
# What was done
previous_agent: str
previous_output: str
artifacts: dict # {"filename": "content"} for any files produced
# What to do next
current_agent: str
current_task: str
constraints: list[str] # hard rules for this step
# Metadata
context_budget_remaining: int # tokens left for this agent
cost_so_far_usd: float
def to_prompt(self) -> str:
return f"""
# Agent Handoff — Step {self.step_number}/{self.total_steps}
## Your Task
{self.current_task}
## Constraints
{chr(10).join(f'- {c}' for c in self.constraints)}
## Context from Previous Step ({self.previous_agent})
{self.previous_output[:2000]}{"... [truncated]" if len(self.previous_output) > 2000 else ""}
## Context Budget
You have approximately {self.context_budget_remaining} tokens remaining. Be concise.
"""
```
---
## Error Recovery Patterns
```python
import time
from functools import wraps
def with_retry(max_attempts=3, backoff_seconds=2, fallback_model=None):
"""Decorator for agent calls with exponential backoff and model fallback."""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_attempts):
try:
return fn(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < max_attempts - 1:
wait = backoff_seconds * (2 ** attempt)
print(f"Attempt {attempt+1} failed: {e}. Retrying in {wait}s...")
time.sleep(wait)
# Fall back to cheaper/faster model on rate limit
if fallback_model and "rate_limit" in str(e).lower():
kwargs["model"] = fallback_model
raise last_error
return wrapper
return decorator
@with_retry(max_attempts=3, fallback_model="claude-3-haiku-20240307")
def call_agent(model, system, user):
...
```
---
## Context Window Budgeting
```python
# Budget context across a multi-step pipeline
# Rule: never let any step consume more than 60% of remaining budget
CONTEXT_LIMITS = {
"claude-3-5-sonnet-20241022": 200_000,
"gpt-4o": 128_000,
}
class ContextBudget:
def __init__(self, model: str, reserve_pct: float = 0.2):
total = CONTEXT_LIMITS.get(model, 128_000)
self.total = total
self.reserve = int(total * reserve_pct) # keep 20% as buffer
self.used = 0
@property
def remaining(self):
return self.total - self.reserve - self.used
def allocate(self, step_name: "str-requested-int-int"
allocated = min(requested, int(self.remaining * 0.6)) # max 60% of remaining
print(f"[Budget] {step_name}: allocated {allocated:,} tokens (remaining: {self.remaining:,})")
return allocated
def consume(self, tokens_used: int):
self.used += tokens_used
def truncate_to_budget(text: str, token_budget: int, chars_per_token: float = 4.0) -> str:
"""Rough truncation — use tiktoken for precision."""
char_budget = int(token_budget * chars_per_token)
if len(text) <= char_budget:
return text
return text[:char_budget] + "\n\n[... truncated to fit context budget ...]"
```
---
## Cost Optimization Strategies
| Strategy | Savings | Tradeoff |
|---|---|---|
| Use Haiku for routing/classification | 85-90% | Slightly less nuanced judgment |
| Cache repeated system prompts | 50-90% | Requires prompt caching setup |
| Truncate intermediate outputs | 20-40% | May lose detail in handoffs |
| Batch similar tasks | 50% | Latency increases |
| Use Sonnet for most, Opus for final step only | 60-70% | Final quality may improve |
| Short-circuit on confidence threshold | 30-50% | Need confidence scoring |
---
## Common Pitfalls
- **Circular dependencies** — agents calling each other in loops; enforce DAG structure at design time
- **Context bleed** — passing entire previous output to every step; summarize or extract only what's needed
- **No timeout** — a stuck agent blocks the whole pipeline; always set max_tokens and wall-clock timeouts
- **Silent failures** — agent returns plausible but wrong output; add validation steps for critical paths
- **Ignoring cost** — 10 parallel Opus calls is $0.50 per workflow; model selection is a cost decision
- **Over-orchestration** — if a single prompt can do it, it should; only add agents when genuinely neededRelated Skills
ux-researcher-designer
UX research and design toolkit for Senior UX Designer/Researcher including data-driven persona generation, journey mapping, usability testing frameworks, and research synthesis. Use for user research, persona creation, journey mapping, and design validation.
observability-designer
Observability Designer (POWERFUL)
n8n-workflow-automation
Designs and outputs n8n workflow JSON with robust triggers, idempotency, error handling, logging, retries, and human-in-the-loop review queues. Use when you need an auditable automation that won’t silently fail.
interview-system-designer
This skill should be used when the user asks to "design interview processes", "create hiring pipelines", "calibrate interview loops", "generate interview questions", "design competency matrices", "analyze interviewer bias", "create scoring rubrics", "build question banks", or "optimize hiring systems". Use for designing role-specific interview loops, competency assessments, and hiring calibration systems.
Git (Essentials + Workflows + Advanced)
Full version control coverage with essential commands, team workflows, branching strategies, and recovery techniques.
expo-cicd-workflows
Helps understand and write EAS workflow YAML files for Expo projects. Use this skill when the user asks about CI/CD or workflows in an Expo or EAS context, mentions .eas/workflows/, or wants help with EAS build pipelines or deployment automation.
experiment-designer
Use when planning product experiments, writing testable hypotheses, estimating sample size, prioritizing tests, or interpreting A/B outcomes with practical statistical rigor.
database-schema-designer
Database Schema Designer
database-designer
Database Designer - POWERFUL Tier Skill
automation-workflows
Design and implement automation workflows to save time and scale operations as a solopreneur. Use when identifying repetitive tasks to automate, building workflows across tools, setting up triggers and actions, or optimizing existing automations. Covers automation opportunity identification, workflow design, tool selection (Zapier, Make, n8n), testing, and maintenance. Trigger on "automate", "automation", "workflow automation", "save time", "reduce manual work", "automate my business", "no-code automation".
agent-designer
Agent Designer - Multi-Agent System Architecture
youtube-watcher
Fetch and read transcripts from YouTube videos. Use when you need to summarize a video, answer questions about its content, or extract information from it.