azure-storage-queue-py

Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.

23 stars

Best use case

azure-storage-queue-py is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.

Teams using azure-storage-queue-py should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/azure-storage-queue-py/SKILL.md --create-dirs "https://raw.githubusercontent.com/christophacham/agent-skills-library/main/skills/ai-ml/azure-storage-queue-py/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/azure-storage-queue-py/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How azure-storage-queue-py Compares

Feature / Agentazure-storage-queue-pyStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Azure Queue Storage SDK for Python. Use for reliable message queuing, task distribution, and asynchronous processing.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Azure Queue Storage SDK for Python

Simple, cost-effective message queuing for asynchronous communication.

## Installation

```bash
pip install azure-storage-queue azure-identity
```

## Environment Variables

```bash
AZURE_STORAGE_ACCOUNT_URL=https://<account>.queue.core.windows.net
```

## Authentication

```python
from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueServiceClient, QueueClient

credential = DefaultAzureCredential()
account_url = "https://<account>.queue.core.windows.net"

# Service client
service_client = QueueServiceClient(account_url=account_url, credential=credential)

# Queue client
queue_client = QueueClient(account_url=account_url, queue_name="myqueue", credential=credential)
```

## Queue Operations

```python
# Create queue
service_client.create_queue("myqueue")

# Get queue client
queue_client = service_client.get_queue_client("myqueue")

# Delete queue
service_client.delete_queue("myqueue")

# List queues
for queue in service_client.list_queues():
    print(queue.name)
```

## Send Messages

```python
# Send message (string)
queue_client.send_message("Hello, Queue!")

# Send with options
queue_client.send_message(
    content="Delayed message",
    visibility_timeout=60,  # Hidden for 60 seconds
    time_to_live=3600       # Expires in 1 hour
)

# Send JSON
import json
data = {"task": "process", "id": 123}
queue_client.send_message(json.dumps(data))
```

## Receive Messages

```python
# Receive messages (makes them invisible temporarily)
messages = queue_client.receive_messages(
    messages_per_page=10,
    visibility_timeout=30  # 30 seconds to process
)

for message in messages:
    print(f"ID: {message.id}")
    print(f"Content: {message.content}")
    print(f"Dequeue count: {message.dequeue_count}")
    
    # Process message...
    
    # Delete after processing
    queue_client.delete_message(message)
```

## Peek Messages

```python
# Peek without hiding (doesn't affect visibility)
messages = queue_client.peek_messages(max_messages=5)

for message in messages:
    print(message.content)
```

## Update Message

```python
# Extend visibility or update content
messages = queue_client.receive_messages()
for message in messages:
    # Extend timeout (need more time)
    queue_client.update_message(
        message,
        visibility_timeout=60
    )
    
    # Update content and timeout
    queue_client.update_message(
        message,
        content="Updated content",
        visibility_timeout=60
    )
```

## Delete Message

```python
# Delete after successful processing
messages = queue_client.receive_messages()
for message in messages:
    try:
        # Process...
        queue_client.delete_message(message)
    except Exception:
        # Message becomes visible again after timeout
        pass
```

## Clear Queue

```python
# Delete all messages
queue_client.clear_messages()
```

## Queue Properties

```python
# Get queue properties
properties = queue_client.get_queue_properties()
print(f"Approximate message count: {properties.approximate_message_count}")

# Set/get metadata
queue_client.set_queue_metadata(metadata={"environment": "production"})
properties = queue_client.get_queue_properties()
print(properties.metadata)
```

## Async Client

```python
from azure.storage.queue.aio import QueueServiceClient, QueueClient
from azure.identity.aio import DefaultAzureCredential

async def queue_operations():
    credential = DefaultAzureCredential()
    
    async with QueueClient(
        account_url="https://<account>.queue.core.windows.net",
        queue_name="myqueue",
        credential=credential
    ) as client:
        # Send
        await client.send_message("Async message")
        
        # Receive
        async for message in client.receive_messages():
            print(message.content)
            await client.delete_message(message)

import asyncio
asyncio.run(queue_operations())
```

## Base64 Encoding

```python
from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy

# For binary data
queue_client = QueueClient(
    account_url=account_url,
    queue_name="myqueue",
    credential=credential,
    message_encode_policy=BinaryBase64EncodePolicy(),
    message_decode_policy=BinaryBase64DecodePolicy()
)

# Send bytes
queue_client.send_message(b"Binary content")
```

## Best Practices

1. **Delete messages after processing** to prevent reprocessing
2. **Set appropriate visibility timeout** based on processing time
3. **Handle `dequeue_count`** for poison message detection
4. **Use async client** for high-throughput scenarios
5. **Use `peek_messages`** for monitoring without affecting queue
6. **Set `time_to_live`** to prevent stale messages
7. **Consider Service Bus** for advanced features (sessions, topics)

## When to Use
This skill is applicable to execute the workflow or actions described in the overview.

Related Skills

azure-speech-to-text-rest-py

23
from christophacham/agent-skills-library

Azure Speech to Text REST API for short audio (Python). Use for simple speech recognition of audio files up to 60 seconds without the Speech SDK.

azure-mgmt-apimanagement-py

23
from christophacham/agent-skills-library

Azure API Management SDK for Python. Use for managing APIM services, APIs, products, subscriptions, and policies.

azure-mgmt-apimanagement-dotnet

23
from christophacham/agent-skills-library

Azure Resource Manager SDK for API Management in .NET.

azure-mgmt-apicenter-py

23
from christophacham/agent-skills-library

Azure API Center Management SDK for Python. Use for managing API inventory, metadata, and governance across your organization.

azure-mgmt-apicenter-dotnet

23
from christophacham/agent-skills-library

Azure API Center SDK for .NET. Centralized API inventory management with governance, versioning, and discovery.

azure-communication-callingserver-java

23
from christophacham/agent-skills-library

Azure Communication Services CallingServer (legacy) Java SDK. Note - This SDK is deprecated. Use azure-communication-callautomation instead for new projects. Only use this skill when maintaining le...

azure-storage-queue-ts

23
from christophacham/agent-skills-library

Azure Queue Storage JavaScript/TypeScript SDK (@azure/storage-queue) for message queue operations. Use for sending, receiving, peeking, and deleting messages in queues.

azure-storage-file-share-ts

23
from christophacham/agent-skills-library

Azure File Share JavaScript/TypeScript SDK (@azure/storage-file-share) for SMB file share operations.

azure-storage-file-share-py

23
from christophacham/agent-skills-library

Azure Storage File Share SDK for Python. Use for SMB file shares, directories, and file operations in the cloud.

azure-storage-file-datalake-py

23
from christophacham/agent-skills-library

Azure Data Lake Storage Gen2 SDK for Python. Use for hierarchical file systems, big data analytics, and file/directory operations.

azure-storage-blob-ts

23
from christophacham/agent-skills-library

Azure Blob Storage JavaScript/TypeScript SDK (@azure/storage-blob) for blob operations. Use for uploading, downloading, listing, and managing blobs and containers.

azure-storage-blob-rust

23
from christophacham/agent-skills-library

Azure Blob Storage SDK for Rust. Use for uploading, downloading, and managing blobs and containers.