LangGraph Integration
Full observability and governance for LangGraph stateful agents with automatic node tracing.
langgraph >= 0.0.20StateGraphMessageGraph
Installation
Terminal
pip install turingpulse langgraphQuick Start
1. Initialize TuringPulse
config.py
from turingpulse import init
init(
api_key="sk_live_your_api_key",
project_id="my-project",
)2. Instrument Your Graph
graph.py
from langgraph.graph import StateGraph, END
from turingpulse.integrations.langgraph import instrument_graph
from typing import TypedDict
# Define your state
class AgentState(TypedDict):
messages: list
next_step: str
# Create your graph
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
# Add edges
graph.add_edge("agent", "tools")
graph.add_conditional_edges("tools", should_continue)
# Compile the graph
compiled = graph.compile()
# Instrument with TuringPulse
instrumented_graph = instrument_graph(
compiled,
agent_id="langgraph-agent",
labels={"framework": "langgraph"},
)3. Run Your Agent
run.py
# Run the instrumented graph
result = instrumented_graph.invoke({
"messages": [{"role": "user", "content": "What's the weather?"}],
})
# Traces are automatically sent to TuringPulse
# Each node execution is captured as a spanWhat Gets Traced
The LangGraph integration automatically captures:
- Graph Execution — Full trace for each invoke/stream
- Node Spans — Each node execution as a child span
- State Transitions — State before and after each node
- Conditional Edges — Which path was taken
- Tool Calls — Tool invocations within nodes
- LLM Calls — Nested LLM calls with tokens
- Errors — Exceptions with stack traces
ℹ️
Automatic LLM Tracing
If you're using LangChain LLMs within your nodes, they're automatically traced as nested spans.
With Governance
governance.py
from turingpulse import GovernanceDirective
from turingpulse.integrations.langgraph import instrument_graph
instrumented_graph = instrument_graph(
compiled,
agent_id="langgraph-agent",
governance=GovernanceDirective(
# Require approval for certain actions
hitl=True,
hitl_condition=lambda state: "delete" in str(state.get("messages", [])),
reviewers=["admin@company.com"],
# Review all executions
hatl=True,
hatl_sample_rate=1.0,
),
)With KPIs
kpis.py
from turingpulse import KPIConfig
from turingpulse.integrations.langgraph import instrument_graph
instrumented_graph = instrument_graph(
compiled,
agent_id="langgraph-agent",
kpis=[
KPIConfig(
kpi_id="latency_ms",
use_duration=True,
alert_threshold=10000,
comparator="gt",
),
KPIConfig(
kpi_id="node_count",
value=lambda ctx: len(ctx.metadata.get("nodes_executed", [])),
alert_threshold=20,
comparator="gt",
),
],
)Streaming Support
streaming.py
# Streaming is fully supported
for event in instrumented_graph.stream({
"messages": [{"role": "user", "content": "Hello"}],
}):
print(event)
# Each event is captured in the trace
# Async streaming
async for event in instrumented_graph.astream({
"messages": [{"role": "user", "content": "Hello"}],
}):
print(event)Node-Level Configuration
node-config.py
from turingpulse.integrations.langgraph import instrument_graph, NodeConfig
instrumented_graph = instrument_graph(
compiled,
agent_id="langgraph-agent",
node_configs={
"agent": NodeConfig(
capture_input=True,
capture_output=True,
labels={"type": "reasoning"},
),
"tools": NodeConfig(
capture_input=True,
capture_output=True,
labels={"type": "action"},
governance=GovernanceDirective(hitl=True),
),
},
)