name: durable-task-python description: Build durable, fault-tolerant workflows in Python using the Durable Task SDK with Azure Durable Task Scheduler. Use when creating orchestrations, activities, entities, or implementing patterns like function chaining, fan-out/fan-in, human interaction, or stateful agents. Applies to any Python application requiring durable execution, state persistence, or distributed transactions without Azure Functions dependency.
Durable Task Python SDK with Durable Task Scheduler
Build fault-tolerant, stateful workflows in Python applications using the Durable Task SDK connected to Azure Durable Task Scheduler.
Quick Start
Required Packages
pip install durabletask durabletask-azuremanaged azure-identity
Or add to requirements.txt:
durabletask
durabletask-azuremanaged
azure-identity
Minimal Worker + Client Setup
import os
from azure.identity import DefaultAzureCredential
from durabletask import task
from durabletask.client import OrchestrationStatus
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
# Activity function
def hello(ctx: task.ActivityContext, name: str) -> str:
return f"Hello {name}!"
# Orchestrator function
def my_orchestration(ctx: task.OrchestrationContext, input: str):
result = yield ctx.call_activity(hello, input=input)
return result
# Configuration - defaults to local emulator
taskhub = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
secure_channel = endpoint != "http://localhost:8080"
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
# Start worker and run orchestration
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(hello)
worker.start()
# Create client and schedule orchestration
dts_client = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
)
instance_id = dts_client.schedule_new_orchestration(my_orchestration, input="World")
state = dts_client.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == OrchestrationStatus.COMPLETED:
print(f"Result: {state.serialized_output}")
Pattern Selection Guide
| Pattern | Use When |
|---|---|
| Function Chaining | Sequential steps where each depends on the previous |
| Fan-Out/Fan-In | Parallel processing with aggregated results |
| Human Interaction | Workflow pauses for external input/approval |
| Durable Entities | Stateful objects with operations (counters, accounts) |
| Sub-Orchestrations | Reusable workflow components or version isolation |
| Eternal Orchestrations | Long-running background processes with continue_as_new |
| Monitoring | Periodic polling with configurable timeouts |
See references/patterns.md for detailed implementations.
Orchestration Structure
Basic Orchestrator
def my_orchestration(ctx: task.OrchestrationContext, input: str):
"""Orchestrator function - MUST be deterministic"""
# Call activities sequentially
step1 = yield ctx.call_activity(step1_activity, input=input)
step2 = yield ctx.call_activity(step2_activity, input=step1)
return step2
Basic Activity
def my_activity(ctx: task.ActivityContext, input: str) -> str:
"""Activity function - can have side effects, I/O, non-determinism"""
# Perform actual work here
print(f"Processing: {input}")
return f"Processed: {input}"
Registering with Worker
with DurableTaskSchedulerWorker(...) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(step1_activity)
worker.add_activity(step2_activity)
worker.start()
Critical Rules
Orchestration Determinism
Orchestrations replay from history - all code MUST be deterministic. When an orchestration resumes, it replays all previous code to rebuild state. Non-deterministic code produces different results on replay, causing failures.
NEVER do inside orchestrations:
datetime.now(),datetime.utcnow()→ Usectx.current_utc_datetimeuuid.uuid4()→ Usectx.new_uuid()random.random()→ Pass random values from activities- Direct I/O, HTTP calls, database access → Move to activities
time.sleep(),asyncio.sleep()→ Usectx.create_timer()- Environment variables that may change → Pass as input or use activities
- Global mutable state → Pass state through activity results
ALWAYS use:
yield ctx.call_activity()- Call activitiesyield ctx.call_sub_orchestrator()- Call sub-orchestrationsyield ctx.create_timer()- Durable delaysyield ctx.wait_for_external_event()- Wait for eventsctx.current_utc_datetime- Current timectx.new_uuid()- Generate GUIDsctx.set_custom_status()- Set status
Non-Determinism Patterns (WRONG vs CORRECT)
Getting Current Time
# WRONG - datetime.now() returns different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
current_time = datetime.now() # Non-deterministic!
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
# CORRECT - ctx.current_utc_datetime is replayed consistently
def good_orchestration(ctx: task.OrchestrationContext, _):
current_time = ctx.current_utc_datetime # Deterministic
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
Generating UUIDs/Random Values
# WRONG - uuid4() generates different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(uuid.uuid4()) # Non-deterministic!
yield ctx.call_activity(create_order, input=order_id)
# CORRECT - ctx.new_uuid() replays the same value
def good_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(ctx.new_uuid()) # Deterministic
yield ctx.call_activity(create_order, input=order_id)
Random Numbers
# WRONG - random produces different values on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
delay = random.randint(1, 10) # Non-deterministic!
yield ctx.create_timer(timedelta(seconds=delay))
# CORRECT - generate random in activity, pass to orchestrator
def get_random_delay(ctx: task.ActivityContext, _) -> int:
return random.randint(1, 10) # OK in activity
def good_orchestration(ctx: task.OrchestrationContext, _):
delay = yield ctx.call_activity(get_random_delay) # Deterministic
yield ctx.create_timer(timedelta(seconds=delay))
Sleeping/Delays
# WRONG - time.sleep blocks and doesn't persist
def bad_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
time.sleep(60) # Non-durable! Lost on restart
yield ctx.call_activity(step2)
# CORRECT - ctx.create_timer is durable
def good_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
yield ctx.create_timer(timedelta(seconds=60)) # Durable timer
yield ctx.call_activity(step2)
HTTP Calls and I/O
# WRONG - HTTP call in orchestrator is non-deterministic
def bad_orchestration(ctx: task.OrchestrationContext, url: str):
import requests
response = requests.get(url) # Non-deterministic!
return response.json()
# CORRECT - move I/O to activity
def fetch_data(ctx: task.ActivityContext, url: str) -> dict:
import requests
response = requests.get(url) # OK in activity
return response.json()
def good_orchestration(ctx: task.OrchestrationContext, url: str):
data = yield ctx.call_activity(fetch_data, input=url) # Deterministic
return data
Database Access
# WRONG - database query in orchestrator
def bad_orchestration(ctx: task.OrchestrationContext, user_id: str):
import sqlite3
conn = sqlite3.connect('db.sqlite') # Non-deterministic!
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
user = cursor.fetchone()
# ...
# CORRECT - database access in activity
def get_user(ctx: task.ActivityContext, user_id: str) -> dict:
import sqlite3
conn = sqlite3.connect('db.sqlite') # OK in activity
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
return dict(cursor.fetchone())
def good_orchestration(ctx: task.OrchestrationContext, user_id: str):
user = yield ctx.call_activity(get_user, input=user_id)
# ...
Environment Variables
# WRONG - env var might change between replays
def bad_orchestration(ctx: task.OrchestrationContext, _):
api_endpoint = os.getenv("API_ENDPOINT") # Could change!
yield ctx.call_activity(call_api, input=api_endpoint)
# CORRECT - pass config as input or read in activity
def good_orchestration(ctx: task.OrchestrationContext, config: dict):
api_endpoint = config["api_endpoint"] # From input, deterministic
yield ctx.call_activity(call_api, input=api_endpoint)
# ALSO CORRECT - read env var in activity
def call_api(ctx: task.ActivityContext, _) -> str:
api_endpoint = os.getenv("API_ENDPOINT") # OK in activity
# make the call...
Conditional Logic Based on External State
# WRONG - file existence can change between replays
def bad_orchestration(ctx: task.OrchestrationContext, path: str):
if os.path.exists(path): # Non-deterministic!
yield ctx.call_activity(process_file, input=path)
# CORRECT - check in activity
def check_file_exists(ctx: task.ActivityContext, path: str) -> bool:
return os.path.exists(path) # OK in activity
def good_orchestration(ctx: task.OrchestrationContext, path: str):
exists = yield ctx.call_activity(check_file_exists, input=path)
if exists: # Deterministic - based on activity result
yield ctx.call_activity(process_file, input=path)
Dictionary/Set Iteration Order
# POTENTIALLY WRONG - dict iteration order may vary (Python < 3.7)
def risky_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in items: # Order might not be guaranteed
yield ctx.call_activity(process, input=key)
# CORRECT - use sorted keys for deterministic order
def good_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in sorted(items.keys()): # Guaranteed order
yield ctx.call_activity(process, input=key)
Thread-Local or Global State
# WRONG - global state can change
counter = 0
def bad_orchestration(ctx: task.OrchestrationContext, _):
global counter
counter += 1 # Non-deterministic across replays!
yield ctx.call_activity(process, input=counter)
# CORRECT - pass state through orchestration input/output
def good_orchestration(ctx: task.OrchestrationContext, counter: int):
counter += 1 # Local variable, deterministic
yield ctx.call_activity(process, input=counter)
# If continuing, pass counter forward
ctx.continue_as_new(counter)
Using yield
In Python, orchestrator functions use yield to await durable operations:
# CORRECT - use yield
result = yield ctx.call_activity(my_activity, input="data")
# WRONG - will not work
result = ctx.call_activity(my_activity, input="data") # Missing yield!
Error Handling
def orchestrator_with_error_handling(ctx: task.OrchestrationContext, input: str):
try:
result = yield ctx.call_activity(risky_activity, input=input)
return result
except task.TaskFailedError as e:
# Activity failed - implement compensation
ctx.set_custom_status({"error": str(e)})
yield ctx.call_activity(compensation_activity, input=input)
return "Compensated"
Retry Policies
from durabletask.task import RetryPolicy
retry_policy = RetryPolicy(
first_retry_interval=5, # seconds
max_number_of_attempts=3,
backoff_coefficient=2.0,
max_retry_interval=60, # seconds
retry_timeout=300 # seconds
)
def orchestrator(ctx: task.OrchestrationContext, _):
result = yield ctx.call_activity(
unreliable_activity,
input="data",
retry_policy=retry_policy
)
return result
Working with Custom Types
The SDK supports dataclasses, namedtuples, and custom classes:
from dataclasses import dataclass
@dataclass
class Order:
product: str
quantity: int
cost: float
def process_order(ctx: task.ActivityContext, order: Order) -> str:
return f"Processed {order.quantity}x {order.product}"
def order_workflow(ctx: task.OrchestrationContext, order: Order):
result = yield ctx.call_activity(process_order, input=order)
return result
Connection & Authentication
Local Emulator (Default)
# No authentication required
taskhub = "default"
endpoint = "http://localhost:8080"
credential = None
secure_channel = False
Azure with DefaultAzureCredential
from azure.identity import DefaultAzureCredential
taskhub = "my-taskhub"
endpoint = "https://my-scheduler.region.durabletask.io"
credential = DefaultAzureCredential()
secure_channel = True
Authentication Helper
def get_connection_config():
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
taskhub = os.getenv("TASKHUB", "default")
is_local = endpoint == "http://localhost:8080"
return {
"host_address": endpoint,
"taskhub": taskhub,
"secure_channel": not is_local,
"token_credential": None if is_local else DefaultAzureCredential()
}
config = get_connection_config()
worker = DurableTaskSchedulerWorker(**config)
client = DurableTaskSchedulerClient(**config)
Local Development with Emulator
# Pull and run the emulator
docker pull mcr.microsoft.com/dts/dts-emulator:latest
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
# Dashboard available at http://localhost:8082
Client Operations
# Schedule new orchestration
instance_id = client.schedule_new_orchestration(my_orchestration, input="data")
# Schedule with custom instance ID
instance_id = client.schedule_new_orchestration(
my_orchestration,
input="data",
instance_id="my-custom-id"
)
# Wait for completion
state = client.wait_for_orchestration_completion(instance_id, timeout=60)
# Get current status
state = client.get_orchestration_state(instance_id)
# Raise external event
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)
# Terminate orchestration
client.terminate_orchestration(instance_id, output="User cancelled")
# Suspend/Resume
client.suspend_orchestration(instance_id)
client.resume_orchestration(instance_id)
References
- patterns.md - Detailed pattern implementations (Fan-Out/Fan-In, Human Interaction, Entities, Sub-Orchestrations)
- setup.md - Azure Durable Task Scheduler provisioning and deployment