From palantir-pack
Builds Palantir Foundry data pipelines using Python @transform decorators, PySpark DataFrames, and input/output wiring for ETL and dataset processing.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin palantir-packThis skill is limited to using the following tools:
Build Foundry data pipelines using the `transforms-python` library. Covers the `@transform` and `@transform_df` decorators, input/output dataset wiring, incremental transforms, and `@configure` for Spark tuning. This is the primary workflow for all data processing in Foundry.
Sets up Palantir Foundry local dev for Python transforms with PySpark, pytest testing on sample data, and API mocking for rapid iteration.
Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation.
Guides PySpark transformer operations for data pipelines including ETL, transformations, orchestration, and streaming. Generates code and best practices; auto-activates on 'pyspark transformer'.
Share bugs, ideas, or general feedback.
Build Foundry data pipelines using the transforms-python library. Covers the @transform and @transform_df decorators, input/output dataset wiring, incremental transforms, and @configure for Spark tuning. This is the primary workflow for all data processing in Foundry.
palantir-install-auth setupmy-transforms-repo/
├── src/
│ └── myproject/
│ ├── __init__.py
│ ├── pipeline.py # Main transforms
│ ├── utils.py # Shared logic
│ └── datasets.py # Dataset path constants
├── build.gradle # Foundry build config
├── conda_recipe/meta.yaml # Dependency declarations
└── settings.gradle
# src/myproject/pipeline.py
from transforms.api import transform_df, Input, Output
@transform_df(
Output("/Company/datasets/cleaned_orders"),
orders=Input("/Company/datasets/raw_orders"),
)
def clean_orders(orders):
"""Clean raw orders: drop nulls, normalize dates, filter test data."""
from pyspark.sql import functions as F
return (
orders
.filter(F.col("order_id").isNotNull())
.filter(~F.col("email").like("%@test.com"))
.withColumn("order_date", F.to_date("order_date_str", "yyyy-MM-dd"))
.withColumn("total_cents", (F.col("total") * 100).cast("long"))
.drop("order_date_str", "total")
)
@transform_df(
Output("/Company/datasets/order_enriched"),
orders=Input("/Company/datasets/cleaned_orders"),
customers=Input("/Company/datasets/customers"),
)
def enrich_orders(orders, customers):
"""Join orders with customer data for analytics."""
from pyspark.sql import functions as F
return (
orders
.join(customers, orders.customer_id == customers.id, "left")
.select(
orders.order_id,
orders.order_date,
orders.total_cents,
customers.name.alias("customer_name"),
customers.segment,
customers.region,
)
.withColumn("processed_at", F.current_timestamp())
)
from transforms.api import transform, Input, Output
@transform(
output=Output("/Company/datasets/report_summary"),
orders=Input("/Company/datasets/order_enriched"),
)
def generate_summary(orders, output):
"""Write aggregated summary using low-level FileSystem API."""
df = orders.dataframe()
summary = (
df.groupBy("region", "segment")
.agg(
{"total_cents": "sum", "order_id": "count"}
)
.withColumnRenamed("sum(total_cents)", "revenue_cents")
.withColumnRenamed("count(order_id)", "order_count")
)
output.write_dataframe(summary)
from transforms.api import transform_df, Input, Output, incremental
@incremental()
@transform_df(
Output("/Company/datasets/daily_events"),
events=Input("/Company/datasets/raw_events"),
)
def process_events_incrementally(events):
"""Only process new rows since last build — much faster for append-only data."""
from pyspark.sql import functions as F
return events.withColumn("ingested_at", F.current_timestamp())
from transforms.api import transform_df, Input, Output, configure
@configure(profile=["DRIVER_MEMORY_LARGE"]) # 16GB driver
@transform_df(
Output("/Company/datasets/heavy_aggregation"),
data=Input("/Company/datasets/large_dataset"),
)
def heavy_compute(data):
"""Resource-intensive transform needing extra Spark memory."""
from pyspark.sql import functions as F
return (
data
.groupBy("category")
.agg(F.approx_count_distinct("user_id").alias("unique_users"))
)
@transform_df@configure| Error | Cause | Solution |
|---|---|---|
DatasetNotFound | Wrong path string | Check dataset path in Foundry UI (right-click > Copy path) |
AnalysisException: cannot resolve | Column name mismatch | Print df.columns to debug; Foundry columns are case-sensitive |
OutOfMemoryError | Insufficient Spark memory | Add @configure(profile=["DRIVER_MEMORY_LARGE"]) |
Transform is not incremental-compatible | Using non-append operations | Only use filter/select/withColumn in incremental transforms |
| Build hangs | Circular dependency | Check that no two transforms reference each other's output |
from transforms.api import transform_polars, Input, Output
@transform_polars(
Output("/Company/datasets/fast_summary"),
data=Input("/Company/datasets/small_table"),
)
def fast_polars(data):
"""Use Polars for small datasets — faster than Spark, no JVM overhead."""
import polars as pl
return data.group_by("category").agg(pl.col("amount").sum())
palantir-core-workflow-bpalantir-performance-tuningpalantir-multi-env-setup