Loading...
Loading...
Guides architectural decisions for LangGraph applications. Use when deciding between LangGraph vs alternatives, choosing state management strategies, designing multi-agent systems, or selecting persistence and streaming approaches.
npx skill4agent add existential-birds/beagle langgraph-architecture| Scenario | Alternative | Why |
|---|---|---|
| Single LLM call | Direct API call | Overhead not justified |
| Linear pipeline | LangChain LCEL | Simpler abstraction |
| Stateless tool use | Function calling | No persistence needed |
| Simple RAG | LangChain retrievers | Built-in patterns |
| Batch processing | Async tasks | Different execution model |
| TypedDict | Pydantic |
|---|---|
| Lightweight, faster | Runtime validation |
| Dict-like access | Attribute access |
| No validation overhead | Type coercion |
| Simpler serialization | Complex nested models |
| Use Case | Reducer | Example |
|---|---|---|
| Chat messages | | Handles IDs, RemoveMessage |
| Simple append | | |
| Keep latest | None (LastValue) | |
| Custom merge | Lambda | |
| Overwrite list | | Bypass reducer |
# SMALL STATE (< 1MB) - Put in state
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
# LARGE DATA - Use Store
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # Reference to store
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# Process without bloating checkpoints| Conditional Edges | Command |
|---|---|
| Routing based on state | Routing + state update |
| Separate router function | Decision in node |
| Clearer visualization | More flexible |
| Standard patterns | Dynamic destinations |
# Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
# Command - when combining routing with updates
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})add_edgeadd_conditional_edgesCommandSend| Checkpointer | Use Case | Characteristics |
|---|---|---|
| Testing only | Lost on restart |
| Development | Single file, local |
| Production | Scalable, concurrent |
| Custom | Special needs | Implement BaseCheckpointSaver |
# Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
# Subgraph options
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent
checkpointer=True, # Independent checkpointing
checkpointer=False, # No checkpointing (runs atomically)
) ┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘┌──────┐ ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘ └──────┘┌────────┐ ┌────────┐ ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘ └────────┘ └────────┘| Mode | Use Case | Data |
|---|---|---|
| UI updates | Node outputs only |
| State inspection | Full state each step |
| Chat UX | LLM tokens |
| Progress/logs | Your data via StreamWriter |
| Debugging | Tasks + checkpoints |
# Stream from subgraphs
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # Include subgraph events
):
namespace, data = chunk # namespace indicates depth| Strategy | Use Case |
|---|---|
| Approval before action |
| Review after completion |
| Dynamic, contextual pauses |
# Simple resume (same thread)
graph.invoke(None, config)
# Resume with value
graph.invoke(Command(resume="approved"), config)
# Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
# Modify state and resume
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)# Per-node retry
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
# Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)
# Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "__end__"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END# Set recursion limit
config = {"recursion_limit": 50}
graph.invoke(input, config)
# Track remaining steps in state
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"