azure-eventhub-py

Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing. Triggers: "event hubs", "EventHubProducerClient", "EventHubConsumerClient", "streaming", "partitions".

242 stars

Best use case

azure-eventhub-py is best used when you need a repeatable AI agent workflow instead of a one-off prompt. It is especially useful for teams working in multi. Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing. Triggers: "event hubs", "EventHubProducerClient", "EventHubConsumerClient", "streaming", "partitions".

Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing. Triggers: "event hubs", "EventHubProducerClient", "EventHubConsumerClient", "streaming", "partitions".

Users should expect a more consistent workflow output, faster repeated execution, and less time spent rewriting prompts from scratch.

Practical example

Example input

Use the "azure-eventhub-py" skill to help with this workflow task. Context: Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
Triggers: "event hubs", "EventHubProducerClient", "EventHubConsumerClient", "streaming", "partitions".

Example output

A structured workflow result with clearer steps, more consistent formatting, and an output that is easier to reuse in the next run.

When to use this skill

  • Use this skill when you want a reusable workflow rather than writing the same prompt again and again.

When not to use this skill

  • Do not use this when you only need a one-off answer and do not need a reusable workflow.
  • Do not use it if you cannot install or maintain the related files, repository context, or supporting tools.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/azure-eventhub-py/SKILL.md --create-dirs "https://raw.githubusercontent.com/aiskillstore/marketplace/main/skills/sickn33/azure-eventhub-py/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/azure-eventhub-py/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How azure-eventhub-py Compares

Feature / Agentazure-eventhub-pyStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing. Triggers: "event hubs", "EventHubProducerClient", "EventHubConsumerClient", "streaming", "partitions".

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Azure Event Hubs SDK for Python

Big data streaming platform for high-throughput event ingestion.

## Installation

```bash
pip install azure-eventhub azure-identity
# For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio
```

## Environment Variables

```bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints
```

## Authentication

```python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"

# Producer
producer = EventHubProducerClient(
    fully_qualified_namespace=namespace,
    eventhub_name=eventhub_name,
    credential=credential
)

# Consumer
consumer = EventHubConsumerClient(
    fully_qualified_namespace=namespace,
    eventhub_name=eventhub_name,
    consumer_group="$Default",
    credential=credential
)
```

## Client Types

| Client | Purpose |
|--------|---------|
| `EventHubProducerClient` | Send events to Event Hub |
| `EventHubConsumerClient` | Receive events from Event Hub |
| `BlobCheckpointStore` | Track consumer progress |

## Send Events

```python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential

producer = EventHubProducerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    credential=DefaultAzureCredential()
)

with producer:
    # Create batch (handles size limits)
    event_data_batch = producer.create_batch()
    
    for i in range(10):
        try:
            event_data_batch.add(EventData(f"Event {i}"))
        except ValueError:
            # Batch is full, send and create new one
            producer.send_batch(event_data_batch)
            event_data_batch = producer.create_batch()
            event_data_batch.add(EventData(f"Event {i}"))
    
    # Send remaining
    producer.send_batch(event_data_batch)
```

### Send to Specific Partition

```python
# By partition ID
event_data_batch = producer.create_batch(partition_id="0")

# By partition key (consistent hashing)
event_data_batch = producer.create_batch(partition_key="user-123")
```

## Receive Events

### Simple Receive

```python
from azure.eventhub import EventHubConsumerClient

def on_event(partition_context, event):
    print(f"Partition: {partition_context.partition_id}")
    print(f"Data: {event.body_as_str()}")
    partition_context.update_checkpoint(event)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential()
)

with consumer:
    consumer.receive(
        on_event=on_event,
        starting_position="-1",  # Beginning of stream
    )
```

### With Blob Checkpoint Store (Production)

```python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential

checkpoint_store = BlobCheckpointStore(
    blob_account_url="https://<account>.blob.core.windows.net",
    container_name="checkpoints",
    credential=DefaultAzureCredential()
)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential(),
    checkpoint_store=checkpoint_store
)

def on_event(partition_context, event):
    print(f"Received: {event.body_as_str()}")
    # Checkpoint after processing
    partition_context.update_checkpoint(event)

with consumer:
    consumer.receive(on_event=on_event)
```

## Async Client

```python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio

async def send_events():
    credential = DefaultAzureCredential()
    
    async with EventHubProducerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        credential=credential
    ) as producer:
        batch = await producer.create_batch()
        batch.add(EventData("Async event"))
        await producer.send_batch(batch)

async def receive_events():
    async def on_event(partition_context, event):
        print(event.body_as_str())
        await partition_context.update_checkpoint(event)
    
    async with EventHubConsumerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        consumer_group="$Default",
        credential=DefaultAzureCredential()
    ) as consumer:
        await consumer.receive(on_event=on_event)

asyncio.run(send_events())
```

## Event Properties

```python
event = EventData("My event body")

# Set properties
event.properties = {"custom_property": "value"}
event.content_type = "application/json"

# Read properties (on receive)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
```

## Get Event Hub Info

```python
with producer:
    info = producer.get_eventhub_properties()
    print(f"Name: {info['name']}")
    print(f"Partitions: {info['partition_ids']}")
    
    for partition_id in info['partition_ids']:
        partition_info = producer.get_partition_properties(partition_id)
        print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
```

## Best Practices

1. **Use batches** for sending multiple events
2. **Use checkpoint store** in production for reliable processing
3. **Use async client** for high-throughput scenarios
4. **Use partition keys** for ordered delivery within a partition
5. **Handle batch size limits** — catch ValueError when batch is full
6. **Use context managers** (`with`/`async with`) for proper cleanup
7. **Set appropriate consumer groups** for different applications

## Reference Files

| File | Contents |
|------|----------|
| [references/checkpointing.md](references/checkpointing.md) | Checkpoint store patterns, blob checkpointing, checkpoint strategies |
| [references/partitions.md](references/partitions.md) | Partition management, load balancing, starting positions |
| [scripts/setup_consumer.py](scripts/setup_consumer.py) | CLI for Event Hub info, consumer setup, and event sending/receiving |

Related Skills

azure-quotas

242
from aiskillstore/marketplace

Check/manage Azure quotas and usage across providers. For deployment planning, capacity validation, region selection. WHEN: "check quotas", "service limits", "current usage", "request quota increase", "quota exceeded", "validate capacity", "regional availability", "provisioning limits", "vCPU limit", "how many vCPUs available in my subscription".

DevOps & Infrastructure

microsoft-azure-webjobs-extensions-authentication-events-dotnet

242
from aiskillstore/marketplace

Microsoft Entra Authentication Events SDK for .NET. Azure Functions triggers for custom authentication extensions. Use for token enrichment, custom claims, attribute collection, and OTP customization in Entra ID. Triggers: "Authentication Events", "WebJobsAuthenticationEventsTrigger", "OnTokenIssuanceStart", "OnAttributeCollectionStart", "custom claims", "token enrichment", "Entra custom extension", "authentication extension".

azure-web-pubsub-ts

242
from aiskillstore/marketplace

Build real-time messaging applications using Azure Web PubSub SDKs for JavaScript (@azure/web-pubsub, @azure/web-pubsub-client). Use when implementing WebSocket-based real-time features, pub/sub messaging, group chat, or live notifications.

azure-storage-queue-ts

242
from aiskillstore/marketplace

Azure Queue Storage JavaScript/TypeScript SDK (@azure/storage-queue) for message queue operations. Use for sending, receiving, peeking, and deleting messages in queues. Supports visibility timeout, message encoding, and batch operations. Triggers: "queue storage", "@azure/storage-queue", "QueueServiceClient", "QueueClient", "send message", "receive message", "dequeue", "visibility timeout".

azure-storage-queue-py

242
from aiskillstore/marketplace

Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing. Triggers: "queue storage", "QueueServiceClient", "QueueClient", "message queue", "dequeue".

azure-storage-file-share-ts

242
from aiskillstore/marketplace

Azure File Share JavaScript/TypeScript SDK (@azure/storage-file-share) for SMB file share operations. Use for creating shares, managing directories, uploading/downloading files, and handling file metadata. Supports Azure Files SMB protocol scenarios. Triggers: "file share", "@azure/storage-file-share", "ShareServiceClient", "ShareClient", "SMB", "Azure Files".

azure-storage-file-share-py

242
from aiskillstore/marketplace

Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud. Triggers: "azure-storage-file-share", "ShareServiceClient", "ShareClient", "file share", "SMB".

azure-storage-file-datalake-py

242
from aiskillstore/marketplace

Azure Data Lake Storage Gen2 SDK for Python. Use for hierarchical file systems, big data analytics, and file/directory operations. Triggers: "data lake", "DataLakeServiceClient", "FileSystemClient", "ADLS Gen2", "hierarchical namespace".

azure-storage-blob-ts

242
from aiskillstore/marketplace

Azure Blob Storage JavaScript/TypeScript SDK (@azure/storage-blob) for blob operations. Use for uploading, downloading, listing, and managing blobs and containers. Supports block blobs, append blobs, page blobs, SAS tokens, and streaming. Triggers: "blob storage", "@azure/storage-blob", "BlobServiceClient", "ContainerClient", "upload blob", "download blob", "SAS token", "block blob".

azure-storage-blob-rust

242
from aiskillstore/marketplace

Azure Blob Storage SDK for Rust. Use for uploading, downloading, and managing blobs and containers. Triggers: "blob storage rust", "BlobClient rust", "upload blob rust", "download blob rust", "container rust".

azure-storage-blob-py

242
from aiskillstore/marketplace

Azure Blob Storage SDK for Python. Use for uploading, downloading, listing blobs, managing containers, and blob lifecycle. Triggers: "blob storage", "BlobServiceClient", "ContainerClient", "BlobClient", "upload blob", "download blob".

azure-storage-blob-java

242
from aiskillstore/marketplace

Build blob storage applications with Azure Storage Blob SDK for Java. Use when uploading, downloading, or managing files in Azure Blob Storage, working with containers, or implementing streaming data operations.