database-engine-implementation

Guide for implementing database engines in IterableData. Use when adding support for new SQL or NoSQL databases, implementing DBDriver classes, or extending database capabilities.

16 stars

Best use case

database-engine-implementation is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Guide for implementing database engines in IterableData. Use when adding support for new SQL or NoSQL databases, implementing DBDriver classes, or extending database capabilities.

Teams using database-engine-implementation should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/database-engine-implementation/SKILL.md --create-dirs "https://raw.githubusercontent.com/diegosouzapw/awesome-omni-skill/main/skills/development/database-engine-implementation/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/database-engine-implementation/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How database-engine-implementation Compares

Feature / Agentdatabase-engine-implementationStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Guide for implementing database engines in IterableData. Use when adding support for new SQL or NoSQL databases, implementing DBDriver classes, or extending database capabilities.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Database Engine Implementation Guide

## Overview

Database engines provide read-only access to SQL and NoSQL databases as iterable data sources. They wrap database drivers to provide a unified `BaseIterable` interface.

## Architecture

- **DBDriver** (`iterable/db/base.py`) - Abstract base class for database drivers
- **DatabaseIterable** (`iterable/db/iterable.py`) - Wrapper that makes DBDriver work as BaseIterable
- **Driver Registry** (`iterable/db/__init__.py`) - Registry for mapping engine names to driver classes

## Adding New Database Engines

### Step-by-Step Process

1. **Create driver file**: `iterable/db/<engine>.py`
2. **Implement DBDriver**: Inherit from `DBDriver` in `iterable/db/base.py`
3. **Required methods**: `connect()`, `iterate()`, `close()` (close has default implementation)
4. **Register driver**: Call `register_driver()` in `iterable/db/__init__.py`
5. **Add detection**: Update `iterable/helpers/detect.py` to recognize database URLs
6. **Create tests**: `tests/test_db_engines.py` or add to existing test file
7. **Update dependencies**: Add optional dependency to `pyproject.toml`
8. **Update documentation**: Add engine to docs

### Implementation Pattern

```python
from collections.abc import Iterator
from typing import Any
from ..types import Row
from .base import DBDriver

class NewEngineDriver(DBDriver):
    """Database driver for NewEngine.
    
    Supports streaming queries using batch processing.
    """
    
    def __init__(self, source: str | Any, **kwargs: Any) -> None:
        """Initialize driver.
        
        Args:
            source: Connection string/URL or existing connection object
            **kwargs: Additional parameters:
                - query: Query string or table name
                - batch_size: Rows per batch (default: 10000)
                - on_error: Error handling policy ('raise', 'skip', 'warn')
        """
        super().__init__(source, **kwargs)
        self._cursor: Any = None
    
    def connect(self) -> None:
        """Establish database connection.
        
        Raises:
            ImportError: If required driver library is not installed
            ConnectionError: If connection fails
        """
        try:
            import database_library
        except ImportError:
            raise ImportError(
                "database-library is required. Install with: pip install database-library"
            ) from None
        
        # Handle existing connection object
        if hasattr(self.source, "cursor"):
            self.conn = self.source
            self._connected = True
            return
        
        # Parse connection string
        if not isinstance(self.source, str):
            raise ValueError("Source must be connection string or connection object")
        
        try:
            self.conn = database_library.connect(self.source, **self.kwargs.get("connect_args", {}))
            self._connected = True
        except Exception as e:
            self._connected = False
            raise ConnectionError(f"Failed to connect: {e}") from e
    
    def iterate(self) -> Iterator[Row]:
        """Return iterator of dict rows.
        
        Yields:
            dict: Database row as dictionary
            
        Raises:
            RuntimeError: If not connected
        """
        if not self._connected:
            raise RuntimeError("Not connected. Call connect() first.")
        
        self._start_metrics()
        batch_size = self.kwargs.get("batch_size", 10000)
        
        try:
            query = self._build_query()
            cursor = self.conn.cursor()
            
            # Execute query with batching
            cursor.execute(query)
            
            while True:
                rows = cursor.fetchmany(batch_size)
                if not rows:
                    break
                
                # Convert rows to dicts
                column_names = [desc[0] for desc in cursor.description]
                for row in rows:
                    row_dict = dict(zip(column_names, row))
                    self._update_metrics(rows_read=1)
                    yield row_dict
                    
        except Exception as e:
            self._handle_error(e, "during iteration")
            if self._on_error == "raise":
                raise
```

## Key Implementation Details

### Connection Handling

- Support both connection strings and existing connection objects
- Parse connection strings appropriately (URL format)
- Handle connection errors gracefully with clear messages
- Set `self._connected = True` on success

### Query Building

- Accept `query` parameter (SQL query or table name)
- If table name provided, build SELECT query
- Support filtering, projection, sorting via kwargs
- Use parameterized queries to prevent SQL injection

### Iteration

- Use batch processing (`batch_size` parameter)
- Convert database rows to dict objects
- Update metrics: `self._update_metrics(rows_read=1)`
- Handle errors according to `on_error` policy
- Start metrics: `self._start_metrics()` before iteration

### Error Handling

Three policies (set via `on_error` kwarg):
- `'raise'` - Raise exceptions (default)
- `'skip'` - Skip problematic rows
- `'warn'` - Warn and continue

Use `self._handle_error(error, context)` for consistent handling.

### Metrics Tracking

Automatic metrics available via `self.metrics`:
- `rows_read` - Number of rows read
- `bytes_read` - Bytes read (may be None)
- `elapsed_seconds` - Time elapsed

Update during iteration: `self._update_metrics(rows_read=count)`

## Registration

Register driver in `iterable/db/__init__.py`:

```python
from .newengine import NewEngineDriver

register_driver("newengine", NewEngineDriver)
```

## Format Detection

Update `iterable/helpers/detect.py` to recognize database URLs:

```python
def detect_file_type(filename, content=None):
    # Database URL detection
    if filename.startswith(("postgresql://", "postgres://")):
        return "database"
    if filename.startswith("newengine://"):
        return "database"
    # ... existing detection
```

## Testing Requirements

### Test Structure

```python
import pytest
from iterable.db.newengine import NewEngineDriver

class TestNewEngineDriver:
    def test_connect(self):
        driver = NewEngineDriver("newengine://localhost/db")
        driver.connect()
        assert driver.is_connected
        driver.close()
    
    def test_iterate(self):
        driver = NewEngineDriver("newengine://localhost/db", query="SELECT * FROM table")
        driver.connect()
        rows = list(driver.iterate())
        assert len(rows) > 0
        assert isinstance(rows[0], dict)
        driver.close()
    
    def test_batch_processing(self):
        driver = NewEngineDriver("newengine://localhost/db", query="SELECT * FROM table", batch_size=100)
        driver.connect()
        # Verify batching works correctly
        driver.close()
    
    def test_error_handling(self):
        driver = NewEngineDriver("newengine://invalid", query="SELECT * FROM table", on_error="skip")
        # Test error handling policies
```

### Test Coverage

- Connection with string and object sources
- Query building (table name vs SQL)
- Batch processing
- Error handling policies
- Metrics tracking
- Context manager support (`with` statement)
- Missing dependencies (should raise ImportError)

## Dependencies

### Optional Dependencies

Add to `pyproject.toml`:

```toml
[project.optional-dependencies]
newengine = ["database-library>=1.0.0"]
```

### Import Handling

Handle missing dependencies:

```python
try:
    import database_library
except ImportError:
    raise ImportError(
        "newengine support requires 'database-library'. "
        "Install with: pip install iterabledata[newengine]"
    ) from None
```

## Examples

Look at existing implementations:
- `iterable/db/postgres.py` - SQL database example (PostgreSQL)
- `iterable/db/mongo.py` - NoSQL database example (MongoDB)
- `iterable/db/clickhouse.py` - Columnar database example
- `iterable/db/elasticsearch.py` - Search engine example

## Common Patterns

### SQL Databases

- Use server-side cursors for memory efficiency
- Support read-only transactions
- Handle connection pooling
- Use parameterized queries

### NoSQL Databases

- Support query filters and projections
- Handle pagination/cursors
- Support aggregation pipelines
- Batch document retrieval

### Connection String Parsing

```python
from urllib.parse import urlparse

def parse_connection_string(conn_str: str) -> dict:
    parsed = urlparse(conn_str)
    return {
        "host": parsed.hostname,
        "port": parsed.port,
        "database": parsed.path.lstrip("/"),
        "username": parsed.username,
        "password": parsed.password,
    }
```

## Common Pitfalls

- **Memory issues**: Always use batch processing, don't load all rows at once
- **Connection leaks**: Always close connections, use context managers
- **Error handling**: Implement all three error policies consistently
- **Metrics**: Update metrics during iteration, not after
- **Reset support**: Most databases don't support reset (set `_reset_supported = False`)
- **Read-only**: Default to read-only access for safety

Related Skills

databases

16
from diegosouzapw/awesome-omni-skill

Work with MongoDB (document database, BSON documents, aggregation pipelines, Atlas cloud) and PostgreSQL (relational database, SQL queries, psql CLI, pgAdmin). Use when designing database schemas, writing queries and aggregations, optimizing indexes for performance, performing database migrations, configuring replication and sharding, implementing backup and restore strategies, managing database users and permissions, analyzing query performance, or administering production databases.

databases-architecture-skill

16
from diegosouzapw/awesome-omni-skill

Master database design (SQL, NoSQL), system architecture, API design (REST, GraphQL), and building scalable systems. Learn PostgreSQL, MongoDB, system design patterns, and enterprise architectures.

database-workflow

16
from diegosouzapw/awesome-omni-skill

Language-agnostic database best practices covering migrations, schema design, ORM patterns, query optimization, and testing strategies. Activate when working with database files, migrations, schema changes, SQL, ORM code, database tests, or when user mentions migrations, schema design, SQL optimization, NoSQL, database patterns, or connection pooling.

database-schema

16
from diegosouzapw/awesome-omni-skill

Design a database schema

database-schema-design

16
from diegosouzapw/awesome-omni-skill

Design and optimize database schemas for SQL and NoSQL databases. Use when creating new databases, designing tables, defining relationships, indexing strategies, or database migrations. Handles PostgreSQL, MySQL, MongoDB, normalization, and performance optimization.

ClaudeChatGPTGemini

database

16
from diegosouzapw/awesome-omni-skill

ทำงานกับ PostgreSQL และ MongoDB อย่างมีประสิทธิภาพ

database-query

16
from diegosouzapw/awesome-omni-skill

Natural language database queries with multi-database support, query optimization, and visual results

database-patterns

16
from diegosouzapw/awesome-omni-skill

Use when designing database schemas, implementing repository patterns, writing optimized queries, managing migrations, or working with indexes and transactions for SQL/NoSQL databases.

database-optimizer

16
from diegosouzapw/awesome-omni-skill

Expert database optimizer specializing in modern performance tuning, query optimization, and scalable architectures. Masters advanced indexing, N+1 resolution, multi-tier caching, partitioning strategies, and cloud database optimization. Handles complex query analysis, migration strategies, and performance monitoring. Use PROACTIVELY for database optimization, performance issues, or scalability challenges.

database-optimization

16
from diegosouzapw/awesome-omni-skill

Use when optimizing database queries, indexes, N+1 problems, slow queries, or analyzing query performance. Triggers on keywords like "slow query", "N+1", "index", "query optimization", "database performance", "eager loading".

database-migrator

16
from diegosouzapw/awesome-omni-skill

Handle schema changes and data migrations safely. Creates migration files with rollback plans. Use when user says 'migration', 'schema change', 'add column', 'database change', or 'alter table'.

database-migrations-sql-migrations

16
from diegosouzapw/awesome-omni-skill

SQL database migrations with zero-downtime strategies for PostgreSQL, MySQL, SQL Server Use when: the user asks to run the `sql-migrations` workflow and the task requires multi-step orchestration. Do not use when: the task is small, single-step, and can be completed directly without orchestration overhead.