pulse-mcp-stream
Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence
Best use case
pulse-mcp-stream is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence
Teams using pulse-mcp-stream should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/pulse-mcp-stream/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How pulse-mcp-stream Compares
| Feature / Agent | pulse-mcp-stream | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# pulse-mcp-stream
> Layer 1: Real-Time Social Stream Monitoring via MCP
**Version**: 1.1.0 (music-topos enhanced)
**Trit**: +1 (Generator - produces live data)
**Bundle**: acquisition
## Overview
Pulse-MCP-stream provides real-time monitoring of social interactions, enabling the cognitive surrogate system to stay updated with the latest patterns. It streams mentions, engagement changes, and trending topics.
## Enhanced Integration: MCP + DuckDB
### MCP Server (TypeScript)
```typescript
// pulse-mcp-server/src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server";
import { Firehose } from "@atproto/sync";
import * as duckdb from "duckdb";
const server = new Server({
name: "pulse-mcp-stream",
version: "1.0.0"
});
// Connect to DuckDB for persistence
const db = new duckdb.Database("pulse_stream.duckdb");
server.setRequestHandler("subscribe", async (params) => {
const { actor, filters } = params;
const firehose = new Firehose({
service: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
});
firehose.on("create", async (event) => {
if (event.author === actor) {
// Store in DuckDB
await db.run(`
INSERT INTO pulse_events (event_id, event_type, actor_did, text, created_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
`, [event.uri, event.type, event.author, event.record?.text]);
}
});
await firehose.start();
return { status: "subscribed", actor };
});
```
### DuckDB Schema
```sql
CREATE TABLE pulse_events (
event_id VARCHAR PRIMARY KEY,
event_type VARCHAR, -- 'post', 'reply', 'like', 'repost', 'mention'
actor_did VARCHAR,
actor_handle VARCHAR,
subject_uri VARCHAR,
text TEXT,
created_at TIMESTAMP,
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
gay_color VARCHAR -- Deterministic color via SPI seed
);
CREATE TABLE engagement_deltas (
delta_id VARCHAR PRIMARY KEY,
post_uri VARCHAR,
likes_delta INT,
reposts_delta INT,
replies_delta INT,
velocity FLOAT, -- engagements per minute
measured_at TIMESTAMP
);
-- Real-time velocity tracking
CREATE VIEW v_post_velocity AS
SELECT
post_uri,
COUNT(*) FILTER (WHERE event_type = 'like') as likes,
COUNT(*) FILTER (WHERE event_type = 'repost') as reposts,
COUNT(*) / (EXTRACT(EPOCH FROM (MAX(created_at) - MIN(created_at))) / 60.0) as velocity_per_min
FROM pulse_events
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY post_uri;
```
### Python Client
```python
# pulse_client.py
import asyncio
import duckdb
from dataclasses import dataclass
from typing import AsyncIterator
@dataclass
class PulseEvent:
event_id: str
event_type: str
actor: str
text: str
created_at: str
class PulseClient:
def __init__(self, db_path: str = "pulse_stream.duckdb", seed: int = 0xf061ebbc2ca74d78):
self.db = duckdb.connect(db_path)
self.seed = seed
async def subscribe_actor(self, actor: str) -> AsyncIterator[PulseEvent]:
"""Subscribe to real-time updates for a user."""
# Poll DuckDB for new events
last_id = ""
while True:
result = self.db.execute("""
SELECT * FROM pulse_events
WHERE actor_handle = ? AND event_id > ?
ORDER BY created_at
LIMIT 10
""", [actor, last_id]).fetchall()
for row in result:
last_id = row[0]
yield PulseEvent(*row[:5])
await asyncio.sleep(1)
async def detect_trends(self, center_user: str, window_minutes: int = 60):
"""Detect trending topics in user's network."""
return self.db.execute("""
WITH word_counts AS (
SELECT
UNNEST(STRING_SPLIT(LOWER(text), ' ')) as word,
COUNT(*) as mentions
FROM pulse_events
WHERE created_at > NOW() - INTERVAL ? MINUTE
GROUP BY word
)
SELECT word, mentions
FROM word_counts
WHERE LENGTH(word) > 3
ORDER BY mentions DESC
LIMIT 10
""", [window_minutes]).fetchall()
```
### Ruby Integration
```ruby
# lib/pulse_stream.rb
require 'duckdb'
module PulseStream
def self.connect(db_path: "pulse_stream.duckdb")
@db = DuckDB::Database.open(db_path)
@conn = @db.connect
end
def self.latest_events(actor:, limit: 10)
@conn.query(<<~SQL, actor, limit)
SELECT event_id, event_type, text, created_at
FROM pulse_events
WHERE actor_handle = ?
ORDER BY created_at DESC
LIMIT ?
SQL
end
def self.velocity(post_uri:)
result = @conn.query(<<~SQL, post_uri)
SELECT velocity_per_min FROM v_post_velocity WHERE post_uri = ?
SQL
result.first&.first || 0.0
end
def self.viral?(post_uri:, threshold: 5.0)
velocity(post_uri: post_uri) > threshold
end
end
```
## GF(3) Triad Integration
| Trit | Skill | Role |
|------|-------|------|
| -1 | influence-propagation | Validates network patterns |
| 0 | bisimulation-game | Coordinates equivalence |
| +1 | **pulse-mcp-stream** | Generates live data |
**Conservation**: (-1) + (0) + (+1) = 0 ✓
## MCP Configuration
```json
{
"mcpServers": {
"pulse": {
"command": "node",
"args": ["pulse-mcp-server/dist/index.js"],
"env": {
"DUCKDB_PATH": "pulse_stream.duckdb",
"GAY_SEED": "0xf061ebbc2ca74d78"
}
}
}
}
```
## Justfile Recipes
```makefile
# Start pulse stream
pulse-start actor="barton.bsky.social":
python3 -c "import asyncio; from pulse_client import PulseClient; asyncio.run(PulseClient().subscribe_actor('{{actor}}'))"
# Check velocity
pulse-velocity uri:
ruby -I lib -r pulse_stream -e "PulseStream.connect; puts PulseStream.velocity(post_uri: '{{uri}}')"
# Detect trends
pulse-trends window="60":
duckdb pulse_stream.duckdb -c "SELECT * FROM v_post_velocity WHERE velocity_per_min > 1.0 LIMIT 10"
```
## Related Skills
- `atproto-ingest` (Layer 1) - Batch data collection
- `influence-propagation` (Layer 7) - Network analysis
- `cognitive-surrogate` (Layer 6) - Pattern consumption
- `duckdb-temporal-versioning` - Time-travel queriesRelated Skills
bluesky-jetstream
Bluesky Jetstream Firehose Skill
discohy-streams
DisCoPy categorical color streams via Hy with 3 parallel TAP streams
zx-calculus
Coecke's ZX-calculus for quantum circuit reasoning via string diagrams with Z-spiders (green) and X-spiders (red)
zulip-cogen
Zulip Cogen Skill 🐸⚡
zls-integration
zls-integration skill
zig
zig skill
zig-syrup-bci
Multimodal BCI pipeline in Zig: DSI-24 EEG, fNIRS mBLL, eye tracking IVT, LSL sync, EDF read/write, GF(3) conservation
zig-programming
zig-programming skill
zeroth-bot
Zeroth Bot - 3D-printed open-source humanoid robot platform for sim-to-real and RL research. Affordable entry point for humanoid robotics.
xlsx
Comprehensive spreadsheet creation, editing, and analysis with support
wycheproof
Google's Wycheproof test vectors for cryptographic implementation testing.
Writing Hookify Rules
This skill should be used when the user asks to "create a hookify rule", "write a hook rule", "configure hookify", "add a hookify rule", or needs guidance on hookify rule syntax and patterns.