apscheduler

Advanced Python Scheduler - Task scheduling and job queue system

25 stars

Best use case

apscheduler is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Advanced Python Scheduler - Task scheduling and job queue system

Teams using apscheduler should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/apscheduler/SKILL.md --create-dirs "https://raw.githubusercontent.com/ComeOnOliver/skillshub/main/skills/aiskillstore/marketplace/bossjones/apscheduler/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/apscheduler/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How apscheduler Compares

Feature / AgentapschedulerStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Advanced Python Scheduler - Task scheduling and job queue system

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# APScheduler

APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.

## Quick Start

### Basic Synchronous Scheduler

```python
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger

def tick():
    print(f"Tick: {datetime.now()}")

# Create and start scheduler with memory datastore
with Scheduler() as scheduler:
    scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
    scheduler.run_until_stopped()
```

### Async Scheduler with FastAPI

```python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger

def cleanup_task():
    print("Running cleanup task...")

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = AsyncScheduler()
    async with scheduler:
        await scheduler.add_schedule(
            cleanup_task,
            IntervalTrigger(hours=1),
            id="cleanup"
        )
        await scheduler.start_in_background()
        yield

app = FastAPI(lifespan=lifespan)
```

## Common Patterns

### Schedulers

**In-memory scheduler (development):**

```python
from apscheduler import AsyncScheduler

async def main():
    async with AsyncScheduler() as scheduler:
        # Jobs lost on restart
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()
```

**Persistent scheduler (production):**

```python
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore

async def main():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    data_store = SQLAlchemyDataStore(engine)

    async with AsyncScheduler(data_store) as scheduler:
        # Jobs survive restarts
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()
```

**Distributed scheduler:**

```python
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

# Scheduler node - creates jobs from schedules
async def scheduler_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.scheduler
    ) as scheduler:
        await scheduler.add_schedule(task, trigger)
        await scheduler.run_until_stopped()

# Worker node - executes jobs only
async def worker_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.worker
    ) as scheduler:
        await scheduler.run_until_stopped()
```

### Jobs

**Simple function jobs:**

```python
def send_daily_report():
    generate_report()
    email_report("admin@example.com")

scheduler.add_schedule(
    send_daily_report,
    CronTrigger(hour=9, minute=0)  # 9 AM daily
)
```

**Jobs with arguments:**

```python
def process_data(source: str, destination: str, batch_size: int):
    # Data processing logic
    pass

scheduler.add_schedule(
    process_data,
    IntervalTrigger(hours=1),
    kwargs={
        'source': 's3://incoming',
        'destination': 's3://processed',
        'batch_size': 1000
    }
)
```

**Async jobs:**

```python
async def fetch_external_api():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            data = await resp.json()
            await save_to_database(data)

scheduler.add_schedule(
    fetch_external_api,
    IntervalTrigger(minutes=5)
)
```

### Triggers

**Interval trigger:**

```python
from apscheduler.triggers.interval import IntervalTrigger

# Every 30 seconds
IntervalTrigger(seconds=30)

# Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)

# Every 3 days
IntervalTrigger(days=3)
```

**Cron trigger:**

```python
from apscheduler.triggers.cron import CronTrigger

# 9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')

# Every 15 minutes
CronTrigger(minute='*/15')

# Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)

# Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5')  # 9 AM weekdays
```

**Date trigger (one-time):**

```python
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger

# 5 minutes from now
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)

# Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
```

**Calendar interval:**

```python
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger

# First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)

# Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
```

### Persistence

**SQLite:**

```python
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)
```

**PostgreSQL:**

```python
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
```

**Redis (event broker):**

```python
from apscheduler.eventbrokers.redis import RedisEventBroker

event_broker = RedisEventBroker.from_url("redis://localhost:6379")
```

### Job Management

**Get job results:**

```python
async def main():
    async with AsyncScheduler() as scheduler:
        await scheduler.start_in_background()

        # Add job with result retention
        job_id = await scheduler.add_job(
            calculate_result,
            args=(10, 20),
            result_expiration_time=timedelta(hours=1)
        )

        # Wait for result
        result = await scheduler.get_job_result(job_id, wait=True)
        print(f"Result: {result.return_value}")
```

**Schedule management:**

```python
# Pause schedule
await scheduler.pause_schedule("my_schedule")

# Resume schedule
await scheduler.unpause_schedule("my_schedule")

# Remove schedule
await scheduler.remove_schedule("my_schedule")

# Get schedule info
schedule = await scheduler.get_schedule("my_schedule")
print(f"Next run: {schedule.next_fire_time}")
```

**Event handling:**

```python
from apscheduler import JobAdded, JobReleased

def on_job_completed(event: JobReleased):
    if event.outcome == Outcome.success:
        print(f"Job {event.job_id} completed successfully")
    else:
        print(f"Job {event.job_id} failed: {event.exception}")

scheduler.subscribe(on_job_completed, JobReleased)
```

## Configuration

**Task defaults:**

```python
from apscheduler import TaskDefaults

task_defaults = TaskDefaults(
    job_executor='threadpool',
    max_running_jobs=3,
    misfire_grace_time=timedelta(minutes=5)
)

scheduler = AsyncScheduler(task_defaults=task_defaults)
```

**Job execution options:**

```python
# Configure task behavior
await scheduler.configure_task(
    my_function,
    job_executor='processpool',
    max_running_jobs=5,
    misfire_grace_time=timedelta(minutes=10)
)

# Override per schedule
await scheduler.add_schedule(
    my_function,
    trigger,
    job_executor='threadpool',  # Override default
    coalesce=CoalescePolicy.latest
)
```

## Requirements

```bash
# Core package
uv add apscheduler

# Database backends
uv add "apscheduler[postgresql]"  # PostgreSQL
uv add "apscheduler[mongodb]"     # MongoDB
uv add "apscheduler[sqlite]"      # SQLite

# Event brokers
uv add "apscheduler[redis]"       # Redis
uv add "apscheduler[mqtt]"        # MQTT
```

**Dependencies by use case:**

- **Basic scheduling**: `apscheduler`
- **PostgreSQL persistence**: `asyncpg`, `sqlalchemy`
- **Redis distributed**: `redis`
- **MongoDB**: `motor`
- **SQLite**: `aiosqlite`

## Best Practices

1. **Use persistent storage** for production to survive restarts
2. **Set appropriate misfire_grace_time** to handle system delays
3. **Use conflict policies** when updating schedules
4. **Subscribe to events** for monitoring and debugging
5. **Choose appropriate executors** (threadpool for I/O, processpool for CPU)
6. **Implement error handling** in job functions
7. **Use unique schedule IDs** for management operations
8. **Set result expiration** to prevent memory leaks

Related Skills

Daily Logs

25
from ComeOnOliver/skillshub

Record the user's daily activities, progress, decisions, and learnings in a structured, chronological format.

Socratic Method: The Dialectic Engine

25
from ComeOnOliver/skillshub

This skill transforms Claude into a Socratic agent — a cognitive partner who guides

Sokratische Methode: Die Dialektik-Maschine

25
from ComeOnOliver/skillshub

Dieser Skill verwandelt Claude in einen sokratischen Agenten — einen kognitiven Partner, der Nutzende durch systematisches Fragen zur Wissensentdeckung führt, anstatt direkt zu instruieren.

College Football Data (CFB)

25
from ComeOnOliver/skillshub

Before writing queries, consult `references/api-reference.md` for endpoints, conference IDs, team IDs, and data shapes.

College Basketball Data (CBB)

25
from ComeOnOliver/skillshub

Before writing queries, consult `references/api-reference.md` for endpoints, conference IDs, team IDs, and data shapes.

Betting Analysis

25
from ComeOnOliver/skillshub

Before writing queries, consult `references/api-reference.md` for odds formats, command parameters, and key concepts.

Research Proposal Generator

25
from ComeOnOliver/skillshub

Generate high-quality academic research proposals for PhD applications following Nature Reviews-style academic writing conventions.

Paper Slide Deck Generator

25
from ComeOnOliver/skillshub

Transform academic papers and content into professional slide deck images with automatic figure extraction.

Medical Imaging AI Literature Review Skill

25
from ComeOnOliver/skillshub

Write comprehensive literature reviews following a systematic 7-phase workflow.

Meeting Briefing Skill

25
from ComeOnOliver/skillshub

You are a meeting preparation assistant for an in-house legal team. You gather context from connected sources, prepare structured briefings for meetings with legal relevance, and help track action items that arise from meetings.

Canned Responses Skill

25
from ComeOnOliver/skillshub

You are a response template assistant for an in-house legal team. You help manage, customize, and generate templated responses for common legal inquiries, and you identify when a situation should NOT use a templated response and instead requires individualized attention.

Copywriting

25
from ComeOnOliver/skillshub

## Purpose