anth-core-workflow-b

Build Claude streaming and Message Batches API workflows. Use when implementing real-time streaming responses, SSE event handling, or processing bulk requests with the 50% cheaper Batches API. Trigger with phrases like "claude streaming", "anthropic batch", "message batches api", "SSE events anthropic", "stream claude response".

1,868 stars

Best use case

anth-core-workflow-b is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Build Claude streaming and Message Batches API workflows. Use when implementing real-time streaming responses, SSE event handling, or processing bulk requests with the 50% cheaper Batches API. Trigger with phrases like "claude streaming", "anthropic batch", "message batches api", "SSE events anthropic", "stream claude response".

Teams using anth-core-workflow-b should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/anth-core-workflow-b/SKILL.md --create-dirs "https://raw.githubusercontent.com/jeremylongshore/claude-code-plugins-plus-skills/main/plugins/saas-packs/anthropic-pack/skills/anth-core-workflow-b/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/anth-core-workflow-b/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How anth-core-workflow-b Compares

Feature / Agentanth-core-workflow-bStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Build Claude streaming and Message Batches API workflows. Use when implementing real-time streaming responses, SSE event handling, or processing bulk requests with the 50% cheaper Batches API. Trigger with phrases like "claude streaming", "anthropic batch", "message batches api", "SSE events anthropic", "stream claude response".

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# Anthropic Core Workflow B — Streaming & Batches

## Overview

Two complementary patterns: real-time streaming for interactive UIs (SSE events via `POST /v1/messages` with `stream: true`) and the Message Batches API (`POST /v1/messages/batches`) for processing up to 100,000 requests asynchronously at 50% cost reduction.

## Prerequisites

- Completed `anth-install-auth` setup
- Familiarity with `anth-core-workflow-a` (Messages API basics)
- For batches: understanding of async/polling patterns

## Instructions

### Streaming — Python SDK

```python
import anthropic

client = anthropic.Anthropic()

# Method 1: High-level streaming (recommended)
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=2048,
    messages=[{"role": "user", "content": "Write a short story about a robot."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    # After stream completes, access full message
    final_message = stream.get_final_message()
    print(f"\nUsage: {final_message.usage.input_tokens}+{final_message.usage.output_tokens}")

# Method 2: Event-level streaming (for custom event handling)
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=2048,
    messages=[{"role": "user", "content": "Explain REST APIs."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="")
        elif event.type == "message_stop":
            print("\n[Stream complete]")
```

### Streaming — TypeScript SDK

```typescript
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

// High-level streaming
const stream = client.messages.stream({
  model: 'claude-sonnet-4-20250514',
  max_tokens: 2048,
  messages: [{ role: 'user', content: 'Write a haiku about code.' }],
});

stream.on('text', (text) => process.stdout.write(text));
stream.on('finalMessage', (msg) => {
  console.log(`\nTokens: ${msg.usage.input_tokens}+${msg.usage.output_tokens}`);
});

await stream.finalMessage();
```

### Streaming with Tool Use

```python
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,  # Same tools array from core-workflow-a
    messages=[{"role": "user", "content": "What's the weather?"}]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                print(f"Tool call: {event.content_block.name}")
        elif event.type == "content_block_delta":
            if event.delta.type == "input_json_delta":
                print(event.delta.partial_json, end="")  # Tool input arrives incrementally
```

### Message Batches API — Bulk Processing

```python
# Create a batch of up to 100,000 requests (50% cost savings)
batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": "req-001",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Summarize: ...article1..."}]
            }
        },
        {
            "custom_id": "req-002",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Summarize: ...article2..."}]
            }
        },
        # ... up to 100,000 requests
    ]
)

print(f"Batch ID: {batch.id}")          # msgbatch_01HBMt...
print(f"Status: {batch.processing_status}")  # in_progress
print(f"Counts: {batch.request_counts}")     # {processing: 2, succeeded: 0, ...}
```

### Poll for Batch Completion

```python
import time

while True:
    batch_status = client.messages.batches.retrieve(batch.id)
    if batch_status.processing_status == "ended":
        break
    print(f"Processing... {batch_status.request_counts}")
    time.sleep(30)

# Stream results (returns JSONL)
for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        text = result.result.message.content[0].text
        print(f"[{result.custom_id}]: {text[:100]}...")
    elif result.result.type == "errored":
        print(f"[{result.custom_id}] ERROR: {result.result.error}")
```

## SSE Event Types Reference

| Event | Description | Key Fields |
|-------|-------------|------------|
| `message_start` | Stream begins | `message.id`, `message.model`, `message.usage` |
| `content_block_start` | New content block | `content_block.type` (text/tool_use) |
| `content_block_delta` | Incremental content | `delta.text` or `delta.partial_json` |
| `content_block_stop` | Block complete | `index` |
| `message_delta` | Message-level update | `delta.stop_reason`, `usage.output_tokens` |
| `message_stop` | Stream complete | (empty) |
| `ping` | Keepalive | (empty) |

## Error Handling

| Error | Cause | Solution |
|-------|-------|----------|
| Stream disconnects mid-response | Network timeout | Implement reconnection with partial content |
| Batch `expired` status | Not processed within 24h | Resubmit batch |
| `errored` results in batch | Individual request invalid | Check `result.error` for each failed request |
| 429 on batch creation | Too many concurrent batches | Wait; limit is ~100 concurrent batches |

## Resources

- [Streaming Messages](https://docs.anthropic.com/en/api/messages-streaming)
- [Message Batches API](https://docs.anthropic.com/en/api/creating-message-batches)
- [Batch Results](https://docs.anthropic.com/en/api/retrieving-message-batch-results)

## Next Steps

For common errors, see `anth-common-errors`.

Related Skills

calendar-to-workflow

1868
from jeremylongshore/claude-code-plugins-plus-skills

Converts calendar events and schedules into Claude Code workflows, meeting prep documents, and standup notes. Use when the user mentions calendar events, meeting prep, standup generation, or scheduling workflows. Trigger with phrases like "prep for my meetings", "generate standup notes", "create workflow from calendar", or "summarize today's schedule".

workhuman-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Workhuman core workflow b for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow b".

workhuman-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Workhuman core workflow a for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow a".

wispr-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Wispr Flow core workflow b for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow b".

wispr-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Wispr Flow core workflow a for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow a".

windsurf-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Windsurf's secondary workflow: Workflows, Memories, and reusable automation. Use when creating reusable Cascade workflows, managing persistent memories, or automating repetitive development tasks. Trigger with phrases like "windsurf workflow", "windsurf automation", "windsurf memories", "cascade workflow", "windsurf slash command".

windsurf-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Windsurf's primary workflow: Cascade Write mode for multi-file agentic coding. Use when building features, refactoring across files, or performing complex code tasks. Trigger with phrases like "windsurf cascade write", "windsurf agentic coding", "windsurf multi-file edit", "cascade write mode", "windsurf build feature".

webflow-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Webflow secondary workflows — Sites management, Pages API, Forms submissions, Ecommerce (products/orders/inventory), and Custom Code via the Data API v2. Use when managing sites, reading pages, handling form data, or working with Webflow Ecommerce products and orders. Trigger with phrases like "webflow sites", "webflow pages", "webflow forms", "webflow ecommerce", "webflow products", "webflow orders".

webflow-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute the primary Webflow workflow — CMS content management: list collections, CRUD items, publish items, and manage content lifecycle via the Data API v2. Use when working with Webflow CMS collections and items, managing blog posts, team members, or any dynamic content. Trigger with phrases like "webflow CMS", "webflow collections", "webflow items", "create webflow content", "manage webflow CMS", "webflow content management".

veeva-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Veeva Vault core workflow b for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow b".

veeva-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Veeva Vault core workflow a for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow a".

vastai-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Vast.ai secondary workflow: multi-instance orchestration, spot recovery, and cost optimization. Use when running distributed training, handling spot preemption, or optimizing GPU spend across multiple instances. Trigger with phrases like "vastai distributed training", "vastai spot recovery", "vastai multi-gpu", "vastai cost optimization".