azure-eventhub-ts
High-throughput event streaming and real-time data ingestion.
About this skill
This skill empowers an AI agent to interact with Azure Event Hubs, Microsoft's highly scalable data streaming platform. Leveraging the Azure Event Hubs SDK for TypeScript, agents can perform robust event publishing and consumption. It supports sending high volumes of events to an Event Hub and reliably processing incoming events from consumer groups, including checkpointing capabilities using Azure Blob Storage. This integration extends an agent's ability to participate in event-driven architectures, handle real-time data streams from IoT devices, process application logs, and support other data ingestion scenarios requiring high throughput and low latency.
Best use case
Ingesting real-time data streams from various sources like IoT devices, sensors, or applications. Implementing event-driven architectures where an AI agent needs to react to or publish continuous streams of data. Collecting and processing large volumes of application logs or telemetry data in real-time. Building data pipelines for analytics or machine learning that require efficient, high-throughput data ingestion. Enabling agents to communicate asynchronously via a scalable event bus.
High-throughput event streaming and real-time data ingestion.
Successful publication of events to a specified Azure Event Hub, contributing to a real-time data stream. Reliable consumption and processing of events from an Azure Event Hub, with proper checkpointing to ensure data integrity and continuity. Integration of real-time data streams into AI agent workflows for immediate analysis or action.
Practical example
Example input
Publish a JSON event to the 'iot-telemetry' Event Hub with data: { "device_id": "temp-sensor-001", "temperature": 22.7, "humidity": 60.1, "timestamp": "2023-10-27T14:30:00Z" } and ensure it's partitioned by 'device_id'.Example output
Event successfully published to Azure Event Hub 'iot-telemetry'. Partition ID: 3. Acknowledgement received. Ready for next event.
When to use this skill
- When an AI agent needs to send or receive events from Azure Event Hubs.
- For applications or workflows requiring real-time processing of large volumes of event data.
- When building scalable, event-driven solutions on the Microsoft Azure platform.
- When reliable event consumption with consumer groups and checkpointing for fault tolerance is critical.
When not to use this skill
- For small-scale, infrequent data transfers that do not require high-throughput event streaming capabilities.
- If the target cloud provider is not Azure or if an alternative streaming technology (e.g., Apache Kafka, AWS Kinesis) is already in use.
- For simple request-response operations where a direct API call or a message queue might be more appropriate.
- If the AI agent lacks the necessary Azure credentials and permissions to access Event Hubs and Blob Storage.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/azure-eventhub-ts/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How azure-eventhub-ts Compares
| Feature / Agent | azure-eventhub-ts | Standard Approach |
|---|---|---|
| Platform Support | Claude | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | medium | N/A |
Frequently Asked Questions
What does this skill do?
High-throughput event streaming and real-time data ingestion.
Which AI agents support this skill?
This skill is designed for Claude.
How difficult is it to install?
The installation complexity is rated as medium. You can find the installation instructions above.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
AI Agents for Coding
Browse AI agent skills for coding, debugging, testing, refactoring, code review, and developer workflows across Claude, Cursor, and Codex.
Best AI Skills for Claude
Explore the best AI skills for Claude and Claude Code across coding, research, workflow automation, documentation, and agent operations.
AI Agent for YouTube Script Writing
Find AI agent skills for YouTube script writing, video research, content outlining, and repeatable channel production workflows.
SKILL.md Source
# Azure Event Hubs SDK for TypeScript
High-throughput event streaming and real-time data ingestion.
## Installation
```bash
npm install @azure/event-hubs @azure/identity
```
For checkpointing with consumer groups:
```bash
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
```
## Environment Variables
```bash
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints
```
## Authentication
```typescript
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
const credential = new DefaultAzureCredential();
// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
// Consumer
const consumer = new EventHubConsumerClient(
"$Default", // Consumer group
fullyQualifiedNamespace,
eventHubName,
credential
);
```
## Core Workflow
### Send Events
```typescript
const producer = new EventHubProducerClient(namespace, eventHubName, credential);
// Create batch and add events
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });
await producer.sendBatch(batch);
await producer.close();
```
### Send to Specific Partition
```typescript
// By partition ID
const batch = await producer.createBatch({ partitionId: "0" });
// By partition key (consistent hashing)
const batch = await producer.createBatch({ partitionKey: "device-123" });
```
### Receive Events (Simple)
```typescript
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
}
},
processError: async (err, context) => {
console.error(`Error on partition ${context.partitionId}: ${err.message}`);
},
});
// Stop after some time
setTimeout(async () => {
await subscription.close();
await consumer.close();
}, 60000);
```
### Receive with Checkpointing (Production)
```typescript
import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient(
`https://${storageAccount}.blob.core.windows.net/${containerName}`,
credential
);
const checkpointStore = new BlobCheckpointStore(containerClient);
const consumer = new EventHubConsumerClient(
"$Default",
namespace,
eventHubName,
credential,
checkpointStore
);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Processing: ${JSON.stringify(event.body)}`);
}
// Checkpoint after processing batch
if (events.length > 0) {
await context.updateCheckpoint(events[events.length - 1]);
}
},
processError: async (err, context) => {
console.error(`Error: ${err.message}`);
},
});
```
### Receive from Specific Position
```typescript
const subscription = consumer.subscribe({
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
}, {
startPosition: {
// Start from beginning
"0": { offset: "@earliest" },
// Start from end (new events only)
"1": { offset: "@latest" },
// Start from specific offset
"2": { offset: "12345" },
// Start from specific time
"3": { enqueuedOn: new Date("2024-01-01") },
},
});
```
## Event Hub Properties
```typescript
// Get hub info
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);
// Get partition info
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);
```
## Batch Processing Options
```typescript
const subscription = consumer.subscribe(
{
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
},
{
maxBatchSize: 100, // Max events per batch
maxWaitTimeInSeconds: 30, // Max wait for batch
}
);
```
## Key Types
```typescript
import {
EventHubProducerClient,
EventHubConsumerClient,
EventData,
ReceivedEventData,
PartitionContext,
Subscription,
SubscriptionEventHandlers,
CreateBatchOptions,
EventPosition,
} from "@azure/event-hubs";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
```
## Event Properties
```typescript
// Send with properties
const batch = await producer.createBatch();
batch.tryAdd({
body: { data: "payload" },
properties: {
eventType: "telemetry",
deviceId: "sensor-1",
},
contentType: "application/json",
correlationId: "request-123",
});
// Access in receiver
consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Type: ${event.properties?.eventType}`);
console.log(`Sequence: ${event.sequenceNumber}`);
console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
console.log(`Offset: ${event.offset}`);
}
},
});
```
## Error Handling
```typescript
consumer.subscribe({
processEvents: async (events, context) => {
try {
for (const event of events) {
await processEvent(event);
}
await context.updateCheckpoint(events[events.length - 1]);
} catch (error) {
// Don't checkpoint on error - events will be reprocessed
console.error("Processing failed:", error);
}
},
processError: async (err, context) => {
if (err.name === "MessagingError") {
// Transient error - SDK will retry
console.warn("Transient error:", err.message);
} else {
// Fatal error
console.error("Fatal error:", err);
}
},
});
```
## Best Practices
1. **Use checkpointing** - Always checkpoint in production for exactly-once processing
2. **Batch sends** - Use `createBatch()` for efficient sending
3. **Partition keys** - Use partition keys to ensure ordering for related events
4. **Consumer groups** - Use separate consumer groups for different processing pipelines
5. **Handle errors gracefully** - Don't checkpoint on processing failures
6. **Close clients** - Always close producer/consumer when done
7. **Monitor lag** - Track `lastEnqueuedSequenceNumber` vs processed sequence
## When to Use
This skill is applicable to execute the workflow or actions described in the overview.Related Skills
microsoft-azure-webjobs-extensions-authentication-events-dotnet
Microsoft Entra Authentication Events SDK for .NET. Azure Functions triggers for custom authentication extensions.
azure-web-pubsub-ts
Real-time messaging with WebSocket connections and pub/sub patterns.
azure-storage-queue-ts
Azure Queue Storage JavaScript/TypeScript SDK (@azure/storage-queue) for message queue operations. Use for sending, receiving, peeking, and deleting messages in queues.
azure-storage-queue-py
Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.
azure-storage-file-share-ts
Azure File Share JavaScript/TypeScript SDK (@azure/storage-file-share) for SMB file share operations.
azure-storage-file-share-py
Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.
azure-storage-file-datalake-py
Azure Data Lake Storage Gen2 SDK for Python. Use for hierarchical file systems, big data analytics, and file/directory operations.
azure-storage-blob-ts
Azure Blob Storage JavaScript/TypeScript SDK (@azure/storage-blob) for blob operations. Use for uploading, downloading, listing, and managing blobs and containers.
azure-storage-blob-rust
Azure Blob Storage SDK for Rust. Use for uploading, downloading, and managing blobs and containers.
azure-storage-blob-py
Azure Blob Storage SDK for Python. Use for uploading, downloading, listing blobs, managing containers, and blob lifecycle.
azure-speech-to-text-rest-py
Azure Speech to Text REST API for short audio (Python). Use for simple speech recognition of audio files up to 60 seconds without the Speech SDK.
azure-servicebus-py
Azure Service Bus SDK for Python messaging. Use for queues, topics, subscriptions, and enterprise messaging patterns.