azure-eventhub-ts

High-throughput event streaming and real-time data ingestion.

31,392 stars
Complexity: medium

About this skill

This skill empowers an AI agent to interact with Azure Event Hubs, Microsoft's highly scalable data streaming platform. Leveraging the Azure Event Hubs SDK for TypeScript, agents can perform robust event publishing and consumption. It supports sending high volumes of events to an Event Hub and reliably processing incoming events from consumer groups, including checkpointing capabilities using Azure Blob Storage. This integration extends an agent's ability to participate in event-driven architectures, handle real-time data streams from IoT devices, process application logs, and support other data ingestion scenarios requiring high throughput and low latency.

Best use case

Ingesting real-time data streams from various sources like IoT devices, sensors, or applications. Implementing event-driven architectures where an AI agent needs to react to or publish continuous streams of data. Collecting and processing large volumes of application logs or telemetry data in real-time. Building data pipelines for analytics or machine learning that require efficient, high-throughput data ingestion. Enabling agents to communicate asynchronously via a scalable event bus.

High-throughput event streaming and real-time data ingestion.

Successful publication of events to a specified Azure Event Hub, contributing to a real-time data stream. Reliable consumption and processing of events from an Azure Event Hub, with proper checkpointing to ensure data integrity and continuity. Integration of real-time data streams into AI agent workflows for immediate analysis or action.

Practical example

Example input

Publish a JSON event to the 'iot-telemetry' Event Hub with data: { "device_id": "temp-sensor-001", "temperature": 22.7, "humidity": 60.1, "timestamp": "2023-10-27T14:30:00Z" } and ensure it's partitioned by 'device_id'.

Example output

Event successfully published to Azure Event Hub 'iot-telemetry'. Partition ID: 3. Acknowledgement received. Ready for next event.

When to use this skill

  • When an AI agent needs to send or receive events from Azure Event Hubs.
  • For applications or workflows requiring real-time processing of large volumes of event data.
  • When building scalable, event-driven solutions on the Microsoft Azure platform.
  • When reliable event consumption with consumer groups and checkpointing for fault tolerance is critical.

When not to use this skill

  • For small-scale, infrequent data transfers that do not require high-throughput event streaming capabilities.
  • If the target cloud provider is not Azure or if an alternative streaming technology (e.g., Apache Kafka, AWS Kinesis) is already in use.
  • For simple request-response operations where a direct API call or a message queue might be more appropriate.
  • If the AI agent lacks the necessary Azure credentials and permissions to access Event Hubs and Blob Storage.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/azure-eventhub-ts/SKILL.md --create-dirs "https://raw.githubusercontent.com/sickn33/antigravity-awesome-skills/main/plugins/antigravity-awesome-skills-claude/skills/azure-eventhub-ts/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/azure-eventhub-ts/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How azure-eventhub-ts Compares

Feature / Agentazure-eventhub-tsStandard Approach
Platform SupportClaudeLimited / Varies
Context Awareness High Baseline
Installation ComplexitymediumN/A

Frequently Asked Questions

What does this skill do?

High-throughput event streaming and real-time data ingestion.

Which AI agents support this skill?

This skill is designed for Claude.

How difficult is it to install?

The installation complexity is rated as medium. You can find the installation instructions above.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# Azure Event Hubs SDK for TypeScript

High-throughput event streaming and real-time data ingestion.

## Installation

```bash
npm install @azure/event-hubs @azure/identity
```

For checkpointing with consumer groups:
```bash
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
```

## Environment Variables

```bash
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints
```

## Authentication

```typescript
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";

const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
const credential = new DefaultAzureCredential();

// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);

// Consumer
const consumer = new EventHubConsumerClient(
  "$Default", // Consumer group
  fullyQualifiedNamespace,
  eventHubName,
  credential
);
```

## Core Workflow

### Send Events

```typescript
const producer = new EventHubProducerClient(namespace, eventHubName, credential);

// Create batch and add events
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });

await producer.sendBatch(batch);
await producer.close();
```

### Send to Specific Partition

```typescript
// By partition ID
const batch = await producer.createBatch({ partitionId: "0" });

// By partition key (consistent hashing)
const batch = await producer.createBatch({ partitionKey: "device-123" });
```

### Receive Events (Simple)

```typescript
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);

const subscription = consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
    }
  },
  processError: async (err, context) => {
    console.error(`Error on partition ${context.partitionId}: ${err.message}`);
  },
});

// Stop after some time
setTimeout(async () => {
  await subscription.close();
  await consumer.close();
}, 60000);
```

### Receive with Checkpointing (Production)

```typescript
import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

const containerClient = new ContainerClient(
  `https://${storageAccount}.blob.core.windows.net/${containerName}`,
  credential
);

const checkpointStore = new BlobCheckpointStore(containerClient);

const consumer = new EventHubConsumerClient(
  "$Default",
  namespace,
  eventHubName,
  credential,
  checkpointStore
);

const subscription = consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Processing: ${JSON.stringify(event.body)}`);
    }
    // Checkpoint after processing batch
    if (events.length > 0) {
      await context.updateCheckpoint(events[events.length - 1]);
    }
  },
  processError: async (err, context) => {
    console.error(`Error: ${err.message}`);
  },
});
```

### Receive from Specific Position

```typescript
const subscription = consumer.subscribe({
  processEvents: async (events, context) => { /* ... */ },
  processError: async (err, context) => { /* ... */ },
}, {
  startPosition: {
    // Start from beginning
    "0": { offset: "@earliest" },
    // Start from end (new events only)
    "1": { offset: "@latest" },
    // Start from specific offset
    "2": { offset: "12345" },
    // Start from specific time
    "3": { enqueuedOn: new Date("2024-01-01") },
  },
});
```

## Event Hub Properties

```typescript
// Get hub info
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);

// Get partition info
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);
```

## Batch Processing Options

```typescript
const subscription = consumer.subscribe(
  {
    processEvents: async (events, context) => { /* ... */ },
    processError: async (err, context) => { /* ... */ },
  },
  {
    maxBatchSize: 100,           // Max events per batch
    maxWaitTimeInSeconds: 30,    // Max wait for batch
  }
);
```

## Key Types

```typescript
import {
  EventHubProducerClient,
  EventHubConsumerClient,
  EventData,
  ReceivedEventData,
  PartitionContext,
  Subscription,
  SubscriptionEventHandlers,
  CreateBatchOptions,
  EventPosition,
} from "@azure/event-hubs";

import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
```

## Event Properties

```typescript
// Send with properties
const batch = await producer.createBatch();
batch.tryAdd({
  body: { data: "payload" },
  properties: {
    eventType: "telemetry",
    deviceId: "sensor-1",
  },
  contentType: "application/json",
  correlationId: "request-123",
});

// Access in receiver
consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Type: ${event.properties?.eventType}`);
      console.log(`Sequence: ${event.sequenceNumber}`);
      console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
      console.log(`Offset: ${event.offset}`);
    }
  },
});
```

## Error Handling

```typescript
consumer.subscribe({
  processEvents: async (events, context) => {
    try {
      for (const event of events) {
        await processEvent(event);
      }
      await context.updateCheckpoint(events[events.length - 1]);
    } catch (error) {
      // Don't checkpoint on error - events will be reprocessed
      console.error("Processing failed:", error);
    }
  },
  processError: async (err, context) => {
    if (err.name === "MessagingError") {
      // Transient error - SDK will retry
      console.warn("Transient error:", err.message);
    } else {
      // Fatal error
      console.error("Fatal error:", err);
    }
  },
});
```

## Best Practices

1. **Use checkpointing** - Always checkpoint in production for exactly-once processing
2. **Batch sends** - Use `createBatch()` for efficient sending
3. **Partition keys** - Use partition keys to ensure ordering for related events
4. **Consumer groups** - Use separate consumer groups for different processing pipelines
5. **Handle errors gracefully** - Don't checkpoint on processing failures
6. **Close clients** - Always close producer/consumer when done
7. **Monitor lag** - Track `lastEnqueuedSequenceNumber` vs processed sequence

## When to Use
This skill is applicable to execute the workflow or actions described in the overview.

Related Skills

microsoft-azure-webjobs-extensions-authentication-events-dotnet

31392
from sickn33/antigravity-awesome-skills

Microsoft Entra Authentication Events SDK for .NET. Azure Functions triggers for custom authentication extensions.

Identity Management / Authentication & AuthorizationClaude

azure-web-pubsub-ts

31392
from sickn33/antigravity-awesome-skills

Real-time messaging with WebSocket connections and pub/sub patterns.

Messaging & CommunicationClaude

azure-storage-queue-ts

31392
from sickn33/antigravity-awesome-skills

Azure Queue Storage JavaScript/TypeScript SDK (@azure/storage-queue) for message queue operations. Use for sending, receiving, peeking, and deleting messages in queues.

Cloud IntegrationClaude

azure-storage-queue-py

31392
from sickn33/antigravity-awesome-skills

Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.

Cloud IntegrationClaude

azure-storage-file-share-ts

31392
from sickn33/antigravity-awesome-skills

Azure File Share JavaScript/TypeScript SDK (@azure/storage-file-share) for SMB file share operations.

Cloud Storage ManagementClaude

azure-storage-file-share-py

31392
from sickn33/antigravity-awesome-skills

Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.

Cloud Storage ManagementClaude

azure-storage-file-datalake-py

31392
from sickn33/antigravity-awesome-skills

Azure Data Lake Storage Gen2 SDK for Python. Use for hierarchical file systems, big data analytics, and file/directory operations.

Cloud Storage ManagementClaude

azure-storage-blob-ts

31392
from sickn33/antigravity-awesome-skills

Azure Blob Storage JavaScript/TypeScript SDK (@azure/storage-blob) for blob operations. Use for uploading, downloading, listing, and managing blobs and containers.

Cloud Storage ManagementClaude

azure-storage-blob-rust

31392
from sickn33/antigravity-awesome-skills

Azure Blob Storage SDK for Rust. Use for uploading, downloading, and managing blobs and containers.

Cloud Storage ManagementClaude

azure-storage-blob-py

31392
from sickn33/antigravity-awesome-skills

Azure Blob Storage SDK for Python. Use for uploading, downloading, listing blobs, managing containers, and blob lifecycle.

Cloud Storage ManagementClaude

azure-speech-to-text-rest-py

31392
from sickn33/antigravity-awesome-skills

Azure Speech to Text REST API for short audio (Python). Use for simple speech recognition of audio files up to 60 seconds without the Speech SDK.

Speech-to-TextClaude

azure-servicebus-py

31392
from sickn33/antigravity-awesome-skills

Azure Service Bus SDK for Python messaging. Use for queues, topics, subscriptions, and enterprise messaging patterns.

Cloud MessagingClaude