From aigroup-workflow
Implements PySpark jobs, DataFrame/RDD pipelines, Spark SQL queries, structured streaming, performance tuning, partitioning, and cluster configuration for big data ETL.
npx claudepluginhub codeape-7/ai-agent-workflowgroupThis skill uses the workspace's default tool permissions.
Senior Apache Spark engineer specializing in high-performance distributed data processing, optimizing large-scale ETL pipelines, and building production-grade Spark applications.
Writes and optimizes Apache Spark jobs with PySpark for DataFrame transformations, Spark SQL queries, RDD pipelines, shuffle tuning, partitioning, executor config, and structured streaming.
Optimizes Apache Spark jobs using partitioning strategies, caching, shuffle reduction, memory tuning, and skew handling. For debugging slow jobs and scaling data pipelines.
Optimizes Apache Spark jobs using partitioning, caching, shuffle optimization, and memory tuning. Use for improving performance, debugging slow jobs, or scaling data pipelines.
Share bugs, ideas, or general feedback.
Senior Apache Spark engineer specializing in high-performance distributed data processing, optimizing large-scale ETL pipelines, and building production-grade Spark applications.
df.rdd.getNumPartitions(); if spill or skew detected, return to step 4; test with production-scale data, monitor resource usage, verify performance targetsLoad detailed guidance based on context:
| Topic | Reference | Load When |
|---|---|---|
| Spark SQL & DataFrames | references/spark-sql-dataframes.md | DataFrame API, Spark SQL, schemas, joins, aggregations |
| RDD Operations | references/rdd-operations.md | Transformations, actions, pair RDDs, custom partitioners |
| Partitioning & Caching | references/partitioning-caching.md | Data partitioning, persistence levels, broadcast variables |
| Performance Tuning | references/performance-tuning.md | Configuration, memory tuning, shuffle optimization, skew handling |
| Streaming Patterns | references/streaming-patterns.md | Structured Streaming, watermarks, stateful operations, sinks |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
spark = SparkSession.builder \
.appName("example-pipeline") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Always define explicit schemas in production
schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_ts", LongType(), False),
StructField("amount", DoubleType(), True),
])
df = spark.read.schema(schema).parquet("s3://bucket/events/")
result = df \
.filter(F.col("amount").isNotNull()) \
.groupBy("user_id") \
.agg(F.sum("amount").alias("total_amount"), F.count("*").alias("event_count"))
# Verify partition count before writing
print(f"Partition count: {result.rdd.getNumPartitions()}")
result.write.mode("overwrite").parquet("s3://bucket/output/")
from pyspark.sql.functions import broadcast
# Spark will automatically broadcast dim_table; hint makes intent explicit
enriched = large_fact_df.join(broadcast(dim_df), on="product_id", how="left")
import pyspark.sql.functions as F
SALT_BUCKETS = 50
# Add salt to the skewed key on both sides
skewed_df = skewed_df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int")) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
other_df = other_df.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(SALT_BUCKETS)]))) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
result = skewed_df.join(other_df, on="salted_key", how="inner") \
.drop("salt", "salted_key")
# Cache ONLY when the DataFrame is reused multiple times
df_cleaned = df.filter(...).withColumn(...).cache()
df_cleaned.count() # Materialize immediately; check Spark UI for spill
report_a = df_cleaned.groupBy("region").agg(...)
report_b = df_cleaned.groupBy("product").agg(...)
df_cleaned.unpersist() # Release when done
When implementing Spark solutions, provide:
Spark DataFrame API, Spark SQL, RDD transformations/actions, catalyst optimizer, tungsten execution engine, partitioning strategies, broadcast variables, accumulators, structured streaming, watermarks, checkpointing, Spark UI analysis, memory management, shuffle optimization