spark

star 25

Build and run Apache Spark jobs -- submit applications, run interactive queries, manage data processing pipelines.

clawdata By clawdata schedule Updated 3/3/2026

name: spark description: "Build and run Apache Spark jobs -- submit applications, run interactive queries, manage data processing pipelines." metadata: {"openclaw": {"emoji": "⚡", "requires": {"bins": ["spark-submit"]}, "tags": ["processing", "spark", "big-data", "etl", "data"]}}

Apache Spark

You help build and run Apache Spark data processing jobs. Use this when the user asks about Spark applications, DataFrame operations, SQL queries on Spark, or cluster management.

Commands

Submit a Spark application

spark-submit --master local[*] <script.py>

Submit with dependencies

spark-submit --master local[*] --packages org.apache.spark:spark-avro_2.12:3.5.0 <script.py>

Start PySpark shell

pyspark --master local[*]

Start Spark SQL shell

spark-sql --master local[*]

PySpark Patterns

Read and write data

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("my_app").getOrCreate()

# Read
df = spark.read.parquet("s3a://bucket/path/")
df = spark.read.csv("data/input.csv", header=True, inferSchema=True)
df = spark.read.json("data/events.json")

# Write
df.write.parquet("output/", mode="overwrite")
df.write.format("delta").save("output/delta/")

Basic transformations

from pyspark.sql import functions as F

result = (
    df
    .filter(F.col("status") == "active")
    .withColumn("year", F.year("created_at"))
    .groupBy("year", "category")
    .agg(
        F.count("*").alias("total"),
        F.sum("amount").alias("revenue"),
    )
    .orderBy(F.desc("revenue"))
)

Window functions

from pyspark.sql.window import Window

window = Window.partitionBy("customer_id").orderBy(F.desc("order_date"))
df = df.withColumn("rank", F.row_number().over(window))

Spark SQL

df.createOrReplaceTempView("orders")
result = spark.sql("""
    SELECT customer_id, COUNT(*) AS order_count, SUM(amount) AS total
    FROM orders
    WHERE status = 'completed'
    GROUP BY customer_id
    HAVING total > 1000
""")

Delta Lake

# Read Delta
df = spark.read.format("delta").load("path/to/delta")

# Write Delta with merge
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "path/to/delta")
delta_table.alias("target").merge(
    new_data.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Configuration

Common Spark configs

spark = (
    SparkSession.builder
    .appName("my_app")
    .config("spark.sql.shuffle.partitions", 200)
    .config("spark.sql.adaptive.enabled", True)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

Best Practices

  • Enable Adaptive Query Execution (AQE) for automatic optimisation
  • Use repartition() or coalesce() to control output file count
  • Avoid collect() on large datasets -- use show(), take(), or write to storage
  • Cache intermediate DataFrames only when reused multiple times
  • Use Delta Lake for ACID transactions and time travel
  • Broadcast small tables in joins: F.broadcast(small_df)
  • Monitor via Spark UI (default port 4040)
  • Prefer DataFrame API over RDD for performance
Install via CLI
npx skills add https://github.com/clawdata/clawdata --skill spark
Repository Details
star Stars 25
call_split Forks 12
navigation Branch main
article Path SKILL.md
More from Creator