name: lhp
description: "Lakehouse Plumber (LHP): the tool that compiles declarative YAML into Databricks Lakeflow/DLT Python. Use whenever the user is working inside an LHP project. Dead-giveaway signals: the word flowgroup; the lhp CLI (init/validate/generate/dag/diff); or LHP files (lhp.yaml, substitutions/, presets/, templates/, blueprints/, pipelines/*.yaml). Covers scaffolding a new LHP project; authoring or fixing flowgroup load/transform/write/test actions; regenerating Python after editing YAML and checking the diff; reusing one pattern across regions/tenants/sites via templates or blueprints; setting up substitutions or Asset Bundle integration; and debugging any LHP validate/generate failure (unresolved ${token}, non-streaming source) including the SQL or YAML behind the generated Python. Fire even when the user only says flowgroup or lhp with little else. Skip for hand-written DLT/Spark notebooks, generic SQL, Genie, or CI/CD with no LHP project involved."
Lakehouse Plumber (LHP)
YAML-to-Python code generator for Databricks Lakeflow Declarative Pipelines.
Before You Start (read this first)
- Reuse before authoring. On any "create a flowgroup / pipeline / ingestion" request, scan
templates/andblueprints/first. If an existing template (one parameterised flowgroup) or blueprint (a whole-flowgroup pattern repeated per site/region/tenant) already fits, propose reusing it viause_template:/use_blueprint:and confirm with the user before hand-writing new actions. Only author fresh YAML when nothing fits. - Write-target rule of thumb: incremental/append/streaming ingest →
streaming_table; full recompute, joins, aggregations, or dimensional rebuilds →materialized_view. - New project from scratch? Load quickstart.md for the
init → lhp.yaml/substitutions/pipeline_config → validate → generatepath. - Common mistakes to avoid:
- Deprecated
{token}braces — always${token}(only%{local_var}uses non-$braces). - Missing
stream(view)wrapper in a SQL transform that reads a streaming source. - Secrets inline in YAML — always
${secret:scope/key}. operational_metadataat the write level — apply it at the load/transform (action) level.- Suggesting
lhp show(removed) orlhp deps/--force(deprecated). Uselhp dag,lhp diff; everygenerateis a full regenerate.
- Deprecated
Read Project Context
Read the user's existing project files before generating new configurations:
lhp.yaml— project config, operational metadata definitionssubstitutions/— environment tokens and secret scopespresets/— reusable defaults (action-type config)templates/— reusable action patterns (one parameterised flowgroup)blueprints/— reusable whole-flowgroup patterns, instantiated per site/region/tenant- Existing
pipelines/YAML files — match naming/structure conventions
Core Architecture
Three main actions: Load, Transform, Write.
- Load: Load data from a source into a view.
- Transform: Transform data in a view into a new view.
- Write: Write data from a view into a table or sink.
One auxiliary action: Test.
- Test: For testing only in test environment using expectations.
- Pipeline: Logical grouping; generated files organized by pipeline name. All files in a pipeline run in a single Spark Declarative Pipeline.
- FlowGroup: One source entity; becomes one Python file. A flowgroup is a logical grouping of actions.
- Action: Individual operation (load, transform, write, test)
Minimal FlowGroup
pipeline: <pipeline_name>
flowgroup: <flowgroup_name>
actions:
- name: <action_name>
type: load
readMode: stream # stream or batch
source:
type: cloudfiles # cloudfiles|delta|sql|jdbc|python|kafka|custom_datasource
path: "${landing_volume}/folder/*.csv"
format: csv
target: v_raw_data
- name: transform_data
type: transform
transform_type: sql # sql|python|schema|data_quality|temp_table
source: v_raw_data
target: v_cleaned
sql: |
SELECT * FROM stream(v_raw_data)
- name: write_table
type: write
source: v_cleaned
write_target:
type: streaming_table # streaming_table|materialized_view
database: "${catalog}.${schema}"
table: "my_table"
Substitution Syntax (Processing Order)
| Order | Syntax | Type |
|---|---|---|
| 1st | %{var} |
Local variable (flowgroup-scoped) |
| 2nd | {{ param }} |
Template parameter (Jinja2) |
| 3rd | ${token} |
Environment substitution (bare {token} is deprecated, LHP-DEPR-001) |
| 4th | ${secret:scope/key} |
Secret -> dbutils.secrets.get() |
Template and Flowgroup Naming Conventions
When creating reusable templates:
Template Files: TMPL<number>_<source_type>_<function>.yaml
<number>: Sequential identifier (001, 002, etc.)<source_type>: Source type from the load action (delta, cloudfiles, jdbc, kafka, sql, etc.)<function>: What the template does (scd2, bronze, incremental, etc.)- Examples:
TMPL001_delta_scd2.yaml- SCD Type 2 from Delta sourceTMPL002_cloudfiles_bronze.yaml- Bronze ingestion from CloudFilesTMPL003_jdbc_incremental.yaml- Incremental load from JDBC
Flowgroups Using Templates: <domain>_<final_table>_TMPL<number>
<domain>: Business domain or subject area (billing, orders, customers, etc.)<final_table>: The final target table nameTMPL<number>: Template number being used (must match template file)- Examples:
billing_invoice_TMPL001- Uses TMPL001 for invoice tableorders_customer_TMPL002- Uses TMPL002 for customer tableanalytics_fact_sales_TMPL003- Uses TMPL003 for fact_sales table
Quick Reference: Action Types
Each sub-type has its own leaf reference file. Load only the one(s) you need.
Key Rules
stream(view_name)required in SQL transforms reading from streaming sources- CloudFiles
_metadata.*columns only available in views, not downstream transforms - Preset lists are replaced, not merged; nested dicts are deep-merged
- All-or-nothing job_name: if any flowgroup has
job_name, all must have it - Never put secrets in YAML values — always use
${secret:scope/key} - Validate before generating:
lhp validate --env <env> readMode: stream->spark.readStream,batch->spark.read- Monitoring requires event_log —
monitoring: {}won't work withoutevent_logsection catalogandschemaare REQUIRED inpipeline_config.yaml— set them per-pipeline or in a top-levelproject_defaultsblock. Missing either failslhp generatewithBundleResourceError. See project-config.md anddocs/how-to/configure-catalog-and-schema.rst.resources/lhp/is exclusively managed by LHP — everylhp generatewipes it and rewrites it. Place custom resource YAMLs (hand-written jobs, dashboards, secret scopes) underresources/at the top level or any non-lhpsubdirectory.- Every
lhp generateis a full regenerate — there is no incremental mode and no--forceflag (it was removed and is a no-op). Never suggest--force.
Best Practice Defaults (apply unless the user overrides)
These defaults reflect LHP's published enterprise best practices. Load best-practices.md for full rules and rationale.
- Medallion defaults by layer:
- Bronze →
streaming_table+ DQEwarn+ file metadata - Silver →
materialized_view+ DQEdrop+updated_at - Gold →
materialized_view+ DQEfailon critical invariants
- Bronze →
- CloudFiles bronze must set
cloudFiles.schemaEvolutionMode: rescueandcloudFiles.rescuedDataColumn: _rescued_data. Silent data loss otherwise. - Default transforms to SQL for silver/gold. Reserve Python for UDFs/ML/procedural logic only. Externalize SQL > ~5 lines into
sql/<system>/<layer>/<name>.sql. - Prefer
cluster_columns(liquid clustering) overpartition_columnson write targets. - Every write target needs a
comment(Unity Catalog description) and adescriptionon every action (generated-code comment). - Keep each YAML file 50–200 lines, one pipeline per file, grouped by business domain (
pipelines/<system>/<layer>/). - Extract a template only after 3+ flowgroups share the pattern. Write concrete flowgroups first.
- Cap presets at ~15–20 files. Use
extendsfor hierarchy (global_defaults→<layer>_standard→ domain-specific). %{var}is flowgroup-local;${TOKEN}is environment. Never put environment values invariables:.- Treat preset edits as high-blast-radius. Run full-project
lhp validatebefore merging preset changes. - Templates/presets are flat — no subdirectory discovery. Use prefix naming (
TMPLxxx_<layer>_<action>_<type>,<scope>_<layer>_<purpose>).
CLI Quick Reference
lhp init <project> [--no-bundle] # Scaffold project (Asset Bundle ON by default)
lhp validate --env <env> # Validate configs
lhp generate --env <env> # Generate Python code (always a FULL regenerate)
lhp generate --env <env> --include-tests # With test actions included
lhp diff --env <env> # Show what generate would change on disk
lhp dag --format job --job-name <name> --bundle-output # Orchestration job
lhp list templates | presets | blueprints # List reusable artifacts
Reference Files
Load these based on the user's task:
Action references are split per sub-type — one leaf file per action sub-type. Load only the leaf for the sub-type you are writing or debugging. The full set is enumerated in the Quick Reference: Action Types table above. By category:
- Load — cloudfiles, delta, sql, python, jdbc, custom_datasource, kafka.
- Transform — sql, python, data_quality, temp_table, schema.
- Write — streaming_table standard, streaming_table cdc, streaming_table snapshot_cdc, materialized_view, sink delta, sink kafka, sink eventhubs, sink custom, sink foreachbatch.
- Test — row_count, uniqueness, referential_integrity, completeness, range, schema_match, all_lookups_found, custom_sql, custom_expectations. All 9 require the
--include-testsflag. - cdc-patterns.md — CDC and SCD2 patterns for Delta CDF, PostgreSQL WAL, and snapshot CDC. Load when implementing any CDC/SCD2 pattern.
- templates-presets.md — Template structure, naming conventions, parameter types (incl. inline SQL parametrized with Jinja), preset matching/merge behavior. Load when creating or editing templates or presets.
- blueprints.md — Blueprints: whole-flowgroup patterns expanded per instance (sites/regions/tenants),
use_blueprint:syntax,%{var}resolution,lhp list blueprints/lhp dag --expand-blueprints. Load when the same flowgroup repeats across deployments, or to decide blueprint vs template vs preset. - quickstart.md — First-project setup:
lhp init,lhp.yaml,databricks.yml,substitutions/,config/pipeline_config.yaml, first flowgroup, validate + generate. Load when scaffolding a new project from scratch. - project-config.md — lhp.yaml, substitutions, local variables, operational metadata, CLI commands, multi-flowgroup syntax. Load for project setup or config questions.
- advanced.md — Databricks bundles, pipeline/job configuration, dependency analysis, multi-job orchestration, CI/CD patterns. Load for deployment or orchestration tasks.
- monitoring.md — Event log injection, monitoring pipeline, materialized views,
__eventlog_monitoringalias. Load when configuring event_log or monitoring in lhp.yaml. - errors.md — All LHP error codes (LHP-CFG/VAL/IO/ACT/DEP) with causes and fixes. Load when troubleshooting any LHP error.
- best-practices.md — Enterprise best practices (BP-1 through BP-19, anti-patterns). Load when setting up a new project, reviewing/refactoring configs, designing templates/presets/substitutions, tiering data quality, choosing between streaming_table vs materialized_view, or answering "what's the right way to..." questions.
Instructions
- Read project files first — match existing patterns and conventions. Do not introduce a best-practice default that conflicts with an established project convention without flagging the trade-off.
- Reuse before authoring — for any flowgroup/pipeline creation request, scan
templates/andblueprints/first. If an existing template or blueprint fits the pattern, propose reusing it (use_template:/use_blueprint:) and confirm with the user before hand-writing new actions. Author fresh YAML only when nothing fits. - Apply best-practice defaults (see section above). When the project is new or silent on a choice, pick the BP default. Load best-practices.md when designing new structures or refactoring.
- Validate substitution tokens — ensure
${token}has corresponding entry in substitution files - Apply presets — reference project presets where appropriate
- Generate valid YAML — proper indentation, correct field nesting
- Explain behavior — describe what the generated YAML will produce in Python/Spark Declarative Pipelines
- Suggest validation — recommend
lhp validate --env devafter changes - For CDC/SCD2 patterns:
- Exclude CDC metadata columns (
__START_AT,__END_AT) using* exceptin transforms - Use business timestamps (modified_at, created_at) for
sequence_by - Add technical columns to
except_column_listin cdc_config - Apply
operational_metadataat action level, not write level - Use
* exceptpattern for future-proof column selection and schema evolution
- Exclude CDC metadata columns (
- For Kafka sources — remind that key/value are binary, need deserialization transform
- For operational_metadata:
- Apply at action level (load, transform), not write level
- For file sources: include file metadata (
_source_file_path,_source_file_name,_processing_timestamp) - For non-file sources: For example
_processing_timestamp
- For creating templates:
- Follow naming convention:
TMPL<number>_<source_type>_<function>.yaml - Define clear, required parameters with descriptions
- Use Jinja2 syntax for parameter substitution:
{{ param_name }}— including inside inlinesql:blocks - Quote array and string parameters in YAML:
keys: "{{ natural_keys }}" - Provide defaults for optional parameters
- Document the template purpose, parameters, and usage examples in templates/README.md
- Test templates by creating example flowgroups before finalizing
- Follow naming convention:
- For using templates in flowgroups:
- Name flowgroup:
<domain>_<final_table>_TMPL<number> - Reference template with
use_template: TMPL<number>_<source_type>_<function> - Provide all required parameters under
template_parameters: - Use natural YAML for objects and arrays (not JSON strings)
- Keep optional parameters only if overriding defaults
- If multiple flowgroups use the same template. use multi-flowgroup syntax.
- Name flowgroup:
- For blueprints (same flowgroup repeated across sites/regions/tenants): define a blueprint under
blueprints/and one instance file per variant (use_blueprint:+ nestedparameters:). Use%{var}for parameters; never${...}inpipeline:/flowgroup:fields. Load blueprints.md. - For error troubleshooting: Load errors.md, find the error code, follow resolution. Always suggest
lhp validate --env <env> --verbose. - For monitoring/event log setup: Load monitoring.md. Require
event_logbeforemonitoring. Use__eventlog_monitoringalias in pipeline_config.yaml. - Not all fields are required: When showing YAML examples, annotate fields as required/optional. Only required fields must be present; optional fields have sensible defaults.