spark-declarative-pipelines

star 1

Creates, configures, updates, reviews, and troubleshoots Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion. Use when: (1) building new data pipelines, (2) reviewing or auditing existing pipelines for best practices, (3) migrating legacy DLT Python to modern SDP, (4) troubleshooting pipeline failures. Triggers: SDP, LDP, DLT, Delta Live Tables, Lakeflow, streaming tables, materialized views, APPLY CHANGES, CDC, bronze/silver/gold, medallion architecture, @dlt.table, @dp.table, read_files, cloudFiles, pipeline review, pipeline audit, pipeline best practices.

malcolndandaro By malcolndandaro schedule Updated 2/5/2026

name: spark-declarative-pipelines description: "Creates, configures, updates, reviews, and troubleshoots Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion. Use when: (1) building new data pipelines, (2) reviewing or auditing existing pipelines for best practices, (3) migrating legacy DLT Python to modern SDP, (4) troubleshooting pipeline failures. Triggers: SDP, LDP, DLT, Delta Live Tables, Lakeflow, streaming tables, materialized views, APPLY CHANGES, CDC, bronze/silver/gold, medallion architecture, @dlt.table, @dp.table, read_files, cloudFiles, pipeline review, pipeline audit, pipeline best practices."

Lakeflow Spark Declarative Pipelines (SDP)

Official Documentation


Workflow Selection

Determine the task type and follow the appropriate workflow:

Task Workflow
Build new pipeline Follow "Development Workflow with MCP Tools" below
Review/audit existing pipelines Follow "Review Mode" below
Migrate legacy DLT to SDP See 6-dlt-migration.md
Troubleshoot pipeline failures Use get_pipeline_events(), see "Common Issues"

Review Mode

When asked to review, audit, check, or improve existing pipelines:

Step 1: Locate Pipeline Definitions

Search for pipeline configurations:

  • DAB pipelines: **/dlt_pipelines/*.yml or resources/pipelines in databricks.yml
  • Source files: .sql and .py files referenced in pipeline libraries

Step 2: Check Against 2025 Best Practices

Check Legacy Pattern Modern Pattern Reference
Python API import dlt from pyspark import pipelines as dp 5-python-api.md
Ingestion spark.readStream.format("cloudFiles") read_files() SQL syntax 1-ingestion-patterns.md
Clustering PARTITION BY or none CLUSTER BY (Liquid Clustering) 4-performance-tuning.md
File format Notebook (# Databricks notebook source) Raw .sql/.py files Best practice
SCD patterns dlt.apply_changes() AUTO CDC INTO SQL syntax 3-scd-patterns.md

Step 3: Review Checklist

  • Modern API: Using pyspark.pipelines (dp) not legacy dlt
  • SQL preferred: Transformations in SQL unless Python required
  • Liquid Clustering: CLUSTER BY on all tables, especially facts
  • Data quality: Expectations defined (EXPECT, @dp.expect_or_drop)
  • No hardcoded configs: Variables from pipeline configuration, not SET statements
  • Raw files: Not notebook format with # COMMAND ----------
  • Serverless: No classic cluster configuration unless required

Step 4: Provide Recommendations

For each issue found:

  1. Identify the file and line number
  2. Show the current (legacy) code
  3. Show the recommended (modern) code
  4. Reference the appropriate documentation file

Development Workflow with MCP Tools

Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline which handles the entire lifecycle.

IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.

Step 1: Write Pipeline Files Locally

Create .sql or .py files in a local folder:

my_pipeline/
├── bronze/
│   ├── ingest_orders.sql       # SQL (default for most cases)
│   └── ingest_events.py        # Python (for complex logic)
├── silver/
│   └── clean_orders.sql
└── gold/
    └── daily_summary.sql

SQL Example (bronze/ingest_orders.sql):

CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
  *,
  current_timestamp() AS _ingested_at,
  _metadata.file_path AS _source_file
FROM read_files(
  '/Volumes/catalog/schema/raw/orders/',
  format => 'json',
  schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);

Python Example (bronze/ingest_events.py):

from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp

@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/Volumes/catalog/schema/raw/events/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
    )

Language Selection:

  • Default to SQL unless user specifies Python or task requires it
  • Use SQL for: Transformations, aggregations, filtering, joins (90% of use cases)
  • Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths
  • Generate ONE language per request unless user explicitly asks for mixed pipeline

Step 2: Upload to Databricks Workspace

# MCP Tool: upload_folder
upload_folder(
    local_folder="/path/to/my_pipeline",
    workspace_folder="/Workspace/Users/user@example.com/my_pipeline"
)

Step 3: Create/Update and Run Pipeline

Use create_or_update_pipeline - the main entry point. It:

  1. Searches for an existing pipeline with the same name (or uses id from extra_settings)
  2. Creates a new pipeline or updates the existing one
  3. Optionally starts a pipeline run
  4. Optionally waits for completion and returns detailed results
# MCP Tool: create_or_update_pipeline
result = create_or_update_pipeline(
    name="my_orders_pipeline",
    root_path="/Workspace/Users/user@example.com/my_pipeline",
    catalog="my_catalog",
    schema="my_schema",
    workspace_file_paths=[
        "/Workspace/Users/user@example.com/my_pipeline/bronze/ingest_orders.sql",
        "/Workspace/Users/user@example.com/my_pipeline/silver/clean_orders.sql",
        "/Workspace/Users/user@example.com/my_pipeline/gold/daily_summary.sql"
    ],
    start_run=True,           # Start immediately
    wait_for_completion=True, # Wait and return final status
    full_refresh=True,        # Full refresh all tables
    timeout=1800              # 30 minute timeout
)

Result contains actionable information:

{
    "success": True,                    # Did the operation succeed?
    "pipeline_id": "abc-123",           # Pipeline ID for follow-up operations
    "pipeline_name": "my_orders_pipeline",
    "created": True,                    # True if new, False if updated
    "state": "COMPLETED",               # COMPLETED, FAILED, TIMEOUT, etc.
    "catalog": "my_catalog",            # Target catalog
    "schema": "my_schema",              # Target schema
    "duration_seconds": 45.2,           # Time taken
    "message": "Pipeline created and completed successfully in 45.2s. Tables written to my_catalog.my_schema",
    "error_message": None,              # Error summary if failed
    "errors": []                        # Detailed error list if failed
}

Step 4: Handle Results

On Success:

if result["success"]:
    # Verify output tables
    stats = get_table_details(
        catalog="my_catalog",
        schema="my_schema",
        table_names=["bronze_orders", "silver_orders", "gold_daily_summary"]
    )

On Failure:

if not result["success"]:
    # Message includes suggested next steps
    print(result["message"])
    # "Pipeline created but run failed. State: FAILED. Error: Column 'amount' not found.
    #  Use get_pipeline_events(pipeline_id='abc-123') for full details."

    # Get detailed errors
    events = get_pipeline_events(pipeline_id=result["pipeline_id"], max_results=50)

Step 5: Iterate Until Working

  1. Review errors from result or get_pipeline_events
  2. Fix issues in local files
  3. Re-upload with upload_folder
  4. Run create_or_update_pipeline again (it will update, not recreate)
  5. Repeat until result["success"] == True

Quick Reference: MCP Tools

Primary Tool

Tool Description
create_or_update_pipeline Main entry point. Creates or updates pipeline, optionally runs and waits. Returns detailed status with success, state, errors, and actionable message.

Pipeline Management

Tool Description
find_pipeline_by_name Find existing pipeline by name, returns pipeline_id
get_pipeline Get pipeline configuration and current state
start_update Start pipeline run (validate_only=True for dry run)
get_update Poll update status (QUEUED, RUNNING, COMPLETED, FAILED)
stop_pipeline Stop a running pipeline
get_pipeline_events Get error messages for debugging failed runs
delete_pipeline Delete a pipeline

Supporting Tools

Tool Description
upload_folder Upload local folder to workspace (parallel)
get_table_details Verify output tables have expected schema and row counts
execute_sql Run ad-hoc SQL to inspect data

Reference Documentation (Local)

Load these for detailed patterns:


Best Practices (2025)

Language Selection

  • Default to SQL unless user specifies Python or task clearly requires it
  • Use SQL for: Transformations, aggregations, filtering, joins (most cases)
  • Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths (use modern pyspark.pipelines as dp)
  • Generate ONE language per request unless user explicitly asks for mixed pipeline

Modern Defaults

  • CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
  • Raw .sql/.py files, not notebooks
  • Serverless compute ONLY - Do not use classic clusters unless explicitly required
  • Unity Catalog (required for serverless)
  • read_files() for cloud storage ingestion - see 1-ingestion-patterns.md

Common Issues

Issue Solution
Empty output tables Use get_table_details to verify, check upstream sources
Pipeline stuck INITIALIZING Normal for serverless, wait a few minutes
"Column not found" Check schemaHints match actual data
Streaming reads fail Use FROM STREAM(table) for streaming sources
Timeout during run Increase timeout, or use wait_for_completion=False and poll with get_update
MV doesn't refresh Enable row tracking on source tables
SCD2 schema errors Let SDP infer START_AT/END_AT columns

For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline_events(pipeline_id=...) for full stack traces.


Advanced Pipeline Configuration

For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.


Platform Constraints

Serverless Pipeline Requirements (Default)

Requirement Details
Unity Catalog Required - serverless pipelines always use UC
Workspace Region Must be in serverless-enabled region
Serverless Terms Must accept serverless terms of use
CDC Features Requires serverless (or Pro/Advanced with classic clusters)

Serverless Limitations (When Classic Clusters Required)

Limitation Workaround
R language Not supported - use classic clusters if required
Spark RDD APIs Not supported - use classic clusters if required
JAR libraries Not supported - use classic clusters if required
Maven coordinates Not supported - use classic clusters if required
DBFS root access Limited - must use Unity Catalog external locations
Global temp views Not supported

General Constraints

Constraint Details
Schema Evolution Streaming tables require full refresh for incompatible changes
SQL Limitations PIVOT clause unsupported
Sinks Python only, streaming only, append flows only

Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.

Install via CLI
npx skills add https://github.com/malcolndandaro/databricks_telecom_bundle --skill spark-declarative-pipelines
Repository Details
star Stars 1
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator
malcolndandaro
malcolndandaro Explore all skills →