palantir-core-workflow-a
Build Palantir Foundry data pipelines using Python transforms. Use when creating ETL pipelines, writing @transform decorators, or building dataset-to-dataset processing in Foundry. Trigger with phrases like "palantir pipeline", "foundry transform", "palantir ETL", "palantir data pipeline", "foundry python transform".
Best use case
palantir-core-workflow-a is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Build Palantir Foundry data pipelines using Python transforms. Use when creating ETL pipelines, writing @transform decorators, or building dataset-to-dataset processing in Foundry. Trigger with phrases like "palantir pipeline", "foundry transform", "palantir ETL", "palantir data pipeline", "foundry python transform".
Teams using palantir-core-workflow-a should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/palantir-core-workflow-a/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How palantir-core-workflow-a Compares
| Feature / Agent | palantir-core-workflow-a | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Build Palantir Foundry data pipelines using Python transforms. Use when creating ETL pipelines, writing @transform decorators, or building dataset-to-dataset processing in Foundry. Trigger with phrases like "palantir pipeline", "foundry transform", "palantir ETL", "palantir data pipeline", "foundry python transform".
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
Best AI Skills for Claude
Explore the best AI skills for Claude and Claude Code across coding, research, workflow automation, documentation, and agent operations.
ChatGPT vs Claude for Agent Skills
Compare ChatGPT and Claude for AI agent skills across coding, writing, research, and reusable workflow execution.
Cursor vs Codex for AI Workflows
Compare Cursor and Codex for AI coding workflows, repository assistance, debugging, refactoring, and reusable developer skills.
SKILL.md Source
# Palantir Core Workflow A — Data Pipelines with Transforms
## Overview
Build Foundry data pipelines using the `transforms-python` library. Covers the `@transform` and `@transform_df` decorators, input/output dataset wiring, incremental transforms, and `@configure` for Spark tuning. This is the primary workflow for all data processing in Foundry.
## Prerequisites
- Completed `palantir-install-auth` setup
- A Foundry Code Repository (Python Transforms type)
- Understanding of PySpark DataFrames (Foundry runs Spark under the hood)
## Instructions
### Step 1: Project Structure
```
my-transforms-repo/
├── src/
│ └── myproject/
│ ├── __init__.py
│ ├── pipeline.py # Main transforms
│ ├── utils.py # Shared logic
│ └── datasets.py # Dataset path constants
├── build.gradle # Foundry build config
├── conda_recipe/meta.yaml # Dependency declarations
└── settings.gradle
```
### Step 2: Basic Transform with @transform_df
```python
# src/myproject/pipeline.py
from transforms.api import transform_df, Input, Output
@transform_df(
Output("/Company/datasets/cleaned_orders"),
orders=Input("/Company/datasets/raw_orders"),
)
def clean_orders(orders):
"""Clean raw orders: drop nulls, normalize dates, filter test data."""
from pyspark.sql import functions as F
return (
orders
.filter(F.col("order_id").isNotNull())
.filter(~F.col("email").like("%@test.com"))
.withColumn("order_date", F.to_date("order_date_str", "yyyy-MM-dd"))
.withColumn("total_cents", (F.col("total") * 100).cast("long"))
.drop("order_date_str", "total")
)
```
### Step 3: Multi-Input Join Transform
```python
@transform_df(
Output("/Company/datasets/order_enriched"),
orders=Input("/Company/datasets/cleaned_orders"),
customers=Input("/Company/datasets/customers"),
)
def enrich_orders(orders, customers):
"""Join orders with customer data for analytics."""
from pyspark.sql import functions as F
return (
orders
.join(customers, orders.customer_id == customers.id, "left")
.select(
orders.order_id,
orders.order_date,
orders.total_cents,
customers.name.alias("customer_name"),
customers.segment,
customers.region,
)
.withColumn("processed_at", F.current_timestamp())
)
```
### Step 4: Low-Level @transform for File I/O
```python
from transforms.api import transform, Input, Output
@transform(
output=Output("/Company/datasets/report_summary"),
orders=Input("/Company/datasets/order_enriched"),
)
def generate_summary(orders, output):
"""Write aggregated summary using low-level FileSystem API."""
df = orders.dataframe()
summary = (
df.groupBy("region", "segment")
.agg(
{"total_cents": "sum", "order_id": "count"}
)
.withColumnRenamed("sum(total_cents)", "revenue_cents")
.withColumnRenamed("count(order_id)", "order_count")
)
output.write_dataframe(summary)
```
### Step 5: Incremental Transforms
```python
from transforms.api import transform_df, Input, Output, incremental
@incremental()
@transform_df(
Output("/Company/datasets/daily_events"),
events=Input("/Company/datasets/raw_events"),
)
def process_events_incrementally(events):
"""Only process new rows since last build — much faster for append-only data."""
from pyspark.sql import functions as F
return events.withColumn("ingested_at", F.current_timestamp())
```
### Step 6: Configure Spark Resources
```python
from transforms.api import transform_df, Input, Output, configure
@configure(profile=["DRIVER_MEMORY_LARGE"]) # 16GB driver
@transform_df(
Output("/Company/datasets/heavy_aggregation"),
data=Input("/Company/datasets/large_dataset"),
)
def heavy_compute(data):
"""Resource-intensive transform needing extra Spark memory."""
from pyspark.sql import functions as F
return (
data
.groupBy("category")
.agg(F.approx_count_distinct("user_id").alias("unique_users"))
)
```
## Output
- Dataset-to-dataset transforms wired with `@transform_df`
- Multi-input joins connecting datasets across projects
- Incremental processing for append-only sources
- Spark resource tuning with `@configure`
## Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| `DatasetNotFound` | Wrong path string | Check dataset path in Foundry UI (right-click > Copy path) |
| `AnalysisException: cannot resolve` | Column name mismatch | Print `df.columns` to debug; Foundry columns are case-sensitive |
| `OutOfMemoryError` | Insufficient Spark memory | Add `@configure(profile=["DRIVER_MEMORY_LARGE"])` |
| `Transform is not incremental-compatible` | Using non-append operations | Only use `filter/select/withColumn` in incremental transforms |
| Build hangs | Circular dependency | Check that no two transforms reference each other's output |
## Examples
### Polars Transform (Lightweight)
```python
from transforms.api import transform_polars, Input, Output
@transform_polars(
Output("/Company/datasets/fast_summary"),
data=Input("/Company/datasets/small_table"),
)
def fast_polars(data):
"""Use Polars for small datasets — faster than Spark, no JVM overhead."""
import polars as pl
return data.group_by("category").agg(pl.col("amount").sum())
```
## Resources
- [Python Transforms Guide](https://www.palantir.com/docs/foundry/transforms-python/transforms)
- [Transforms API Reference](https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api)
- [@configure Reference](https://www.palantir.com/docs/foundry/api-reference/transforms-python-library/api-configure)
- [Incremental Transforms](https://www.palantir.com/docs/foundry/transforms-python/transforms-pipelines)
## Next Steps
- Query Ontology objects and actions: `palantir-core-workflow-b`
- Optimize pipeline performance: `palantir-performance-tuning`
- Deploy across environments: `palantir-multi-env-setup`Related Skills
calendar-to-workflow
Converts calendar events and schedules into Claude Code workflows, meeting prep documents, and standup notes. Use when the user mentions calendar events, meeting prep, standup generation, or scheduling workflows. Trigger with phrases like "prep for my meetings", "generate standup notes", "create workflow from calendar", or "summarize today's schedule".
workhuman-core-workflow-b
Workhuman core workflow b for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow b".
workhuman-core-workflow-a
Workhuman core workflow a for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow a".
wispr-core-workflow-b
Wispr Flow core workflow b for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow b".
wispr-core-workflow-a
Wispr Flow core workflow a for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow a".
windsurf-core-workflow-b
Execute Windsurf's secondary workflow: Workflows, Memories, and reusable automation. Use when creating reusable Cascade workflows, managing persistent memories, or automating repetitive development tasks. Trigger with phrases like "windsurf workflow", "windsurf automation", "windsurf memories", "cascade workflow", "windsurf slash command".
windsurf-core-workflow-a
Execute Windsurf's primary workflow: Cascade Write mode for multi-file agentic coding. Use when building features, refactoring across files, or performing complex code tasks. Trigger with phrases like "windsurf cascade write", "windsurf agentic coding", "windsurf multi-file edit", "cascade write mode", "windsurf build feature".
webflow-core-workflow-b
Execute Webflow secondary workflows — Sites management, Pages API, Forms submissions, Ecommerce (products/orders/inventory), and Custom Code via the Data API v2. Use when managing sites, reading pages, handling form data, or working with Webflow Ecommerce products and orders. Trigger with phrases like "webflow sites", "webflow pages", "webflow forms", "webflow ecommerce", "webflow products", "webflow orders".
webflow-core-workflow-a
Execute the primary Webflow workflow — CMS content management: list collections, CRUD items, publish items, and manage content lifecycle via the Data API v2. Use when working with Webflow CMS collections and items, managing blog posts, team members, or any dynamic content. Trigger with phrases like "webflow CMS", "webflow collections", "webflow items", "create webflow content", "manage webflow CMS", "webflow content management".
veeva-core-workflow-b
Veeva Vault core workflow b for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow b".
veeva-core-workflow-a
Veeva Vault core workflow a for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow a".
vastai-core-workflow-b
Execute Vast.ai secondary workflow: multi-instance orchestration, spot recovery, and cost optimization. Use when running distributed training, handling spot preemption, or optimizing GPU spend across multiple instances. Trigger with phrases like "vastai distributed training", "vastai spot recovery", "vastai multi-gpu", "vastai cost optimization".