Execution Engine & Retry Mechanisms
⚡ The Reliable Core of AgentRouter
AgentRouter's execution engine provides robust, fault-tolerant execution with built-in retry mechanisms and comprehensive error handling. This document explores the actual implementation of retry logic, error handling, and configuration options available in the SDK.
🔄 Intelligent Retry System
Exponential Backoff Implementation
The SDK implements exponential backoff with configurable parameters:
# Retry configuration with validation
manager = ManagerAgent(
name="resilient_manager",
api_key="key",
max_retries=3, # 0-10 attempts
retry_delay=1.0, # 0.1-60 seconds
retry_multiplier=2.0, # 1.0-10.0
retry_max_wait=60.0 # 1-300 seconds
)
Retry Calculation
The retry mechanism uses exponential backoff:
- First retry: Wait
retry_delay
seconds - Second retry: Wait
retry_delay * retry_multiplier
seconds - Third retry: Wait
retry_delay * (retry_multiplier^2)
seconds - Capped at
retry_max_wait
seconds
🛡️ Error Handling in AgentRouter
Error Types
The SDK provides specific exception types for different error scenarios:
from agentrouter.exceptions import (
ExecutionError, # General execution failures
MaxIterationsError, # Max iterations exceeded
ValidationError, # Input validation failures
APIError, # API communication errors
ConfigurationError # Configuration issues
)
Error Handling Flow
- Error Occurs → Exception raised with context
- Error Type → Specific exception class for different scenarios
- Error Context → Includes agent name, field details, and error message
- Recovery → Retry for transient errors, fail fast for validation errors
Example Error Handling
try:
result = await manager.run("Process this task")
except MaxIterationsError as e:
# Task exceeded configured max_iterations
logger.warning(f"Task too complex: {e}")
# Consider increasing max_iterations or breaking down the task
except ValidationError as e:
# Input validation failed
logger.error(f"Invalid input: {e.field} - {e.details}")
except APIError as e:
# API communication failed after retries
logger.error(f"API error: {e.api_name} - {e}")
except ExecutionError as e:
# General execution failure
logger.error(f"Execution failed: {e.agent_name} - {e}")
⚡ Configuration Management
Execution Configuration
All configuration parameters are validated using Pydantic:
from agentrouter.config import AgentConfiguration
# Create configuration with overrides
config = AgentConfiguration.create_with_overrides(
max_iterations=30, # 3-50 iterations
api_timeout=60.0, # 5-300 seconds
worker_timeout=120.0, # 5-300 seconds (should be > api_timeout)
max_retries=3, # 0-10 attempts
retry_delay=1.0, # 0.1-60 seconds
retry_multiplier=2.0, # 1.0-10.0
retry_max_wait=60.0 # 1-300 seconds
)
Configuration Inheritance
Workers inherit configuration from their parent agents:
# Manager configuration
manager = ManagerAgent(
name="manager",
api_key="key",
max_iterations=30,
api_timeout=60.0
)
# Worker inherits configuration
worker = WorkerAgent(
name="worker",
role="Executes assigned tasks, processes data, and returns structured results", # REQUIRED: Must have role
# Inherits: api_key, max_iterations, api_timeout, etc.
)
manager.attach_worker(worker)
🔐 Message Flow Validation
Validation Points
The SDK validates message flow at critical points:
- Before Plan API: Ensures no consecutive plan messages
- Before Tool Call API: Validates plan response indicates tool needed
- Tool Response: Ensures proper tool response format
- Final Response: Validates message structure for OpenAI compatibility
Validation Example
from agentrouter.validators import MessageFlowValidator
# Automatic validation before API calls
try:
MessageFlowValidator.validate_for_plan_api(messages)
except ValidationError as e:
logger.error(f"Invalid message flow: {e}")
🔄 Execution Workflow
Main Execution Loop
The execution engine follows this workflow:
- Initialize → Set up context and configuration
- Plan API Call → Get strategy from Plan API
- Decision Point → Tool needed or final response?
- Tool Execution → If needed, call Tool Call API and execute
- Iterate → Continue until complete or max iterations reached
- Final Response → Generate OpenAI-compatible response
Max Iterations Handling
When max iterations is reached, the SDK:
- Executes any pending tool calls
- Forces generation of final response
- Returns best available answer based on gathered information
# Configure max iterations
manager = ManagerAgent(
name="manager",
api_key="key",
max_iterations=30 # Default: 30, Range: 3-50
)
📊 Execution Context
Context Management
The SDK maintains execution context throughout the workflow:
# ExecutionContext tracks:
- messages: List[Message] # Conversation history
- tools: List[ToolDefinition] # Available tools
- current_iteration: int # Current iteration number
- max_iterations: int # Maximum allowed iterations
- agent_name: str # Executing agent name
- agent_status: AgentStatus # Current status
Agent Status States
class AgentStatus(Enum):
IDLE = "idle"
RUNNING = "running"
PREPARING_FINAL_RESPONSE = "preparing_final_response"
COMPLETED = "completed"
FAILED = "failed"
🔍 Debugging & Tracing
Built-in Pipeline Tracer
AgentRouter includes a visualization tracer for debugging:
from agentrouter.visualization import PipelineTracer
# Enable tracing
tracer = PipelineTracer()
manager.set_tracer(tracer)
# Execute with tracing
result = await manager.run("Analyze this data")
# Display execution trace
tracer.display()
# Shows: API calls, tool executions, agent delegations, timing
Trace Information
The tracer captures:
- API call sequences (Plan API, Tool Call API)
- Tool executions with arguments
- Worker agent delegations
- Execution timing
- Token usage
- Error occurrences
💡 Best Practices for Reliability
1. Appropriate Timeout Configuration
# Ensure worker timeout > api timeout
manager = ManagerAgent(
name="manager",
api_key="key",
api_timeout=60.0, # Individual API calls
worker_timeout=120.0 # Worker agent execution
)
2. Error Recovery Strategy
async def execute_with_fallback(manager, task):
try:
# Try with optimal configuration
return await manager.run(task)
except MaxIterationsError:
# Fallback: Increase iterations for complex tasks
manager.config.max_iterations = 50
return await manager.run(task)
except APIError as e:
# Log and potentially retry later
logger.error(f"API failure: {e}")
raise
3. Monitoring Execution
# Track execution metrics
import time
async def monitored_execution(manager, task):
start_time = time.time()
iterations_before = manager._execution_context.current_iteration if manager._execution_context else 0
result = await manager.run(task)
duration = time.time() - start_time
iterations_used = manager._execution_context.current_iteration - iterations_before
logger.info(f"Task completed in {duration:.2f}s using {iterations_used} iterations")
return result
🎯 Real-World Configuration Examples
Development Configuration
dev_config = {
"max_iterations": 10, # Faster feedback
"max_retries": 1, # Fail fast
"api_timeout": 30.0, # Shorter timeouts
"retry_delay": 0.5 # Quick retries
}
Production Configuration
prod_config = {
"max_iterations": 30, # Handle complex tasks
"max_retries": 3, # Resilient to transients
"api_timeout": 60.0, # Allow for API latency
"retry_delay": 1.0, # Standard backoff
"retry_multiplier": 2.0 # Exponential backoff
}
High-Reliability Configuration
high_reliability_config = {
"max_iterations": 50, # Maximum allowed
"max_retries": 5, # More retry attempts
"api_timeout": 120.0, # Longer timeouts
"worker_timeout": 240.0, # Extended worker time
"retry_max_wait": 120.0 # Longer max wait
}
🔧 Timeout Management
Timeout Hierarchy
The SDK implements a hierarchical timeout system:
# Timeout configuration
timeouts = {
"api_timeout": 60, # Individual API call timeout
"worker_timeout": 120, # Worker agent execution timeout
# Note: worker_timeout should be >= api_timeout
}
manager = ManagerAgent(
name="manager",
api_key="key",
**timeouts
)
Timeout Behavior
- API Timeout: Applied to Plan API and Tool Call API requests
- Worker Timeout: Applied when executing worker agents as tools
- No Global Timeout: Tasks continue until completion or max iterations
🚀 Performance Considerations
Iteration Efficiency
To optimize performance:
- Set appropriate max_iterations based on task complexity
- Use specialized workers to reduce iterations
- Provide clear, specific prompts to minimize planning cycles
- Monitor iteration usage to tune configuration
Memory Management
The SDK maintains message history throughout execution:
- Each iteration adds messages (plan, tool calls, responses)
- Long conversations may accumulate significant history
- Consider task decomposition for very long workflows
Discover how to create custom tools and integrate specialized agents