Skip to main content

Production Deployment & Best Practices

🚀 Production-Ready Python SDK

AgentRouter is a lightweight, production-ready Python SDK designed to build reliable multi-agent applications. While the SDK itself focuses on agent orchestration and workflow management, it's designed to integrate seamlessly with your existing infrastructure and scaling solutions.

SDK Architecture

Enterprise Architecture & ScalabilityLoad Balancer10M+ req/dayAPI GatewayRate LimitingAuthenticationAPI GatewayRate LimitingAuthenticationAPI GatewayRate LimitingAuthenticationAPI GatewayRate LimitingAuthenticationAgent PoolAuto-scaling100-1000 agentsHealth monitoringCache LayerRedis Cluster99.9% hit rateSub-ms latencyMessage QueueKafka/RabbitMQAsync processingEvent streamingMonitoringPrometheusGrafanaAlertManagerDatabase ClusterPostgreSQL with replicasAuto-failoverObject StorageS3-compatibleUnlimited scalePerformance MetricsP50: <100msP95: <500msP99: <1sUptime: 99.99%

📈 SDK Capabilities

Configuration Management

AgentRouter provides comprehensive configuration management with validation:

manager = ManagerAgent(
name="production_manager",
api_key="your-api-key",
model="usf-mini",
# Network configuration
api_timeout=60.0, # 5-300 seconds
max_retries=3, # 0-10 attempts
retry_delay=1.0, # 0.1-60 seconds
retry_multiplier=2.0, # 1.0-10.0
retry_max_wait=60.0, # 1-300 seconds
# Execution configuration
max_iterations=30, # 3-50 iterations
worker_timeout=60.0 # 5-300 seconds
)

Built-in Retry Mechanisms

The SDK includes intelligent retry logic with exponential backoff:

# Automatic retry on transient failures
# - Network timeouts
# - Temporary API errors
# - Rate limit responses (respects retry-after)

# Configurable retry behavior
config = {
"max_retries": 3,
"retry_delay": 1.0,
"retry_multiplier": 2.0, # Exponential backoff
"retry_max_wait": 60.0
}

🔐 API Key Management

Basic API Key Configuration

# Direct API key configuration
manager = ManagerAgent(
name="secure_manager",
api_key="your-api-key", # Required for manager agents
model="usf-mini"
)

# Worker agents can inherit from parent
worker = WorkerAgent(
name="worker",
role="Analyzes data inputs, executes validation checks, and generates formatted reports", # REQUIRED: Clear role
# api_key inherited from parent manager
)

Environment Variable Configuration

import os

# Recommended: Store API keys in environment variables
api_key = os.environ.get("AGENTROUTER_API_KEY")

manager = ManagerAgent(
name="manager",
api_key=api_key,
model="usf-mini"
)

🎯 Performance Characteristics

Actual Performance Metrics

Based on the SDK's architecture:

MetricValueDescription
SDK Overhead< 10msAgent orchestration overhead
Memory Footprint< 50MBBase SDK memory usage
Configuration Validation< 1msPydantic validation
Message Processing< 5msMessage transformation

Note: End-to-end latency depends on API response times and network conditions

🚀 Deployment Best Practices

1. Configuration Management

# Use environment-specific configurations
import os

class Config:
API_KEY = os.environ.get("AGENTROUTER_API_KEY")
MODEL = os.environ.get("AGENTROUTER_MODEL", "usf-mini")
MAX_ITERATIONS = int(os.environ.get("MAX_ITERATIONS", "30"))
API_TIMEOUT = float(os.environ.get("API_TIMEOUT", "60"))

# Apply configuration
manager = ManagerAgent(
name="manager",
api_key=Config.API_KEY,
model=Config.MODEL,
max_iterations=Config.MAX_ITERATIONS,
api_timeout=Config.API_TIMEOUT
)

2. Error Handling

from agentrouter.exceptions import (
ExecutionError,
MaxIterationsError,
ValidationError,
APIError
)

try:
result = await manager.run("Process this task")
except MaxIterationsError as e:
# Handle max iterations reached
logger.warning(f"Task exceeded max iterations: {e}")
except APIError as e:
# Handle API failures
logger.error(f"API error: {e}")
except ExecutionError as e:
# Handle general execution errors
logger.error(f"Execution failed: {e}")

3. Logging Configuration

import logging

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# SDK uses standard Python logging
logger = logging.getLogger('agentrouter')
logger.setLevel(logging.DEBUG) # For detailed debugging

4. Worker Timeout Management

# Ensure worker timeout >= API timeout
manager = ManagerAgent(
name="manager",
api_key="key",
api_timeout=60.0, # API calls timeout
worker_timeout=120.0 # Worker execution timeout (should be higher)
)

📊 Monitoring & Observability

Using the Built-in Tracer

AgentRouter includes a visualization tracer for debugging:

from agentrouter.visualization import PipelineTracer

# Enable tracing
tracer = PipelineTracer()
manager.set_tracer(tracer)

# Execute with tracing
result = await manager.run("Your task")

# Visualize execution
tracer.display()

Integration with External Monitoring

The SDK can be integrated with your existing monitoring solutions:

# Example: Custom metrics collection
class MetricsCollector:
def __init__(self, manager):
self.manager = manager

async def execute_with_metrics(self, task):
start_time = time.time()
try:
result = await self.manager.run(task)
duration = time.time() - start_time
# Send to your metrics system
send_metric("agent.execution.success", 1)
send_metric("agent.execution.duration", duration)
return result
except Exception as e:
send_metric("agent.execution.error", 1)
raise

🔄 Scaling Patterns

Horizontal Scaling with Your Infrastructure

AgentRouter SDK can be deployed in any scalable architecture:

# Example: Using with async workers
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def process_task(task_data):
manager = ManagerAgent(
name=f"manager_{task_data['id']}",
api_key=API_KEY,
model="usf-mini"
)
return await manager.run(task_data['prompt'])

# Process multiple tasks concurrently
async def process_batch(tasks):
results = await asyncio.gather(
*[process_task(task) for task in tasks]
)
return results

Rate Limiting Considerations

# Implement rate limiting at application level
from asyncio import Semaphore

class RateLimitedManager:
def __init__(self, manager, max_concurrent=10):
self.manager = manager
self.semaphore = Semaphore(max_concurrent)

async def run(self, task):
async with self.semaphore:
return await self.manager.run(task)

🎯 Production Checklist

Before Deployment

  • ✅ API keys stored securely (environment variables or secrets manager)
  • ✅ Appropriate timeout configurations set
  • ✅ Error handling implemented
  • ✅ Logging configured
  • ✅ Configuration validation enabled
  • ✅ Max iterations set appropriately for your use case

Runtime Monitoring

  • ✅ Monitor API response times
  • ✅ Track iteration counts per task
  • ✅ Log error rates and types
  • ✅ Monitor memory usage
  • ✅ Track worker timeout occurrences

💡 Best Practices Summary

  1. Configuration: Use environment variables for configuration
  2. Error Handling: Implement comprehensive error handling
  3. Timeouts: Set worker_timeout > api_timeout
  4. Logging: Enable appropriate logging levels
  5. Monitoring: Integrate with your monitoring stack
  6. Testing: Test with realistic iteration limits
  7. Security: Never hardcode API keys

🚀 Getting Started in Production

# production_example.py
import os
import logging
from agentrouter import ManagerAgent, WorkerAgent

# Setup logging
logging.basicConfig(level=logging.INFO)

# Load configuration
API_KEY = os.environ["AGENTROUTER_API_KEY"]

# Create production-ready manager
manager = ManagerAgent(
name="production_manager",
api_key=API_KEY,
model="usf-mini",
max_iterations=30,
api_timeout=60.0,
worker_timeout=120.0,
max_retries=3
)

# Add specialized workers
analyst = WorkerAgent(
name="data_analyst",
role="Processes datasets, calculates metrics, and generates statistical reports with visualizations", # REQUIRED: Concise role
goal="Analyze data and provide insights"
)

manager.attach_worker(analyst)

# Ready for production use
async def process_request(user_task):
try:
result = await manager.run(user_task)
return result
except Exception as e:
logging.error(f"Task failed: {e}")
raise

Production Ready

AgentRouter SDK provides:

  • ✅ Robust error handling
  • ✅ Configurable retry mechanisms
  • ✅ Comprehensive validation
  • ✅ Production-grade logging
  • ✅ Flexible configuration
  • ✅ Lightweight footprint

Get Started →