Langgraph V1 2 Deltachannel Per Node Timeout Sandbox Poc 2026
Date: 2026-05-22
Content track: sandbox-poc
Slug: langgraph-v1-2-deltachannel-per-node-timeout-sandbox-poc-2026
Goal
Verify whether the current LangGraph 1.2 line can support three production-agent patterns in a local sandbox:
- Per-node timeout on an async node.
- Node-level
error_handlerrecovery afterNodeTimeoutError. DeltaChannelaccumulation for a growing message list.
The backlog also asked for checkpoint delta size verification. This run did not complete a durable checkpoint byte comparison, so the article treats physical storage reduction as source-verified from LangChain documentation/blog material, not as a locally measured benchmark.
Environment
Host: macOS local Codex workspace
Python: 3.12.8
Sandbox directory: /tmp/effloow-langgraph-poc-WbaNFI
Virtualenv: /tmp/effloow-langgraph-poc-WbaNFI/.venv
Date: 2026-05-22
Network: enabled for package install and web research
Secrets: none used
Installed packages:
langchain-core==1.4.0
langgraph==1.2.1
langgraph-checkpoint==4.1.0
langgraph-checkpoint-sqlite==3.1.0
langgraph-prebuilt==1.1.0
langgraph-sdk==0.3.14
pydantic==2.13.4
xxhash==3.7.0
Commands
tmpdir=$(mktemp -d /tmp/effloow-langgraph-poc-XXXXXX)
python3 -m venv "$tmpdir/.venv"
"$tmpdir/.venv/bin/python" -m pip install --upgrade pip
"$tmpdir/.venv/bin/python" -m pip install 'langgraph>=1.2,<1.3'
"$tmpdir/.venv/bin/python" -m pip show langgraph
Relevant output:
Name: langgraph
Version: 1.2.1
Summary: Building stateful, multi-actor applications with LLMs
License-Expression: MIT
Requires: langchain-core, langgraph-checkpoint, langgraph-prebuilt, langgraph-sdk, pydantic, xxhash
API inspection:
"$tmpdir/.venv/bin/python" - <<'PY'
import inspect
from langgraph.graph import StateGraph
from langgraph.types import TimeoutPolicy, RetryPolicy, Command
from langgraph.errors import NodeTimeoutError, NodeError
from langgraph.channels import DeltaChannel
print('StateGraph.add_node:', inspect.signature(StateGraph.add_node))
print('TimeoutPolicy:', inspect.signature(TimeoutPolicy))
print('RetryPolicy:', inspect.signature(RetryPolicy))
print('NodeTimeoutError MRO:', [c.__name__ for c in NodeTimeoutError.__mro__])
print('NodeError:', inspect.signature(NodeError))
print('DeltaChannel:', inspect.signature(DeltaChannel))
PY
Relevant output:
StateGraph.add_node: (..., retry_policy=..., cache_policy=..., error_handler=..., destinations=..., timeout=...) -> Self
TimeoutPolicy: (*, run_timeout: float | timedelta | None = None, idle_timeout: float | timedelta | None = None, refresh_on: Literal['auto', 'heartbeat'] = 'auto') -> None
RetryPolicy: (..., max_attempts: int = 3, ..., retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool] = <function default_retry_on ...>)
NodeTimeoutError MRO: ['NodeTimeoutError', 'Exception', 'BaseException', 'object']
NodeError: (node: str, error: BaseException) -> None
DeltaChannel: (reducer: Callable[[Any, Sequence[Any]], Any], typ: type[Value] | None = None, *, snapshot_frequency: int = 1000) -> None
PoC 1: Async Node Timeout and Error Handler
Command:
"$tmpdir/.venv/bin/python" - <<'PY'
import asyncio
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import TimeoutPolicy, RetryPolicy, Command
from langgraph.errors import NodeError
class State(TypedDict):
events: list[str]
status: str
async def slow_node(state: State) -> State:
await asyncio.sleep(0.2)
return {"events": state["events"] + ["slow finished"], "status": "ok"}
def timeout_handler(state: State, error: NodeError) -> Command:
err = error.error
return Command(
update={
"events": state["events"] + [f"handled {type(err).__name__}"],
"status": "recovered",
},
goto="finalize",
)
def finalize(state: State) -> State:
return {"events": state["events"] + ["finalized"], "status": state["status"]}
builder = StateGraph(State)
builder.add_node(
"slow",
slow_node,
timeout=TimeoutPolicy(run_timeout=0.05),
retry_policy=RetryPolicy(max_attempts=1),
error_handler=timeout_handler,
)
builder.add_node("finalize", finalize)
builder.add_edge(START, "slow")
builder.add_edge("slow", "finalize")
builder.add_edge("finalize", END)
app = builder.compile()
async def main():
result = await app.ainvoke({"events": ["start"], "status": "new"})
print(result)
asyncio.run(main())
PY
Output:
{'events': ['start', 'handled NodeTimeoutError', 'finalized'], 'status': 'recovered'}
Interpretation:
TimeoutPolicy(run_timeout=0.05)cancelled the async node before the0.2second sleep completed.- The node-level
error_handlerreceived aNodeErrorwhose underlyingerrorwasNodeTimeoutError. - Returning
Command(update=..., goto="finalize")recovered the graph instead of aborting the run.
PoC 2: Sync Node Timeout Guardrail
Command:
"$tmpdir/.venv/bin/python" - <<'PY'
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
status: str
def sync_node(state: State) -> State:
return {"status": "done"}
builder = StateGraph(State)
builder.add_node("sync", sync_node, timeout=0.01)
builder.add_edge(START, "sync")
builder.add_edge("sync", END)
try:
builder.compile()
except Exception as exc:
print(type(exc).__name__)
print(str(exc).splitlines()[0])
PY
Output:
ValueError
Node timeouts are only supported for async nodes because sync Python execution cannot be safely cancelled in-process. Node 'sync' is sync.
Interpretation:
- The timeout API is intentionally async-only in this package version.
- Blocking sync SDK calls need async wrappers, provider-level timeouts, thread/process isolation, or a separate job boundary.
PoC 3: DeltaChannel Accumulation
Command:
"$tmpdir/.venv/bin/python" - <<'PY'
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.channels import DeltaChannel
def list_reducer(current: list[str], writes: Sequence[list[str]]) -> list[str]:
base = current or []
return [*base, *(item for write in writes for item in write)]
class DeltaState(TypedDict):
messages: Annotated[list[str], DeltaChannel(list_reducer, snapshot_frequency=2)]
count: int
def append_one(state: DeltaState) -> DeltaState:
n = state.get("count", 0) + 1
return {"messages": [f"event-{n}"], "count": n}
def route(state: DeltaState) -> str:
return "append" if state["count"] < 3 else END
builder = StateGraph(DeltaState)
builder.add_node("append", append_one)
builder.add_edge(START, "append")
builder.add_conditional_edges("append", route, {"append": "append", END: END})
app = builder.compile()
print(app.invoke({"messages": [], "count": 0}))
PY
Output:
{'messages': ['event-1', 'event-2', 'event-3'], 'count': 3}
Interpretation:
DeltaChannelimported and compiled successfully.- A batch reducer receiving
currentplus a sequence of writes produced the expected accumulated message list. - This confirms API usability and state semantics in a small in-memory graph, but it does not measure durable checkpoint byte savings.
Checkpoint Size Attempt and Limitation
langgraph-checkpoint-sqlite==3.1.0 installed successfully:
Name: langgraph-checkpoint-sqlite
Version: 3.1.0
Summary: Library with a SQLite implementation of LangGraph checkpoint saver.
License-Expression: MIT
Requires: aiosqlite, langgraph-checkpoint, sqlite-vec
A first durable-checkpoint comparison attempt using SqliteSaver(sqlite3.connect(path)) failed because LangGraph writes checkpoints from worker threads and default SQLite connections are thread-bound:
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.
A follow-up attempt using sqlite3.connect(path, check_same_thread=False) did not produce a stable byte-comparison result during this run. The article therefore does not claim a locally measured checkpoint reduction. It only says the local PoC verified the DeltaChannel API and reducer behavior, while checkpoint storage reduction remains source-verified and should be measured in the reader's own checkpointer backend.
Sources Used for Article Research
- GitHub releases:
https://github.com/langchain-ai/langgraph/releases - PyPI project page:
https://pypi.org/project/langgraph/ - LangGraph fault tolerance docs:
https://docs.langchain.com/oss/python/langgraph/timeout-and-error-handling - LangGraph durable execution docs:
https://docs.langchain.com/oss/python/langgraph/durable-execution - DeltaChannel Python reference:
https://reference.langchain.com/python/langgraph/channels/delta/DeltaChannel - NodeTimeoutError Python reference:
https://reference.langchain.com/python/langgraph/errors/NodeTimeoutError - Delta Channels blog:
https://www.langchain.com/blog/delta-channels-evolving-agent-runtime
Result
PASS for a bounded sandbox PoC:
- Per-node async timeout verified.
NodeTimeoutErrorrecovery through node-levelerror_handlerverified.- Sync-node timeout rejection verified.
DeltaChannelreducer behavior verified.
Not verified locally:
- Durable checkpoint byte savings.
- Graceful shutdown behavior.
- Postgres/SQLite production checkpointer behavior under concurrent graph load.
- LLM/tool provider cancellation semantics.
Read the article
This note supports the public article and records what was actually checked.