azure-eventhub-py
Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
About this skill
This skill provides the Python SDK for Azure Event Hubs, a scalable, real-time data streaming platform capable of ingesting millions of events per second. It empowers AI agents to produce high volumes of streaming data (e.g., IoT telemetry, logs, clickstreams, financial transactions) and consume events from Event Hubs for real-time analytics and processing. The skill supports distributed consumption and includes built-in checkpointing capabilities using Azure Blob Storage, ensuring fault tolerance and state management for reliable event processing. It's an essential tool for AI agents operating within large-scale, event-driven data pipelines in the Azure cloud ecosystem.
Best use case
Publishing real-time telemetry, logs, or user activity streams from an AI agent to an Azure Event Hub. Consuming and processing live streams of events (e.g., sensor data, financial transactions) for real-time analytics or alerting. Integrating AI agent outputs or inputs directly into Azure's big data streaming architecture. Building event-driven automation where the agent reacts to or triggers events in a high-throughput environment.
Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
Successful, high-throughput ingestion of events into Azure Event Hubs by the AI agent. Reliable, scalable, and fault-tolerant consumption and processing of events from Event Hubs. Real-time integration of AI agent actions and data flows with Azure's streaming analytics services. Efficient management of event processing state using checkpointing.
Practical example
Example input
"Use Azure Event Hubs to send a batch of 5 sensor readings with values [22.5, 23.1, 22.9, 23.0, 22.7] to the 'sensor-data-stream' Event Hub." "Set up a consumer to listen to the 'user-activity-log' Event Hub for the next minute and report any 'login_failed' events."
Example output
"Successfully sent 5 events to the 'sensor-data-stream' Event Hub. Total events sent: 5." "Found 2 'login_failed' events in 'user-activity-log' within the last minute: User 'x' at [timestamp], User 'y' at [timestamp]."
When to use this skill
- When your AI agent needs to interact with large volumes of streaming data in real-time.
- For integrating AI agent functionality with existing Azure Event Hubs-based data pipelines.
- When fault-tolerant and scalable event ingestion or consumption is critical.
- If the AI agent needs to act as a producer or consumer in a microservices or event-driven architecture using Azure.
When not to use this skill
- For simple request-response interactions or low-volume messaging, where other Azure services might be more cost-effective.
- When the data processing is purely batch-oriented and does not require real-time streaming capabilities.
- If the AI agent operates entirely outside the Azure cloud ecosystem.
- For persistent storage of large files or structured data that is not event-based.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/azure-eventhub-py/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How azure-eventhub-py Compares
| Feature / Agent | azure-eventhub-py | Standard Approach |
|---|---|---|
| Platform Support | Claude, ChatGPT, Gemini, Cursor, GitHub Copilot, DeepSeek, Roo Code, OpenCode, Aider, Continue | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | medium | N/A |
Frequently Asked Questions
What does this skill do?
Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
Which AI agents support this skill?
This skill is designed for Claude, ChatGPT, Gemini, Cursor, GitHub Copilot, DeepSeek, Roo Code, OpenCode, Aider, Continue.
How difficult is it to install?
The installation complexity is rated as medium. You can find the installation instructions above.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
AI Agents for Coding
Browse AI agent skills for coding, debugging, testing, refactoring, code review, and developer workflows across Claude, Cursor, and Codex.
Best AI Skills for Claude
Explore the best AI skills for Claude and Claude Code across coding, research, workflow automation, documentation, and agent operations.
ChatGPT vs Claude for Agent Skills
Compare ChatGPT and Claude for AI agent skills across coding, writing, research, and reusable workflow execution.
SKILL.md Source
# Azure Event Hubs SDK for Python
Big data streaming platform for high-throughput event ingestion.
## Installation
```bash
pip install azure-eventhub azure-identity
# For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio
```
## Environment Variables
```bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints
```
## Authentication
```python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"
# Producer
producer = EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
)
# Consumer
consumer = EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
)
```
## Client Types
| Client | Purpose |
|--------|---------|
| `EventHubProducerClient` | Send events to Event Hub |
| `EventHubConsumerClient` | Receive events from Event Hub |
| `BlobCheckpointStore` | Track consumer progress |
## Send Events
```python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
)
with producer:
# Create batch (handles size limits)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# Batch is full, send and create new one
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# Send remaining
producer.send_batch(event_data_batch)
```
### Send to Specific Partition
```python
# By partition ID
event_data_batch = producer.create_batch(partition_id="0")
# By partition key (consistent hashing)
event_data_batch = producer.create_batch(partition_key="user-123")
```
## Receive Events
### Simple Receive
```python
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
)
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # Beginning of stream
)
```
### With Blob Checkpoint Store (Production)
```python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# Checkpoint after processing
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event)
```
## Async Client
```python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
asyncio.run(send_events())
```
## Event Properties
```python
event = EventData("My event body")
# Set properties
event.properties = {"custom_property": "value"}
event.content_type = "application/json"
# Read properties (on receive)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
```
## Get Event Hub Info
```python
with producer:
info = producer.get_eventhub_properties()
print(f"Name: {info['name']}")
print(f"Partitions: {info['partition_ids']}")
for partition_id in info['partition_ids']:
partition_info = producer.get_partition_properties(partition_id)
print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
```
## Best Practices
1. **Use batches** for sending multiple events
2. **Use checkpoint store** in production for reliable processing
3. **Use async client** for high-throughput scenarios
4. **Use partition keys** for ordered delivery within a partition
5. **Handle batch size limits** — catch ValueError when batch is full
6. **Use context managers** (`with`/`async with`) for proper cleanup
7. **Set appropriate consumer groups** for different applications
## Reference Files
| File | Contents |
|------|----------|
| references/checkpointing.md | Checkpoint store patterns, blob checkpointing, checkpoint strategies |
| references/partitions.md | Partition management, load balancing, starting positions |
| scripts/setup_consumer.py | CLI for Event Hub info, consumer setup, and event sending/receiving |
## When to Use
This skill is applicable to execute the workflow or actions described in the overview.Related Skills
azure-eventhub-rust
Azure Event Hubs SDK for Rust. Use for sending and receiving events, streaming data ingestion.
azure-eventhub-dotnet
Azure Event Hubs SDK for .NET.
microsoft-azure-webjobs-extensions-authentication-events-dotnet
Microsoft Entra Authentication Events SDK for .NET. Azure Functions triggers for custom authentication extensions.
azure-web-pubsub-ts
Real-time messaging with WebSocket connections and pub/sub patterns.
azure-storage-queue-ts
Azure Queue Storage JavaScript/TypeScript SDK (@azure/storage-queue) for message queue operations. Use for sending, receiving, peeking, and deleting messages in queues.
azure-storage-queue-py
Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.
azure-storage-file-share-ts
Azure File Share JavaScript/TypeScript SDK (@azure/storage-file-share) for SMB file share operations.
azure-storage-file-share-py
Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.
azure-storage-file-datalake-py
Azure Data Lake Storage Gen2 SDK for Python. Use for hierarchical file systems, big data analytics, and file/directory operations.
azure-storage-blob-ts
Azure Blob Storage JavaScript/TypeScript SDK (@azure/storage-blob) for blob operations. Use for uploading, downloading, listing, and managing blobs and containers.
azure-storage-blob-rust
Azure Blob Storage SDK for Rust. Use for uploading, downloading, and managing blobs and containers.
azure-storage-blob-py
Azure Blob Storage SDK for Python. Use for uploading, downloading, listing blobs, managing containers, and blob lifecycle.