river-ts-streaming

Type-safe Server-Sent Events (SSE) and WebSocket communication using river.ts library. Use when working with this codebase to: (1) Define typed event schemas with RiverEvents builder, (2) Implement SSE streaming on server with RiverEmitter, (3) Consume SSE streams on client with RiverClient, (4) Handle WebSocket communication with RiverSocketAdapter, (5) Implement request/response RPC patterns over WebSocket, (6) Work with chunked/streamed data events.

16 stars

Best use case

river-ts-streaming is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Type-safe Server-Sent Events (SSE) and WebSocket communication using river.ts library. Use when working with this codebase to: (1) Define typed event schemas with RiverEvents builder, (2) Implement SSE streaming on server with RiverEmitter, (3) Consume SSE streams on client with RiverClient, (4) Handle WebSocket communication with RiverSocketAdapter, (5) Implement request/response RPC patterns over WebSocket, (6) Work with chunked/streamed data events.

Teams using river-ts-streaming should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/river-ts-streaming/SKILL.md --create-dirs "https://raw.githubusercontent.com/diegosouzapw/awesome-omni-skill/main/skills/development/river-ts-streaming/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/river-ts-streaming/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How river-ts-streaming Compares

Feature / Agentriver-ts-streamingStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Type-safe Server-Sent Events (SSE) and WebSocket communication using river.ts library. Use when working with this codebase to: (1) Define typed event schemas with RiverEvents builder, (2) Implement SSE streaming on server with RiverEmitter, (3) Consume SSE streams on client with RiverClient, (4) Handle WebSocket communication with RiverSocketAdapter, (5) Implement request/response RPC patterns over WebSocket, (6) Work with chunked/streamed data events.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

## Quick Reference

river.ts provides three main components:
- `RiverEvents` - Type-safe event schema builder
- `RiverEmitter` - Server-side SSE streaming
- `RiverClient` - Client-side SSE consumption
- `RiverSocketAdapter` - WebSocket message handling with request/response support

## Event Definition

Define events using the builder pattern:

```typescript
import { RiverEvents } from 'river.ts';

const events = new RiverEvents()
  .defineEvent('message', { message: 'Hello' })
  .defineEvent('data', { data: {} as { id: number; name: string } })
  .defineEvent('stream', { data: [] as string[], stream: true, chunkSize: 100 })
  // Request/response pattern with explicit response type
  .defineEvent('rpc.call', {
    data: {} as { method: string; params: unknown },
    response: {} as { result: unknown; error?: string }
  })
  .build();
```

Reserved event types: `close`, `error` - do not define these.

## Server-Side SSE (RiverEmitter)

```typescript
import { RiverEmitter } from 'river.ts/server';

const emitter = RiverEmitter.init(events);

// Create SSE stream for HTTP response
const stream = emitter.stream({
  callback: async (emit, clientId) => {
    await emit('message', { message: 'Connected' });
    await emit('data', { data: { id: 1, name: 'test' } });
  },
  clientId: 'optional-custom-id',
  ondisconnect: (clientId) => console.log(`${clientId} disconnected`)
});

return new Response(stream, { headers: emitter.headers() });

// Broadcast to all clients
await emitter.broadcast('message', { message: 'Update' });

// Send to specific client
await emitter.sendToClient('client-id', 'data', { data: { id: 2, name: 'specific' } });
```

## Client-Side SSE (RiverClient)

```typescript
import { RiverClient } from 'river.ts/client';

const client = RiverClient.init(events, { reconnect: true });

client
  .prepare('http://localhost:3000/events', { method: 'GET' })
  .on('message', (data) => console.log(data.message))
  .on('data', (data) => console.log(data.id, data.name))
  .stream();

// Close connection
client.close();
```

## WebSocket Adapter (RiverSocketAdapter)

```typescript
import { RiverSocketAdapter } from 'river.ts/websocket';

const adapter = new RiverSocketAdapter(events, { debug: false });

// Register event handlers
adapter.on('message', (data) => console.log(data));
adapter.off('message', handler); // Unregister

// Handle incoming messages (call from ws.onmessage)
adapter.handleMessage(messageData);

// Send messages
adapter.send('data', { data: { id: 1, name: 'test' } }, (msg) => ws.send(msg));
```

## WebSocket Request/Response Pattern

For RPC-style communication with automatic type inference:

```typescript
import {
  RiverSocketAdapter,
  RequestTimeoutError,
  WebSocketClosedError
} from 'river.ts/websocket';

// Events with explicit response types
const events = new RiverEvents()
  .defineEvent('instance.spawn', {
    data: {} as { cwd: string },
    response: {} as { instanceId: string; status: 'created' | 'error' }
  })
  .build();

const adapter = new RiverSocketAdapter(events);

// Route messages through adapter
ws.onmessage = (e) => adapter.handleMessage(e.data);
ws.onclose = () => adapter.clearPendingRequests();

// Make request - response type is inferred from event definition
const response = await adapter.request(
  'instance.spawn',
  { cwd: '/app' },
  (msg) => ws.send(msg),
  10000 // timeout in ms (default: 30000)
);
// response is typed as { instanceId: string; status: 'created' | 'error' }
```

Wire format for request/response:
```json
// Request (outgoing)
{ "type": "instance.spawn", "data": { "cwd": "/app" }, "id": "uuid" }

// Response (incoming) - server echoes back the id
{ "type": "instance.spawn", "data": { "instanceId": "123", "status": "created" }, "id": "uuid" }
```

## Key Types

```typescript
import { EventData, ResponseData, EmitPayload } from 'river.ts';

// EventData<T, K> - Extract data type for receiving/handling
// ResponseData<T, K> - Extract response type for request() return value
// EmitPayload<T, K> - Extract payload type for emitting (excludes type/stream/chunkSize)
```

## Project Structure

```
src/
├── index.ts          # Main exports (RiverEvents, types)
├── builder.ts        # RiverEvents builder class
├── client/           # RiverClient for SSE consumption
├── server/           # RiverEmitter for SSE streaming
├── websocket/        # RiverSocketAdapter for WebSocket
└── types/
    ├── core.ts       # BaseEvent, EventMap, EventData, ResponseData
    └── http.ts       # HTTPMethods type
```

## Testing

Run tests with: `bun test`

Test files are in `tests/` directory. WebSocket request tests are in `tests/websocket/request.test.ts`.

## Build

Build with: `npm run build` (uses unbuild)

Output goes to `dist/` with separate entry points for `/client`, `/server`, `/websocket`.

Related Skills

asciinema-streaming-backup

16
from diegosouzapw/awesome-omni-skill

Real-time asciinema backup to GitHub orphan branch. TRIGGERS - streaming backup, asciinema backup, session backup, recording backup.

anthropic-streaming-patterns

16
from diegosouzapw/awesome-omni-skill

Use when integrating Claude API with streaming responses, implementing tool execution in streams, tracking API costs, or encountering streaming errors - provides Anthropic SDK 0.30.1+ patterns with mandatory cost monitoring

add-driver

16
from diegosouzapw/awesome-omni-skill

Scaffold a new LLM provider driver for Prompture. Creates sync + async driver classes, registers them in the driver registry, adds settings, env template, setup.py extras, package exports, discovery integration, and models.dev pricing. Use when adding support for a new LLM provider.

Adaptive Bitrate Streaming

16
from diegosouzapw/awesome-omni-skill

Automatically adjusting video quality based on network conditions using HLS, DASH protocols and player implementation for smooth playback and optimal user experience.

bgo

10
from diegosouzapw/awesome-omni-skill

Automates the complete Blender build-go workflow, from building and packaging your extension/add-on to removing old versions, installing, enabling, and launching Blender for quick testing and iteration.

Coding & Development

bio-liquid-biopsy-pipeline

16
from diegosouzapw/awesome-omni-skill

Cell-free DNA analysis pipeline from plasma sequencing to tumor monitoring. Preprocesses cfDNA reads, analyzes fragment patterns, estimates tumor fraction from sWGS, and optionally detects mutations from targeted panels. Use when analyzing liquid biopsy samples for cancer detection or monitoring.

bicep

16
from diegosouzapw/awesome-omni-skill

Expert assistance for Azure Bicep infrastructure-as-code. Provides best practices for authoring Bicep templates, Azure resource type discovery with API versions, resource schema retrieval, and Azure Verified Modules (AVM) guidance. Use when writing Bicep files, deploying Azure resources, looking up resource types/schemas, or working with AVM modules.

bicep-diagrams

16
from diegosouzapw/awesome-omni-skill

Generates architecture diagrams from Azure Bicep files. Use when user has .bicep files or asks to visualize Bicep infrastructure.

beat

16
from diegosouzapw/awesome-omni-skill

16ステップビートを生成 (JSON + ASCII grid + MIDI + WAV)。スタイルプロンプトからビートを作成。トリガー: /beat, ビートを生成, ビートを作って

bats-testing-patterns

16
from diegosouzapw/awesome-omni-skill

Master Bash Automated Testing System (Bats) for comprehensive shell script testing. Use when writing tests for shell scripts, CI/CD pipelines, or requiring test-driven development of shell utilities.

babysit-pr

16
from diegosouzapw/awesome-omni-skill

Babysit a GitHub pull request after creation by continuously polling CI checks/workflow runs, new review comments, and mergeability state until the PR is ready to merge (or merged/closed). Diagnose failures, retry likely flaky failures up to 3 times, auto-fix/push branch-related issues when appropriate, and stop only when user help is required (for example CI infrastructure issues, exhausted flaky retries, or ambiguous/blocking situations). Use when the user asks Codex to monitor a PR, watch CI, handle review comments, or keep an eye on failures and feedback on an open PR.

azure-storage-file-share-py

16
from diegosouzapw/awesome-omni-skill

Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.