azure-eventhub-py
Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
Best use case
azure-eventhub-py is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
Teams using azure-eventhub-py should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/azure-eventhub-py/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How azure-eventhub-py Compares
| Feature / Agent | azure-eventhub-py | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Azure Event Hubs SDK for Python
Big data streaming platform for high-throughput event ingestion.
## Installation
```bash
pip install azure-eventhub azure-identity
# For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio
```
## Environment Variables
```bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints
```
## Authentication
```python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"
# Producer
producer = EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
)
# Consumer
consumer = EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
)
```
## Client Types
| Client | Purpose |
|--------|---------|
| `EventHubProducerClient` | Send events to Event Hub |
| `EventHubConsumerClient` | Receive events from Event Hub |
| `BlobCheckpointStore` | Track consumer progress |
## Send Events
```python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
)
with producer:
# Create batch (handles size limits)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# Batch is full, send and create new one
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# Send remaining
producer.send_batch(event_data_batch)
```
### Send to Specific Partition
```python
# By partition ID
event_data_batch = producer.create_batch(partition_id="0")
# By partition key (consistent hashing)
event_data_batch = producer.create_batch(partition_key="user-123")
```
## Receive Events
### Simple Receive
```python
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
)
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # Beginning of stream
)
```
### With Blob Checkpoint Store (Production)
```python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# Checkpoint after processing
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event)
```
## Async Client
```python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
asyncio.run(send_events())
```
## Event Properties
```python
event = EventData("My event body")
# Set properties
event.properties = {"custom_property": "value"}
event.content_type = "application/json"
# Read properties (on receive)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
```
## Get Event Hub Info
```python
with producer:
info = producer.get_eventhub_properties()
print(f"Name: {info['name']}")
print(f"Partitions: {info['partition_ids']}")
for partition_id in info['partition_ids']:
partition_info = producer.get_partition_properties(partition_id)
print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
```
## Best Practices
1. **Use batches** for sending multiple events
2. **Use checkpoint store** in production for reliable processing
3. **Use async client** for high-throughput scenarios
4. **Use partition keys** for ordered delivery within a partition
5. **Handle batch size limits** — catch ValueError when batch is full
6. **Use context managers** (`with`/`async with`) for proper cleanup
7. **Set appropriate consumer groups** for different applications
## Reference Files
| File | Contents |
|------|----------|
| references/checkpointing.md | Checkpoint store patterns, blob checkpointing, checkpoint strategies |
| references/partitions.md | Partition management, load balancing, starting positions |
| scripts/setup_consumer.py | CLI for Event Hub info, consumer setup, and event sending/receiving |
## When to Use
This skill is applicable to execute the workflow or actions described in the overview.Related Skills
microsoft-azure-webjobs-extensions-authentication-events-dotnet
Microsoft Entra Authentication Events SDK for .NET. Azure Functions triggers for custom authentication extensions.
azure-web-pubsub-ts
Build real-time messaging applications using Azure Web PubSub SDKs for JavaScript (@azure/web-pubsub, @azure/web-pubsub-client). Use when implementing WebSocket-based real-time features, pub/sub me...
azure-storage-queue-ts
Azure Queue Storage JavaScript/TypeScript SDK (@azure/storage-queue) for message queue operations. Use for sending, receiving, peeking, and deleting messages in queues.
azure-storage-queue-py
Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.
azure-storage-file-share-ts
Azure File Share JavaScript/TypeScript SDK (@azure/storage-file-share) for SMB file share operations.
azure-storage-file-share-py
Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.
azure-storage-file-datalake-py
Azure Data Lake Storage Gen2 SDK for Python. Use for hierarchical file systems, big data analytics, and file/directory operations.
azure-storage-blob-ts
Azure Blob Storage JavaScript/TypeScript SDK (@azure/storage-blob) for blob operations. Use for uploading, downloading, listing, and managing blobs and containers.
azure-storage-blob-rust
Azure Blob Storage SDK for Rust. Use for uploading, downloading, and managing blobs and containers.
azure-storage-blob-py
Azure Blob Storage SDK for Python. Use for uploading, downloading, listing blobs, managing containers, and blob lifecycle.
azure-storage-blob-java
Build blob storage applications with Azure Storage Blob SDK for Java. Use when uploading, downloading, or managing files in Azure Blob Storage, working with containers, or implementing streaming da...
azure-speech-to-text-rest-py
Azure Speech to Text REST API for short audio (Python). Use for simple speech recognition of audio files up to 60 seconds without the Speech SDK.