ClaudeChatGPTGeminiCursorGitHub CopilotDeepSeekRoo CodeOpenCodeAiderContinueData Streaming & Integration

azure-eventhub-py

Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.

31,392 stars
Complexity: medium

About this skill

This skill provides the Python SDK for Azure Event Hubs, a scalable, real-time data streaming platform capable of ingesting millions of events per second. It empowers AI agents to produce high volumes of streaming data (e.g., IoT telemetry, logs, clickstreams, financial transactions) and consume events from Event Hubs for real-time analytics and processing. The skill supports distributed consumption and includes built-in checkpointing capabilities using Azure Blob Storage, ensuring fault tolerance and state management for reliable event processing. It's an essential tool for AI agents operating within large-scale, event-driven data pipelines in the Azure cloud ecosystem.

Best use case

Publishing real-time telemetry, logs, or user activity streams from an AI agent to an Azure Event Hub. Consuming and processing live streams of events (e.g., sensor data, financial transactions) for real-time analytics or alerting. Integrating AI agent outputs or inputs directly into Azure's big data streaming architecture. Building event-driven automation where the agent reacts to or triggers events in a high-throughput environment.

Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.

Successful, high-throughput ingestion of events into Azure Event Hubs by the AI agent. Reliable, scalable, and fault-tolerant consumption and processing of events from Event Hubs. Real-time integration of AI agent actions and data flows with Azure's streaming analytics services. Efficient management of event processing state using checkpointing.

Practical example

Example input

"Use Azure Event Hubs to send a batch of 5 sensor readings with values [22.5, 23.1, 22.9, 23.0, 22.7] to the 'sensor-data-stream' Event Hub."
"Set up a consumer to listen to the 'user-activity-log' Event Hub for the next minute and report any 'login_failed' events."

Example output

"Successfully sent 5 events to the 'sensor-data-stream' Event Hub. Total events sent: 5."
"Found 2 'login_failed' events in 'user-activity-log' within the last minute: User 'x' at [timestamp], User 'y' at [timestamp]."

When to use this skill

  • When your AI agent needs to interact with large volumes of streaming data in real-time.
  • For integrating AI agent functionality with existing Azure Event Hubs-based data pipelines.
  • When fault-tolerant and scalable event ingestion or consumption is critical.
  • If the AI agent needs to act as a producer or consumer in a microservices or event-driven architecture using Azure.

When not to use this skill

  • For simple request-response interactions or low-volume messaging, where other Azure services might be more cost-effective.
  • When the data processing is purely batch-oriented and does not require real-time streaming capabilities.
  • If the AI agent operates entirely outside the Azure cloud ecosystem.
  • For persistent storage of large files or structured data that is not event-based.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/azure-eventhub-py/SKILL.md --create-dirs "https://raw.githubusercontent.com/sickn33/antigravity-awesome-skills/main/plugins/antigravity-awesome-skills-claude/skills/azure-eventhub-py/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/azure-eventhub-py/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How azure-eventhub-py Compares

Feature / Agentazure-eventhub-pyStandard Approach
Platform SupportClaude, ChatGPT, Gemini, Cursor, GitHub Copilot, DeepSeek, Roo Code, OpenCode, Aider, ContinueLimited / Varies
Context Awareness High Baseline
Installation ComplexitymediumN/A

Frequently Asked Questions

What does this skill do?

Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.

Which AI agents support this skill?

This skill is designed for Claude, ChatGPT, Gemini, Cursor, GitHub Copilot, DeepSeek, Roo Code, OpenCode, Aider, Continue.

How difficult is it to install?

The installation complexity is rated as medium. You can find the installation instructions above.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# Azure Event Hubs SDK for Python

Big data streaming platform for high-throughput event ingestion.

## Installation

```bash
pip install azure-eventhub azure-identity
# For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio
```

## Environment Variables

```bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints
```

## Authentication

```python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"

# Producer
producer = EventHubProducerClient(
    fully_qualified_namespace=namespace,
    eventhub_name=eventhub_name,
    credential=credential
)

# Consumer
consumer = EventHubConsumerClient(
    fully_qualified_namespace=namespace,
    eventhub_name=eventhub_name,
    consumer_group="$Default",
    credential=credential
)
```

## Client Types

| Client | Purpose |
|--------|---------|
| `EventHubProducerClient` | Send events to Event Hub |
| `EventHubConsumerClient` | Receive events from Event Hub |
| `BlobCheckpointStore` | Track consumer progress |

## Send Events

```python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential

producer = EventHubProducerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    credential=DefaultAzureCredential()
)

with producer:
    # Create batch (handles size limits)
    event_data_batch = producer.create_batch()
    
    for i in range(10):
        try:
            event_data_batch.add(EventData(f"Event {i}"))
        except ValueError:
            # Batch is full, send and create new one
            producer.send_batch(event_data_batch)
            event_data_batch = producer.create_batch()
            event_data_batch.add(EventData(f"Event {i}"))
    
    # Send remaining
    producer.send_batch(event_data_batch)
```

### Send to Specific Partition

```python
# By partition ID
event_data_batch = producer.create_batch(partition_id="0")

# By partition key (consistent hashing)
event_data_batch = producer.create_batch(partition_key="user-123")
```

## Receive Events

### Simple Receive

```python
from azure.eventhub import EventHubConsumerClient

def on_event(partition_context, event):
    print(f"Partition: {partition_context.partition_id}")
    print(f"Data: {event.body_as_str()}")
    partition_context.update_checkpoint(event)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential()
)

with consumer:
    consumer.receive(
        on_event=on_event,
        starting_position="-1",  # Beginning of stream
    )
```

### With Blob Checkpoint Store (Production)

```python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential

checkpoint_store = BlobCheckpointStore(
    blob_account_url="https://<account>.blob.core.windows.net",
    container_name="checkpoints",
    credential=DefaultAzureCredential()
)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential(),
    checkpoint_store=checkpoint_store
)

def on_event(partition_context, event):
    print(f"Received: {event.body_as_str()}")
    # Checkpoint after processing
    partition_context.update_checkpoint(event)

with consumer:
    consumer.receive(on_event=on_event)
```

## Async Client

```python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio

async def send_events():
    credential = DefaultAzureCredential()
    
    async with EventHubProducerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        credential=credential
    ) as producer:
        batch = await producer.create_batch()
        batch.add(EventData("Async event"))
        await producer.send_batch(batch)

async def receive_events():
    async def on_event(partition_context, event):
        print(event.body_as_str())
        await partition_context.update_checkpoint(event)
    
    async with EventHubConsumerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        consumer_group="$Default",
        credential=DefaultAzureCredential()
    ) as consumer:
        await consumer.receive(on_event=on_event)

asyncio.run(send_events())
```

## Event Properties

```python
event = EventData("My event body")

# Set properties
event.properties = {"custom_property": "value"}
event.content_type = "application/json"

# Read properties (on receive)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
```

## Get Event Hub Info

```python
with producer:
    info = producer.get_eventhub_properties()
    print(f"Name: {info['name']}")
    print(f"Partitions: {info['partition_ids']}")
    
    for partition_id in info['partition_ids']:
        partition_info = producer.get_partition_properties(partition_id)
        print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
```

## Best Practices

1. **Use batches** for sending multiple events
2. **Use checkpoint store** in production for reliable processing
3. **Use async client** for high-throughput scenarios
4. **Use partition keys** for ordered delivery within a partition
5. **Handle batch size limits** — catch ValueError when batch is full
6. **Use context managers** (`with`/`async with`) for proper cleanup
7. **Set appropriate consumer groups** for different applications

## Reference Files

| File | Contents |
|------|----------|
| references/checkpointing.md | Checkpoint store patterns, blob checkpointing, checkpoint strategies |
| references/partitions.md | Partition management, load balancing, starting positions |
| scripts/setup_consumer.py | CLI for Event Hub info, consumer setup, and event sending/receiving |

## When to Use
This skill is applicable to execute the workflow or actions described in the overview.

Related Skills

azure-eventhub-rust

31392
from sickn33/antigravity-awesome-skills

Azure Event Hubs SDK for Rust. Use for sending and receiving events, streaming data ingestion.

Data Streaming & IntegrationClaude

azure-eventhub-dotnet

31392
from sickn33/antigravity-awesome-skills

Azure Event Hubs SDK for .NET.

Data Streaming & IntegrationClaude

microsoft-azure-webjobs-extensions-authentication-events-dotnet

31392
from sickn33/antigravity-awesome-skills

Microsoft Entra Authentication Events SDK for .NET. Azure Functions triggers for custom authentication extensions.

Identity Management / Authentication & AuthorizationClaude

azure-web-pubsub-ts

31392
from sickn33/antigravity-awesome-skills

Real-time messaging with WebSocket connections and pub/sub patterns.

Messaging & CommunicationClaude

azure-storage-queue-ts

31392
from sickn33/antigravity-awesome-skills

Azure Queue Storage JavaScript/TypeScript SDK (@azure/storage-queue) for message queue operations. Use for sending, receiving, peeking, and deleting messages in queues.

Cloud IntegrationClaude

azure-storage-queue-py

31392
from sickn33/antigravity-awesome-skills

Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.

Cloud IntegrationClaude

azure-storage-file-share-ts

31392
from sickn33/antigravity-awesome-skills

Azure File Share JavaScript/TypeScript SDK (@azure/storage-file-share) for SMB file share operations.

Cloud Storage ManagementClaude

azure-storage-file-share-py

31392
from sickn33/antigravity-awesome-skills

Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.

Cloud Storage ManagementClaude

azure-storage-file-datalake-py

31392
from sickn33/antigravity-awesome-skills

Azure Data Lake Storage Gen2 SDK for Python. Use for hierarchical file systems, big data analytics, and file/directory operations.

Cloud Storage ManagementClaude

azure-storage-blob-ts

31392
from sickn33/antigravity-awesome-skills

Azure Blob Storage JavaScript/TypeScript SDK (@azure/storage-blob) for blob operations. Use for uploading, downloading, listing, and managing blobs and containers.

Cloud Storage ManagementClaude

azure-storage-blob-rust

31392
from sickn33/antigravity-awesome-skills

Azure Blob Storage SDK for Rust. Use for uploading, downloading, and managing blobs and containers.

Cloud Storage ManagementClaude

azure-storage-blob-py

31392
from sickn33/antigravity-awesome-skills

Azure Blob Storage SDK for Python. Use for uploading, downloading, listing blobs, managing containers, and blob lifecycle.

Cloud Storage ManagementClaude