building-rag-systems
Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval. Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion. NOT when doing simple similarity search without production requirements.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/building-rag-systems/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How building-rag-systems Compares
| Feature / Agent | building-rag-systems | Standard Approach |
|---|---|---|
| Platform Support | multi | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval. Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion. NOT when doing simple similarity search without production requirements.
Which AI agents support this skill?
This skill is compatible with multi.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Building RAG Systems
Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval.
## Quick Start
```bash
# Dependencies
pip install qdrant-client openai pydantic python-frontmatter
# Core components
# 1. Crawler → discovers files, extracts path metadata
# 2. Parser → extracts frontmatter, computes file hash
# 3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap
# 4. Embedder → batched OpenAI embeddings
# 5. Uploader → Qdrant upsert with indexed payloads
```
---
## Ingestion Pipeline
### Architecture
```
┌──────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ Crawler │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │
└──────────┘ └────────┘ └─────────┘ └──────────┘ └──────────┘
│ │ │ │ │
Discovers Extracts Splits by Generates Upserts to
files frontmatter semantic vectors Qdrant
+ file hash boundaries (batched) (batched)
```
### Semantic Chunking (NOT Fixed-Size)
```python
class SemanticChunker:
"""
Production chunking:
- Split on ## headers (semantic boundaries)
- Target 400 tokens (NVIDIA benchmark optimal)
- 15% overlap for context continuity
- Track prev/next for context expansion
"""
SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
TOKENS_PER_WORD = 1.3
def __init__(
self,
target_tokens: int = 400,
max_tokens: int = 512,
overlap_percent: float = 0.15,
):
self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
self.overlap_words = int(self.target_words * overlap_percent)
def chunk(self, content: str, file_hash: str) -> list[Chunk]:
sections = self.SECTION_PATTERN.split(content)
chunks = []
for idx, section in enumerate(sections):
content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"
chunks.append(Chunk(
id=chunk_id,
text=section,
chunk_index=idx,
total_chunks=len(sections),
prev_chunk_id=chunks[-1].id if chunks else None,
content_hash=content_hash,
source_file_hash=file_hash,
))
# Set next_chunk_id on previous
if len(chunks) > 1:
chunks[-2].next_chunk_id = chunk_id
return chunks
```
### Change Detection (Incremental Updates)
```python
def compute_file_hash(file_path: str) -> str:
"""SHA-256 for change detection."""
with open(file_path, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
class QdrantStateTracker:
"""Query Qdrant payloads directly - no external state DB needed."""
def get_indexed_files(self, book_id: str) -> dict[str, str]:
"""Returns {file_path: file_hash} from Qdrant."""
indexed = {}
offset = None
while True:
points, next_offset = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="book_id", match=MatchValue(value=book_id))
]),
limit=100,
offset=offset,
with_payload=["source_file", "source_file_hash"],
with_vectors=False,
)
for point in points:
indexed[point.payload["source_file"]] = point.payload["source_file_hash"]
if next_offset is None:
break
offset = next_offset
return indexed
def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
"""Compare filesystem vs index."""
new = [p for p in current if p not in indexed]
deleted = [p for p in indexed if p not in current]
modified = [p for p in current if p in indexed and current[p] != indexed[p]]
return new, modified, deleted
```
### Batched Embeddings
```python
class OpenAIEmbedder:
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size # OpenAI recommendation
def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
embedded = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
response = self.client.embeddings.create(
input=[c.text for c in batch],
model=self.model,
)
for chunk, data in zip(batch, response.data):
embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
return embedded
```
### Qdrant Collection with Payload Indexes
```python
def create_collection(self, recreate: bool = False):
"""Create collection with proper indexes for filtered retrieval."""
self.client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
# Index ALL fields you filter by
indexes = [
("book_id", PayloadSchemaType.KEYWORD), # Tenant isolation
("module", PayloadSchemaType.KEYWORD), # Content filter
("chapter", PayloadSchemaType.INTEGER), # Range filter
("hardware_tier", PayloadSchemaType.INTEGER),# Personalization
("proficiency_level", PayloadSchemaType.KEYWORD),
("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion
("source_file_hash", PayloadSchemaType.KEYWORD), # Change detection
]
for field, schema in indexes:
self.client.create_payload_index(
collection_name=self.collection,
field_name=field,
field_schema=schema,
)
```
---
## Retrieval Patterns
### Comprehensive Filter Builder
```python
def build_filter(self, query: SearchQuery) -> Filter:
"""Build Qdrant filter with all conditions (AND logic)."""
conditions = []
# Required: Tenant isolation
conditions.append(FieldCondition(
key="book_id", match=MatchValue(value=query.book_id)
))
# Required: Hardware tier (lte = "tier X or lower")
conditions.append(FieldCondition(
key="hardware_tier", range=Range(lte=query.hardware_tier)
))
# Optional: Module exact match
if query.module:
conditions.append(FieldCondition(
key="module", match=MatchValue(value=query.module)
))
# Optional: Chapter range
if query.chapter_min or query.chapter_max:
chapter_range = Range()
if query.chapter_min:
chapter_range.gte = query.chapter_min
if query.chapter_max:
chapter_range.lte = query.chapter_max
conditions.append(FieldCondition(key="chapter", range=chapter_range))
# Optional: Proficiency OR logic
if query.proficiency_levels:
conditions.append(FieldCondition(
key="proficiency_level",
match=MatchAny(any=query.proficiency_levels),
))
return Filter(must=conditions)
```
### Context Expansion (Walk Chunk Chain)
```python
def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
"""Walk prev_chunk_id/next_chunk_id chain for surrounding context."""
current = self.get_chunk_by_id(chunk_id)
if not current:
return []
# Walk backwards
prev_chunks = []
prev_id = current.prev_chunk_id
for _ in range(prev):
if not prev_id:
break
chunk = self.get_chunk_by_id(prev_id)
if not chunk:
break
prev_chunks.insert(0, chunk)
prev_id = chunk.prev_chunk_id
# Walk forwards
next_chunks = []
next_id = current.next_chunk_id
for _ in range(next):
if not next_id:
break
chunk = self.get_chunk_by_id(next_id)
if not chunk:
break
next_chunks.append(chunk)
next_id = chunk.next_chunk_id
return prev_chunks + [current] + next_chunks
```
### Full Document Retrieval
```python
def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
"""Get all chunks for a document, ordered by chunk_index."""
points, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
]),
limit=100,
with_payload=True,
with_vectors=False,
)
chunks = [self._to_chunk(p) for p in points]
chunks.sort(key=lambda c: c.chunk_index)
return chunks
```
---
## Payload Schema
```python
class ChunkPayload(BaseModel):
"""Complete payload for filtered retrieval and context expansion."""
# Tenant isolation
book_id: str
# Content filters (all indexed)
module: str
chapter: int
lesson: int
hardware_tier: int
proficiency_level: str
# Display content
text: str
section_title: Optional[str]
source_file: str
# Context expansion
parent_doc_id: str
chunk_index: int
total_chunks: int
prev_chunk_id: Optional[str]
next_chunk_id: Optional[str]
# Change detection
content_hash: str
source_file_hash: str
```
---
## Anti-Patterns
| Don't | Do Instead |
|-------|------------|
| Fixed character chunking | Semantic boundaries (## headers) |
| Position-based chunk IDs | Content hash for stable IDs |
| No overlap between chunks | 10-20% overlap for continuity |
| Full re-index on every change | Incremental with file hash detection |
| Missing payload indexes | Index every field you filter by |
| Synchronous embedding | Batch with background jobs |
| External state database | Qdrant-native state tracking |
---
## Verification
Run: `python scripts/verify.py`
## Related Skills
- `scaffolding-fastapi-dapr` - API patterns for search endpoints
- `streaming-llm-responses` - Streaming RAG responses
## References
- [references/ingestion-patterns.md](references/ingestion-patterns.md) - Full ingestion pipeline
- [references/retrieval-patterns.md](references/retrieval-patterns.md) - Filter strategies, context expansion