river-ts-streaming
Type-safe Server-Sent Events (SSE) and WebSocket communication using river.ts library. Use when working with this codebase to: (1) Define typed event schemas with RiverEvents builder, (2) Implement SSE streaming on server with RiverEmitter, (3) Consume SSE streams on client with RiverClient, (4) Handle WebSocket communication with RiverSocketAdapter, (5) Implement request/response RPC patterns over WebSocket, (6) Work with chunked/streamed data events.
Best use case
river-ts-streaming is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Type-safe Server-Sent Events (SSE) and WebSocket communication using river.ts library. Use when working with this codebase to: (1) Define typed event schemas with RiverEvents builder, (2) Implement SSE streaming on server with RiverEmitter, (3) Consume SSE streams on client with RiverClient, (4) Handle WebSocket communication with RiverSocketAdapter, (5) Implement request/response RPC patterns over WebSocket, (6) Work with chunked/streamed data events.
Teams using river-ts-streaming should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/river-ts-streaming/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How river-ts-streaming Compares
| Feature / Agent | river-ts-streaming | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Type-safe Server-Sent Events (SSE) and WebSocket communication using river.ts library. Use when working with this codebase to: (1) Define typed event schemas with RiverEvents builder, (2) Implement SSE streaming on server with RiverEmitter, (3) Consume SSE streams on client with RiverClient, (4) Handle WebSocket communication with RiverSocketAdapter, (5) Implement request/response RPC patterns over WebSocket, (6) Work with chunked/streamed data events.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
## Quick Reference
river.ts provides three main components:
- `RiverEvents` - Type-safe event schema builder
- `RiverEmitter` - Server-side SSE streaming
- `RiverClient` - Client-side SSE consumption
- `RiverSocketAdapter` - WebSocket message handling with request/response support
## Event Definition
Define events using the builder pattern:
```typescript
import { RiverEvents } from 'river.ts';
const events = new RiverEvents()
.defineEvent('message', { message: 'Hello' })
.defineEvent('data', { data: {} as { id: number; name: string } })
.defineEvent('stream', { data: [] as string[], stream: true, chunkSize: 100 })
// Request/response pattern with explicit response type
.defineEvent('rpc.call', {
data: {} as { method: string; params: unknown },
response: {} as { result: unknown; error?: string }
})
.build();
```
Reserved event types: `close`, `error` - do not define these.
## Server-Side SSE (RiverEmitter)
```typescript
import { RiverEmitter } from 'river.ts/server';
const emitter = RiverEmitter.init(events);
// Create SSE stream for HTTP response
const stream = emitter.stream({
callback: async (emit, clientId) => {
await emit('message', { message: 'Connected' });
await emit('data', { data: { id: 1, name: 'test' } });
},
clientId: 'optional-custom-id',
ondisconnect: (clientId) => console.log(`${clientId} disconnected`)
});
return new Response(stream, { headers: emitter.headers() });
// Broadcast to all clients
await emitter.broadcast('message', { message: 'Update' });
// Send to specific client
await emitter.sendToClient('client-id', 'data', { data: { id: 2, name: 'specific' } });
```
## Client-Side SSE (RiverClient)
```typescript
import { RiverClient } from 'river.ts/client';
const client = RiverClient.init(events, { reconnect: true });
client
.prepare('http://localhost:3000/events', { method: 'GET' })
.on('message', (data) => console.log(data.message))
.on('data', (data) => console.log(data.id, data.name))
.stream();
// Close connection
client.close();
```
## WebSocket Adapter (RiverSocketAdapter)
```typescript
import { RiverSocketAdapter } from 'river.ts/websocket';
const adapter = new RiverSocketAdapter(events, { debug: false });
// Register event handlers
adapter.on('message', (data) => console.log(data));
adapter.off('message', handler); // Unregister
// Handle incoming messages (call from ws.onmessage)
adapter.handleMessage(messageData);
// Send messages
adapter.send('data', { data: { id: 1, name: 'test' } }, (msg) => ws.send(msg));
```
## WebSocket Request/Response Pattern
For RPC-style communication with automatic type inference:
```typescript
import {
RiverSocketAdapter,
RequestTimeoutError,
WebSocketClosedError
} from 'river.ts/websocket';
// Events with explicit response types
const events = new RiverEvents()
.defineEvent('instance.spawn', {
data: {} as { cwd: string },
response: {} as { instanceId: string; status: 'created' | 'error' }
})
.build();
const adapter = new RiverSocketAdapter(events);
// Route messages through adapter
ws.onmessage = (e) => adapter.handleMessage(e.data);
ws.onclose = () => adapter.clearPendingRequests();
// Make request - response type is inferred from event definition
const response = await adapter.request(
'instance.spawn',
{ cwd: '/app' },
(msg) => ws.send(msg),
10000 // timeout in ms (default: 30000)
);
// response is typed as { instanceId: string; status: 'created' | 'error' }
```
Wire format for request/response:
```json
// Request (outgoing)
{ "type": "instance.spawn", "data": { "cwd": "/app" }, "id": "uuid" }
// Response (incoming) - server echoes back the id
{ "type": "instance.spawn", "data": { "instanceId": "123", "status": "created" }, "id": "uuid" }
```
## Key Types
```typescript
import { EventData, ResponseData, EmitPayload } from 'river.ts';
// EventData<T, K> - Extract data type for receiving/handling
// ResponseData<T, K> - Extract response type for request() return value
// EmitPayload<T, K> - Extract payload type for emitting (excludes type/stream/chunkSize)
```
## Project Structure
```
src/
├── index.ts # Main exports (RiverEvents, types)
├── builder.ts # RiverEvents builder class
├── client/ # RiverClient for SSE consumption
├── server/ # RiverEmitter for SSE streaming
├── websocket/ # RiverSocketAdapter for WebSocket
└── types/
├── core.ts # BaseEvent, EventMap, EventData, ResponseData
└── http.ts # HTTPMethods type
```
## Testing
Run tests with: `bun test`
Test files are in `tests/` directory. WebSocket request tests are in `tests/websocket/request.test.ts`.
## Build
Build with: `npm run build` (uses unbuild)
Output goes to `dist/` with separate entry points for `/client`, `/server`, `/websocket`.Related Skills
asciinema-streaming-backup
Real-time asciinema backup to GitHub orphan branch. TRIGGERS - streaming backup, asciinema backup, session backup, recording backup.
anthropic-streaming-patterns
Use when integrating Claude API with streaming responses, implementing tool execution in streams, tracking API costs, or encountering streaming errors - provides Anthropic SDK 0.30.1+ patterns with mandatory cost monitoring
add-driver
Scaffold a new LLM provider driver for Prompture. Creates sync + async driver classes, registers them in the driver registry, adds settings, env template, setup.py extras, package exports, discovery integration, and models.dev pricing. Use when adding support for a new LLM provider.
Adaptive Bitrate Streaming
Automatically adjusting video quality based on network conditions using HLS, DASH protocols and player implementation for smooth playback and optimal user experience.
bgo
Automates the complete Blender build-go workflow, from building and packaging your extension/add-on to removing old versions, installing, enabling, and launching Blender for quick testing and iteration.
bio-liquid-biopsy-pipeline
Cell-free DNA analysis pipeline from plasma sequencing to tumor monitoring. Preprocesses cfDNA reads, analyzes fragment patterns, estimates tumor fraction from sWGS, and optionally detects mutations from targeted panels. Use when analyzing liquid biopsy samples for cancer detection or monitoring.
bicep
Expert assistance for Azure Bicep infrastructure-as-code. Provides best practices for authoring Bicep templates, Azure resource type discovery with API versions, resource schema retrieval, and Azure Verified Modules (AVM) guidance. Use when writing Bicep files, deploying Azure resources, looking up resource types/schemas, or working with AVM modules.
bicep-diagrams
Generates architecture diagrams from Azure Bicep files. Use when user has .bicep files or asks to visualize Bicep infrastructure.
beat
16ステップビートを生成 (JSON + ASCII grid + MIDI + WAV)。スタイルプロンプトからビートを作成。トリガー: /beat, ビートを生成, ビートを作って
bats-testing-patterns
Master Bash Automated Testing System (Bats) for comprehensive shell script testing. Use when writing tests for shell scripts, CI/CD pipelines, or requiring test-driven development of shell utilities.
babysit-pr
Babysit a GitHub pull request after creation by continuously polling CI checks/workflow runs, new review comments, and mergeability state until the PR is ready to merge (or merged/closed). Diagnose failures, retry likely flaky failures up to 3 times, auto-fix/push branch-related issues when appropriate, and stop only when user help is required (for example CI infrastructure issues, exhausted flaky retries, or ambiguous/blocking situations). Use when the user asks Codex to monitor a PR, watch CI, handle review comments, or keep an eye on failures and feedback on an open PR.
azure-storage-file-share-py
Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.