Python SDK

Complete instrumentation for Python AI agents — tracing, custom metrics, governance, drift detection, and more.

Python 3.8+Async Support

Installation

Terminal
pip install turingpulse

Configuration

Initialize TuringPulse at application startup. Get your API key from the TuringPulse dashboard.

Basic Configuration

config.py
from turingpulse import init

init(
    api_key="sk_live_your_api_key",  # Or set TURINGPULSE_API_KEY env var
    project_id="my-project",
    environment="production",
)

Full Configuration Options

full-config.py
from turingpulse import init, TuringPulseConfig

config = TuringPulseConfig(
    # Required
    api_key="sk_live_...",           # API key (or TURINGPULSE_API_KEY env var)
    project_id="my-project",         # Project identifier
    
    # Environment
    environment="production",        # Environment: production, staging, development
    service_name="my-service",       # Service name for grouping
    service_version="1.0.0",         # Version tag
    
    # Behavior
    enabled=True,                    # Enable/disable SDK globally
    debug=False,                     # Enable debug logging
    
    # Batching & Performance
    batch_size=100,                  # Traces per batch (default: 100)
    flush_interval=5.0,              # Seconds between flushes (default: 5.0)
    max_queue_size=10000,            # Max queued traces before dropping
    
    # Sampling
    sample_rate=1.0,                 # 1.0 = 100% of traces, 0.1 = 10%
    
    # Data Capture
    capture_inputs=True,             # Log function inputs
    capture_outputs=True,            # Log function outputs
    max_payload_size=65536,          # Max bytes per payload (truncates if exceeded)
    
    # Sensitive Data
    redact_keys=[                    # Keys to redact from logs
        "password", "api_key", "secret", "token", "authorization"
    ],
)

init(config)
💡
Environment Variable
For security, set TURINGPULSE_API_KEY as an environment variable instead of hardcoding.

Basic Instrumentation

Use the @instrument decorator to capture traces from any function.

basic.py
from turingpulse import instrument

@instrument(agent_id="my-agent")
def process_query(query: str) -> str:
    response = llm.chat(query)
    return response

# Call normally - traces are captured automatically
result = process_query("What's the weather today?")

Decorator Options

decorator-options.py
@instrument(
    agent_id="customer-support-agent",    # Required: Unique workflow identifier
    operation="handle_query",              # Operation name (defaults to function name)
    labels={                               # Custom labels for filtering
        "team": "support",
        "channel": "web",
        "priority": "high"
    },
    trace=True,                            # Enable/disable tracing (default: True)
    capture_input=True,                    # Log function inputs (default: True)
    capture_output=True,                   # Log function outputs (default: True)
)
def my_agent(query: str, context: dict) -> dict:
    return {"response": "..."}

Custom Metrics & KPIs

Track custom metrics and set up automatic alerts when thresholds are breached.

Defining KPIs

kpis.py
from turingpulse import instrument, KPIConfig

@instrument(
    agent_id="document-processor",
    kpis=[
        # Track latency from execution duration
        KPIConfig(
            kpi_id="latency_ms",
            name="Response Latency",
            use_duration=True,           # Use function execution time
            unit="ms",
            alert_threshold=5000,        # Alert if > 5 seconds
            comparator="gt",
        ),
        
        # Track token count from result
        KPIConfig(
            kpi_id="token_count",
            name="Token Usage",
            value=lambda ctx: ctx.result.get("tokens", 0),
            unit="tokens",
            alert_threshold=8000,
            comparator="gt",
        ),
        
        # Track accuracy score
        KPIConfig(
            kpi_id="accuracy",
            name="Accuracy Score",
            value=lambda ctx: ctx.result.get("score", 0),
            alert_threshold=0.85,
            comparator="lt",             # Alert if < 0.85
        ),
        
        # Track cost
        KPIConfig(
            kpi_id="cost_usd",
            name="Execution Cost",
            value=lambda ctx: ctx.metadata.get("cost", 0),
            unit="USD",
            alert_threshold=0.50,
            comparator="gt",
        ),
    ],
)
def process_document(doc: str) -> dict:
    result = llm.analyze(doc)
    return {
        "analysis": result.text,
        "tokens": result.usage.total_tokens,
        "score": result.confidence,
    }

KPI Comparators

ComparatorDescriptionAlert When
gtGreater thanvalue > threshold
gteGreater than or equalvalue >= threshold
ltLess thanvalue < threshold
lteLess than or equalvalue <= threshold
eqEqualvalue == threshold
neqNot equalvalue != threshold

Manual Metric Recording

manual-metrics.py
from turingpulse import get_current_trace, record_metric

@instrument(agent_id="my-agent")
def process_request(request: dict):
    trace = get_current_trace()
    
    # Record metrics at any point during execution
    record_metric("input_size", len(request.get("text", "")))
    
    result = llm.process(request)
    
    # Record computed metrics
    record_metric("output_tokens", result.usage.completion_tokens)
    record_metric("confidence", result.confidence)
    record_metric("cost", calculate_cost(result))
    
    return result

Governance & Human Oversight

Configure human-in-the-loop (HITL), human-after-the-loop (HATL), and human-on-the-loop (HOTL) workflows.

Human-in-the-Loop (HITL)

Require human approval before execution completes.

hitl.py
from turingpulse import instrument, GovernanceDirective

@instrument(
    agent_id="high-risk-action",
    governance=GovernanceDirective(
        hitl=True,                          # Require approval before execution
        reviewers=["manager@company.com"],  # Who can approve
        escalation_timeout=3600,            # Escalate after 1 hour (seconds)
        auto_approve_after=None,            # Never auto-approve (or set seconds)
        review_policy="require_all",        # require_all, require_any, require_majority
    ),
)
def execute_trade(symbol: str, amount: float):
    # Execution pauses until human approves in TuringPulse dashboard
    return trading_api.execute(symbol, amount)

Human-after-the-Loop (HATL)

Execute immediately, queue for review after completion.

hatl.py
@instrument(
    agent_id="content-generator",
    governance=GovernanceDirective(
        hatl=True,                          # Review after execution
        reviewers=["qa@company.com"],
        sample_rate=0.1,                    # Review 10% of executions
        priority_condition=lambda ctx: ctx.result.get("flagged", False),
    ),
)
def generate_content(topic: str):
    # Executes immediately, flagged for post-review
    return content_llm.generate(topic)

Human-on-the-Loop (HOTL)

Real-time monitoring with alerts, no blocking.

hotl.py
@instrument(
    agent_id="realtime-monitor",
    governance=GovernanceDirective(
        hotl=True,                          # Monitor in real-time
        alert_channels=["slack://alerts"],  # Alert destinations
        alert_condition=lambda ctx: ctx.kpis.get("latency_ms", 0) > 5000,
    ),
)
def process_transaction(tx: dict):
    # Monitored in real-time, alerts on anomalies
    return payment_processor.process(tx)

Creating Configurations via SDK

Create and manage TuringPulse configurations programmatically.

Register Workflows

register-workflow.py
from turingpulse import register_workflow, WorkflowConfig

# Register a workflow with full configuration
register_workflow(
    WorkflowConfig(
        workflow_id="customer-support-agent",
        name="Customer Support Agent",
        description="Handles customer queries via chat",
        
        # Default labels for all runs
        default_labels={
            "team": "support",
            "product": "chat-bot",
        },
        
        # KPI configurations
        kpis=[
            {"kpi_id": "latency_ms", "threshold": 5000, "comparator": "gt"},
            {"kpi_id": "cost_usd", "threshold": 0.10, "comparator": "gt"},
        ],
        
        # Governance defaults
        governance={
            "hatl": True,
            "sample_rate": 0.05,
            "reviewers": ["qa@company.com"],
        },
        
        # Alert channels
        alert_channels=["slack://support-alerts", "email://oncall@company.com"],
    )
)

Configure Alert Channels

alert-channels.py
from turingpulse import configure_alert_channel, AlertChannelConfig

# Slack integration
configure_alert_channel(
    AlertChannelConfig(
        channel_id="slack-alerts",
        type="slack",
        webhook_url="https://hooks.slack.com/services/...",
        name="Support Alerts",
        severity_filter=["warning", "critical"],
    )
)

# Email alerts
configure_alert_channel(
    AlertChannelConfig(
        channel_id="email-oncall",
        type="email",
        recipients=["oncall@company.com", "team-lead@company.com"],
        name="On-Call Email",
        severity_filter=["critical"],
    )
)

# PagerDuty integration
configure_alert_channel(
    AlertChannelConfig(
        channel_id="pagerduty-critical",
        type="pagerduty",
        routing_key="your-routing-key",
        name="PagerDuty Critical",
        severity_filter=["critical"],
    )
)

Define Baselines & Anomaly Rules

baselines.py
from turingpulse import create_baseline, create_anomaly_rule

# Create a baseline from historical data
create_baseline(
    workflow_id="customer-support-agent",
    baseline_id="latency-baseline",
    metric="latency_ms",
    method="percentile",           # percentile, stddev, fixed
    percentile=95,                 # For percentile method
    lookback_days=14,              # Days of history to analyze
    auto_update=True,              # Recalculate periodically
)

# Create anomaly detection rule
create_anomaly_rule(
    workflow_id="customer-support-agent",
    rule_id="latency-spike",
    metric="latency_ms",
    condition="exceeds_baseline",  # exceeds_baseline, below_baseline, outside_range
    multiplier=2.0,                # Alert if 2x baseline
    min_samples=5,                 # Minimum samples before alerting
    alert_channels=["slack-alerts"],
)

Nested Spans

Create child spans within an instrumented function to trace sub-operations.

spans.py
from turingpulse import instrument, span

@instrument(agent_id="multi-step-agent")
def complex_workflow(query: str):
    with span("retrieve_context") as s:
        context = vector_db.search(query)
        s.set_attribute("doc_count", len(context))
    
    with span("generate_response") as s:
        response = llm.chat(query, context=context)
        s.set_attribute("tokens", response.usage.total_tokens)
    
    with span("validate_output"):
        validated = validator.check(response)
    
    return validated

Custom Metadata & Tags

metadata.py
from turingpulse import instrument, get_current_trace

@instrument(agent_id="enriched-agent")
def process_with_metadata(query: str, user_id: str):
    trace = get_current_trace()
    
    # Set metadata (key-value pairs for analysis)
    trace.set_metadata("user_id", user_id)
    trace.set_metadata("query_length", len(query))
    trace.set_metadata("model_version", "gpt-4-turbo")
    
    # Add tags (for filtering in dashboard)
    trace.add_tag("priority", "high")
    trace.add_tag("department", "sales")
    trace.add_tag("experiment", "v2-prompt")
    
    result = llm.chat(query)
    
    # Update metadata after execution
    trace.set_metadata("response_tokens", result.usage.completion_tokens)
    trace.set_metadata("finish_reason", result.finish_reason)
    
    return result.content

Deploy Tracking

Register deployments to correlate behavior changes with code releases.

deploy.py
from turingpulse import register_deploy

# Auto-detect from CI/CD environment
register_deploy(
    workflow_id="my-agent",
    auto_detect=True,  # Detects GitHub Actions, GitLab CI, etc.
)

# Or provide explicit values
register_deploy(
    workflow_id="my-agent",
    version="v1.2.3",
    git_sha="abc123def",
    git_branch="main",
    commit_message="Improve prompt template",
    deployed_by="ci-bot",
    changelog=["Fixed hallucination issue", "Added context window"],
)
💡
Supported CI/CD
Auto-detection works with GitHub Actions, GitLab CI, CircleCI, Jenkins, Azure DevOps, and Bitbucket Pipelines.

Drift Detection & Fingerprinting

fingerprint.py
from turingpulse import init, FingerprintConfig

init(
    api_key="sk_live_...",
    project_id="my-project",
    fingerprint=FingerprintConfig(
        enabled=True,
        capture_prompts=True,       # Hash prompts for change detection
        capture_configs=True,       # Hash model configurations
        capture_structure=True,     # Track agent DAG structure
        sensitive_keys=[            # Keys to redact before hashing
            "api_key", "password", "secret", "token"
        ],
    )
)

# When prompts/configs change, TuringPulse will:
# 1. Detect the change automatically
# 2. Correlate with any metric changes
# 3. Alert if drift is detected

Async Support

async.py
from turingpulse import instrument, span
import asyncio

@instrument(agent_id="async-agent")
async def async_workflow(queries: list[str]):
    async with span("parallel_processing"):
        tasks = [process_query(q) for q in queries]
        results = await asyncio.gather(*tasks)
    return results

async def process_query(query: str):
    response = await llm.achat(query)
    return response

# Run with asyncio
results = asyncio.run(async_workflow(["q1", "q2", "q3"]))

Error Handling

errors.py
from turingpulse import instrument, get_current_trace

@instrument(agent_id="error-aware-agent")
def risky_operation(data: dict):
    trace = get_current_trace()
    
    try:
        result = external_api.call(data)
        return result
    except RateLimitError as e:
        trace.set_error("rate_limited", str(e), recoverable=True)
        raise
    except ValidationError as e:
        trace.set_error("validation_failed", str(e), recoverable=True)
        return {"error": "Invalid input", "details": str(e)}
    except Exception as e:
        # Unknown errors are captured automatically with stack trace
        raise

Next Steps