langgraph-streaming

star 3

Streaming real-time updates from LangGraph: stream modes (values, updates, messages, custom, debug) for responsive UX

christian-bromann By christian-bromann schedule Updated 2/11/2026

name: langgraph-streaming description: Streaming real-time updates from LangGraph: stream modes (values, updates, messages, custom, debug) for responsive UX language: python

langgraph-streaming (Python)


name: langgraph-streaming description: Streaming real-time updates from LangGraph - stream modes (values, updates, messages, custom, debug) for responsive UX

Overview

LangGraph's streaming system surfaces real-time updates during graph execution, crucial for responsive LLM applications. Stream graph state, LLM tokens, or custom data as it's generated.

Decision Table: Stream Modes

Mode What it Streams Use Case
values Full state after each step Monitor complete state changes
updates State deltas after each step Track incremental updates
messages LLM tokens + metadata Chat UIs, token streaming
custom User-defined data Progress indicators, logs
debug All execution details Debugging, detailed tracing

Code Examples

Stream State Values

from langgraph.graph import StateGraph, START, END

def process(state):
    return {"count": state["count"] + 1}

graph = StateGraph(State).add_node("process", process).add_edge(START, "process").add_edge("process", END).compile()

# Stream full state after each step
for chunk in graph.stream(
    {"count": 0},
    stream_mode="values"
):
    print(chunk)  # {'count': 0}, then {'count': 1}

Stream State Updates (Deltas)

# Stream only the changes
for chunk in graph.stream(
    {"count": 0},
    stream_mode="updates"
):
    print(chunk)  # {"process": {"count": 1}}

Stream LLM Tokens

from langchain.chat_models import init_chat_model

model = init_chat_model("gpt-4")

def llm_node(state):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

graph = StateGraph(State).add_node("llm", llm_node).compile()

# Stream LLM tokens as they're generated
for chunk in graph.stream(
    {"messages": [HumanMessage("Hello")]},
    stream_mode="messages"
):
    token, metadata = chunk
    if hasattr(token, "content"):
        print(token.content, end="", flush=True)

Stream Custom Data

from langgraph.config import get_stream_writer

def my_node(state):
    writer = get_stream_writer()
    
    # Emit custom updates
    writer("Processing step 1...")
    # Do work
    writer("Processing step 2...")
    # More work
    writer("Complete!")
    
    return {"result": "done"}

graph = StateGraph(State).add_node("work", my_node).compile()

for chunk in graph.stream(
    {"data": "test"},
    stream_mode="custom"
):
    print(chunk)  # "Processing step 1...", etc.

Multiple Stream Modes

# Stream multiple modes simultaneously
for mode, chunk in graph.stream(
    {"messages": [HumanMessage("Hi")]},
    stream_mode=["updates", "messages", "custom"]
):
    print(f"{mode}: {chunk}")

Async Streaming

async for chunk in graph.astream(
    {"count": 0},
    stream_mode="values"
):
    print(chunk)

Stream with Subgraphs

# Include subgraph outputs
for chunk in graph.stream(
    {"data": "test"},
    stream_mode="updates",
    subgraphs=True  # Stream from nested graphs too
):
    print(chunk)

Stream with Interrupts

async for metadata, mode, chunk in graph.astream(
    {"query": "test"},
    stream_mode=["messages", "updates"],
    subgraphs=True,
    config={"configurable": {"thread_id": "1"}}
):
    if mode == "messages":
        # Handle streaming LLM content
        msg, _ = chunk
        if hasattr(msg, "content"):
            print(msg.content, end="")
    
    elif mode == "updates":
        # Check for interrupts
        if "__interrupt__" in chunk:
            # Handle interrupt
            interrupt_info = chunk["__interrupt__"][0].value
            user_input = input(f"Approve? {interrupt_info}: ")
            # Resume
            break

Boundaries

What You CAN Configure

✅ Choose stream modes ✅ Stream multiple modes simultaneously ✅ Emit custom data from nodes ✅ Stream from subgraphs ✅ Combine streaming with interrupts

What You CANNOT Configure

❌ Modify streaming protocol ❌ Change when checkpoints are created ❌ Alter token streaming format

Gotchas

1. Messages Mode Requires LLM Invocation

# ❌ WRONG - No LLM called, nothing streamed
def node(state):
    return {"output": "static text"}

for chunk in graph.stream({}, stream_mode="messages"):
    print(chunk)  # Nothing!

# ✅ CORRECT - LLM invoked
def node(state):
    response = model.invoke(state["messages"])  # LLM call
    return {"messages": [response]}

2. Custom Mode Needs Stream Writer

# ❌ WRONG - No writer, nothing streamed
def node(state):
    print("Processing...")  # Not streamed!
    return {"data": "done"}

# ✅ CORRECT
from langgraph.config import get_stream_writer

def node(state):
    writer = get_stream_writer()
    writer("Processing...")  # Streamed!
    return {"data": "done"}

3. Stream Modes Are Lists

# ❌ WRONG - Single string
graph.stream({}, stream_mode="updates, messages")

# ✅ CORRECT - List
graph.stream({}, stream_mode=["updates", "messages"])

4. Async Stream Requires Await

# ❌ WRONG
for chunk in graph.astream({}):  # SyntaxError!
    print(chunk)

# ✅ CORRECT
async for chunk in graph.astream({}):
    print(chunk)

Links

Install via CLI
npx skills add https://github.com/christian-bromann/langchain-skills --skill langgraph-streaming
Repository Details
star Stars 3
call_split Forks 1
navigation Branch main
article Path SKILL.md
More from Creator
christian-bromann
christian-bromann Explore all skills →