So, I've been working on some python using PydanticAI and I think I've gone and confused myself to where I'm not sure where the bug is occurring so I was hoping someone more versed in Python (or anyone, I'll take any ideas/tips at this point lol) might be able to help out.
Goal: Create a generic/base class to generate Agents based off of the url to API docs.
My Code:
Logfire Import - helps debug, but also lets us monitor and log what goes on for legal compliance and model tuning
# Try to import logfire, giveup if not possible (we must do nothing if we can not log what the ai does)
has_logfire = False
try:
import logfire
has_logfire = True
# Configure Logfire if available
logfire.configure(token=retoolContext.configVars.LOGFIRE_API_KEY)
logfire.instrument_openai(openai_client)
logfire.instrument_pydantic_ai(event_mode='logs')
logfire.instrument_httpx()
except ImportError:
print("ā ļø Warning:Logfire not available, exiting")
sys.exit(1)
Custom Printer - helps with debugging
class Printer:
def __init__(self, console: Console):
self.live = Live(console=console)
self.items: dict[str, tuple[str, bool]] = {}
self.hide_done_ids: set[str] = set()
self.live.start()
def end(self) -> None:
self.live.stop()
def hide_done_checkmark(self, item_id: str) -> None:
self.hide_done_ids.add(item_id)
def update_item(
self, item_id: str, content: str, is_done: bool = False, hide_checkmark: bool = False
) -> None:
self.items[item_id] = (content, is_done)
if hide_checkmark:
self.hide_done_ids.add(item_id)
self.flush()
def mark_item_done(self, item_id: str) -> None:
self.items[item_id] = (self.items[item_id][0], True)
self.flush()
def flush(self) -> None:
renderables: list[Any] = []
for item_id, (content, is_done) in self.items.items():
if is_done:
prefix = "ā
" if item_id not in self.hide_done_ids else ""
renderables.append(prefix + content)
else:
renderables.append(Spinner("dots", text=content))
self.live.update(Group(*renderables))
Structured I/O Types:
class ApiEndpoint(BaseModel):
url: str
"""The complete URL endpoint to call"""
method: str
"""HTTP method (GET, POST, PUT, DELETE)"""
headers: Dict[str, str] = {}
"""Headers to include in the request"""
body: Dict[str, Any] = {}
"""Body data for POST/PUT requests"""
description: str
"""Description of what this endpoint call accomplishes"""
order: int
"""The order in which this endpoint should be called (1, 2, 3, etc.)"""
class ApiPlan(BaseModel):
endpoints: List[ApiEndpoint] = Field(..., description="Ordered list of API endpoints to call")
explanation: str = Field(..., description="Overall explanation of the API call sequence")
class TriageResult(BaseModel):
task_type: str
"""The type of task to perform: 'research' or 'security' or 'unknown'"""
confidence: float
"""Confidence level from 0.0 to 1.0"""
reasoning: str
"""Explanation for the classification decision"""
API Prompts - Base Class for lookup/planning/execution
class BaseAPIPrompts:
"""Base class for generating API-specific prompts"""
@staticmethod
def create_api_lookup_prompt(api_name: str) -> str:
"""Create a simple API lookup prompt"""
return (
f"You search the {api_name} API documentation for information about the API endpoints, "
f"parameters, and usage to find the best endpoint to use for the user's request. "
f"Return only the relevant information about the API endpoint."
)
@staticmethod
def create_api_planner_prompt(api_name: str, base_url: str, examples: List[str] = []) -> str:
"""Create an API planning prompt"""
examples_text = ""
if examples:
examples_text = "\n\nExamples:\n" + "\n".join(f"- {example}" for example in examples)
return (
f"You are a {api_name} API planning specialist. Given a user query and the {api_name} API documentation, "
f"create an ordered sequence of API calls to accomplish the user's request.\n\n"
f"Return your response in the following structured format:\n\n"
f"EXPLANATION: [Overall explanation of the API call sequence]\n\n"
f"ENDPOINTS:\n"
f"1. METHOD: [GET/POST/PUT/DELETE] URL: [complete URL] DESCRIPTION: [what this accomplishes] HEADERS: [any required headers] BODY: [request body if needed]\n"
f"2. METHOD: [GET/POST/PUT/DELETE] URL: [complete URL] DESCRIPTION: [what this accomplishes] HEADERS: [any required headers] BODY: [request body if needed]\n"
f"...\n\n"
f"Base URL: {base_url}\n"
f"{examples_text}\n\n"
f"Always order calls logically - if one call depends on data from another, put the dependency first."
)
@staticmethod
def create_execution_prompt(api_name: str) -> str:
"""Create an API execution prompt"""
return (
f"You are a {api_name} API execution assistant. Given API call results and a user query, "
f"analyze the data and provide a comprehensive response to the user.\n\n"
f"Your tasks:\n"
f"1. Parse and interpret the API response data\n"
f"2. Extract relevant information based on the user's query\n"
f"3. Present the information in a clear, user-friendly format\n"
f"4. Handle errors gracefully and suggest alternatives if needed\n"
f"5. Provide context and explanations for {api_name.lower()}-related data\n\n"
f"Format your response with:\n"
f"- Clear headings and organization\n"
f"- Relevant data points highlighted\n"
f"- Explanations of terms when appropriate\n"
f"- Next steps or recommendations if applicable\n\n"
f"If there are errors in the API response, explain what went wrong and suggest solutions."
)
API Config - Base Class
class BaseAPIConfig:
"""Base configuration for API management"""
def __init__(self,
name: str,
base_url: str,
docs_url: str,
api_key_header: Optional[str] = None,
api_key_value: Optional[str] = None,
api_key_in_query: bool = False):
self.name = name
self.base_url = base_url
self.docs_url = docs_url
self.api_key_header = api_key_header
self.api_key_value = api_key_value
self.api_key_in_query = api_key_in_query # Flag to indicate if API key goes in query params
def get_headers(self) -> Dict[str, str]:
"""Get default headers for API calls"""
headers = {"Content-Type": "application/json"}
# Only add API key to headers if it's not supposed to be in query params
if self.api_key_header and self.api_key_value and not self.api_key_in_query:
headers[self.api_key_header] = self.api_key_value
return headers
def get_api_key_query_param(self) -> Dict[str, str]:
"""Get API key as query parameter if needed"""
if self.api_key_header and self.api_key_value and self.api_key_in_query:
return {self.api_key_header: self.api_key_value}
return {}
def get_base_url(self) -> str:
return self.base_url
API Manager - Base Class to handle the full process of planning/execution
class BaseAPIManager:
"""Base class for API managers with planning + execution flow"""
def __init__(self, config: BaseAPIConfig, planner_agent: Agent, execution_agent: Agent):
self.config = config
self.planner_agent = planner_agent
self.execution_agent = execution_agent
self.console = Console()
self.printer = Printer(self.console)
self.retoolContext = retoolContext
async def run(self, query: str) -> str:
"""Generic run method with planning + execution flow"""
try:
trace_id = gen_trace_id()
with trace(f"{self.config.name} API Flow", trace_id=trace_id):
self.printer.update_item(
"trace_id",
f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}",
is_done=True,
hide_checkmark=True,
)
self.printer.update_item(
"starting",
f"Starting {self.config.name} API operations for: '{query}'",
is_done=True,
hide_checkmark=True,
)
# Step 1: Planning - Create API call sequence
self.printer.update_item("planning", f"Planning {self.config.name} API sequence...", is_done=False)
planning_result = await self.planner_agent.run(query)
api_plan_text = planning_result.output
self.printer.update_item("planning", "API plan created successfully", is_done=True)
# Step 2: Parse the plan to extract endpoints
self.printer.update_item("parsing", "Parsing API plan...", is_done=False)
# Try to parse the structured plan
try:
api_plan = planning_result.output
endpoints = api_plan.endpoints
self.printer.update_item("parsing", f"Found {len(endpoints)} endpoints to execute", is_done=True)
except Exception:
# Fallback: parse text-based plan
endpoints = self._parse_plan_text(api_plan_text)
self.printer.update_item("parsing", f"Parsed {len(endpoints)} endpoints from text plan", is_done=True)
# Step 3: Execute API calls in sequence
await self._execute_api_calls(endpoints, query, api_plan_text)
return self.final_output
except Exception as e:
self.printer.end()
print(f"ā Error during {self.config.name} API operations: {e}")
raise
async def _execute_api_calls(self, endpoints: List[ApiEndpoint], query: str, api_plan_text: str):
"""Execute API calls and process responses - can be overridden by subclasses"""
self.printer.update_item("execution", f"Executing {self.config.name} API calls...", is_done=False)
api_responses = []
for i, endpoint in enumerate(endpoints, 1):
self.printer.update_item("execution", f"Executing call {i}/{len(endpoints)}: {endpoint.method} {endpoint.url}", is_done=False)
# Add default headers from config
headers = endpoint.headers.copy()
config_headers = self.config.get_headers()
headers = {**config_headers, **headers} # Endpoint headers override config
try:
# Convert headers and body to JSON strings as required by the function
headers_json = json.dumps(headers)
body_json = json.dumps(endpoint.body)
# # Use the execution agent to make the API call
# # The execution agent has execute_api_call tool, but we need to call it directly
# # since the agent's execute_api_call is a function tool within the factory
# # Get the execute_api_call function from the agent's tools
# execute_api_call_tool = None
# for tool in self.execution_agent.tools:
# if hasattr(tool, '__name__') and tool.__name__ == 'execute_api_call':
# execute_api_call_tool = tool
# break
# if execute_api_call_tool:
# # Call the function tool directly
# response = await execute_api_call_tool(
# url=endpoint.url,
# method=endpoint.method,
# headers_json=headers_json,
# body_json=body_json
# )
# else:
# Fallback: use text instruction (original method)
print("!!! --- API ENDPOINT: ", endpoint)
print("!!! --- API CALL ENDPOINT URL: ", endpoint.url)
print("!!! --- CONFIG: ", self.config)
call_instruction = f"Execute API call: {endpoint.method} {self.config.base_url}{endpoint.url} with headers: {headers_json} and body: {body_json}"
execution_result = await self.execution_agent.run(call_instruction)
response = execution_result.output
# Parse response as JSON if possible
try:
response_data = json.loads(response)
api_responses.append(response_data)
except json.JSONDecodeError as e:
print("API EXECUTION FAILED: ", e)
print("!!!!!RESPONSE: ", response)
api_responses.append({"response": response})
except Exception as e:
print(f"API call failed: {e}")
api_responses.append({"error": str(e)})
self.printer.update_item("execution", f"All {len(endpoints)} API calls completed", is_done=True)
# Step 4: Process and format final response
self.printer.update_item("processing", "Processing API responses...", is_done=False)
final_context = f"Original Query: {query}\n\nAPI Plan:\n{api_plan_text}\n\nAPI Responses:\n"
for i, response in enumerate(api_responses, 1):
final_context += f"\nResponse {i}:\n{response}\n"
final_result = await self.execution_agent.run(final_context)
self.final_output = final_result.output
self.printer.update_item("processing", "Response processed successfully", is_done=True)
self.printer.end()
print(f"\n\n====={self.config.name.upper()} API RESULTS=====\n\n")
print(f"Query: {query}")
print(f"Plan: {api_plan_text}")
print(f"Final Result: {self.final_output}")
def _parse_plan_text(self, plan_text: str) -> List[ApiEndpoint]:
"""Generic plan text parser - can be overridden by subclasses"""
endpoints = []
lines = plan_text.split('\n')
current_endpoint = {}
order = 1
print("GENERIC PLAN LINE PARSER")
for line in lines:
line = line.strip()
if line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')):
# Process previous endpoint if exists
if current_endpoint:
endpoints.append(ApiEndpoint(
url=current_endpoint.get('url', ''),
method=current_endpoint.get('method', 'GET'),
headers=current_endpoint.get('headers', {}),
body=current_endpoint.get('body', {}),
description=current_endpoint.get('description', ''),
order=order
))
order += 1
# Start new endpoint
current_endpoint = {}
if 'METHOD:' in line:
parts = line.split('METHOD:')[1].strip().split()
current_endpoint['method'] = parts[0] if parts else 'GET'
if 'URL:' in line:
url_part = line.split('URL:')[1].strip()
# Remove backticks if present and extract the URL properly
if '`' in url_part:
# Handle backtick-wrapped URLs
url_part = url_part.strip('`').strip()
else:
# For non-backtick URLs, take everything until the first space followed by a description keyword
# This handles cases where URL is followed by DESCRIPTION: or HEADERS:
if ' DESCRIPTION:' in url_part:
url_part = url_part.split(' DESCRIPTION:')[0].strip()
elif ' HEADERS:' in url_part:
url_part = url_part.split(' HEADERS:')[0].strip()
elif ' BODY:' in url_part:
url_part = url_part.split(' BODY:')[0].strip()
# If no description keywords, take the whole remaining part
current_endpoint['url'] = url_part
if 'DESCRIPTION:' in line:
desc_part = line.split('DESCRIPTION:')[1].strip()
current_endpoint['description'] = desc_part
# Add the last endpoint
if current_endpoint:
endpoints.append(ApiEndpoint(
url=current_endpoint.get('url', ''),
method=current_endpoint.get('method', 'GET'),
headers=current_endpoint.get('headers', {}),
body=current_endpoint.get('body', {}),
description=current_endpoint.get('description', ''),
order=order
))
return endpoints
Global Function - used to generate 3 agents per api doc url
def create_api_agents(api_config: BaseAPIConfig, examples: List[str] = []) -> Dict[str, Agent]:
"""Factory function to create a complete set of agents for any API"""
# Create prompts
lookup_prompt = BaseAPIPrompts.create_api_lookup_prompt(api_config.name)
planner_prompt = BaseAPIPrompts.create_api_planner_prompt(api_config.name, api_config.base_url, examples)
execution_prompt = BaseAPIPrompts.create_execution_prompt(api_config.name)
# Create function tools with proper decorators
async def get_api_specs() -> str:
"""Get API specifications for this specific API"""
try:
url = api_config.docs_url
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
api_specs = response.text
# Limit the API specs to prevent context window issues
# Keep only the first 15000 characters to stay within context limits
max_chars = 15000
if len(api_specs) > max_chars:
api_specs = api_specs[:max_chars] + "\n\n[Documentation truncated to fit context window. This excerpt contains the main API endpoints and parameters.]"
return f"""
{api_config.name} API Specifications (Excerpt):
Source: {url}
{api_specs}
"""
except Exception as e:
return f"Error fetching {api_config.name} API specifications: {str(e)}"
async def execute_api_call(
url: str,
method: str = "GET",
headers_json: str = "{}",
body_json: str = "{}"
) -> str:
"""Execute an API call for this specific API"""
print("EXECUTE API CALL")
try:
# Parse parameters
headers = json.loads(headers_json) if headers_json else {}
body = json.loads(body_json) if body_json else {}
# Merge with default headers from api_config
merged_headers = api_config.get_headers()
if headers:
merged_headers.update(headers)
print("API URL: ", url)
# Handle API key in query parameters for APIs like Polygon
final_url = url
api_key_params = api_config.get_api_key_query_param()
if api_key_params:
# Parse the URL to add query parameters
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# Add API key parameters
for key, value in api_key_params.items():
query_params[key] = [value]
# Reconstruct the URL with the new query parameters
new_query = urlencode(query_params, doseq=True)
final_url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
print("API FINAL URL: ", final_url)
async with httpx.AsyncClient() as client:
if method.upper() == "GET":
response = await client.get(final_url, headers=merged_headers)
elif method.upper() == "POST":
response = await client.post(final_url, headers=merged_headers, json=body)
elif method.upper() == "PUT":
response = await client.put(final_url, headers=merged_headers, json=body)
elif method.upper() == "DELETE":
response = await client.delete(final_url, headers=merged_headers)
else:
return json.dumps({"error": f"Unsupported HTTP method: {method}"})
print("API CALL RESPONSE: ", response)
response.raise_for_status()
# Try to parse as JSON, fall back to text
try:
ret = json.dumps(response.json())
print("GENERIC RET: ", ret)
return ret
except:
ret = json.dumps({"response": response.text})
print("GENERIC RET2: ", ret)
return ret
except httpx.HTTPStatusError as e:
return json.dumps({"error": f"HTTP Error {e.response.status_code}: {e.response.text}"})
except Exception as e:
return json.dumps({"error": f"Request failed: {str(e)}"})
# Create agents
agents = {
'lookup': Agent(
name=f"{api_config.name}API",
instructions=lookup_prompt,
tools=cast(Sequence[Tool], [Tool(get_api_specs)]),
model=model
),
'planner': Agent(
name=f"{api_config.name}APIPlanner",
instructions=planner_prompt,
output_type=ApiPlan,
tools=cast(Sequence[Tool], [Tool(get_api_specs)]),
model=model
),
'execution': Agent(
name=f"{api_config.name}Execution",
instructions=execution_prompt,
tools=cast(Sequence[Tool], [Tool(execute_api_call)]),
model=model
)
}
return agents