airflow-3x-migration

star 0

Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API.

MGPowerlytics By MGPowerlytics schedule Updated 1/21/2026

name: airflow-3x-migration description: Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API. version: 1.0.0

Airflow 3.x Skills

Import Path Changes

Operators

# Airflow 2.x
from airflow.operators.python import PythonOperator

# Airflow 3.x
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator

Sensors

# Airflow 3.x
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.time import TimeSensor

Removed Features

Removed Replacement
SubDagOperator TaskGroup
packaged_dag_processor Use standard DAG loading
airflow.contrib.* Provider packages
schedule_interval param schedule param

DAG Definition Changes

# Airflow 3.x preferred
from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="my_dag",
    schedule="@daily",  # Not schedule_interval
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["betting", "sports"],
) as dag:
    ...

TaskFlow API (Preferred)

from airflow.decorators import dag, task

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def betting_workflow():

    @task
    def download_games(sport: str) -> list:
        # Returns are automatically passed via XCom
        return fetch_games(sport)

    @task
    def update_elo(games: list) -> dict:
        return calculate_elo(games)

    # Chain tasks
    games = download_games("nba")
    ratings = update_elo(games)

betting_dag = betting_workflow()

Asset-Based Scheduling (Replaces Dataset)

from airflow.sdk import Asset

# Define assets
games_data = Asset("games_data")
elo_ratings = Asset("elo_ratings")

# Producer DAG
@dag(schedule="@daily")
def download_dag():
    @task(outlets=[games_data])
    def download():
        ...

# Consumer DAG - triggers when asset updates
@dag(schedule=[games_data])
def process_dag():
    @task
    def process():
        ...

Setup/Teardown Tasks

@task
def setup_db_connection():
    return create_connection()

@task
def cleanup_connection(conn):
    conn.close()

@task
def process_data(conn):
    ...

# Define setup/teardown relationship
with dag:
    conn = setup_db_connection()
    process_data(conn) >> cleanup_connection(conn)

    # Or use context manager style
    conn.as_setup() >> process_data(conn) >> conn.as_teardown()

DAG Versioning

from airflow import DAG

with DAG(
    dag_id="betting_workflow",
    version="2.0.0",  # New in 3.x
    schedule="@daily",
) as dag:
    ...

Backfill Changes

# Airflow 3.x - use REST API
curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" \
  -H "Content-Type: application/json" \
  -d '{"logical_date": "2024-01-15T00:00:00Z"}'

# Or use new backfill command
airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-15

New REST API Endpoints

import requests

# Get DAG runs
response = requests.get(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    auth=("admin", "admin")
)

# Trigger DAG
response = requests.post(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    json={"conf": {"sport": "nba"}},
    auth=("admin", "admin")
)

Edge Labels

from airflow.utils.edgemodifier import Label

download >> Label("success") >> process
download >> Label("failure") >> alert

Migration Checklist

  • Update all operator imports to provider packages
  • Replace schedule_interval with schedule
  • Convert SubDags to TaskGroups
  • Replace Dataset with Asset
  • Test DAG parsing with python dags/my_dag.py
  • Update docker-compose to Airflow 3.x image

Files to Reference

Airflow 3.x CLI

  • Has changed significantly since Airflow 2.
  • Please look at latest docs before running CLI commands
Install via CLI
npx skills add https://github.com/MGPowerlytics/nhlstats --skill airflow-3x-migration
Repository Details
star Stars 0
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator
MGPowerlytics
MGPowerlytics Explore all skills →