multiAI Summary Pending

when-chaining-agent-pipelines-use-stream-chain

Chain agent outputs as inputs in sequential or parallel pipelines for data flow orchestration

231 stars

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/when-chaining-agent-pipelines-use-stream-chain/SKILL.md --create-dirs "https://raw.githubusercontent.com/aiskillstore/marketplace/main/skills/dnyoussef/when-chaining-agent-pipelines-use-stream-chain/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/when-chaining-agent-pipelines-use-stream-chain/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How when-chaining-agent-pipelines-use-stream-chain Compares

Feature / Agentwhen-chaining-agent-pipelines-use-stream-chainStandard Approach
Platform SupportmultiLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Chain agent outputs as inputs in sequential or parallel pipelines for data flow orchestration

Which AI agents support this skill?

This skill is compatible with multi.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Agent Pipeline Chaining SOP

## Overview

This skill implements agent pipeline chaining where outputs from one agent become inputs to the next, supporting both sequential and parallel execution patterns with streaming data flows.

## Agents & Responsibilities

### task-orchestrator
**Role:** Pipeline coordination and orchestration
**Responsibilities:**
- Design pipeline architecture
- Connect agent stages
- Monitor data flow
- Handle pipeline errors

### memory-coordinator
**Role:** Data flow and state management
**Responsibilities:**
- Store intermediate results
- Coordinate data passing
- Manage pipeline state
- Ensure data consistency

## Phase 1: Design Pipeline

### Objective
Design pipeline architecture with stages, data flows, and execution strategy.

### Scripts

```bash
# Design pipeline architecture
npx claude-flow@alpha pipeline design \
  --stages "research,analyze,code,test,review" \
  --flow sequential \
  --output pipeline-design.json

# Define data flow
npx claude-flow@alpha pipeline dataflow \
  --design pipeline-design.json \
  --output dataflow-spec.json

# Visualize pipeline
npx claude-flow@alpha pipeline visualize \
  --design pipeline-design.json \
  --output pipeline-diagram.png

# Store design in memory
npx claude-flow@alpha memory store \
  --key "pipeline/design" \
  --file pipeline-design.json
```

### Pipeline Patterns

**Sequential Pipeline:**
```
Agent1 → Agent2 → Agent3 → Agent4
```

**Parallel Pipeline:**
```
       ┌─ Agent2 ─┐
Agent1 ├─ Agent3 ─┤ Agent5
       └─ Agent4 ─┘
```

**Hybrid Pipeline:**
```
Agent1 → ┬─ Agent2 ─┐
         └─ Agent3 ─┴─ Agent4 → Agent5
```

## Phase 2: Connect Agents

### Objective
Connect agents with proper data flow channels and state management.

### Scripts

```bash
# Initialize pipeline
npx claude-flow@alpha pipeline init \
  --design pipeline-design.json

# Spawn pipeline agents
npx claude-flow@alpha agent spawn --type researcher --pipeline-stage 1
npx claude-flow@alpha agent spawn --type analyst --pipeline-stage 2
npx claude-flow@alpha agent spawn --type coder --pipeline-stage 3
npx claude-flow@alpha agent spawn --type tester --pipeline-stage 4

# Connect pipeline stages
npx claude-flow@alpha pipeline connect \
  --from-stage 1 --to-stage 2 \
  --data-channel "memory"

npx claude-flow@alpha pipeline connect \
  --from-stage 2 --to-stage 3 \
  --data-channel "stream"

# Verify connections
npx claude-flow@alpha pipeline status --show-connections
```

### Data Flow Mechanisms

**Memory-Based:**
```bash
# Agent 1 stores output
npx claude-flow@alpha memory store \
  --key "pipeline/stage-1/output" \
  --value "research findings..."

# Agent 2 retrieves input
npx claude-flow@alpha memory retrieve \
  --key "pipeline/stage-1/output"
```

**Stream-Based:**
```bash
# Agent 1 streams output
npx claude-flow@alpha stream write \
  --channel "stage-1-to-2" \
  --data "streaming data..."

# Agent 2 consumes stream
npx claude-flow@alpha stream read \
  --channel "stage-1-to-2"
```

## Phase 3: Execute Pipeline

### Objective
Execute pipeline with proper sequencing and data flow.

### Scripts

```bash
# Execute sequential pipeline
npx claude-flow@alpha pipeline execute \
  --design pipeline-design.json \
  --input initial-data.json \
  --strategy sequential

# Execute parallel pipeline
npx claude-flow@alpha pipeline execute \
  --design pipeline-design.json \
  --input initial-data.json \
  --strategy parallel \
  --max-parallelism 3

# Monitor execution
npx claude-flow@alpha pipeline monitor --interval 5

# Track stage progress
npx claude-flow@alpha pipeline stages --show-progress
```

### Execution Strategies

**Sequential:**
- Stages execute one after another
- Output of stage N is input to stage N+1
- Simple error handling
- Predictable execution time

**Parallel:**
- Independent stages execute simultaneously
- Outputs merged at synchronization points
- Complex error handling
- Faster overall execution

**Adaptive:**
- Dynamically switches between sequential and parallel
- Based on stage dependencies and resource availability
- Optimizes for throughput

## Phase 4: Monitor Streaming

### Objective
Monitor data flow and pipeline execution in real-time.

### Scripts

```bash
# Monitor data flow
npx claude-flow@alpha stream monitor \
  --all-channels \
  --interval 2 \
  --output stream-metrics.json

# Track stage throughput
npx claude-flow@alpha pipeline metrics \
  --metric throughput \
  --per-stage

# Monitor backpressure
npx claude-flow@alpha stream backpressure --detect

# Generate flow report
npx claude-flow@alpha pipeline report \
  --include-timing \
  --include-throughput \
  --output pipeline-report.md
```

### Key Metrics

- **Stage Throughput:** Items processed per minute per stage
- **Pipeline Latency:** End-to-end processing time
- **Backpressure:** Queue buildup at stage boundaries
- **Error Rate:** Failures per stage
- **Resource Utilization:** CPU/memory per agent

## Phase 5: Validate Results

### Objective
Validate pipeline outputs and ensure data integrity.

### Scripts

```bash
# Collect pipeline results
npx claude-flow@alpha pipeline results \
  --output pipeline-results.json

# Validate data integrity
npx claude-flow@alpha pipeline validate \
  --results pipeline-results.json \
  --schema validation-schema.json

# Compare with expected output
npx claude-flow@alpha pipeline compare \
  --actual pipeline-results.json \
  --expected expected-output.json

# Generate validation report
npx claude-flow@alpha pipeline report \
  --type validation \
  --output validation-report.md
```

## Success Criteria

- [ ] Pipeline design complete
- [ ] All stages connected
- [ ] Data flow functional
- [ ] Outputs validated
- [ ] Performance acceptable

### Performance Targets
- Stage latency: <30 seconds average
- Pipeline throughput: ≥10 items/minute
- Error rate: <2%
- Data integrity: 100%

## Best Practices

1. **Clear Stage Boundaries:** Each stage has single responsibility
2. **Data Validation:** Validate outputs before passing to next stage
3. **Error Handling:** Implement retry and fallback mechanisms
4. **Backpressure Management:** Prevent queue overflow
5. **Monitoring:** Track metrics continuously
6. **State Management:** Use memory coordination for state
7. **Testing:** Test each stage independently
8. **Documentation:** Document data schemas and flows

## Common Issues & Solutions

### Issue: Pipeline Stalls
**Symptoms:** Stages stop processing
**Solution:** Check for backpressure, increase buffer sizes

### Issue: Data Loss
**Symptoms:** Missing data in outputs
**Solution:** Implement acknowledgment mechanism, use reliable channels

### Issue: High Latency
**Symptoms:** Slow end-to-end processing
**Solution:** Identify bottleneck stage, add parallelism

## Integration Points

- **swarm-orchestration:** For complex multi-pipeline orchestration
- **advanced-swarm:** For optimized agent coordination
- **performance-analysis:** For bottleneck detection

## References

- Pipeline Design Patterns
- Stream Processing Theory
- Data Flow Architectures