Python SDK
Complete instrumentation for Python AI agents — tracing, custom metrics, governance, drift detection, and more.
Installation
pip install turingpulseConfiguration
Initialize TuringPulse at application startup. Get your API key from the TuringPulse dashboard.
Basic Configuration
from turingpulse import init
init(
api_key="sk_live_your_api_key", # Or set TURINGPULSE_API_KEY env var
project_id="my-project",
environment="production",
)Full Configuration Options
from turingpulse import init, TuringPulseConfig
config = TuringPulseConfig(
# Required
api_key="sk_live_...", # API key (or TURINGPULSE_API_KEY env var)
project_id="my-project", # Project identifier
# Environment
environment="production", # Environment: production, staging, development
service_name="my-service", # Service name for grouping
service_version="1.0.0", # Version tag
# Behavior
enabled=True, # Enable/disable SDK globally
debug=False, # Enable debug logging
# Batching & Performance
batch_size=100, # Traces per batch (default: 100)
flush_interval=5.0, # Seconds between flushes (default: 5.0)
max_queue_size=10000, # Max queued traces before dropping
# Sampling
sample_rate=1.0, # 1.0 = 100% of traces, 0.1 = 10%
# Data Capture
capture_inputs=True, # Log function inputs
capture_outputs=True, # Log function outputs
max_payload_size=65536, # Max bytes per payload (truncates if exceeded)
# Sensitive Data
redact_keys=[ # Keys to redact from logs
"password", "api_key", "secret", "token", "authorization"
],
)
init(config)TURINGPULSE_API_KEY as an environment variable instead of hardcoding.Basic Instrumentation
Use the @instrument decorator to capture traces from any function.
from turingpulse import instrument
@instrument(agent_id="my-agent")
def process_query(query: str) -> str:
response = llm.chat(query)
return response
# Call normally - traces are captured automatically
result = process_query("What's the weather today?")Decorator Options
@instrument(
agent_id="customer-support-agent", # Required: Unique workflow identifier
operation="handle_query", # Operation name (defaults to function name)
labels={ # Custom labels for filtering
"team": "support",
"channel": "web",
"priority": "high"
},
trace=True, # Enable/disable tracing (default: True)
capture_input=True, # Log function inputs (default: True)
capture_output=True, # Log function outputs (default: True)
)
def my_agent(query: str, context: dict) -> dict:
return {"response": "..."}Custom Metrics & KPIs
Track custom metrics and set up automatic alerts when thresholds are breached.
Defining KPIs
from turingpulse import instrument, KPIConfig
@instrument(
agent_id="document-processor",
kpis=[
# Track latency from execution duration
KPIConfig(
kpi_id="latency_ms",
name="Response Latency",
use_duration=True, # Use function execution time
unit="ms",
alert_threshold=5000, # Alert if > 5 seconds
comparator="gt",
),
# Track token count from result
KPIConfig(
kpi_id="token_count",
name="Token Usage",
value=lambda ctx: ctx.result.get("tokens", 0),
unit="tokens",
alert_threshold=8000,
comparator="gt",
),
# Track accuracy score
KPIConfig(
kpi_id="accuracy",
name="Accuracy Score",
value=lambda ctx: ctx.result.get("score", 0),
alert_threshold=0.85,
comparator="lt", # Alert if < 0.85
),
# Track cost
KPIConfig(
kpi_id="cost_usd",
name="Execution Cost",
value=lambda ctx: ctx.metadata.get("cost", 0),
unit="USD",
alert_threshold=0.50,
comparator="gt",
),
],
)
def process_document(doc: str) -> dict:
result = llm.analyze(doc)
return {
"analysis": result.text,
"tokens": result.usage.total_tokens,
"score": result.confidence,
}KPI Comparators
| Comparator | Description | Alert When |
|---|---|---|
gt | Greater than | value > threshold |
gte | Greater than or equal | value >= threshold |
lt | Less than | value < threshold |
lte | Less than or equal | value <= threshold |
eq | Equal | value == threshold |
neq | Not equal | value != threshold |
Manual Metric Recording
from turingpulse import get_current_trace, record_metric
@instrument(agent_id="my-agent")
def process_request(request: dict):
trace = get_current_trace()
# Record metrics at any point during execution
record_metric("input_size", len(request.get("text", "")))
result = llm.process(request)
# Record computed metrics
record_metric("output_tokens", result.usage.completion_tokens)
record_metric("confidence", result.confidence)
record_metric("cost", calculate_cost(result))
return resultGovernance & Human Oversight
Configure human-in-the-loop (HITL), human-after-the-loop (HATL), and human-on-the-loop (HOTL) workflows.
Human-in-the-Loop (HITL)
Require human approval before execution completes.
from turingpulse import instrument, GovernanceDirective
@instrument(
agent_id="high-risk-action",
governance=GovernanceDirective(
hitl=True, # Require approval before execution
reviewers=["manager@company.com"], # Who can approve
escalation_timeout=3600, # Escalate after 1 hour (seconds)
auto_approve_after=None, # Never auto-approve (or set seconds)
review_policy="require_all", # require_all, require_any, require_majority
),
)
def execute_trade(symbol: str, amount: float):
# Execution pauses until human approves in TuringPulse dashboard
return trading_api.execute(symbol, amount)Human-after-the-Loop (HATL)
Execute immediately, queue for review after completion.
@instrument(
agent_id="content-generator",
governance=GovernanceDirective(
hatl=True, # Review after execution
reviewers=["qa@company.com"],
sample_rate=0.1, # Review 10% of executions
priority_condition=lambda ctx: ctx.result.get("flagged", False),
),
)
def generate_content(topic: str):
# Executes immediately, flagged for post-review
return content_llm.generate(topic)Human-on-the-Loop (HOTL)
Real-time monitoring with alerts, no blocking.
@instrument(
agent_id="realtime-monitor",
governance=GovernanceDirective(
hotl=True, # Monitor in real-time
alert_channels=["slack://alerts"], # Alert destinations
alert_condition=lambda ctx: ctx.kpis.get("latency_ms", 0) > 5000,
),
)
def process_transaction(tx: dict):
# Monitored in real-time, alerts on anomalies
return payment_processor.process(tx)Creating Configurations via SDK
Create and manage TuringPulse configurations programmatically.
Register Workflows
from turingpulse import register_workflow, WorkflowConfig
# Register a workflow with full configuration
register_workflow(
WorkflowConfig(
workflow_id="customer-support-agent",
name="Customer Support Agent",
description="Handles customer queries via chat",
# Default labels for all runs
default_labels={
"team": "support",
"product": "chat-bot",
},
# KPI configurations
kpis=[
{"kpi_id": "latency_ms", "threshold": 5000, "comparator": "gt"},
{"kpi_id": "cost_usd", "threshold": 0.10, "comparator": "gt"},
],
# Governance defaults
governance={
"hatl": True,
"sample_rate": 0.05,
"reviewers": ["qa@company.com"],
},
# Alert channels
alert_channels=["slack://support-alerts", "email://oncall@company.com"],
)
)Configure Alert Channels
from turingpulse import configure_alert_channel, AlertChannelConfig
# Slack integration
configure_alert_channel(
AlertChannelConfig(
channel_id="slack-alerts",
type="slack",
webhook_url="https://hooks.slack.com/services/...",
name="Support Alerts",
severity_filter=["warning", "critical"],
)
)
# Email alerts
configure_alert_channel(
AlertChannelConfig(
channel_id="email-oncall",
type="email",
recipients=["oncall@company.com", "team-lead@company.com"],
name="On-Call Email",
severity_filter=["critical"],
)
)
# PagerDuty integration
configure_alert_channel(
AlertChannelConfig(
channel_id="pagerduty-critical",
type="pagerduty",
routing_key="your-routing-key",
name="PagerDuty Critical",
severity_filter=["critical"],
)
)Define Baselines & Anomaly Rules
from turingpulse import create_baseline, create_anomaly_rule
# Create a baseline from historical data
create_baseline(
workflow_id="customer-support-agent",
baseline_id="latency-baseline",
metric="latency_ms",
method="percentile", # percentile, stddev, fixed
percentile=95, # For percentile method
lookback_days=14, # Days of history to analyze
auto_update=True, # Recalculate periodically
)
# Create anomaly detection rule
create_anomaly_rule(
workflow_id="customer-support-agent",
rule_id="latency-spike",
metric="latency_ms",
condition="exceeds_baseline", # exceeds_baseline, below_baseline, outside_range
multiplier=2.0, # Alert if 2x baseline
min_samples=5, # Minimum samples before alerting
alert_channels=["slack-alerts"],
)Nested Spans
Create child spans within an instrumented function to trace sub-operations.
from turingpulse import instrument, span
@instrument(agent_id="multi-step-agent")
def complex_workflow(query: str):
with span("retrieve_context") as s:
context = vector_db.search(query)
s.set_attribute("doc_count", len(context))
with span("generate_response") as s:
response = llm.chat(query, context=context)
s.set_attribute("tokens", response.usage.total_tokens)
with span("validate_output"):
validated = validator.check(response)
return validatedCustom Metadata & Tags
from turingpulse import instrument, get_current_trace
@instrument(agent_id="enriched-agent")
def process_with_metadata(query: str, user_id: str):
trace = get_current_trace()
# Set metadata (key-value pairs for analysis)
trace.set_metadata("user_id", user_id)
trace.set_metadata("query_length", len(query))
trace.set_metadata("model_version", "gpt-4-turbo")
# Add tags (for filtering in dashboard)
trace.add_tag("priority", "high")
trace.add_tag("department", "sales")
trace.add_tag("experiment", "v2-prompt")
result = llm.chat(query)
# Update metadata after execution
trace.set_metadata("response_tokens", result.usage.completion_tokens)
trace.set_metadata("finish_reason", result.finish_reason)
return result.contentDeploy Tracking
Register deployments to correlate behavior changes with code releases.
from turingpulse import register_deploy
# Auto-detect from CI/CD environment
register_deploy(
workflow_id="my-agent",
auto_detect=True, # Detects GitHub Actions, GitLab CI, etc.
)
# Or provide explicit values
register_deploy(
workflow_id="my-agent",
version="v1.2.3",
git_sha="abc123def",
git_branch="main",
commit_message="Improve prompt template",
deployed_by="ci-bot",
changelog=["Fixed hallucination issue", "Added context window"],
)Drift Detection & Fingerprinting
from turingpulse import init, FingerprintConfig
init(
api_key="sk_live_...",
project_id="my-project",
fingerprint=FingerprintConfig(
enabled=True,
capture_prompts=True, # Hash prompts for change detection
capture_configs=True, # Hash model configurations
capture_structure=True, # Track agent DAG structure
sensitive_keys=[ # Keys to redact before hashing
"api_key", "password", "secret", "token"
],
)
)
# When prompts/configs change, TuringPulse will:
# 1. Detect the change automatically
# 2. Correlate with any metric changes
# 3. Alert if drift is detectedAsync Support
from turingpulse import instrument, span
import asyncio
@instrument(agent_id="async-agent")
async def async_workflow(queries: list[str]):
async with span("parallel_processing"):
tasks = [process_query(q) for q in queries]
results = await asyncio.gather(*tasks)
return results
async def process_query(query: str):
response = await llm.achat(query)
return response
# Run with asyncio
results = asyncio.run(async_workflow(["q1", "q2", "q3"]))Error Handling
from turingpulse import instrument, get_current_trace
@instrument(agent_id="error-aware-agent")
def risky_operation(data: dict):
trace = get_current_trace()
try:
result = external_api.call(data)
return result
except RateLimitError as e:
trace.set_error("rate_limited", str(e), recoverable=True)
raise
except ValidationError as e:
trace.set_error("validation_failed", str(e), recoverable=True)
return {"error": "Invalid input", "details": str(e)}
except Exception as e:
# Unknown errors are captured automatically with stack trace
raiseNext Steps
- LangGraph Integration — Auto-instrumentation for LangGraph agents
- LangChain Integration — Auto-instrumentation for LangChain
- OpenAI Integration — Instrument OpenAI API calls
- Quickstart Guide — End-to-end setup walkthrough