micropython-skills/algorithm

MicroPython on-device algorithms — PID controller, moving average, Kalman filter, state machine, task scheduler, data logger.

3,891 stars
Complexity: easy

About this skill

This skill offers a suite of essential algorithms implemented purely in MicroPython, designed to run efficiently on resource-constrained embedded devices. These 'Safe tier' algorithms perform computations without direct hardware side effects, focusing on core logic that can be easily integrated into larger projects. The collection includes a robust PID controller with anti-windup for precise control loops, moving average and Kalman filters for effective sensor data smoothing, a versatile state machine for managing device logic, a task scheduler for cooperative multitasking, and a data logger. Developers can leverage these pre-built algorithms to accelerate the creation of intelligent embedded applications. They are particularly valuable for tasks requiring closed-loop control (e.g., temperature, motor speed), signal processing from noisy sensors, managing complex device behaviors, or scheduling multiple operations on a single-core microcontroller. By providing battle-tested implementations, the skill reduces development time and minimizes potential errors often associated with implementing these complex algorithms from scratch. This skill simplifies the process of bringing advanced functionality to MicroPython projects, allowing users to quickly upload and integrate these modules into their device firmware. It serves as a foundational toolkit for building reliable and responsive IoT devices, automation systems, and other embedded solutions.

Best use case

This skill is primarily used by embedded systems developers, IoT engineers, and hobbyists working with MicroPython to implement robust control systems, data filtering, and device management on hardware. It streamlines the development of applications requiring precision control, reliable sensor data processing, or structured task execution on devices like ESP32, ESP8266, or Raspberry Pi Pico.

MicroPython on-device algorithms — PID controller, moving average, Kalman filter, state machine, task scheduler, data logger.

Users should expect a functional MicroPython script integrating robust algorithms for control, data filtering, or task management directly on their embedded device.

Practical example

Example input

Implement a PID controller in MicroPython for an oven to maintain a temperature of 180 degrees Celsius, using the provided `micropython-skills/algorithm` and suggest how to connect it to a heater output.

Example output

```python
import time

# Assuming algorithm.py is uploaded and PID class is available
# from algorithm import PID

class PID: # Placeholder if algorithm.py is not yet imported
    def __init__(self, kp, ki, kd, setpoint=0, output_min=-100, output_max=100):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.setpoint = setpoint
        self.output_min = output_min
        self.output_max = output_max
        self._integral = 0
        self._last_error = 0
        self._last_time = time.ticks_ms()

    def compute(self, measurement):
        now = time.ticks_ms()
        dt = time.ticks_diff(now, self._last_time) / 1000.0
        if dt <= 0:
            dt = 0.001
        self._last_time = now

        error = self.setpoint - measurement
        self._integral += error * dt
        derivative = (error - self._last_error) / dt
        self._last_error = error

        output = self.kp * error + self.ki * self._integral + self.kd * derivative

        if output > self.output_max:
            output = self.output_max
            self._integral -= error * dt
        elif output < self.output_min:
            output = self.output_min
            self._integral -= error * dt

        return output

# --- Example Application for Oven Control ---

# Initialize PID controller for oven temperature
# Assuming a heater controlled by PWM (0-100%)
oven_pid = PID(kp=5.0, ki=0.8, kd=0.2, setpoint=180.0, output_min=0, output_max=100)

# Simulate temperature sensor (replace with actual sensor reading)
def read_oven_temperature():
    # In a real scenario, this would read from a sensor like a thermistor or thermocouple
    # For simulation, let's just make it approach the setpoint
    global current_simulated_temp
    # Simple simulation for demonstration
    if current_simulated_temp < oven_pid.setpoint:
        current_simulated_temp += 0.5 # Increase temp
    else:
        current_simulated_temp -= 0.1 # Slow decrease if overshoot
    return current_simulated_temp

# Simulate heater control (replace with actual PWM output to a relay/MOSFET)
def set_heater_power(power_percent):
    # In a real scenario, this would control a GPIO pin with PWM
    # e.g., from machine import Pin, PWM; heater_pwm = PWM(Pin(2)); heater_pwm.duty_u16(int(power_percent/100 * 65535))
    print(f"Setting heater power to: {power_percent:.1f}%")

current_simulated_temp = 25.0 # Starting room temperature

print("Starting oven temperature control simulation...")
for i in range(50):
    current_temp = read_oven_temperature()
    control_output = oven_pid.compute(current_temp)

    print(f"Step {i+1}: Temp = {current_temp:.2f}°C, PID Output = {control_output:.1f}")
    set_heater_power(control_output)
    time.sleep_ms(500) # Simulate a control loop delay

print("Oven temperature control simulation finished.")
```

When to use this skill

  • Developing MicroPython applications that require precise control loops (e.g., HVAC, motor control) using PID.
  • Filtering noisy sensor data from accelerometers, temperature sensors, etc., using moving average or Kalman filters.
  • Implementing finite state machines to manage complex device behaviors or user interfaces.
  • Scheduling multiple cooperative tasks on a MicroPython device without an RTOS.

When not to use this skill

  • For projects not involving MicroPython or embedded hardware development.
  • When extremely high-performance, real-time critical systems require bare-metal C/C++ implementations.
  • If a full-fledged RTOS (Real-Time Operating System) with preemptive multitasking is available and preferred.
  • For cloud-based or server-side data processing and analytics rather than on-device computation.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/algorithm/SKILL.md --create-dirs "https://raw.githubusercontent.com/openclaw/skills/main/skills/0x1abin/micropython-skills/skills/algorithm/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/algorithm/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How micropython-skills/algorithm Compares

Feature / Agentmicropython-skills/algorithmStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityeasyN/A

Frequently Asked Questions

What does this skill do?

MicroPython on-device algorithms — PID controller, moving average, Kalman filter, state machine, task scheduler, data logger.

How difficult is it to install?

The installation complexity is rated as easy. You can find the installation instructions above.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# On-Device Algorithms

Pure MicroPython algorithm implementations that run on the device.
These are **Safe tier** — no direct hardware side effects.

These algorithms are typically combined with sensor reads (input) and actuator control (output).
Upload as `.py` files via `mpremote fs cp algorithm.py :` for reuse.

## PID Controller

Classic PID with anti-windup for control loops (e.g., temperature regulation):

```python
import time, json

class PID:
    def __init__(self, kp, ki, kd, setpoint=0, output_min=-100, output_max=100):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.setpoint = setpoint
        self.output_min = output_min
        self.output_max = output_max
        self._integral = 0
        self._last_error = 0
        self._last_time = time.ticks_ms()

    def compute(self, measurement):
        now = time.ticks_ms()
        dt = time.ticks_diff(now, self._last_time) / 1000.0
        if dt <= 0:
            dt = 0.001
        self._last_time = now

        error = self.setpoint - measurement
        self._integral += error * dt
        derivative = (error - self._last_error) / dt
        self._last_error = error

        output = self.kp * error + self.ki * self._integral + self.kd * derivative

        # Anti-windup: clamp output and freeze integral if saturated
        if output > self.output_max:
            output = self.output_max
            self._integral -= error * dt
        elif output < self.output_min:
            output = self.output_min
            self._integral -= error * dt

        return output

# Example usage: temperature control
try:
    pid = PID(kp=2.0, ki=0.5, kd=0.1, setpoint=25.0, output_min=0, output_max=100)
    # Simulate 5 steps
    temps = [22.0, 23.1, 24.0, 24.8, 25.1]
    results = []
    for t in temps:
        out = pid.compute(t)
        results.append({"temp": t, "output": round(out, 2)})
        time.sleep_ms(100)
    print("RESULT:" + json.dumps({"pid_steps": results}))
except Exception as e:
    print("ERROR:" + str(e))
```

## Moving Average Filter

Smooths noisy sensor readings using a ring buffer:

```python
import json

class MovingAverage:
    def __init__(self, window_size=10):
        self._buf = [0.0] * window_size
        self._idx = 0
        self._count = 0
        self._sum = 0.0
        self._size = window_size

    def add(self, value):
        old = self._buf[self._idx]
        self._buf[self._idx] = value
        self._sum += value - old
        self._idx = (self._idx + 1) % self._size
        if self._count < self._size:
            self._count += 1
        return self._sum / self._count

# Example: smooth noisy ADC readings
try:
    filt = MovingAverage(window_size=5)
    raw = [512, 498, 525, 510, 503, 515, 508, 520, 502, 511]
    smoothed = []
    for v in raw:
        s = filt.add(v)
        smoothed.append(round(s, 1))
    print("RESULT:" + json.dumps({"raw": raw, "smoothed": smoothed}))
except Exception as e:
    print("ERROR:" + str(e))
```

## Kalman Filter (1D)

Better noise rejection than moving average, adapts to signal dynamics:

```python
import json

class KalmanFilter:
    def __init__(self, process_variance=0.01, measurement_variance=0.1, initial_estimate=0):
        self.q = process_variance      # Process noise
        self.r = measurement_variance  # Measurement noise
        self.x = initial_estimate      # Current estimate
        self.p = 1.0                   # Estimation error

    def update(self, measurement):
        # Prediction
        self.p += self.q
        # Update
        k = self.p / (self.p + self.r)  # Kalman gain
        self.x += k * (measurement - self.x)
        self.p *= (1 - k)
        return self.x

# Example: filter noisy temperature readings
try:
    kf = KalmanFilter(process_variance=0.01, measurement_variance=0.5, initial_estimate=23.0)
    readings = [23.2, 22.8, 23.5, 23.1, 24.0, 23.3, 22.9, 23.4, 23.0, 23.2]
    filtered = []
    for r in readings:
        f = kf.update(r)
        filtered.append(round(f, 2))
    print("RESULT:" + json.dumps({"raw": readings, "filtered": filtered}))
except Exception as e:
    print("ERROR:" + str(e))
```

## State Machine

Event-driven finite state machine for device behavior control:

```python
import json

class StateMachine:
    def __init__(self, initial_state, transitions):
        """
        transitions: dict mapping (state, event) -> (new_state, action_fn)
        action_fn receives (old_state, event, new_state) and returns optional data.
        """
        self.state = initial_state
        self.transitions = transitions
        self.history = []

    def handle(self, event):
        key = (self.state, event)
        if key not in self.transitions:
            return None
        old = self.state
        new_state, action = self.transitions[key]
        result = action(old, event, new_state) if action else None
        self.history.append({"from": old, "event": event, "to": new_state})
        self.state = new_state
        return result

# Example: simple thermostat (IDLE -> HEATING -> IDLE)
try:
    def start_heat(old, ev, new):
        return "heater ON"

    def stop_heat(old, ev, new):
        return "heater OFF"

    sm = StateMachine("IDLE", {
        ("IDLE", "too_cold"):    ("HEATING", start_heat),
        ("HEATING", "reached"):  ("IDLE", stop_heat),
        ("IDLE", "too_hot"):     ("COOLING", lambda o,e,n: "cooler ON"),
        ("COOLING", "reached"):  ("IDLE", lambda o,e,n: "cooler OFF"),
    })

    events = ["too_cold", "reached", "too_hot", "reached"]
    actions = []
    for ev in events:
        a = sm.handle(ev)
        actions.append({"event": ev, "action": a, "state": sm.state})

    print("RESULT:" + json.dumps({"transitions": actions}))
except Exception as e:
    print("ERROR:" + str(e))
```

## Cooperative Task Scheduler

Run multiple periodic tasks without threads (uses Timer or manual scheduling):

```python
import time, json

class Scheduler:
    def __init__(self):
        self.tasks = []

    def add(self, name, interval_ms, callback):
        self.tasks.append({
            "name": name,
            "interval": interval_ms,
            "callback": callback,
            "last_run": 0,
        })

    def run(self, duration_ms=5000):
        """Run all tasks cooperatively for a fixed duration."""
        start = time.ticks_ms()
        log = []
        while time.ticks_diff(time.ticks_ms(), start) < duration_ms:
            now = time.ticks_ms()
            for task in self.tasks:
                if time.ticks_diff(now, task["last_run"]) >= task["interval"]:
                    result = task["callback"]()
                    log.append({"task": task["name"], "time_ms": time.ticks_diff(now, start), "result": result})
                    task["last_run"] = now
            time.sleep_ms(10)
        return log

# Example: read sensor every 500ms, log every 2000ms
try:
    counter = {"reads": 0}

    def read_sensor():
        counter["reads"] += 1
        return counter["reads"]

    def log_status():
        return "reads=" + str(counter["reads"])

    sched = Scheduler()
    sched.add("sensor", 500, read_sensor)
    sched.add("log", 2000, log_status)

    events = sched.run(duration_ms=3000)
    print("RESULT:" + json.dumps({"events": events}))
except Exception as e:
    print("ERROR:" + str(e))
```

## Data Logger

Write sensor data to CSV on the device filesystem with size rotation:

```python
import os, time, json

class DataLogger:
    def __init__(self, filename="data.csv", max_size_kb=100):
        self.filename = filename
        self.max_bytes = max_size_kb * 1024
        self._ensure_header()

    def _ensure_header(self):
        try:
            os.stat(self.filename)
        except OSError:
            with open(self.filename, "w") as f:
                f.write("timestamp,value\n")

    def _check_rotation(self):
        try:
            size = os.stat(self.filename)[6]
            if size > self.max_bytes:
                try:
                    os.remove(self.filename + ".old")
                except:
                    pass
                os.rename(self.filename, self.filename + ".old")
                self._ensure_header()
                return True
        except:
            pass
        return False

    def log(self, value):
        self._check_rotation()
        with open(self.filename, "a") as f:
            f.write("{},{}\n".format(time.time(), value))

    def read_last(self, n=5):
        lines = []
        with open(self.filename, "r") as f:
            for line in f:
                lines.append(line.strip())
        return lines[-(n+1):]  # Include header

# Example
try:
    logger = DataLogger("sensor_log.csv", max_size_kb=50)
    for i in range(5):
        logger.log(22.5 + i * 0.3)
    last = logger.read_last(5)
    print("RESULT:" + json.dumps({"log_file": "sensor_log.csv", "last_entries": last}))
except Exception as e:
    print("ERROR:" + str(e))
```

Related Skills

micropython-skills/sensor

3891
from openclaw/skills

MicroPython sensor reading — DHT11/22, BME280, MPU6050, ADC, ultrasonic HC-SR04, photoresistor, generic I2C sensors.

Coding & Development

micropython-skills/network

3891
from openclaw/skills

MicroPython networking — WiFi STA/AP, HTTP requests, MQTT pub/sub, BLE, NTP time sync, WebSocket.

Coding & Development

Go Production Engineering

3891
from openclaw/skills

You are a Go production engineering expert. Follow this system for every Go project — from architecture decisions through production deployment. Apply phases sequentially for new projects; use individual phases as needed for existing codebases.

Coding & Development

Database Engineering Mastery

3891
from openclaw/skills

> Complete database design, optimization, migration, and operations system. From schema design to production monitoring — covers PostgreSQL, MySQL, SQLite, and general SQL patterns.

Coding & Development

afrexai-code-reviewer

3891
from openclaw/skills

Enterprise-grade code review agent. Reviews PRs, diffs, or code files for security vulnerabilities, performance issues, error handling gaps, architecture smells, and test coverage. Works with any language, any repo, no dependencies required.

Coding & Development

API Documentation Generator

3891
from openclaw/skills

Generate production-ready API documentation from endpoint descriptions. Outputs OpenAPI 3.0, markdown reference docs, and SDK quickstart guides.

Coding & Development

bili-rs

3891
from openclaw/skills

Development skill for bili-rs, a Rust CLI tool for Bilibili (B站). Use when implementing features, fixing bugs, or extending the bilibili-cli-rust codebase. Provides architecture conventions, API endpoints, coding patterns, and project-specific constraints. Triggers on tasks involving adding CLI commands, calling Bilibili APIs, handling authentication, implementing output formatting, or working with the layered cli/commands/client/payloads architecture.

Coding & Development

Puppeteer

3891
from openclaw/skills

Automate Chrome and Chromium with Puppeteer for scraping, testing, screenshots, and browser workflows.

Coding & Development

pharaoh

3891
from openclaw/skills

Codebase knowledge graph with 23 development workflow skills. Query architecture, dependencies, blast radius, dead code, and test coverage via MCP. Requires GitHub App installation (read-only repo access) and OAuth authentication. Connects to external MCP server at mcp.pharaoh.so.

Coding & Development

git-commit-helper

3891
from openclaw/skills

Generate standardized git commit messages following Conventional Commits format. Use this skill when the user asks to commit code, write a commit message, or create a git commit. Enforces team conventions for type prefixes, scope naming, message length, and breaking change documentation.

Coding & Development

ask-claude

3891
from openclaw/skills

Delegate a task to Claude Code CLI and immediately report the result back in chat. Supports persistent sessions with full context memory. Safe execution: no data exfiltration, no external calls, file operations confined to workspace. Use when the user asks to run Claude, delegate a coding task, continue a previous Claude session, or any task benefiting from Claude Code's tools (file editing, code analysis, bash, etc.).

Coding & Development

bnbchain-mcp

3891
from openclaw/skills

Interact with the BNB Chain Model Context Protocol (MCP) server. Blocks, contracts, tokens, NFTs, wallet, Greenfield, and ERC-8004 agent tools. Use npx @bnb-chain/mcp@latest or read the official skill page.

Coding & Development