apscheduler
Advanced Python Scheduler - Task scheduling and job queue system
Best use case
apscheduler is best used when you need a repeatable AI agent workflow instead of a one-off prompt. It is especially useful for teams working in multi. Advanced Python Scheduler - Task scheduling and job queue system
Advanced Python Scheduler - Task scheduling and job queue system
Users should expect a more consistent workflow output, faster repeated execution, and less time spent rewriting prompts from scratch.
Practical example
Example input
Use the "apscheduler" skill to help with this workflow task. Context: Advanced Python Scheduler - Task scheduling and job queue system
Example output
A structured workflow result with clearer steps, more consistent formatting, and an output that is easier to reuse in the next run.
When to use this skill
- Use this skill when you want a reusable workflow rather than writing the same prompt again and again.
When not to use this skill
- Do not use this when you only need a one-off answer and do not need a reusable workflow.
- Do not use it if you cannot install or maintain the related files, repository context, or supporting tools.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/apscheduler/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How apscheduler Compares
| Feature / Agent | apscheduler | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Advanced Python Scheduler - Task scheduling and job queue system
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# APScheduler
APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.
## Quick Start
### Basic Synchronous Scheduler
```python
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print(f"Tick: {datetime.now()}")
# Create and start scheduler with memory datastore
with Scheduler() as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
scheduler.run_until_stopped()
```
### Async Scheduler with FastAPI
```python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
def cleanup_task():
print("Running cleanup task...")
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncScheduler()
async with scheduler:
await scheduler.add_schedule(
cleanup_task,
IntervalTrigger(hours=1),
id="cleanup"
)
await scheduler.start_in_background()
yield
app = FastAPI(lifespan=lifespan)
```
## Common Patterns
### Schedulers
**In-memory scheduler (development):**
```python
from apscheduler import AsyncScheduler
async def main():
async with AsyncScheduler() as scheduler:
# Jobs lost on restart
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
```
**Persistent scheduler (production):**
```python
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
# Jobs survive restarts
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
```
**Distributed scheduler:**
```python
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
# Scheduler node - creates jobs from schedules
async def scheduler_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.scheduler
) as scheduler:
await scheduler.add_schedule(task, trigger)
await scheduler.run_until_stopped()
# Worker node - executes jobs only
async def worker_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.worker
) as scheduler:
await scheduler.run_until_stopped()
```
### Jobs
**Simple function jobs:**
```python
def send_daily_report():
generate_report()
email_report("admin@example.com")
scheduler.add_schedule(
send_daily_report,
CronTrigger(hour=9, minute=0) # 9 AM daily
)
```
**Jobs with arguments:**
```python
def process_data(source: str, destination: str, batch_size: int):
# Data processing logic
pass
scheduler.add_schedule(
process_data,
IntervalTrigger(hours=1),
kwargs={
'source': 's3://incoming',
'destination': 's3://processed',
'batch_size': 1000
}
)
```
**Async jobs:**
```python
async def fetch_external_api():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
data = await resp.json()
await save_to_database(data)
scheduler.add_schedule(
fetch_external_api,
IntervalTrigger(minutes=5)
)
```
### Triggers
**Interval trigger:**
```python
from apscheduler.triggers.interval import IntervalTrigger
# Every 30 seconds
IntervalTrigger(seconds=30)
# Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)
# Every 3 days
IntervalTrigger(days=3)
```
**Cron trigger:**
```python
from apscheduler.triggers.cron import CronTrigger
# 9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')
# Every 15 minutes
CronTrigger(minute='*/15')
# Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)
# Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5') # 9 AM weekdays
```
**Date trigger (one-time):**
```python
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger
# 5 minutes from now
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)
# Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
```
**Calendar interval:**
```python
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
# First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)
# Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
```
### Persistence
**SQLite:**
```python
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)
```
**PostgreSQL:**
```python
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
```
**Redis (event broker):**
```python
from apscheduler.eventbrokers.redis import RedisEventBroker
event_broker = RedisEventBroker.from_url("redis://localhost:6379")
```
### Job Management
**Get job results:**
```python
async def main():
async with AsyncScheduler() as scheduler:
await scheduler.start_in_background()
# Add job with result retention
job_id = await scheduler.add_job(
calculate_result,
args=(10, 20),
result_expiration_time=timedelta(hours=1)
)
# Wait for result
result = await scheduler.get_job_result(job_id, wait=True)
print(f"Result: {result.return_value}")
```
**Schedule management:**
```python
# Pause schedule
await scheduler.pause_schedule("my_schedule")
# Resume schedule
await scheduler.unpause_schedule("my_schedule")
# Remove schedule
await scheduler.remove_schedule("my_schedule")
# Get schedule info
schedule = await scheduler.get_schedule("my_schedule")
print(f"Next run: {schedule.next_fire_time}")
```
**Event handling:**
```python
from apscheduler import JobAdded, JobReleased
def on_job_completed(event: JobReleased):
if event.outcome == Outcome.success:
print(f"Job {event.job_id} completed successfully")
else:
print(f"Job {event.job_id} failed: {event.exception}")
scheduler.subscribe(on_job_completed, JobReleased)
```
## Configuration
**Task defaults:**
```python
from apscheduler import TaskDefaults
task_defaults = TaskDefaults(
job_executor='threadpool',
max_running_jobs=3,
misfire_grace_time=timedelta(minutes=5)
)
scheduler = AsyncScheduler(task_defaults=task_defaults)
```
**Job execution options:**
```python
# Configure task behavior
await scheduler.configure_task(
my_function,
job_executor='processpool',
max_running_jobs=5,
misfire_grace_time=timedelta(minutes=10)
)
# Override per schedule
await scheduler.add_schedule(
my_function,
trigger,
job_executor='threadpool', # Override default
coalesce=CoalescePolicy.latest
)
```
## Requirements
```bash
# Core package
uv add apscheduler
# Database backends
uv add "apscheduler[postgresql]" # PostgreSQL
uv add "apscheduler[mongodb]" # MongoDB
uv add "apscheduler[sqlite]" # SQLite
# Event brokers
uv add "apscheduler[redis]" # Redis
uv add "apscheduler[mqtt]" # MQTT
```
**Dependencies by use case:**
- **Basic scheduling**: `apscheduler`
- **PostgreSQL persistence**: `asyncpg`, `sqlalchemy`
- **Redis distributed**: `redis`
- **MongoDB**: `motor`
- **SQLite**: `aiosqlite`
## Best Practices
1. **Use persistent storage** for production to survive restarts
2. **Set appropriate misfire_grace_time** to handle system delays
3. **Use conflict policies** when updating schedules
4. **Subscribe to events** for monitoring and debugging
5. **Choose appropriate executors** (threadpool for I/O, processpool for CPU)
6. **Implement error handling** in job functions
7. **Use unique schedule IDs** for management operations
8. **Set result expiration** to prevent memory leaksRelated Skills
azure-quotas
Check/manage Azure quotas and usage across providers. For deployment planning, capacity validation, region selection. WHEN: "check quotas", "service limits", "current usage", "request quota increase", "quota exceeded", "validate capacity", "regional availability", "provisioning limits", "vCPU limit", "how many vCPUs available in my subscription".
raindrop-io
Manage Raindrop.io bookmarks with AI assistance. Save and organize bookmarks, search your collection, manage reading lists, and organize research materials. Use when working with bookmarks, web research, reading lists, or when user mentions Raindrop.io.
zlibrary-to-notebooklm
自动从 Z-Library 下载书籍并上传到 Google NotebookLM。支持 PDF/EPUB 格式,自动转换,一键创建知识库。
discover-skills
当你发现当前可用的技能都不够合适(或用户明确要求你寻找技能)时使用。本技能会基于任务目标和约束,给出一份精简的候选技能清单,帮助你选出最适配当前任务的技能。
web-performance-seo
Fix PageSpeed Insights/Lighthouse accessibility "!" errors caused by contrast audit failures (CSS filters, OKLCH/OKLAB, low opacity, gradient text, image backgrounds). Use for accessibility-driven SEO/performance debugging and remediation.
project-to-obsidian
将代码项目转换为 Obsidian 知识库。当用户提到 obsidian、项目文档、知识库、分析项目、转换项目 时激活。 【激活后必须执行】: 1. 先完整阅读本 SKILL.md 文件 2. 理解 AI 写入规则(默认到 00_Inbox/AI/、追加式、统一 Schema) 3. 执行 STEP 0: 使用 AskUserQuestion 询问用户确认 4. 用户确认后才开始 STEP 1 项目扫描 5. 严格按 STEP 0 → 1 → 2 → 3 → 4 顺序执行 【禁止行为】: - 禁止不读 SKILL.md 就开始分析项目 - 禁止跳过 STEP 0 用户确认 - 禁止直接在 30_Resources 创建(先到 00_Inbox/AI/) - 禁止自作主张决定输出位置
obsidian-helper
Obsidian 智能笔记助手。当用户提到 obsidian、日记、笔记、知识库、capture、review 时激活。 【激活后必须执行】: 1. 先完整阅读本 SKILL.md 文件 2. 理解 AI 写入三条硬规矩(00_Inbox/AI/、追加式、白名单字段) 3. 按 STEP 0 → STEP 1 → ... 顺序执行 4. 不要跳过任何步骤,不要自作主张 【禁止行为】: - 禁止不读 SKILL.md 就开始工作 - 禁止跳过用户确认步骤 - 禁止在非 00_Inbox/AI/ 位置创建新笔记(除非用户明确指定)
internationalizing-websites
Adds multi-language support to Next.js websites with proper SEO configuration including hreflang tags, localized sitemaps, and language-specific content. Use when adding new languages, setting up i18n, optimizing for international SEO, or when user mentions localization, translation, multi-language, or specific languages like Japanese, Korean, Chinese.
google-official-seo-guide
Official Google SEO guide covering search optimization, best practices, Search Console, crawling, indexing, and improving website search visibility based on official Google documentation
github-release-assistant
Generate bilingual GitHub release documentation (README.md + README.zh.md) from repo metadata and user input, and guide release prep with git add/commit/push. Use when the user asks to write or polish README files, create bilingual docs, prepare a GitHub release, or mentions release assistant/README generation.
doc-sync-tool
自动同步项目中的 Agents.md、claude.md 和 gemini.md 文件,保持内容一致性。支持自动监听和手动触发。
deploying-to-production
Automate creating a GitHub repository and deploying a web project to Vercel. Use when the user asks to deploy a website/app to production, publish a project, or set up GitHub + Vercel deployment.