Last Updated: 3/11/2026
Streaming
Stream real-time updates from your LangGraph agents.
Why Streaming?
Streaming improves UX by:
- Showing progress as the agent works
- Displaying LLM tokens as they’re generated
- Providing immediate feedback on long-running tasks
Stream Modes
LangGraph supports multiple streaming modes:
| Mode | What It Streams |
|---|---|
values | Full state after each step |
updates | State updates from each node |
messages | LLM tokens as they’re generated |
custom | Custom data from your nodes |
debug | Detailed execution information |
Stream State Updates
Stream Updates
Get state changes from each node:
Python:
for chunk in app.stream(inputs, stream_mode="updates"):
print(chunk)
# Output:
# {'node_a': {'count': 1}}
# {'node_b': {'count': 2}}JavaScript:
for await (const chunk of await app.stream(inputs, { streamMode: "updates" })) {
console.log(chunk);
}Stream Full State
Get the complete state after each step:
Python:
for chunk in app.stream(inputs, stream_mode="values"):
print(chunk)
# Output:
# {'count': 1, 'messages': [...]}
# {'count': 2, 'messages': [...]}Stream LLM Tokens
Stream tokens as the LLM generates them:
Python:
for chunk in app.stream(inputs, stream_mode="messages"):
message_chunk, metadata = chunk
if message_chunk.content:
print(message_chunk.content, end="", flush=True)JavaScript:
for await (const [msg, metadata] of await app.stream(
inputs,
{ streamMode: "messages" }
)) {
if (msg.content) {
process.stdout.write(msg.content);
}
}Filter by Node
Stream tokens from specific nodes only:
for chunk in app.stream(inputs, stream_mode="messages"):
msg, metadata = chunk
if metadata["langgraph_node"] == "my_llm_node":
print(msg.content, end="", flush=True)Filter by Tag
Tag LLM calls and filter by tag:
model_a = init_chat_model("gpt-4o-mini", tags=["joke"])
model_b = init_chat_model("gpt-4o-mini", tags=["poem"])
for chunk in app.stream(inputs, stream_mode="messages"):
msg, metadata = chunk
if "joke" in metadata["tags"]:
print(msg.content, end="", flush=True)Stream Custom Data
Send custom progress updates from nodes:
Python:
from langgraph.config import get_stream_writer
def my_node(state: State):
writer = get_stream_writer()
for i in range(100):
# Do work
process_item(i)
# Send progress
writer({"progress": i + 1, "total": 100})
return {"result": "done"}
# Stream custom data
for chunk in app.stream(inputs, stream_mode="custom"):
print(f"Progress: {chunk['progress']}/{chunk['total']}")JavaScript:
const myNode = (state, config) => {
for (let i = 0; i < 100; i++) {
// Do work
processItem(i);
// Send progress
config.writer({ progress: i + 1, total: 100 });
}
return { result: "done" };
};
for await (const chunk of await app.stream(inputs, { streamMode: "custom" })) {
console.log(`Progress: ${chunk.progress}/${chunk.total}`);
}Stream Multiple Modes
Combine multiple stream modes:
Python:
for chunk in app.stream(inputs, stream_mode=["updates", "messages"]):
mode, data = chunk
if mode == "updates":
print(f"Node update: {data}")
elif mode == "messages":
msg, metadata = data
print(f"Token: {msg.content}")JavaScript:
for await (const [mode, data] of await app.stream(
inputs,
{ streamMode: ["updates", "messages"] }
)) {
if (mode === "updates") {
console.log(`Node update: ${data}`);
} else if (mode === "messages") {
console.log(`Token: ${data[0].content}`);
}
}Stream from Subgraphs
Include outputs from nested subgraphs:
for chunk in app.stream(
inputs,
stream_mode="updates",
subgraphs=True
):
namespace, data = chunk
if namespace == ():
print(f"Root graph: {data}")
else:
print(f"Subgraph {namespace}: {data}")Best Practices
- Use
updatesfor node progress: See which nodes have completed - Use
messagesfor LLM tokens: Show typing effect to users - Use
customfor progress bars: Send percentage complete from long operations - Filter by node/tag: Reduce noise by streaming only relevant data
- Combine modes carefully: Too many modes can overwhelm output
Next Steps
- Human-in-the-Loop: Add approval workflows
- Graph API Reference: Explore streaming APIs