pulse-mcp-stream

Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence

16 stars

Best use case

pulse-mcp-stream is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence

Teams using pulse-mcp-stream should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/pulse-mcp-stream/SKILL.md --create-dirs "https://raw.githubusercontent.com/plurigrid/asi/main/ies/music-topos/.agents/skills/pulse-mcp-stream/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/pulse-mcp-stream/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How pulse-mcp-stream Compares

Feature / Agentpulse-mcp-streamStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# pulse-mcp-stream

> Layer 1: Real-Time Social Stream Monitoring via MCP

**Version**: 1.1.0 (music-topos enhanced)
**Trit**: +1 (Generator - produces live data)
**Bundle**: acquisition

## Overview

Pulse-MCP-stream provides real-time monitoring of social interactions, enabling the cognitive surrogate system to stay updated with the latest patterns. It streams mentions, engagement changes, and trending topics.

## Enhanced Integration: MCP + DuckDB

### MCP Server (TypeScript)

```typescript
// pulse-mcp-server/src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server";
import { Firehose } from "@atproto/sync";
import * as duckdb from "duckdb";

const server = new Server({
  name: "pulse-mcp-stream",
  version: "1.0.0"
});

// Connect to DuckDB for persistence
const db = new duckdb.Database("pulse_stream.duckdb");

server.setRequestHandler("subscribe", async (params) => {
  const { actor, filters } = params;
  
  const firehose = new Firehose({
    service: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
  });
  
  firehose.on("create", async (event) => {
    if (event.author === actor) {
      // Store in DuckDB
      await db.run(`
        INSERT INTO pulse_events (event_id, event_type, actor_did, text, created_at)
        VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
      `, [event.uri, event.type, event.author, event.record?.text]);
    }
  });
  
  await firehose.start();
  return { status: "subscribed", actor };
});
```

### DuckDB Schema

```sql
CREATE TABLE pulse_events (
    event_id VARCHAR PRIMARY KEY,
    event_type VARCHAR,  -- 'post', 'reply', 'like', 'repost', 'mention'
    actor_did VARCHAR,
    actor_handle VARCHAR,
    subject_uri VARCHAR,
    text TEXT,
    created_at TIMESTAMP,
    ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    gay_color VARCHAR  -- Deterministic color via SPI seed
);

CREATE TABLE engagement_deltas (
    delta_id VARCHAR PRIMARY KEY,
    post_uri VARCHAR,
    likes_delta INT,
    reposts_delta INT,
    replies_delta INT,
    velocity FLOAT,  -- engagements per minute
    measured_at TIMESTAMP
);

-- Real-time velocity tracking
CREATE VIEW v_post_velocity AS
SELECT 
    post_uri,
    COUNT(*) FILTER (WHERE event_type = 'like') as likes,
    COUNT(*) FILTER (WHERE event_type = 'repost') as reposts,
    COUNT(*) / (EXTRACT(EPOCH FROM (MAX(created_at) - MIN(created_at))) / 60.0) as velocity_per_min
FROM pulse_events
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY post_uri;
```

### Python Client

```python
# pulse_client.py
import asyncio
import duckdb
from dataclasses import dataclass
from typing import AsyncIterator

@dataclass
class PulseEvent:
    event_id: str
    event_type: str
    actor: str
    text: str
    created_at: str

class PulseClient:
    def __init__(self, db_path: str = "pulse_stream.duckdb", seed: int = 0xf061ebbc2ca74d78):
        self.db = duckdb.connect(db_path)
        self.seed = seed
    
    async def subscribe_actor(self, actor: str) -> AsyncIterator[PulseEvent]:
        """Subscribe to real-time updates for a user."""
        # Poll DuckDB for new events
        last_id = ""
        while True:
            result = self.db.execute("""
                SELECT * FROM pulse_events 
                WHERE actor_handle = ? AND event_id > ?
                ORDER BY created_at
                LIMIT 10
            """, [actor, last_id]).fetchall()
            
            for row in result:
                last_id = row[0]
                yield PulseEvent(*row[:5])
            
            await asyncio.sleep(1)
    
    async def detect_trends(self, center_user: str, window_minutes: int = 60):
        """Detect trending topics in user's network."""
        return self.db.execute("""
            WITH word_counts AS (
                SELECT 
                    UNNEST(STRING_SPLIT(LOWER(text), ' ')) as word,
                    COUNT(*) as mentions
                FROM pulse_events
                WHERE created_at > NOW() - INTERVAL ? MINUTE
                GROUP BY word
            )
            SELECT word, mentions
            FROM word_counts
            WHERE LENGTH(word) > 3
            ORDER BY mentions DESC
            LIMIT 10
        """, [window_minutes]).fetchall()
```

### Ruby Integration

```ruby
# lib/pulse_stream.rb
require 'duckdb'

module PulseStream
  def self.connect(db_path: "pulse_stream.duckdb")
    @db = DuckDB::Database.open(db_path)
    @conn = @db.connect
  end
  
  def self.latest_events(actor:, limit: 10)
    @conn.query(<<~SQL, actor, limit)
      SELECT event_id, event_type, text, created_at
      FROM pulse_events
      WHERE actor_handle = ?
      ORDER BY created_at DESC
      LIMIT ?
    SQL
  end
  
  def self.velocity(post_uri:)
    result = @conn.query(<<~SQL, post_uri)
      SELECT velocity_per_min FROM v_post_velocity WHERE post_uri = ?
    SQL
    result.first&.first || 0.0
  end
  
  def self.viral?(post_uri:, threshold: 5.0)
    velocity(post_uri: post_uri) > threshold
  end
end
```

## GF(3) Triad Integration

| Trit | Skill | Role |
|------|-------|------|
| -1 | influence-propagation | Validates network patterns |
| 0 | bisimulation-game | Coordinates equivalence |
| +1 | **pulse-mcp-stream** | Generates live data |

**Conservation**: (-1) + (0) + (+1) = 0 ✓

## MCP Configuration

```json
{
  "mcpServers": {
    "pulse": {
      "command": "node",
      "args": ["pulse-mcp-server/dist/index.js"],
      "env": {
        "DUCKDB_PATH": "pulse_stream.duckdb",
        "GAY_SEED": "0xf061ebbc2ca74d78"
      }
    }
  }
}
```

## Justfile Recipes

```makefile
# Start pulse stream
pulse-start actor="barton.bsky.social":
    python3 -c "import asyncio; from pulse_client import PulseClient; asyncio.run(PulseClient().subscribe_actor('{{actor}}'))"

# Check velocity
pulse-velocity uri:
    ruby -I lib -r pulse_stream -e "PulseStream.connect; puts PulseStream.velocity(post_uri: '{{uri}}')"

# Detect trends
pulse-trends window="60":
    duckdb pulse_stream.duckdb -c "SELECT * FROM v_post_velocity WHERE velocity_per_min > 1.0 LIMIT 10"
```

## Related Skills

- `atproto-ingest` (Layer 1) - Batch data collection
- `influence-propagation` (Layer 7) - Network analysis
- `cognitive-surrogate` (Layer 6) - Pattern consumption
- `duckdb-temporal-versioning` - Time-travel queries