micropython-skills/algorithm
MicroPython on-device algorithms — PID controller, moving average, Kalman filter, state machine, task scheduler, data logger.
About this skill
This skill offers a suite of essential algorithms implemented purely in MicroPython, designed to run efficiently on resource-constrained embedded devices. These 'Safe tier' algorithms perform computations without direct hardware side effects, focusing on core logic that can be easily integrated into larger projects. The collection includes a robust PID controller with anti-windup for precise control loops, moving average and Kalman filters for effective sensor data smoothing, a versatile state machine for managing device logic, a task scheduler for cooperative multitasking, and a data logger. Developers can leverage these pre-built algorithms to accelerate the creation of intelligent embedded applications. They are particularly valuable for tasks requiring closed-loop control (e.g., temperature, motor speed), signal processing from noisy sensors, managing complex device behaviors, or scheduling multiple operations on a single-core microcontroller. By providing battle-tested implementations, the skill reduces development time and minimizes potential errors often associated with implementing these complex algorithms from scratch. This skill simplifies the process of bringing advanced functionality to MicroPython projects, allowing users to quickly upload and integrate these modules into their device firmware. It serves as a foundational toolkit for building reliable and responsive IoT devices, automation systems, and other embedded solutions.
Best use case
This skill is primarily used by embedded systems developers, IoT engineers, and hobbyists working with MicroPython to implement robust control systems, data filtering, and device management on hardware. It streamlines the development of applications requiring precision control, reliable sensor data processing, or structured task execution on devices like ESP32, ESP8266, or Raspberry Pi Pico.
MicroPython on-device algorithms — PID controller, moving average, Kalman filter, state machine, task scheduler, data logger.
Users should expect a functional MicroPython script integrating robust algorithms for control, data filtering, or task management directly on their embedded device.
Practical example
Example input
Implement a PID controller in MicroPython for an oven to maintain a temperature of 180 degrees Celsius, using the provided `micropython-skills/algorithm` and suggest how to connect it to a heater output.
Example output
```python
import time
# Assuming algorithm.py is uploaded and PID class is available
# from algorithm import PID
class PID: # Placeholder if algorithm.py is not yet imported
def __init__(self, kp, ki, kd, setpoint=0, output_min=-100, output_max=100):
self.kp = kp
self.ki = ki
self.kd = kd
self.setpoint = setpoint
self.output_min = output_min
self.output_max = output_max
self._integral = 0
self._last_error = 0
self._last_time = time.ticks_ms()
def compute(self, measurement):
now = time.ticks_ms()
dt = time.ticks_diff(now, self._last_time) / 1000.0
if dt <= 0:
dt = 0.001
self._last_time = now
error = self.setpoint - measurement
self._integral += error * dt
derivative = (error - self._last_error) / dt
self._last_error = error
output = self.kp * error + self.ki * self._integral + self.kd * derivative
if output > self.output_max:
output = self.output_max
self._integral -= error * dt
elif output < self.output_min:
output = self.output_min
self._integral -= error * dt
return output
# --- Example Application for Oven Control ---
# Initialize PID controller for oven temperature
# Assuming a heater controlled by PWM (0-100%)
oven_pid = PID(kp=5.0, ki=0.8, kd=0.2, setpoint=180.0, output_min=0, output_max=100)
# Simulate temperature sensor (replace with actual sensor reading)
def read_oven_temperature():
# In a real scenario, this would read from a sensor like a thermistor or thermocouple
# For simulation, let's just make it approach the setpoint
global current_simulated_temp
# Simple simulation for demonstration
if current_simulated_temp < oven_pid.setpoint:
current_simulated_temp += 0.5 # Increase temp
else:
current_simulated_temp -= 0.1 # Slow decrease if overshoot
return current_simulated_temp
# Simulate heater control (replace with actual PWM output to a relay/MOSFET)
def set_heater_power(power_percent):
# In a real scenario, this would control a GPIO pin with PWM
# e.g., from machine import Pin, PWM; heater_pwm = PWM(Pin(2)); heater_pwm.duty_u16(int(power_percent/100 * 65535))
print(f"Setting heater power to: {power_percent:.1f}%")
current_simulated_temp = 25.0 # Starting room temperature
print("Starting oven temperature control simulation...")
for i in range(50):
current_temp = read_oven_temperature()
control_output = oven_pid.compute(current_temp)
print(f"Step {i+1}: Temp = {current_temp:.2f}°C, PID Output = {control_output:.1f}")
set_heater_power(control_output)
time.sleep_ms(500) # Simulate a control loop delay
print("Oven temperature control simulation finished.")
```When to use this skill
- Developing MicroPython applications that require precise control loops (e.g., HVAC, motor control) using PID.
- Filtering noisy sensor data from accelerometers, temperature sensors, etc., using moving average or Kalman filters.
- Implementing finite state machines to manage complex device behaviors or user interfaces.
- Scheduling multiple cooperative tasks on a MicroPython device without an RTOS.
When not to use this skill
- For projects not involving MicroPython or embedded hardware development.
- When extremely high-performance, real-time critical systems require bare-metal C/C++ implementations.
- If a full-fledged RTOS (Real-Time Operating System) with preemptive multitasking is available and preferred.
- For cloud-based or server-side data processing and analytics rather than on-device computation.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/algorithm/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How micropython-skills/algorithm Compares
| Feature / Agent | micropython-skills/algorithm | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | easy | N/A |
Frequently Asked Questions
What does this skill do?
MicroPython on-device algorithms — PID controller, moving average, Kalman filter, state machine, task scheduler, data logger.
How difficult is it to install?
The installation complexity is rated as easy. You can find the installation instructions above.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
AI Agents for Coding
Browse AI agent skills for coding, debugging, testing, refactoring, code review, and developer workflows across Claude, Cursor, and Codex.
Cursor vs Codex for AI Workflows
Compare Cursor and Codex for AI coding workflows, repository assistance, debugging, refactoring, and reusable developer skills.
AI Agents for Startups
Explore AI agent skills for startup validation, product research, growth experiments, documentation, and fast execution with small teams.
SKILL.md Source
# On-Device Algorithms
Pure MicroPython algorithm implementations that run on the device.
These are **Safe tier** — no direct hardware side effects.
These algorithms are typically combined with sensor reads (input) and actuator control (output).
Upload as `.py` files via `mpremote fs cp algorithm.py :` for reuse.
## PID Controller
Classic PID with anti-windup for control loops (e.g., temperature regulation):
```python
import time, json
class PID:
def __init__(self, kp, ki, kd, setpoint=0, output_min=-100, output_max=100):
self.kp = kp
self.ki = ki
self.kd = kd
self.setpoint = setpoint
self.output_min = output_min
self.output_max = output_max
self._integral = 0
self._last_error = 0
self._last_time = time.ticks_ms()
def compute(self, measurement):
now = time.ticks_ms()
dt = time.ticks_diff(now, self._last_time) / 1000.0
if dt <= 0:
dt = 0.001
self._last_time = now
error = self.setpoint - measurement
self._integral += error * dt
derivative = (error - self._last_error) / dt
self._last_error = error
output = self.kp * error + self.ki * self._integral + self.kd * derivative
# Anti-windup: clamp output and freeze integral if saturated
if output > self.output_max:
output = self.output_max
self._integral -= error * dt
elif output < self.output_min:
output = self.output_min
self._integral -= error * dt
return output
# Example usage: temperature control
try:
pid = PID(kp=2.0, ki=0.5, kd=0.1, setpoint=25.0, output_min=0, output_max=100)
# Simulate 5 steps
temps = [22.0, 23.1, 24.0, 24.8, 25.1]
results = []
for t in temps:
out = pid.compute(t)
results.append({"temp": t, "output": round(out, 2)})
time.sleep_ms(100)
print("RESULT:" + json.dumps({"pid_steps": results}))
except Exception as e:
print("ERROR:" + str(e))
```
## Moving Average Filter
Smooths noisy sensor readings using a ring buffer:
```python
import json
class MovingAverage:
def __init__(self, window_size=10):
self._buf = [0.0] * window_size
self._idx = 0
self._count = 0
self._sum = 0.0
self._size = window_size
def add(self, value):
old = self._buf[self._idx]
self._buf[self._idx] = value
self._sum += value - old
self._idx = (self._idx + 1) % self._size
if self._count < self._size:
self._count += 1
return self._sum / self._count
# Example: smooth noisy ADC readings
try:
filt = MovingAverage(window_size=5)
raw = [512, 498, 525, 510, 503, 515, 508, 520, 502, 511]
smoothed = []
for v in raw:
s = filt.add(v)
smoothed.append(round(s, 1))
print("RESULT:" + json.dumps({"raw": raw, "smoothed": smoothed}))
except Exception as e:
print("ERROR:" + str(e))
```
## Kalman Filter (1D)
Better noise rejection than moving average, adapts to signal dynamics:
```python
import json
class KalmanFilter:
def __init__(self, process_variance=0.01, measurement_variance=0.1, initial_estimate=0):
self.q = process_variance # Process noise
self.r = measurement_variance # Measurement noise
self.x = initial_estimate # Current estimate
self.p = 1.0 # Estimation error
def update(self, measurement):
# Prediction
self.p += self.q
# Update
k = self.p / (self.p + self.r) # Kalman gain
self.x += k * (measurement - self.x)
self.p *= (1 - k)
return self.x
# Example: filter noisy temperature readings
try:
kf = KalmanFilter(process_variance=0.01, measurement_variance=0.5, initial_estimate=23.0)
readings = [23.2, 22.8, 23.5, 23.1, 24.0, 23.3, 22.9, 23.4, 23.0, 23.2]
filtered = []
for r in readings:
f = kf.update(r)
filtered.append(round(f, 2))
print("RESULT:" + json.dumps({"raw": readings, "filtered": filtered}))
except Exception as e:
print("ERROR:" + str(e))
```
## State Machine
Event-driven finite state machine for device behavior control:
```python
import json
class StateMachine:
def __init__(self, initial_state, transitions):
"""
transitions: dict mapping (state, event) -> (new_state, action_fn)
action_fn receives (old_state, event, new_state) and returns optional data.
"""
self.state = initial_state
self.transitions = transitions
self.history = []
def handle(self, event):
key = (self.state, event)
if key not in self.transitions:
return None
old = self.state
new_state, action = self.transitions[key]
result = action(old, event, new_state) if action else None
self.history.append({"from": old, "event": event, "to": new_state})
self.state = new_state
return result
# Example: simple thermostat (IDLE -> HEATING -> IDLE)
try:
def start_heat(old, ev, new):
return "heater ON"
def stop_heat(old, ev, new):
return "heater OFF"
sm = StateMachine("IDLE", {
("IDLE", "too_cold"): ("HEATING", start_heat),
("HEATING", "reached"): ("IDLE", stop_heat),
("IDLE", "too_hot"): ("COOLING", lambda o,e,n: "cooler ON"),
("COOLING", "reached"): ("IDLE", lambda o,e,n: "cooler OFF"),
})
events = ["too_cold", "reached", "too_hot", "reached"]
actions = []
for ev in events:
a = sm.handle(ev)
actions.append({"event": ev, "action": a, "state": sm.state})
print("RESULT:" + json.dumps({"transitions": actions}))
except Exception as e:
print("ERROR:" + str(e))
```
## Cooperative Task Scheduler
Run multiple periodic tasks without threads (uses Timer or manual scheduling):
```python
import time, json
class Scheduler:
def __init__(self):
self.tasks = []
def add(self, name, interval_ms, callback):
self.tasks.append({
"name": name,
"interval": interval_ms,
"callback": callback,
"last_run": 0,
})
def run(self, duration_ms=5000):
"""Run all tasks cooperatively for a fixed duration."""
start = time.ticks_ms()
log = []
while time.ticks_diff(time.ticks_ms(), start) < duration_ms:
now = time.ticks_ms()
for task in self.tasks:
if time.ticks_diff(now, task["last_run"]) >= task["interval"]:
result = task["callback"]()
log.append({"task": task["name"], "time_ms": time.ticks_diff(now, start), "result": result})
task["last_run"] = now
time.sleep_ms(10)
return log
# Example: read sensor every 500ms, log every 2000ms
try:
counter = {"reads": 0}
def read_sensor():
counter["reads"] += 1
return counter["reads"]
def log_status():
return "reads=" + str(counter["reads"])
sched = Scheduler()
sched.add("sensor", 500, read_sensor)
sched.add("log", 2000, log_status)
events = sched.run(duration_ms=3000)
print("RESULT:" + json.dumps({"events": events}))
except Exception as e:
print("ERROR:" + str(e))
```
## Data Logger
Write sensor data to CSV on the device filesystem with size rotation:
```python
import os, time, json
class DataLogger:
def __init__(self, filename="data.csv", max_size_kb=100):
self.filename = filename
self.max_bytes = max_size_kb * 1024
self._ensure_header()
def _ensure_header(self):
try:
os.stat(self.filename)
except OSError:
with open(self.filename, "w") as f:
f.write("timestamp,value\n")
def _check_rotation(self):
try:
size = os.stat(self.filename)[6]
if size > self.max_bytes:
try:
os.remove(self.filename + ".old")
except:
pass
os.rename(self.filename, self.filename + ".old")
self._ensure_header()
return True
except:
pass
return False
def log(self, value):
self._check_rotation()
with open(self.filename, "a") as f:
f.write("{},{}\n".format(time.time(), value))
def read_last(self, n=5):
lines = []
with open(self.filename, "r") as f:
for line in f:
lines.append(line.strip())
return lines[-(n+1):] # Include header
# Example
try:
logger = DataLogger("sensor_log.csv", max_size_kb=50)
for i in range(5):
logger.log(22.5 + i * 0.3)
last = logger.read_last(5)
print("RESULT:" + json.dumps({"log_file": "sensor_log.csv", "last_entries": last}))
except Exception as e:
print("ERROR:" + str(e))
```Related Skills
micropython-skills/sensor
MicroPython sensor reading — DHT11/22, BME280, MPU6050, ADC, ultrasonic HC-SR04, photoresistor, generic I2C sensors.
micropython-skills/network
MicroPython networking — WiFi STA/AP, HTTP requests, MQTT pub/sub, BLE, NTP time sync, WebSocket.
Go Production Engineering
You are a Go production engineering expert. Follow this system for every Go project — from architecture decisions through production deployment. Apply phases sequentially for new projects; use individual phases as needed for existing codebases.
Database Engineering Mastery
> Complete database design, optimization, migration, and operations system. From schema design to production monitoring — covers PostgreSQL, MySQL, SQLite, and general SQL patterns.
afrexai-code-reviewer
Enterprise-grade code review agent. Reviews PRs, diffs, or code files for security vulnerabilities, performance issues, error handling gaps, architecture smells, and test coverage. Works with any language, any repo, no dependencies required.
API Documentation Generator
Generate production-ready API documentation from endpoint descriptions. Outputs OpenAPI 3.0, markdown reference docs, and SDK quickstart guides.
bili-rs
Development skill for bili-rs, a Rust CLI tool for Bilibili (B站). Use when implementing features, fixing bugs, or extending the bilibili-cli-rust codebase. Provides architecture conventions, API endpoints, coding patterns, and project-specific constraints. Triggers on tasks involving adding CLI commands, calling Bilibili APIs, handling authentication, implementing output formatting, or working with the layered cli/commands/client/payloads architecture.
Puppeteer
Automate Chrome and Chromium with Puppeteer for scraping, testing, screenshots, and browser workflows.
pharaoh
Codebase knowledge graph with 23 development workflow skills. Query architecture, dependencies, blast radius, dead code, and test coverage via MCP. Requires GitHub App installation (read-only repo access) and OAuth authentication. Connects to external MCP server at mcp.pharaoh.so.
git-commit-helper
Generate standardized git commit messages following Conventional Commits format. Use this skill when the user asks to commit code, write a commit message, or create a git commit. Enforces team conventions for type prefixes, scope naming, message length, and breaking change documentation.
ask-claude
Delegate a task to Claude Code CLI and immediately report the result back in chat. Supports persistent sessions with full context memory. Safe execution: no data exfiltration, no external calls, file operations confined to workspace. Use when the user asks to run Claude, delegate a coding task, continue a previous Claude session, or any task benefiting from Claude Code's tools (file editing, code analysis, bash, etc.).
bnbchain-mcp
Interact with the BNB Chain Model Context Protocol (MCP) server. Blocks, contracts, tokens, NFTs, wallet, Greenfield, and ERC-8004 agent tools. Use npx @bnb-chain/mcp@latest or read the official skill page.