name: spark-declarative-pipelines description: "Creates, configures, updates, reviews, and troubleshoots Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion. Use when: (1) building new data pipelines, (2) reviewing or auditing existing pipelines for best practices, (3) migrating legacy DLT Python to modern SDP, (4) troubleshooting pipeline failures. Triggers: SDP, LDP, DLT, Delta Live Tables, Lakeflow, streaming tables, materialized views, APPLY CHANGES, CDC, bronze/silver/gold, medallion architecture, @dlt.table, @dp.table, read_files, cloudFiles, pipeline review, pipeline audit, pipeline best practices."
Lakeflow Spark Declarative Pipelines (SDP)
Official Documentation
- Lakeflow Spark Declarative Pipelines Overview - Main documentation hub
- SQL Language Reference - SQL syntax for streaming tables and materialized views
- Python Language Reference -
pyspark.pipelinesAPI - Loading Data - Auto Loader, Kafka, Kinesis ingestion
- Change Data Capture (CDC) - AUTO CDC, SCD Type 1/2
- Developing Pipelines - File structure, testing, validation
- Liquid Clustering - Modern data layout optimization
Workflow Selection
Determine the task type and follow the appropriate workflow:
| Task | Workflow |
|---|---|
| Build new pipeline | Follow "Development Workflow with MCP Tools" below |
| Review/audit existing pipelines | Follow "Review Mode" below |
| Migrate legacy DLT to SDP | See 6-dlt-migration.md |
| Troubleshoot pipeline failures | Use get_pipeline_events(), see "Common Issues" |
Review Mode
When asked to review, audit, check, or improve existing pipelines:
Step 1: Locate Pipeline Definitions
Search for pipeline configurations:
- DAB pipelines:
**/dlt_pipelines/*.ymlorresources/pipelinesindatabricks.yml - Source files:
.sqland.pyfiles referenced in pipelinelibraries
Step 2: Check Against 2025 Best Practices
| Check | Legacy Pattern | Modern Pattern | Reference |
|---|---|---|---|
| Python API | import dlt |
from pyspark import pipelines as dp |
5-python-api.md |
| Ingestion | spark.readStream.format("cloudFiles") |
read_files() SQL syntax |
1-ingestion-patterns.md |
| Clustering | PARTITION BY or none |
CLUSTER BY (Liquid Clustering) |
4-performance-tuning.md |
| File format | Notebook (# Databricks notebook source) |
Raw .sql/.py files |
Best practice |
| SCD patterns | dlt.apply_changes() |
AUTO CDC INTO SQL syntax |
3-scd-patterns.md |
Step 3: Review Checklist
- Modern API: Using
pyspark.pipelines(dp) not legacydlt - SQL preferred: Transformations in SQL unless Python required
- Liquid Clustering:
CLUSTER BYon all tables, especially facts - Data quality: Expectations defined (
EXPECT,@dp.expect_or_drop) - No hardcoded configs: Variables from pipeline configuration, not
SETstatements - Raw files: Not notebook format with
# COMMAND ---------- - Serverless: No classic cluster configuration unless required
Step 4: Provide Recommendations
For each issue found:
- Identify the file and line number
- Show the current (legacy) code
- Show the recommended (modern) code
- Reference the appropriate documentation file
Development Workflow with MCP Tools
Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline which handles the entire lifecycle.
IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.
Step 1: Write Pipeline Files Locally
Create .sql or .py files in a local folder:
my_pipeline/
├── bronze/
│ ├── ingest_orders.sql # SQL (default for most cases)
│ └── ingest_events.py # Python (for complex logic)
├── silver/
│ └── clean_orders.sql
└── gold/
└── daily_summary.sql
SQL Example (bronze/ingest_orders.sql):
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);
Python Example (bronze/ingest_events.py):
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
Language Selection:
- Default to SQL unless user specifies Python or task requires it
- Use SQL for: Transformations, aggregations, filtering, joins (90% of use cases)
- Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths
- Generate ONE language per request unless user explicitly asks for mixed pipeline
Step 2: Upload to Databricks Workspace
# MCP Tool: upload_folder
upload_folder(
local_folder="/path/to/my_pipeline",
workspace_folder="/Workspace/Users/user@example.com/my_pipeline"
)
Step 3: Create/Update and Run Pipeline
Use create_or_update_pipeline - the main entry point. It:
- Searches for an existing pipeline with the same name (or uses
idfromextra_settings) - Creates a new pipeline or updates the existing one
- Optionally starts a pipeline run
- Optionally waits for completion and returns detailed results
# MCP Tool: create_or_update_pipeline
result = create_or_update_pipeline(
name="my_orders_pipeline",
root_path="/Workspace/Users/user@example.com/my_pipeline",
catalog="my_catalog",
schema="my_schema",
workspace_file_paths=[
"/Workspace/Users/user@example.com/my_pipeline/bronze/ingest_orders.sql",
"/Workspace/Users/user@example.com/my_pipeline/silver/clean_orders.sql",
"/Workspace/Users/user@example.com/my_pipeline/gold/daily_summary.sql"
],
start_run=True, # Start immediately
wait_for_completion=True, # Wait and return final status
full_refresh=True, # Full refresh all tables
timeout=1800 # 30 minute timeout
)
Result contains actionable information:
{
"success": True, # Did the operation succeed?
"pipeline_id": "abc-123", # Pipeline ID for follow-up operations
"pipeline_name": "my_orders_pipeline",
"created": True, # True if new, False if updated
"state": "COMPLETED", # COMPLETED, FAILED, TIMEOUT, etc.
"catalog": "my_catalog", # Target catalog
"schema": "my_schema", # Target schema
"duration_seconds": 45.2, # Time taken
"message": "Pipeline created and completed successfully in 45.2s. Tables written to my_catalog.my_schema",
"error_message": None, # Error summary if failed
"errors": [] # Detailed error list if failed
}
Step 4: Handle Results
On Success:
if result["success"]:
# Verify output tables
stats = get_table_details(
catalog="my_catalog",
schema="my_schema",
table_names=["bronze_orders", "silver_orders", "gold_daily_summary"]
)
On Failure:
if not result["success"]:
# Message includes suggested next steps
print(result["message"])
# "Pipeline created but run failed. State: FAILED. Error: Column 'amount' not found.
# Use get_pipeline_events(pipeline_id='abc-123') for full details."
# Get detailed errors
events = get_pipeline_events(pipeline_id=result["pipeline_id"], max_results=50)
Step 5: Iterate Until Working
- Review errors from result or
get_pipeline_events - Fix issues in local files
- Re-upload with
upload_folder - Run
create_or_update_pipelineagain (it will update, not recreate) - Repeat until
result["success"] == True
Quick Reference: MCP Tools
Primary Tool
| Tool | Description |
|---|---|
create_or_update_pipeline |
Main entry point. Creates or updates pipeline, optionally runs and waits. Returns detailed status with success, state, errors, and actionable message. |
Pipeline Management
| Tool | Description |
|---|---|
find_pipeline_by_name |
Find existing pipeline by name, returns pipeline_id |
get_pipeline |
Get pipeline configuration and current state |
start_update |
Start pipeline run (validate_only=True for dry run) |
get_update |
Poll update status (QUEUED, RUNNING, COMPLETED, FAILED) |
stop_pipeline |
Stop a running pipeline |
get_pipeline_events |
Get error messages for debugging failed runs |
delete_pipeline |
Delete a pipeline |
Supporting Tools
| Tool | Description |
|---|---|
upload_folder |
Upload local folder to workspace (parallel) |
get_table_details |
Verify output tables have expected schema and row counts |
execute_sql |
Run ad-hoc SQL to inspect data |
Reference Documentation (Local)
Load these for detailed patterns:
- 1-ingestion-patterns.md - Auto Loader, Kafka, Event Hub, Kinesis, file formats
- 2-streaming-patterns.md - Deduplication, windowing, stateful operations, joins
- 3-scd-patterns.md - Querying SCD Type 2 history tables, temporal joins
- 4-performance-tuning.md - Liquid Clustering, optimization, state management
- 5-python-api.md - Modern
dpAPI vs legacydltAPI comparison - 6-dlt-migration.md - Migrating existing DLT pipelines to SDP
- 7-advanced-configuration.md -
extra_settingsparameter reference and examples
Best Practices (2025)
Language Selection
- Default to SQL unless user specifies Python or task clearly requires it
- Use SQL for: Transformations, aggregations, filtering, joins (most cases)
- Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths (use modern
pyspark.pipelines as dp) - Generate ONE language per request unless user explicitly asks for mixed pipeline
Modern Defaults
- CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
- Raw
.sql/.pyfiles, not notebooks - Serverless compute ONLY - Do not use classic clusters unless explicitly required
- Unity Catalog (required for serverless)
- read_files() for cloud storage ingestion - see 1-ingestion-patterns.md
Common Issues
| Issue | Solution |
|---|---|
| Empty output tables | Use get_table_details to verify, check upstream sources |
| Pipeline stuck INITIALIZING | Normal for serverless, wait a few minutes |
| "Column not found" | Check schemaHints match actual data |
| Streaming reads fail | Use FROM STREAM(table) for streaming sources |
| Timeout during run | Increase timeout, or use wait_for_completion=False and poll with get_update |
| MV doesn't refresh | Enable row tracking on source tables |
| SCD2 schema errors | Let SDP infer START_AT/END_AT columns |
For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline_events(pipeline_id=...) for full stack traces.
Advanced Pipeline Configuration
For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.
Platform Constraints
Serverless Pipeline Requirements (Default)
| Requirement | Details |
|---|---|
| Unity Catalog | Required - serverless pipelines always use UC |
| Workspace Region | Must be in serverless-enabled region |
| Serverless Terms | Must accept serverless terms of use |
| CDC Features | Requires serverless (or Pro/Advanced with classic clusters) |
Serverless Limitations (When Classic Clusters Required)
| Limitation | Workaround |
|---|---|
| R language | Not supported - use classic clusters if required |
| Spark RDD APIs | Not supported - use classic clusters if required |
| JAR libraries | Not supported - use classic clusters if required |
| Maven coordinates | Not supported - use classic clusters if required |
| DBFS root access | Limited - must use Unity Catalog external locations |
| Global temp views | Not supported |
General Constraints
| Constraint | Details |
|---|---|
| Schema Evolution | Streaming tables require full refresh for incompatible changes |
| SQL Limitations | PIVOT clause unsupported |
| Sinks | Python only, streaming only, append flows only |
Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.