pyo3-interop

star 32

Rust↔Python interop architecture in duroxide-python. Use when modifying the PyO3 bridge, adding ScheduledTask types, fixing GIL deadlocks, changing tracing delegation, or debugging block_in_place / with_gil behavior.

microsoft By microsoft schedule Updated 2/10/2026

name: pyo3-interop description: Rust↔Python interop architecture in duroxide-python. Use when modifying the PyO3 bridge, adding ScheduledTask types, fixing GIL deadlocks, changing tracing delegation, or debugging block_in_place / with_gil behavior.

PyO3 Interop Architecture

Overview

duroxide-python bridges Rust's duroxide runtime to Python via PyO3/maturin. The interop has two distinct paths — orchestrations (generator-based, synchronous blocking) and activities (synchronous GIL calls). Getting this wrong causes GIL deadlocks, silent replay corruption, or dropped futures.

File Map

File Role
src/handlers.rs Core interop — orchestration handler loop, activity invocation, global context maps, select/race/join, activity cancellation
src/types.rs ScheduledTask enum — the protocol between Python and Rust
src/lib.rs PyO3 module entry point, #[pyfunction] trace functions
src/runtime.rs PyRuntime — wraps duroxide::Runtime, global tokio runtime
src/client.rs PyClient — wraps duroxide::Client, all methods with py.allow_threads()
src/provider.rs PySqliteProvider
src/pg_provider.rs PyPostgresProvider
python/duroxide/__init__.py Python wrapper: SqliteProvider, PostgresProvider, Client, Runtime, decorators
python/duroxide/context.py OrchestrationContext, ActivityContext
python/duroxide/driver.py Generator driver: create_generator, next_step, dispose_generator

⚠️ Critical: GIL Deadlock Problem

This is the most important difference between duroxide-python and duroxide-node. Get this wrong and the process deadlocks.

The Problem

PyO3 holds the GIL when Python calls into Rust #[pymethods]. If that method calls TOKIO_RT.block_on(), it blocks the thread while holding the GIL. Meanwhile, orchestration handlers running on tokio threads need the GIL via Python::with_gil()deadlock.

Thread A (Python → Rust):
  client.wait_for_orchestration()
    → PyO3 holds GIL
    → TOKIO_RT.block_on(async { ... })   ← BLOCKS, holding GIL

Thread B (Tokio → Python):
  orchestration handler invoked
    → block_in_place + Python::with_gil()  ← BLOCKS, waiting for GIL

The Fix

EVERY method that calls block_on must use py.allow_threads() to release the GIL before blocking:

fn wait_for_orchestration(&self, py: Python<'_>, id: String, timeout: u64) -> PyResult<...> {
    py.allow_threads(|| {
        TOKIO_RT.block_on(async {
            self.client.wait_for_orchestration(&id, timeout).await
                .map_err(|e| format!("{e}"))
        })
    })
    .map_err(PyRuntimeError::new_err)
}

This pattern is applied to ALL 20+ methods in client.rs and runtime.rs.

Error Handling Across the Boundary

PyErr is not Send, so you can't return PyResult from inside allow_threads. Pattern:

  1. Inside allow_threads: map errors to String via .map_err(|e| format!("{e}"))
  2. Outside allow_threads: map String to PyErr via .map_err(PyRuntimeError::new_err)

Rules for ANY New Method

  1. Add py: Python<'_> parameter to the method signature
  2. Wrap TOKIO_RT.block_on() in py.allow_threads(|| { ... })
  3. Map errors to String inside, to PyErr outside
  4. Never hold the GIL while blocking on tokio

Orchestration Interop (Blocking Generator Loop)

The replay engine calls poll_once() on the handler future. If the future isn't ready in one poll, it's dropped.

Solution: block_in_place + with_gil

fn call_create_blocking(&self, payload: String) -> Result<GeneratorStepResult, String> {
    tokio::task::block_in_place(|| {
        Python::with_gil(|py| {
            let result = self.create_fn.call1(py, (payload,))?;
            // parse result...
        })
    })
}

Orchestration Handler Sequence

Rust (tokio thread)                         Python (GIL)
───────────────────                         ────────────────────
1. invoke(ctx, input)
   ├─ Store ctx in ORCHESTRATION_CTXS[instance_id]
   ├─ call_create_blocking(payload) ──────► create_generator(payload)
   │   (block_in_place + with_gil)            ├─ Create OrchestrationContext
   │                                          ├─ Create generator: fn(ctx, input)
   │                                          ├─ gen.send(None) → first yield
   │                                          └─ Return {"status": "yielded", "task": ...}
   │◄────────────────────────────────────────┘
   ├─ Loop:
   │   ├─ execute_task(ctx, task)           // Real DurableFuture or replay
   │   ├─ call_next_blocking(result) ──────► next_step(result)
   │   │   (block_in_place + with_gil)        ├─ gen.send(value) or gen.throw(exc)
   │   │                                      └─ Return next task or completion
   │   │◄────────────────────────────────────┘
   │   └─ If completed/error: break
   └─ Remove ctx from ORCHESTRATION_CTXS

Activity Interop (Synchronous GIL Call)

Activities in duroxide-python are synchronous functions (unlike duroxide-node's async activities). They run on tokio threads via block_in_place + with_gil:

Rust                                        Python
────                                        ──────
invoke(ctx, input)
  ├─ Generate unique token (act-0, act-1, ...)
  ├─ Store ctx in ACTIVITY_CTXS[token]
  ├─ block_in_place + with_gil ─────────────► wrapped_fn(payload)
  │                                            ├─ Parse ctx, create ActivityContext
  │                                            ├─ Call user's function (synchronous)
  │                                            └─ Return JSON result
  │◄───────────────────────────────────────────┘
  └─ Remove token from ACTIVITY_CTXS

Users can use asyncio.run() inside activities if they need async I/O.

Cross-Thread Tracing

Python callbacks acquire the GIL on tokio threads. Rust contexts live on tokio threads. Global HashMaps bridge the two:

static ACTIVITY_CTXS: LazyLock<Mutex<HashMap<String, ActivityContext>>>
static ORCHESTRATION_CTXS: LazyLock<Mutex<HashMap<String, OrchestrationContext>>>

Python calls PyO3 functions that look up the Rust context:

# In OrchestrationContext (fire-and-forget, no yield)
def trace_info(self, message):
    orchestration_trace_log(self.instance_id, "info", str(message))

# In ActivityContext (fire-and-forget)
def trace_info(self, message):
    activity_trace_log(self._trace_token, "info", str(message))

Rules for Tracing

  1. Never expose is_replaying to Python — Rust OrchestrationContext.trace() handles suppression
  2. Always use global maps, not thread-locals — Python runs on a different thread
  3. Clean up map entries on ALL exit paths — leaked entries cause stale traces
  4. Use atomic tokens for activities (not instance_id) — multiple activities for the same instance can run concurrently

ScheduledTask Protocol

Python yields plain dicts. Rust deserializes them via serde_json into ScheduledTask enum variants:

#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ScheduledTask {
    Activity { name: String, input: String },
    ActivityWithRetry { name: String, input: String, retry: RetryPolicyConfig },
    Timer { delay_ms: u64 },
    WaitEvent { name: String },
    SubOrchestration { name: String, input: String },
    SubOrchestrationWithId { name: String, instance_id: String, input: String },
    SubOrchestrationVersioned { name: String, version: String, input: String },
    Orchestration { name: String, instance_id: String, input: String },
    OrchestrationVersioned { name: String, version: String, instance_id: String, input: String },
    NewGuid,
    UtcNow,
    ContinueAsNew { input: String },
    ContinueAsNewVersioned { input: String, version: String },
    Join { tasks: Vec<ScheduledTask> },
    Select { tasks: Vec<ScheduledTask> },
}

Adding a New ScheduledTask Type

  1. Add variant to ScheduledTask in src/types.rs with correct serde attributes
  2. Add execution branch in execute_task() in src/handlers.rs
  3. If it should work in select/race, add branch in make_select_future()
  4. If it should work in join/all, add branch in make_join_future()
  5. Add Python method to OrchestrationContext in python/duroxide/context.py
  6. Add test in tests/test_e2e.py
  7. Rebuild: maturin develop

Global Tokio Runtime

static TOKIO_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("failed to build tokio runtime")
});

All async operations go through TOKIO_RT.block_on() (with GIL released via py.allow_threads()). No pyo3-async-runtimes needed.

Provider Polymorphism

PyO3 doesn't support trait objects in constructors. Python wrapper detects provider type:

class Runtime:
    def __init__(self, provider, options=None):
        if getattr(provider, "_type", None) == "postgres":
            self._native = PyRuntime.from_postgres(provider._native, options)
        else:
            self._native = PyRuntime.from_sqlite(provider._native, options)

PyO3 Object Mutability

#[pyclass(get_all)] makes fields readable but NOT writable from Python. Even with set_all, setting a Python dict to an Option<String> field fails with a type mismatch.

Solution: Create pure Python wrapper objects (e.g., OrchestrationResult) instead of mutating PyO3 objects:

class OrchestrationResult:
    def __init__(self, status, output=None, error=None):
        self.status = status
        self.output = output  # parsed from JSON
        self.error = error

def _parse_status(raw):
    output = raw.output
    if output is not None:
        output = json.loads(output)
    return OrchestrationResult(status=raw.status, output=output, error=raw.error)

Common Pitfalls

Pitfall What Happens Fix
Missing py.allow_threads() around block_on GIL deadlock — process hangs forever Wrap ALL TOKIO_RT.block_on() calls
Returning PyErr from inside allow_threads Compile error — PyErr is not Send Map to String inside, PyErr outside
Thread-local for cross-thread context Lookup returns None — traces silently fail Use global HashMap
Mutating PyO3 #[pyclass] fields from Python TypeError or silently ignored Use Python wrapper objects
cargo build instead of maturin develop Python imports stale .so — changes don't take effect Always use maturin develop
Missing serde(rename_all) on new ScheduledTask variants Deserialization fails — task type not recognized Match Python naming convention (camelCase in JSON)

Build Requirements

# MUST use maturin (not cargo build) for the Python extension
source .venv/bin/activate
maturin develop           # Debug build + install
maturin develop --release # Release build + install
maturin build --release   # Build wheel for distribution

# cargo build alone produces a .dylib/.so that Python can't find
Install via CLI
npx skills add https://github.com/microsoft/duroxide-python --skill pyo3-interop
Repository Details
star Stars 32
call_split Forks 17
navigation Branch main
article Path SKILL.md
More from Creator