name: dlt-expert description: Expert guidance for dlt (data load tool), the Python library for building data pipelines. Create pipelines from REST APIs, SQL databases, cloud storage, and Python data structures to destinations like DuckDB, BigQuery, Snowflake, and Postgres. Covers rest_api_source, RESTClient, authentication, pagination, incremental loading, schema evolution, and pipeline deployment. Use when working with dlt, data pipelines, ETL, ELT, data loading, REST API integration, or when user mentions dlt, data load tool, pipeline creation, or data extraction. license: Apache-2.0 compatibility: Requires Python 3.9-3.14, pip or uv package manager
dlt Expert
Expert guidance for building data pipelines with dlt (data load tool), the Python library that automates tedious data loading tasks.
Core Concepts
What is dlt?
dlt (data load tool) is a mature, open-source Python library for building data pipelines. It's:
- A library, not a platform - Add it to your code, respects existing workflows
- Lightweight - No backends, containers, or black boxes required
- Pythonic - Clean interfaces, human-readable formats, no side effects
- LLM-native - Built from the ground up to work with AI assistants
Key Features:
- Schema inference and evolution
- Automatic data normalization for nested structures
- Incremental loading with state management
- 40+ verified sources and destinations
- Supports Python 3.9-3.14
dlt Principles
Core philosophy:
- Multiply, don't add - dlt does more work so users do less
- No black boxes - Everything is transparent and inspectable
- Respect existing workflows - Integrate seamlessly with existing code
- Mature and trusted - Used by thousands of engineers in production
Quick Start
Installation
pip install dlt
# or with uv
uv add dlt
Simple Example
Load chess player data from an API:
import dlt
from dlt.sources.helpers import requests
# Create pipeline
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='duckdb',
dataset_name='player_data'
)
# Grab data from API
data = []
for player in ['magnuscarlsen', 'rpragchess']:
response = requests.get(f'https://api.chess.com/pub/player/{player}')
response.raise_for_status()
data.append(response.json())
# Extract, normalize, and load
pipeline.run(data, table_name='player')
Try it:
Building Data Pipelines
Step 1: Choose Your Approach
dlt offers multiple patterns based on your data source:
For REST APIs:
- Use
rest_api_source()for standard APIs (declarative) - Use
RESTClientfor complex/custom APIs (programmatic) - See REST_API.md for detailed patterns
For SQL Databases:
import dlt
from dlt.sources.sql_database import sql_database
source = sql_database(
"mysql://user:password@localhost/db",
table_names=["users", "orders"]
)
For Cloud Storage (S3, GCS, Azure):
from dlt.sources.filesystem import filesystem
resource = filesystem(
bucket_url="s3://example-bucket",
file_glob="*.csv"
)
For Python Data Structures:
@dlt.resource(write_disposition="replace")
def my_data():
yield [{"id": 1, "name": "Item 1"}]
Step 2: Configure Authentication
See AUTHENTICATION.md for complete patterns:
- API Keys (query parameter or header)
- Bearer Tokens
- HTTP Basic Auth
- OAuth 2.0 Client Credentials
Step 3: Handle Pagination
For APIs with multiple pages, dlt provides built-in paginators:
JSONLinkPaginator- Follows "next" linksPageNumberPaginator- Page-based paginationOffsetPaginator- Offset/limit paginationJSONResponseCursorPaginator- Cursor-based pagination
See PAGINATION.md for details.
Step 4: Implement Incremental Loading
For large datasets, load only new/changed data:
@dlt.resource(
write_disposition="append",
primary_key="id"
)
def events(
updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01T00:00:00Z")
):
# Only fetch records updated since last run
params = {"updated_since": updated_at.last_value}
# ... fetch and yield data
See INCREMENTAL_LOADING.md for advanced patterns.
Step 5: Choose Destination
Common destinations:
- DuckDB - Local analytics, fast testing
- BigQuery - Google Cloud warehouse
- Snowflake - Enterprise data warehouse
- Postgres - PostgreSQL database
- Redshift - AWS data warehouse
See DESTINATIONS.md for configuration details.
Step 6: Run Pipeline
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
dataset_name="my_dataset"
)
info = pipeline.run(source)
print(f"Loaded {info}")
Write Dispositions
Control how data is loaded:
| Disposition | Behavior | Use Case |
|---|---|---|
replace |
Truncate and reload all data | Full snapshots, dimension tables |
append |
Add new records without deduplication | Immutable logs, events |
merge |
Upsert based on primary key | Updating records, CDC |
Secrets Management
Never hardcode credentials! Use .dlt/secrets.toml:
# .dlt/secrets.toml
api_key = "your_api_key_here"
[sources.my_api]
base_url = "https://api.example.com"
api_key = "source_specific_key"
[sources.my_api.credentials]
client_id = "your_client_id"
client_secret = "your_client_secret"
Access in code:
api_key = dlt.secrets["api_key"]
token = dlt.secrets["sources.my_api.token"]
Environment variables: dlt automatically maps env vars:
SOURCES__MY_API__API_KEY=your_key
Testing and Inspection
Test Locally with DuckDB
pipeline = dlt.pipeline(
pipeline_name="test_pipeline",
destination="duckdb",
dataset_name="test_data"
)
# Run with limited data
info = pipeline.run(source.with_limit(10))
Inspect Data
# Get connection
conn = pipeline.sql_client()
# Query data
result = conn.execute("SELECT COUNT(*) FROM users").fetchone()
print(f"Total users: {result[0]}")
CLI Commands
# View schema
dlt pipeline <name> schema
# Show data
dlt pipeline <name> show
# Inspect pipeline
dlt pipeline <name> info
Common Patterns
Pattern 1: Simple REST API Pipeline
For standard REST APIs with predictable structure:
from dlt.sources.rest_api import rest_api_source
source = rest_api_source({
"client": {
"base_url": "https://api.example.com",
"auth": {
"type": "bearer",
"token": dlt.secrets.value
}
},
"resources": [
{
"name": "users",
"endpoint": {
"path": "users",
"paginator": "json_link",
"data_selector": "data"
}
}
]
})
pipeline = dlt.pipeline(
pipeline_name="api_pipeline",
destination="duckdb"
)
pipeline.run(source)
Pattern 2: Custom REST API with RESTClient
For complex APIs needing custom logic:
from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator
@dlt.resource(write_disposition="replace")
def users():
client = RESTClient(base_url="https://api.example.com")
for page in client.paginate(
"/users",
paginator=JSONResponsePaginator(next_url_path="pagination.next")
):
yield page
@dlt.source
def my_api_source():
return [users()]
Pattern 3: Incremental Loading with Merge
For updating existing records:
@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def users(
updated_at=dlt.sources.incremental("updated_at")
):
client = RESTClient(base_url="https://api.example.com")
for page in client.paginate(
"/users",
params={"updated_since": updated_at.last_value}
):
yield page
Development Workflow
1. Start with DuckDB Destination
Fast local iteration:
pipeline = dlt.pipeline(destination="duckdb")
2. Test with Limited Data
source.with_limit(10) # Test with 10 records
3. Verify Schema
# Check inferred schema
dlt pipeline <name> schema
4. Test Incremental Loading
Run pipeline twice to verify state:
# First run
pipeline.run(source)
# Second run - should only load new data
pipeline.run(source)
5. Deploy to Production
Common deployment options:
- Airflow - Orchestration with DAGs
- AWS Lambda - Serverless functions
- Google Cloud Functions - Serverless on GCP
- Any Python environment - dlt runs anywhere Python runs
See DEPLOYMENT.md for deployment patterns.
Troubleshooting
Authentication Failures:
- Verify secrets in
.dlt/secrets.toml - Check token expiration for OAuth
- Ensure correct auth type (bearer vs api_key)
Pagination Issues:
- Inspect API response structure
- Verify paginator configuration matches API
- Test with
add_limit(10)first
Schema Evolution:
- dlt handles schema changes automatically
- Use schema contracts for strict validation
- See SCHEMA_EVOLUTION.md
Performance:
- Use incremental loading for large datasets
- Configure appropriate batch sizes
- Consider parallelization for independent resources
Best Practices
- Start Simple - Use
rest_api_source()for standard APIs - Use RESTClient for Custom Logic - When you need more control
- Always Use Secrets Management - Never hardcode credentials
- Test Locally with DuckDB - Fast iteration and debugging
- Implement Incremental Loading - For large or frequently updated datasets
- Choose Appropriate Write Disposition - Based on data mutability
- Specify Primary Keys - Required for
mergedisposition - Use Test-Driven Development - Write tests for your pipelines
- Follow dlt Principles - No black boxes, respect existing workflows
- Leverage Built-ins - Use dlt's auth, paginators, and retry logic
See BEST_PRACTICES.md for comprehensive guidance.
Reference Files
Detailed documentation for specific topics:
- REST_API.md - REST API patterns (rest_api_source vs RESTClient)
- AUTHENTICATION.md - Authentication methods and patterns
- PAGINATION.md - Pagination classes and configuration
- INCREMENTAL_LOADING.md - Incremental loading strategies
- DESTINATIONS.md - Destination configuration and options
- SCHEMA_EVOLUTION.md - Schema inference and evolution
- DEPLOYMENT.md - Deployment patterns and orchestration
- BEST_PRACTICES.md - Comprehensive best practices
- TDD_WORKFLOW.md - Test-driven development for dlt pipelines
Additional Resources
- Official Docs: https://dlthub.com/docs
- GitHub Repo: https://github.com/dlt-hub/dlt
- REST API Source Guide: https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api
- RESTClient Guide: https://dlthub.com/docs/general-usage/http/rest-client
- Community Slack: https://dlthub.com/community
- Demo Examples: https://github.com/dlt-hub/dlt_demos
- REST API Recipes: https://github.com/dlt-hub/rest_api_source_recipes
- Verified Sources: https://dlthub.com/docs/dlt-ecosystem/verified-sources/
- LLM Workspace: https://dlthub.com/workspace (10,100+ sources)
Contributing
Improvements welcome! Check the dlt repository for contribution guidelines:
- Bugfixes and improvements are always welcome
- New destinations unlikely to be merged (high maintenance cost)
- Focus on SQLAlchemy destination for new database dialects