name: create-transformation description: Write dlthub transformation functions that map source tables to CDM entities. Use after generate-cdm to produce the transformation Python script. argument-hint: "[pipeline-name]"
Create transformation
Write @dlt.hub.transformation functions that map annotated source tables to CDM entities, using SQL-first with optional ibis.
Requires:
.schema/<cdm-name>/taxonomy.json— confirmed table→concept mappings and natural keys; read_namefrom this file to determine<cdm-name>.schema/<cdm-name>/<pipeline_name>.dbml— annotated source schemas.schema/<cdm-name>/CDM.dbml— target CDM schema
If any are missing, run the preceding skills first.
The _name value from taxonomy.json is also the dataset_name for the transformation pipeline — do not re-derive it.
Parse $ARGUMENTS:
pipeline-name: the dlt pipeline to transform from (e.g.hubspot_crm_pipeline)- If omitted, check
taxonomy.jsonfor contributing pipelines and ask user which to target
Steps
1. Install dependencies
Determine destination — if the destination is not already known from prior context (prior conversation), ask the user which destination they are using before proceeding.
Install the matching ibis backend:
| Destination | Command |
|---|---|
| DuckDB | uv add "ibis-framework[duckdb]" |
| PostgreSQL | uv add "ibis-framework[postgres]" |
| Snowflake | uv add "ibis-framework[snowflake]" |
| BigQuery | uv add "ibis-framework[bigquery]" |
| Some backend | uv add ibis-framework[<backend>] |
Then install dlt[hub]:
uv add "dlt[hub]"
2. Read inputs
Read in parallel:
.schema/annotated-sources.dbml— source columns and their concept mappings.schema/taxonomy.json— table mappings and natural keys.schema/CDM.dbml— CDM entity definitions and column specs
3. Get actual source schema
Prefer relation schema from dlt dataset objects (not get_table_schema MCP tool) for actual column types.
The MCP tool may include untyped/null-only columns that were never materialized in the destination.
import dlt
pipeline = dlt.attach(pipeline_name="<pipeline_name>")
dataset = pipeline.dataset()
relation = dataset.<table_name>
schema = relation.schema() # authoritative column list
Cross-check the annotated columns in annotated-sources.dbml against the schema from relation.schema(). Note any discrepancies.
4. Plan transformation order
Always run dimensions before facts — facts join on dimension surrogate keys.
Build an execution order:
- All conformed dimensions (shared across multiple facts)
- Non-conformed dimensions
- Fact tables (after all their dimension FKs exist)
Do not self-reference transformation outputs while building facts (default).
By default, fact SQL must be derived from source-side tables/logic (or explicit stage resources), not from newly produced dim_* output tables.
This avoids cyclic/incompatible behavior across runs and destinations.
Allowed exception:
- If the user explicitly requests output-to-output dependencies and the destination semantics are confirmed to support that pattern, document it in the plan before writing SQL.
Define a key type contract before writing any SQL.
Pick one key type for this pipeline (text or bigint) and apply it consistently to:
- all surrogate/foreign key casts in SQL or ibis
- every corresponding
columns=schema hint
Do not mix key representations (INT64 vs STRING) for related keys across dimensions/facts.
If source systems disagree on key type, normalize to the chosen contract in staging/CTEs first.
5. Write transformation functions
One @dlt.hub.transformation function per CDM entity. Wrap all in a @dlt.source.
Dataset binding is required when yielding from @dlt.source.
When a transformation resource is returned/yielded from a source, pass the dataset argument explicitly (for example yield dim_person(source_dataset)). Not binding datasets can raise IncompatibleDatasetsException.
@dlt.source
def hubspot_activity_schema(source_dataset: dlt.Dataset):
# Correct: dataset is explicitly bound
yield dim_company(source_dataset)
yield fact_activity(source_dataset)
@dlt.hub.transformation
def dim_company(dataset: dlt.Dataset):
yield dataset("SELECT company_id, name FROM hubspot__companies")
Default to SQL transformation logic — pass a SQL string directly to dataset() (see "Writing your queries in SQL" in https://dlthub.com/docs/hub/transformations.md).
Use SQL first because it is easier for users to review, generally more reliable for LLM generation, and dlthub can transpile dialect differences when needed.
@dlt.hub.transformation
def dim_person(dataset: dlt.Dataset):
yield dataset("SELECT email, first_name, last_name FROM hubspot__contacts ORDER BY email")
If a relation variable is already available (for example slack_dataset), treat it as a callable dataset relation and pass SQL directly:
@dlt.hub.transformation
def dim_users(dataset: dlt.Dataset):
yield dataset("SELECT user_id, email, created_at FROM users")
Write all transformation SQL in ANSI-standard SQL. This ensures transformations are portable across destinations without modification. dlthub uses SQLGlot to transpile queries, but transpilation can only bridge dialect gaps when the input SQL uses constructs that have mappings across dialects. ANSI SQL is the baseline that all supported destinations understand.
Concretely:
- Use
CAST(x AS type)notx::type - Use
COALESCE(a, b)notIFNULL(a, b) - Use standard type names:
VARCHAR,BIGINT,BOOLEAN,TIMESTAMP - Use
CASE WHENfor conditional logic - Use standard aggregates:
SUM,AVG,COUNT,MIN,MAX
When a transformation genuinely requires a dialect-specific function with no ANSI equivalent (e.g., EPOCH_MS, STRFTIME, array operations), pass query_dialect to dataset() so dlthub knows how to transpile it.
Cross-dataset SQL must use fully qualified source references.
When writing into <target_dataset> from a different source dataset, unqualified table names may resolve against the target dataset and fail with "table not found". For BigQuery, always use project.dataset.table for source-side refs.
@dlt.hub.transformation
def fact_activity(dataset: dlt.Dataset):
yield dataset(
"""
SELECT a.id, a.activity_type, a.created_at
FROM `my_project.source_dataset_name.activities` AS a
"""
)
Association key check is mandatory before FK logic.
For nested association tables, verify join lineage first: association table _dlt_parent_id joins to parent row _dlt_id (not to parent business keys like id).
Do this verification before writing any JOIN used to derive foreign keys.
ibis remains supported as an option when SQL becomes too verbose for a specific step (complex programmatic expression building, reusable expression fragments, or existing ibis-heavy codebases). If ibis is chosen, keep everything lazy and never fall back to pandas.
Minimal ibis example (single-table transform), adapted from dlt docs:
@dlt.hub.transformation
def dim_person(dataset: dlt.Dataset):
contacts = dataset.table("hubspot__contacts").to_ibis()
yield contacts.select("email", "first_name", "last_name").order_by("email").limit(1000)
For more complex ibis patterns (joins, aggregations, unions, row_number, window functions, etc.) see the ibis Table expression API.
ibis requires a SQL-capable destination (BigQuery, Snowflake, DuckDB with file-based access, etc.). If the user requests DuckDB as destination, check whether ibis can connect to it in the context — if not, keep SQL-first transformations or switch to a destination that supports the desired ibis workflow.
Decorator (default pattern):
@dlt.hub.transformation(
write_disposition="replace",
)
def dim_person(dataset: dlt.Dataset):
...
For scheduled or high-volume pipelines, use the
incremental-transformationskill to switchreplace→ incremental so only new/changed rows are processed on each run.
Cross-source transformations: use SQL-first where possible by selecting from available datasets in SQL; use ibis connections only when cross-dataset SQL composition is not practical in the current environment.
If ibis is needed for cross-source composition, initialise connections before the CDM pipeline starts — see the ibis Table expression API for join, union, and window function patterns.
columns= hint — REQUIRED for any column that may be NULL on first run, and for any computed or derived column:
@dlt.hub.transformation(
write_disposition="replace",
columns={
"company_sk": {"data_type": "text", "nullable": False},
"email_hash": {"data_type": "text", "nullable": True}, # md5()
"month_bucket": {"data_type": "text", "nullable": True}, # strftime()
"event_count": {"data_type": "bigint", "nullable": True}, # COUNT() alias
},
)
def dim_person(dataset: dlt.Dataset):
...
columns= data_type values for keys must match the key type contract selected in Step 4.
When to add columns=:
- Any computed or derived column — scan the SELECT list: every
md5(),strftime(),TRY_CAST,CASE WHEN, aggregate alias (COUNT(*) AS event_count), or function chain needs a hint. These are visible directly in the SQL. - Any column from a LEFT JOIN (lookup may return NULL)
- Any cast from string to typed value where source may be empty
- Any column that was NULL-only in a prior run
Omitting columns= causes silent data loss — dlthub strips the column from the outer SELECT if its schema entry has no data_type.
Do NOT use execute_sql_query for cloud destinations — use dlthub transformations with SQL-first (or ibis when explicitly selected).
6. Write the script
Output file: transformations/<dataset_name>_to_cdm.py
Structure:
import dlt
@dlt.source
def <dataset_name>_to_cdm(dataset: dlt.Dataset):
# dimensions first
yield dim_person(dataset)
yield dim_company(dataset)
yield dim_event(dataset)
# facts after
yield fact_event_attendance(dataset)
@dlt.hub.transformation(write_disposition="replace")
def dim_person(dataset: dlt.Dataset):
...
# ... remaining functions
if __name__ == "__main__":
source_pipeline = dlt.attach(pipeline_name="<source_pipeline_name>")
source_dataset = source_pipeline.dataset()
load_info = source_pipeline.run(<business_domain>_to_cdm(source_dataset))
print(load_info)
Naming convention: pipeline_name and dataset_name should reflect the business domain and central fact, not the source systems. Derive the name from:
- The central fact table in
.schema/CDM.dbml(e.g.fact_interaction→interactions) - The primary dimension in
.schema/ontology.ison(e.g.Person) - The use cases in
.schema/taxonomy.json
Name the dataset after the grain of the star schema — what the data mart is about: person_interactions, order_fulfillment, event_attendance. Never use source system names (hubspot_stripe_cdm) or generic names (combined_cdm, my_pipeline). A good name tells an analyst what business process lives in the dataset without reading the code.
7. Get feedback before running
Show a summary of:
- Output tables being created
- Source tables used per output table
- Any columns added with explicit type hints and why
- Any source columns skipped and why
Ask user to confirm before running the transformation.
8. Run
Run the script from the project root so .dlt state resolves correctly. If needed, enforce root CWD in entrypoint:
from pathlib import Path
import os
os.chdir(Path(__file__).resolve().parents[1]) # run from project root
If the run fails, read the error before deciding where to go — do not proceed to step 9:
- SQL syntax error, unsupported function, dialect error → (
debug-transformation) skill - Pipeline state error, stale packages, schema drift, connection error →
debug-pipelineskill in the rest-api-pipeline toolkit (also use this for development iterations — it sets updev_mode=True)
9. Validate output
After a successful run, verify the transformation produced the expected result using the MCP tools:
list_tables— confirm all CDM tables are present in the target datasetget_row_counts— verify counts are non-zero and plausible relative to source table sizesget_table_schema— confirm column names and types match the CDM specpreview_table— inspect a sample of rows for unexpected NULLs, wrong grain, or type mismatches
What to check:
- All expected CDM tables exist (no silent skip due to empty resource)
- Row counts are non-zero and plausible relative to source table sizes
- Surrogate key columns are populated (not all NULL)
- Foreign keys in fact tables resolve to values present in dimension tables
- No unexpected duplicate rows (grain violation)
- Computed columns (
md5, date buckets, etc.) are present and non-NULL where expected
If any check fails, go to the (debug-transformation) skill.
If all checks pass, ask the user what they'd like to do next:
Transformation validated successfully. What would you like to do next?
1. Deploy and schedule this transformation → dlthub-platform toolkit
2. Explore and visualise the CDM output → data-exploration toolkit
Output
transformations/<dataset_name>_to_cdm.py— dlthub transformation script