PydanticAI - Generic API class to auto generate Agent

So, I've been working on some python using PydanticAI and I think I've gone and confused myself to where I'm not sure where the bug is occurring so I was hoping someone more versed in Python (or anyone, I'll take any ideas/tips at this point lol) might be able to help out.

Goal: Create a generic/base class to generate Agents based off of the url to API docs.

My Code:

Logfire Import - helps debug, but also lets us monitor and log what goes on for legal compliance and model tuning

# Try to import logfire, giveup if not possible (we must do nothing if we can not log what the ai does)
has_logfire = False
try:
  import logfire
  has_logfire = True

  # Configure Logfire if available
  logfire.configure(token=retoolContext.configVars.LOGFIRE_API_KEY)
  logfire.instrument_openai(openai_client)
  logfire.instrument_pydantic_ai(event_mode='logs')
  logfire.instrument_httpx()
except ImportError:
  print("āš ļø  Warning:Logfire not available, exiting")
  sys.exit(1)

Custom Printer - helps with debugging

class Printer:
    def __init__(self, console: Console):
        self.live = Live(console=console)
        self.items: dict[str, tuple[str, bool]] = {}
        self.hide_done_ids: set[str] = set()
        self.live.start()

    def end(self) -> None:
        self.live.stop()

    def hide_done_checkmark(self, item_id: str) -> None:
        self.hide_done_ids.add(item_id)

    def update_item(
        self, item_id: str, content: str, is_done: bool = False, hide_checkmark: bool = False
    ) -> None:
        self.items[item_id] = (content, is_done)
        if hide_checkmark:
            self.hide_done_ids.add(item_id)
        self.flush()

    def mark_item_done(self, item_id: str) -> None:
        self.items[item_id] = (self.items[item_id][0], True)
        self.flush()

    def flush(self) -> None:
        renderables: list[Any] = []
        for item_id, (content, is_done) in self.items.items():
            if is_done:
                prefix = "āœ… " if item_id not in self.hide_done_ids else ""
                renderables.append(prefix + content)
            else:
                renderables.append(Spinner("dots", text=content))
        self.live.update(Group(*renderables))

Structured I/O Types:

class ApiEndpoint(BaseModel):
    url: str
    """The complete URL endpoint to call"""
    
    method: str
    """HTTP method (GET, POST, PUT, DELETE)"""
    
    headers: Dict[str, str] = {}
    """Headers to include in the request"""
    
    body: Dict[str, Any] = {}
    """Body data for POST/PUT requests"""
    
    description: str
    """Description of what this endpoint call accomplishes"""
    
    order: int
    """The order in which this endpoint should be called (1, 2, 3, etc.)"""

class ApiPlan(BaseModel):
    endpoints: List[ApiEndpoint] = Field(..., description="Ordered list of API endpoints to call")
    explanation: str = Field(..., description="Overall explanation of the API call sequence")

class TriageResult(BaseModel):
    task_type: str
    """The type of task to perform: 'research' or 'security' or 'unknown'"""
    
    confidence: float
    """Confidence level from 0.0 to 1.0"""
    
    reasoning: str
    """Explanation for the classification decision"""

API Prompts - Base Class for lookup/planning/execution

class BaseAPIPrompts:
    """Base class for generating API-specific prompts"""
    
    @staticmethod
    def create_api_lookup_prompt(api_name: str) -> str:
        """Create a simple API lookup prompt"""
        return (
            f"You search the {api_name} API documentation for information about the API endpoints, "
            f"parameters, and usage to find the best endpoint to use for the user's request. "
            f"Return only the relevant information about the API endpoint."
        )
    
    @staticmethod
    def create_api_planner_prompt(api_name: str, base_url: str, examples: List[str] = []) -> str:
        """Create an API planning prompt"""
        examples_text = ""
        if examples:
            examples_text = "\n\nExamples:\n" + "\n".join(f"- {example}" for example in examples)
        
        return (
            f"You are a {api_name} API planning specialist. Given a user query and the {api_name} API documentation, "
            f"create an ordered sequence of API calls to accomplish the user's request.\n\n"
            f"Return your response in the following structured format:\n\n"
            f"EXPLANATION: [Overall explanation of the API call sequence]\n\n"
            f"ENDPOINTS:\n"
            f"1. METHOD: [GET/POST/PUT/DELETE] URL: [complete URL] DESCRIPTION: [what this accomplishes] HEADERS: [any required headers] BODY: [request body if needed]\n"
            f"2. METHOD: [GET/POST/PUT/DELETE] URL: [complete URL] DESCRIPTION: [what this accomplishes] HEADERS: [any required headers] BODY: [request body if needed]\n"
            f"...\n\n"
            f"Base URL: {base_url}\n"
            f"{examples_text}\n\n"
            f"Always order calls logically - if one call depends on data from another, put the dependency first."
        )
    
    @staticmethod
    def create_execution_prompt(api_name: str) -> str:
        """Create an API execution prompt"""
        return (
            f"You are a {api_name} API execution assistant. Given API call results and a user query, "
            f"analyze the data and provide a comprehensive response to the user.\n\n"
            f"Your tasks:\n"
            f"1. Parse and interpret the API response data\n"
            f"2. Extract relevant information based on the user's query\n"
            f"3. Present the information in a clear, user-friendly format\n"
            f"4. Handle errors gracefully and suggest alternatives if needed\n"
            f"5. Provide context and explanations for {api_name.lower()}-related data\n\n"
            f"Format your response with:\n"
            f"- Clear headings and organization\n"
            f"- Relevant data points highlighted\n"
            f"- Explanations of terms when appropriate\n"
            f"- Next steps or recommendations if applicable\n\n"
            f"If there are errors in the API response, explain what went wrong and suggest solutions."
        )

API Config - Base Class

class BaseAPIConfig:
    """Base configuration for API management"""
    def __init__(self, 
                 name: str,
                 base_url: str,
                 docs_url: str,
                 api_key_header: Optional[str] = None,
                 api_key_value: Optional[str] = None,
                 api_key_in_query: bool = False):
        self.name = name
        self.base_url = base_url
        self.docs_url = docs_url
        self.api_key_header = api_key_header
        self.api_key_value = api_key_value
        self.api_key_in_query = api_key_in_query  # Flag to indicate if API key goes in query params
    
    def get_headers(self) -> Dict[str, str]:
        """Get default headers for API calls"""
        headers = {"Content-Type": "application/json"}
        # Only add API key to headers if it's not supposed to be in query params
        if self.api_key_header and self.api_key_value and not self.api_key_in_query:
            headers[self.api_key_header] = self.api_key_value
        return headers
    
    def get_api_key_query_param(self) -> Dict[str, str]:
        """Get API key as query parameter if needed"""
        if self.api_key_header and self.api_key_value and self.api_key_in_query:
            return {self.api_key_header: self.api_key_value}
        return {}

    def get_base_url(self) -> str:
        return self.base_url

API Manager - Base Class to handle the full process of planning/execution

class BaseAPIManager:
    """Base class for API managers with planning + execution flow"""
    
    def __init__(self, config: BaseAPIConfig, planner_agent: Agent, execution_agent: Agent):
        self.config = config
        self.planner_agent = planner_agent
        self.execution_agent = execution_agent
        self.console = Console()
        self.printer = Printer(self.console)
        self.retoolContext = retoolContext
    
    async def run(self, query: str) -> str:
        """Generic run method with planning + execution flow"""
        try:
            trace_id = gen_trace_id()
            with trace(f"{self.config.name} API Flow", trace_id=trace_id):
                self.printer.update_item(
                    "trace_id",
                    f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}",
                    is_done=True,
                    hide_checkmark=True,
                )

                self.printer.update_item(
                    "starting",
                    f"Starting {self.config.name} API operations for: '{query}'",
                    is_done=True,
                    hide_checkmark=True,
                )

                # Step 1: Planning - Create API call sequence
                self.printer.update_item("planning", f"Planning {self.config.name} API sequence...", is_done=False)
                
                planning_result = await self.planner_agent.run(query)
                api_plan_text = planning_result.output
                
                self.printer.update_item("planning", "API plan created successfully", is_done=True)
                
                # Step 2: Parse the plan to extract endpoints
                self.printer.update_item("parsing", "Parsing API plan...", is_done=False)
                
                # Try to parse the structured plan
                try:
                    api_plan = planning_result.output
                    endpoints = api_plan.endpoints
                    self.printer.update_item("parsing", f"Found {len(endpoints)} endpoints to execute", is_done=True)
                except Exception:
                    # Fallback: parse text-based plan
                    endpoints = self._parse_plan_text(api_plan_text)
                    self.printer.update_item("parsing", f"Parsed {len(endpoints)} endpoints from text plan", is_done=True)
                  # Step 3: Execute API calls in sequence
                await self._execute_api_calls(endpoints, query, api_plan_text)
                
                return self.final_output
                
        except Exception as e:
            self.printer.end()
            print(f"āŒ Error during {self.config.name} API operations: {e}")
            raise
    
    async def _execute_api_calls(self, endpoints: List[ApiEndpoint], query: str, api_plan_text: str):
        """Execute API calls and process responses - can be overridden by subclasses"""
        self.printer.update_item("execution", f"Executing {self.config.name} API calls...", is_done=False)
        
        api_responses = []
        for i, endpoint in enumerate(endpoints, 1):
            self.printer.update_item("execution", f"Executing call {i}/{len(endpoints)}: {endpoint.method} {endpoint.url}", is_done=False)
            # Add default headers from config
            headers = endpoint.headers.copy()
            config_headers = self.config.get_headers()
            headers = {**config_headers, **headers}  # Endpoint headers override config
            
            try:
                # Convert headers and body to JSON strings as required by the function
                headers_json = json.dumps(headers)
                body_json = json.dumps(endpoint.body)
                #   # Use the execution agent to make the API call
                # # The execution agent has execute_api_call tool, but we need to call it directly
                # # since the agent's execute_api_call is a function tool within the factory
                
                # # Get the execute_api_call function from the agent's tools
                # execute_api_call_tool = None
                
                # for tool in self.execution_agent.tools:
                #     if hasattr(tool, '__name__') and tool.__name__ == 'execute_api_call':
                #         execute_api_call_tool = tool
                #         break
                
                # if execute_api_call_tool:
                #     # Call the function tool directly
                #     response = await execute_api_call_tool(
                #         url=endpoint.url,
                #         method=endpoint.method,
                #         headers_json=headers_json,
                #         body_json=body_json
                #     )
                # else:
                    # Fallback: use text instruction (original method)
                print("!!! --- API ENDPOINT: ", endpoint)
                print("!!! --- API CALL ENDPOINT URL: ", endpoint.url)
                print("!!! --- CONFIG: ", self.config)
                call_instruction = f"Execute API call: {endpoint.method} {self.config.base_url}{endpoint.url} with headers: {headers_json} and body: {body_json}"
                execution_result = await self.execution_agent.run(call_instruction)
                response = execution_result.output
                # Parse response as JSON if possible
                try:
                    response_data = json.loads(response)
                    api_responses.append(response_data)
                except json.JSONDecodeError as e:
                    print("API EXECUTION FAILED: ", e)
                    print("!!!!!RESPONSE: ", response)
                    api_responses.append({"response": response})
                    
            except Exception as e:
                print(f"API call failed: {e}")
                api_responses.append({"error": str(e)})
        
        self.printer.update_item("execution", f"All {len(endpoints)} API calls completed", is_done=True)
        
        # Step 4: Process and format final response
        self.printer.update_item("processing", "Processing API responses...", is_done=False)
        
        final_context = f"Original Query: {query}\n\nAPI Plan:\n{api_plan_text}\n\nAPI Responses:\n"
        for i, response in enumerate(api_responses, 1):
            final_context += f"\nResponse {i}:\n{response}\n"
        
        final_result = await self.execution_agent.run(final_context)
        self.final_output = final_result.output
        
        self.printer.update_item("processing", "Response processed successfully", is_done=True)
        self.printer.end()
        
        print(f"\n\n====={self.config.name.upper()} API RESULTS=====\n\n")
        print(f"Query: {query}")
        print(f"Plan: {api_plan_text}")
        print(f"Final Result: {self.final_output}")
    
    def _parse_plan_text(self, plan_text: str) -> List[ApiEndpoint]:
        """Generic plan text parser - can be overridden by subclasses"""
        endpoints = []
        lines = plan_text.split('\n')
        
        current_endpoint = {}
        order = 1
        print("GENERIC PLAN LINE PARSER")
        for line in lines:
            line = line.strip()
            if line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')):
                # Process previous endpoint if exists
                if current_endpoint:
                    endpoints.append(ApiEndpoint(
                        url=current_endpoint.get('url', ''),
                        method=current_endpoint.get('method', 'GET'),
                        headers=current_endpoint.get('headers', {}),
                        body=current_endpoint.get('body', {}),
                        description=current_endpoint.get('description', ''),
                        order=order
                    ))
                    order += 1
                
                # Start new endpoint
                current_endpoint = {}
                if 'METHOD:' in line:
                    parts = line.split('METHOD:')[1].strip().split()
                    current_endpoint['method'] = parts[0] if parts else 'GET'
                if 'URL:' in line:
                    url_part = line.split('URL:')[1].strip()
                    # Remove backticks if present and extract the URL properly
                    if '`' in url_part:
                        # Handle backtick-wrapped URLs
                        url_part = url_part.strip('`').strip()
                    else:
                        # For non-backtick URLs, take everything until the first space followed by a description keyword
                        # This handles cases where URL is followed by DESCRIPTION: or HEADERS:
                        if ' DESCRIPTION:' in url_part:
                            url_part = url_part.split(' DESCRIPTION:')[0].strip()
                        elif ' HEADERS:' in url_part:
                            url_part = url_part.split(' HEADERS:')[0].strip()
                        elif ' BODY:' in url_part:
                            url_part = url_part.split(' BODY:')[0].strip()
                        # If no description keywords, take the whole remaining part
                    current_endpoint['url'] = url_part
                if 'DESCRIPTION:' in line:
                    desc_part = line.split('DESCRIPTION:')[1].strip()
                    current_endpoint['description'] = desc_part
        
        # Add the last endpoint
        if current_endpoint:
            endpoints.append(ApiEndpoint(
                url=current_endpoint.get('url', ''),
                method=current_endpoint.get('method', 'GET'),
                headers=current_endpoint.get('headers', {}),
                body=current_endpoint.get('body', {}),
                description=current_endpoint.get('description', ''),
                order=order
            ))
        
        return endpoints

Global Function - used to generate 3 agents per api doc url

def create_api_agents(api_config: BaseAPIConfig, examples: List[str] = []) -> Dict[str, Agent]:
    """Factory function to create a complete set of agents for any API"""
      # Create prompts
    lookup_prompt = BaseAPIPrompts.create_api_lookup_prompt(api_config.name)
    planner_prompt = BaseAPIPrompts.create_api_planner_prompt(api_config.name, api_config.base_url, examples)
    execution_prompt = BaseAPIPrompts.create_execution_prompt(api_config.name)
    
    # Create function tools with proper decorators
    async def get_api_specs() -> str:
        """Get API specifications for this specific API"""
        try:
            url = api_config.docs_url
            async with httpx.AsyncClient() as client:
                response = await client.get(url)
                response.raise_for_status()
                
                api_specs = response.text
                
                # Limit the API specs to prevent context window issues
                # Keep only the first 15000 characters to stay within context limits
                max_chars = 15000
                if len(api_specs) > max_chars:
                    api_specs = api_specs[:max_chars] + "\n\n[Documentation truncated to fit context window. This excerpt contains the main API endpoints and parameters.]"
                
                return f"""
{api_config.name} API Specifications (Excerpt):

Source: {url}

{api_specs}
"""
        except Exception as e:
            return f"Error fetching {api_config.name} API specifications: {str(e)}"
    
    async def execute_api_call(
        url: str,
        method: str = "GET",
        headers_json: str = "{}",
        body_json: str = "{}"
    ) -> str:
        """Execute an API call for this specific API"""
        print("EXECUTE API CALL")
        try:
            # Parse parameters
            headers = json.loads(headers_json) if headers_json else {}
            body = json.loads(body_json) if body_json else {}
            
            # Merge with default headers from api_config
            merged_headers = api_config.get_headers()
            if headers:
                merged_headers.update(headers)
            print("API URL: ", url)
            # Handle API key in query parameters for APIs like Polygon
            final_url = url
            api_key_params = api_config.get_api_key_query_param()
            if api_key_params:
                # Parse the URL to add query parameters
                parsed_url = urlparse(url)
                query_params = parse_qs(parsed_url.query)
                
                # Add API key parameters
                for key, value in api_key_params.items():
                    query_params[key] = [value]
                
                # Reconstruct the URL with the new query parameters
                new_query = urlencode(query_params, doseq=True)
                final_url = urlunparse((
                    parsed_url.scheme,
                    parsed_url.netloc,
                    parsed_url.path,
                    parsed_url.params,
                    new_query,
                    parsed_url.fragment
                ))
                print("API FINAL URL: ", final_url)
            async with httpx.AsyncClient() as client:
                if method.upper() == "GET":
                    response = await client.get(final_url, headers=merged_headers)
                elif method.upper() == "POST":
                    response = await client.post(final_url, headers=merged_headers, json=body)
                elif method.upper() == "PUT":
                    response = await client.put(final_url, headers=merged_headers, json=body)
                elif method.upper() == "DELETE":
                    response = await client.delete(final_url, headers=merged_headers)
                else:
                    return json.dumps({"error": f"Unsupported HTTP method: {method}"})

                print("API CALL RESPONSE: ", response)
                response.raise_for_status()
                
                # Try to parse as JSON, fall back to text
                try:
                    ret = json.dumps(response.json())
                    print("GENERIC RET: ", ret)
                    return ret
                except:
                    ret = json.dumps({"response": response.text})
                    print("GENERIC RET2: ", ret)
                    return ret
                    
        except httpx.HTTPStatusError as e:
            return json.dumps({"error": f"HTTP Error {e.response.status_code}: {e.response.text}"})
        except Exception as e:
            return json.dumps({"error": f"Request failed: {str(e)}"})
    
        # Create agents
    agents = {
      'lookup': Agent(
          name=f"{api_config.name}API",
          instructions=lookup_prompt,
          tools=cast(Sequence[Tool], [Tool(get_api_specs)]),
          model=model
        ),
        'planner': Agent(
          name=f"{api_config.name}APIPlanner",
          instructions=planner_prompt,
          output_type=ApiPlan,
          tools=cast(Sequence[Tool], [Tool(get_api_specs)]),
          model=model
        ),
        'execution': Agent(
          name=f"{api_config.name}Execution",
          instructions=execution_prompt,
          tools=cast(Sequence[Tool], [Tool(execute_api_call)]),
          model=model
        )
    }
    
    return agents

Cont...

Specific API Managers - these extend the BaseAPIManager class and are either directly used when the user specifies to use a specific market (i.e 'Get the current BTC price from Coingecko')

# API Configurations
COINGECKO_CONFIG = BaseAPIConfig(
    name="CoinGecko",
    base_url="https://api.coingecko.com/api/v3",
    docs_url="https://docs.coingecko.com/v3.0.1/reference/endpoint-overview",
    api_key_header="x-cg-demo-api-key",
    api_key_value=retoolContext.configVars.COINGECKO_API_KEY,
    api_key_in_query=False
)

POLYGON_CONFIG = BaseAPIConfig(
    name="Polygon",
    base_url="https://api.polygon.io",
    docs_url="https://polygon.io/docs/rest",
    api_key_header="apikey",
    api_key_value=retoolContext.configVars.POLYGON_API_KEY,
    api_key_in_query=True  # Polygon API expects API key as query parameter
)

#######################
# INDIVIDUAL API MANAGERS
#######################

class CoingeckoAPIManagerGeneric(BaseAPIManager):
    """CoinGecko API Manager using the generic base system"""
    
    def __init__(self):
        # Create agents using the factory
        agents: Dict[str, Agent] = create_api_agents(
            COINGECKO_CONFIG, 
            examples=[
                "Get Bitcoin price: GET /simple/price?ids=bitcoin&vs_currencies=usd",
                "Get coin details: GET /coins/bitcoin",
                "Get market data: GET /coins/markets?vs_currency=usd&order=market_cap_desc"
            ]
        )

        super().__init__(COINGECKO_CONFIG, agents['planner'], agents['execution'])

class PolygonManagerGeneric(BaseAPIManager):
    """Polygon.io API Manager using the generic base system"""
    
    def __init__(self):
        # Create agents using the factory
        agents: Dict[str, Agent] = create_api_agents(
            POLYGON_CONFIG,
            examples=[
                "Get AAPL stock price: GET /v2/aggs/ticker/AAPL/prev",
                "Get Bitcoin price: GET /v2/aggs/ticker/X:BTCUSD/prev", 
                "Get market status: GET /v1/marketstatus/now"
            ]
        )
        
        super().__init__(POLYGON_CONFIG, agents['planner'], agents['execution'])

Cryptocurrency Manager - can create plans using all individual managers above (if the market isn't specified we can use this to check all of them we have access to. example input would be - 'Get the current price of BTC')

class CryptocurrencyManager:
    def __init__(self):
        self.console = Console()
        self.printer = Printer(self.console)
          # Initialize the API managers for planning
        self.coingecko_manager_generic = CoingeckoAPIManagerGeneric()
        self.polygon_manager_generic = PolygonManagerGeneric()

    async def run(self, query: str) -> str:
        """Run enhanced cryptocurrency analysis with planning and execution stages"""
        print("RUNNING CRYPTOCURRENCY MANAGER")
        try:
            trace_id = gen_trace_id()
            with trace("Enhanced Cryptocurrency Analysis", trace_id=trace_id):
                self.printer.update_item(
                    "trace_id",
                    f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}",
                    is_done=True,
                    hide_checkmark=True,
                )

                self.printer.update_item(
                    "starting",
                    f"Starting enhanced cryptocurrency analysis for: '{query}'",
                    is_done=True,
                    hide_checkmark=True,
                )

                # Step 1: Planning Phase - Determine what data sources to use
                self.printer.update_item("planning", "Planning cryptocurrency analysis strategy...", is_done=False)

                print("START CRYPTO PLANNING")
                # Analyze the query to determine if we need market data, trading data, or both
                analysis_plan = await self._plan_analysis(query)
                print("FINISHED CRYPTO PLANS: ", analysis_plan)
              
                self.printer.update_item("planning", f"Analysis plan: {analysis_plan['strategy']}", is_done=True)

                print("START DATA COLLECTION")
                # Step 2: Data Collection Phase
                collected_data = await self._collect_data(query, analysis_plan)
                print("FINISHED DATA COLLECTION: ", collected_data)
                
                # Step 3: Execution Phase - Synthesize and analyze all collected data
                self.printer.update_item("synthesis", "Synthesizing cryptocurrency analysis...", is_done=False)

                print("START CRYPTO EXECUTION")
                final_output = await self._synthesize_analysis(query, analysis_plan, collected_data)
                print("FINISHED CRYPTO EXECUTION: ", final_output)
                
                self.printer.update_item("synthesis", "Analysis complete", is_done=True)
                self.printer.end()
                
            print("\n\n=====ENHANCED CRYPTOCURRENCY ANALYSIS=====\n\n")
            print(f"Query: {query}")
            print(f"Strategy: {analysis_plan['strategy']}")
            print(f"Data Sources: {', '.join(analysis_plan['sources'])}")
            print(f"\nAnalysis:\n{final_output}")
            
            return final_output
            
        except Exception as e:
            self.printer.end()
            print(f"āŒ Error during cryptocurrency analysis: {e}")
            raise

    async def _plan_analysis(self, query: str) -> Dict[str, Any]:
        """Plan the analysis strategy based on the query"""
        print("CREATING CRYPTO PLAN ANALYSIS")
        # Create a planning agent specifically for cryptocurrency analysis
        crypto_planner_prompt = (
            "You are a cryptocurrency analysis planner. Given a user query, determine what data sources and analysis approach to use.\n\n"
            "Available data sources:\n"
            "- CoinGecko API: Market data, prices, historical data, trending coins, market cap info\n"
            "- TradingView API: Trading data, broker integration, order book info, technical analysis\n"
            "- Basic cryptocurrency knowledge: General crypto information\n\n"
            "Determine:\n"
            "1. Which data sources are needed\n"
            "2. What type of analysis (market, trading, educational, comparative)\n"
            "3. Whether real-time data is required\n"
            "4. If trading-related functionality is needed\n\n"
            "Respond in this format:\n"
            "STRATEGY: [Brief description of analysis approach]\n"
            "SOURCES: [Comma-separated list: coingecko, tradingview, knowledge]\n"
            "TYPE: [market/trading/educational/comparative]\n"
            "REALTIME: [yes/no]\n"
            "TRADING: [yes/no]"
        )

        print("CREATING CRYPTO PLANNER AGENT")
        crypto_planner = Agent(
            name="CryptoPlannerAgent",
            instructions=crypto_planner_prompt,
            model=model
        )

        print("RUNNING CRYPTO PLANNER AGENT")
        planning_result = await crypto_planner.run(f"User query: {query}")
        print("CRYPTO PLANNER RESULT: ", planning_result.output)
        plan_text = planning_result.output
        
        # Parse the planning result
        plan = {
            'strategy': 'comprehensive analysis',
            'sources': ['coingecko', 'knowledge'],
            'type': 'market',
            'realtime': False,
            'trading': False
        }
        print("PARSING PLANNING RESULT")
        for line in plan_text.split('\n'):
            line = line.strip()
            if line.startswith('STRATEGY:'):
                plan['strategy'] = line.split('STRATEGY:')[1].strip()
            elif line.startswith('SOURCES:'):
                sources_text = line.split('SOURCES:')[1].strip()
                plan['sources'] = [s.strip() for s in sources_text.split(',')]
            elif line.startswith('TYPE:'):
                plan['type'] = line.split('TYPE:')[1].strip()
            elif line.startswith('REALTIME:'):
                plan['realtime'] = 'yes' in line.split('REALTIME:')[1].strip().lower()
            elif line.startswith('TRADING:'):
                plan['trading'] = 'yes' in line.split('TRADING:')[1].strip().lower()

        print("FINISHED CRYPTO PLAN ANALYSIS")
        return plan

    async def _collect_data(self, query: str, analysis_plan: Dict[str, Any]) -> Dict[str, Any]:
        """Collect data from determined sources"""
        collected_data = {}
        print("GETTING DATA COLLECTION")
        # Collect CoinGecko market data if needed
        if 'coingecko' in analysis_plan['sources']:
            self.printer.update_item("coingecko", "Collecting CoinGecko market data...", is_done=False)
            try:
                coingecko_result = await self.coingecko_manager_generic.run(query)
                collected_data['coingecko'] = coingecko_result
                self.printer.update_item("coingecko", "CoinGecko data collected", is_done=True)
            except Exception as e:
                collected_data['coingecko'] = f"Error collecting CoinGecko data: {str(e)}"
                self.printer.update_item("coingecko", "CoinGecko data failed", is_done=True)
        
        # Collect TradingView trading data if needed
        if 'tradingview' in analysis_plan['sources']:
            self.printer.update_item("tradingview", "Collecting TradingView trading data...", is_done=False)
            try:
                tradingview_result = await self.polygon_manager_generic.run(query)
                collected_data['tradingview'] = tradingview_result
                self.printer.update_item("tradingview", "TradingView data collected", is_done=True)
            except Exception as e:
                collected_data['tradingview'] = f"Error collecting TradingView data: {str(e)}"
                self.printer.update_item("tradingview", "TradingView data failed", is_done=True)
        
        # Collect basic cryptocurrency knowledge if needed
        if 'knowledge' in analysis_plan['sources']:
            self.printer.update_item("knowledge", "Gathering cryptocurrency knowledge...", is_done=False)
            try:
                knowledge_result = await cryptocurrency_agent.run(f"Provide cryptocurrency information for: {query}")
                collected_data['knowledge'] = knowledge_result.output
                self.printer.update_item("knowledge", "Knowledge gathered", is_done=True)
            except Exception as e:
                collected_data['knowledge'] = f"Error gathering knowledge: {str(e)}"
                self.printer.update_item("knowledge", "Knowledge gathering failed", is_done=True)

        print("FINISHED DATA COLLECTION")
        return collected_data

    async def _synthesize_analysis(self, query: str, analysis_plan: Dict[str, Any], collected_data: Dict[str, Any]) -> str:
        """Synthesize all collected data into a comprehensive analysis"""
        print("GETTING SYNTHESIZE ANALYSIS")
        # Create synthesis prompt
        synthesis_prompt = (
            "You are a cryptocurrency analysis expert. Synthesize the provided data sources into a comprehensive, "
            "accurate, and insightful response to the user's query.\n\n"
            "Guidelines:\n"
            "- Combine information from multiple sources when available\n"
            "- Prioritize real-time data when present\n"
            "- Provide market context and insights\n"
            "- Include trading recommendations if trading data is available\n"
            "- Explain technical concepts clearly\n"
            "- Highlight any data limitations or uncertainties\n"
            "- Structure the response with clear sections and headings\n"
            "- Include relevant metrics, prices, and percentages when available\n\n"
            "Focus on providing actionable insights based on the analysis strategy."
        )

        print("CREATING SYNTHESIS AGENT")
        synthesis_agent = Agent(
            name="CryptoSynthesisAgent",
            instructions=synthesis_prompt,
            model=model
        )
        
        # Prepare synthesis input
        synthesis_input = f"User Query: {query}\n\n"
        synthesis_input += f"Analysis Strategy: {analysis_plan['strategy']}\n"
        synthesis_input += f"Analysis Type: {analysis_plan['type']}\n\n"
        print("SYNTHESIS INPUT: ", synthesis_input)
      
        # Add collected data
        for source, data in collected_data.items():
            synthesis_input += f"{source.upper()} Data:\n{data}\n\n"
        
        synthesis_input += "Please provide a comprehensive analysis based on all available data sources."
        
        # Generate final synthesis
        print("GENERATING FINAL SYNTHESIS")
        synthesis_result = await synthesis_agent.run(synthesis_input)
        print("FINAL SYNTHESIS RESULT: ", synthesis_result)
        return synthesis_result.output

Research Manager - Used as an alternative or to supplement results

class ResearchManager:
    def __init__(self):
        self.console = Console()
        self.printer = Printer(self.console)

    async def run(self, query: str) -> str:
        trace_id = gen_trace_id()
        with trace("Research trace", trace_id=trace_id):
            self.printer.update_item(
                "trace_id",
                f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}",
                is_done=True,
                hide_checkmark=True,
            )

            self.printer.update_item(
                "starting",
                "Starting research...",
                is_done=True,
                hide_checkmark=True,
            )
            print("DETECTING LANGUAGE")
            # Detect language first
            detected_language = await self._detect_language(query)
            print("FOUND LANGUAGE: ", detected_language)

            print("CREATING SEARCH PLAN")
            search_plan = await self._plan_searches(query)
            print("SEARCH PLAN", search_plan)

            print("EXECUTING SEARCHES")
            search_results = await self._perform_searches(search_plan)
            print("SEARCH RESULTS: ", search_results)

            print("CREATING REPORT")
            report = await self._write_report(query, search_results)
            print("REPORT: ", report)

            print("CREATING FINAL REPORT")
            final_report = f"Report summary\n\n{report.short_summary}"
            print("FINAL REPORT: ", final_report)
          
            self.printer.update_item("report", final_report, is_done=True)

            # Translate the report if not in English
            if detected_language.language.lower() != "english":
                translated_report = await self._translate_report(detected_language.language, report.markdown_report)
                self.printer.update_item("final_report", "Translation completed", is_done=True)
                final_output = translated_report.translated_report
            else:
                final_output = report.markdown_report
                
            self.printer.end()
            
        print("\n\n=====REPORT=====\n\n")
        print(f"Report: {final_output}")
        print("\n\n=====FOLLOW UP QUESTIONS=====\n\n")
        follow_up_questions = "\n".join(report.follow_up_questions)
        print(f"Follow up questions: {follow_up_questions}")

        return final_output

    async def _translate_report(self, target_language: str, report: str) -> TranslationResult:
        self.printer.update_item("translation", "Translating report...")
        translation_input = f"Target language: {target_language}\n\nMarkdown report to translate:\n{report}"
        result = await translation_agent.run(translation_input)
        self.printer.mark_item_done("translation")
        return TranslationResult(**result.output.model_dump())

    async def _detect_language(self, query: str) -> DetectLanguageResult:
        self.printer.update_item("language_detection", "Detecting language...")
        result = await language_agent.run(f"Detect the language of this message: {query}")
        self.printer.mark_item_done("language_detection")
        return DetectLanguageResult(**result.output.model_dump())
  
    async def _plan_searches(self, query: str) -> WebSearchPlan:
        self.printer.update_item("planning", "Planning searches...")
        result = await planner_agent.run(
            f"Query: {query}"
        )
        self.printer.update_item(
            "planning",
            f"Will perform {len(result.output.searches)} searches",
            is_done=True,
        )
        return WebSearchPlan(**result.output.model_dump())

    async def _perform_searches(self, search_plan: WebSearchPlan) -> "list[str]":
        with custom_span("Search the web"):
            self.printer.update_item("searching", "Searching...")
            num_completed = 0
            tasks = [asyncio.create_task(self._search(item)) for item in search_plan.searches]
            results = []
            for task in asyncio.as_completed(tasks):
                result = await task
                if result is not None:
                    results.append(result)
                num_completed += 1
                self.printer.update_item(
                    "searching", f"Searching... {num_completed}/{len(tasks)} completed"
                )
            self.printer.mark_item_done("searching")
            return results

    async def _search(self, item: WebSearchItem) -> Union[str, None]:
        input = f"Search term: {item.query}\nReason for searching: {item.reason}"
        try:
            result = await search_agent.run(
                input
            )
            return str(result.output)
        except Exception:
            return None

    async def _write_report(self, query: str, search_results: "list[str]") -> Union[ReportData, None]:
        self.printer.update_item("writing", "Thinking about report...")
        input = f"Original query: {query}\nSummarized search results: {search_results}"
        print("WRITTING REPORT");
        try:
          result = await writer_agent.run(input)

Triage Agent/Prompt

TRIAGE_PROMPT = (
    "You are a triage agent that determines the appropriate specialist for handling user requests.\n"
    "Classify the input as:\n"
    "- 'research': For general research questions, topics to investigate, information gathering\n"
    "- 'security': For URL security scanning, malware detection, phishing analysis, security assessments\n"
    "- 'insurance': For insurance-related questions, policy inquiries, claims information, coverage details\n"
    "- 'cryptocurrency': For crypto price queries, token information, blockchain analysis, crypto historical data\n"
    "- 'trading': For stock trading, broker APIs, portfolio management, order placement, STOCK market data only\n"
    "- 'crypto_api': For CoinGecko API planning, crypto API endpoint creation\n"
    "- 'legal': For legal questions, rights, statutes, regulations, court cases, legal definitions, or anything requiring legal expertise\n"
    "- 'unknown': If the intent is unclear or doesn't fit any category\n\n"
    "IMPORTANT DISTINCTIONS:\n"
    "- Cryptocurrency queries (Bitcoin, Ethereum, altcoins, etc.) should go to 'cryptocurrency' or 'crypto_api'\n"
    "- Stock trading queries (AAPL, TSLA, NYSE, NASDAQ stocks) should go to 'trading'\n"
    "- TradingView requests for crypto data should go to 'cryptocurrency' (TradingView Broker API doesn't provide crypto market data)\n"
    "- Legal questions (Miranda rights, constitutional law, statutes, court cases, legal definitions, etc.) should go to 'legal'\n\n"
    "Examples:\n"
    "- 'What is machine learning?' → research\n"
    "- 'Is this URL safe: https://example.com' → security\n"
    "- 'Scan this website for malware' → security\n"
    "- 'Tell me about climate change' → research\n"
    "- 'Check if this link is phishing' → security\n"
    "- 'What is health insurance?' → insurance\n"
    "- 'How do car insurance claims work?' → insurance\n"
    "- 'What types of life insurance are available?' → insurance\n"
    "- 'Get Bitcoin price' → cryptocurrency\n"
    "- 'Get Ethereum historical data' → cryptocurrency\n"
    "- 'Historical price data for Ethereum using tradingview' → cryptocurrency\n"
    "- 'Buy 100 shares of AAPL' → trading\n"
    "- 'Check my portfolio balance' → trading\n"
    "- 'Place a limit order for Tesla stock' → trading\n"
    "- 'Get AAPL historical data' → trading\n"
    "- 'Create CoinGecko API sequence for trending coins' → crypto_api\n"
    "- 'What are the Miranda rights?' → legal\n"
    "- 'Explain the Fourth Amendment' → legal\n"
    "- 'What is the difference between a felony and a misdemeanor?' → legal\n"
    "- 'Summarize Roe v. Wade' → legal\n\n"
    "Provide your classification with confidence level and reasoning."
)

triage_agent = Agent(
    name="TriageAgent",
    instructions=TRIAGE_PROMPT,
    model=model,
    output_type=TriageResult,
    instrument=True
)

Triage Manager - Actual Entry Point

class TriageManager:
    def __init__(self):
        self.console = Console()
        self.printer = Printer(self.console)

    async def classify_input(self, user_input: str) -> TriageResult:
        """Classify user input to determine which manager to use"""
        try:
            self.printer.update_item("triage", "Analyzing input...", is_done=False)
            
            result = await triage_agent.run(f"User input: {user_input}")
            triage_result = result.output
            
            self.printer.update_item(
                "triage", 
                f"Classified as: {triage_result.task_type} (confidence: {triage_result.confidence:.2f})",
                is_done=True
            )
            
            return triage_result
            
        except Exception as e:
            self.printer.update_item("triage", f"Classification failed: {e}", is_done=True)
            # Default to research if classification fails
            return TriageResult(
                task_type="research",
                confidence=0.0,
                reasoning=f"Classification failed, defaulting to research: {e}"
            )

    async def route_and_execute(self, user_input: str, message_history: List) -> str:
        """Route input to appropriate manager and execute"""
        try:
            trace_id = gen_trace_id()
            with trace("Triage and Execution", trace_id=trace_id):
                self.printer.update_item(
                    "trace_id",
                    f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}",
                    is_done=True,
                    hide_checkmark=True,
                )                # Classify the input
                triage_result = await self.classify_input(user_input)
                
                self.printer.update_item(
                    "reasoning", 
                    f"Reasoning: {triage_result.reasoning}",
                    is_done=True,
                    hide_checkmark=True
                )

                # We always try to use specific managers based on the task, but we also want to conduct some research using other methods involving web searches
                research_manager = ResearchManager()
                result = ""   
                # Route to appropriate manager                
                #if triage_result.task_type == "security":
                 #   self.printer.update_item("routing", "šŸ”’ Routing to Security Manager...", is_done=True)
                  #  security_manager = SecurityManager()
                  #  result = await security_manager.run(user_input, message_history=message_history)
                #elif triage_result.task_type == "insurance":
                    #self.printer.update_item("routing", "šŸ„ Routing to Insurance Manager...", is_done=True)
                   # insurance_manager = InsuranceManager()
                    #result = await insurance_manager.run(user_input, message_history=message_history)
                    #return result
                #elif triage_result.task_type == "legal":
                    #self.printer.update_item("routing", "āš–ļø Routing to Legal Manager...", is_done=True)
                    #legal_manager = LegalManager()
                    #result = await legal_manager.run(user_input, message_history=message_history)
                #elif triage_result.task_type == "research":
                    #self.printer.update_item("routing", "šŸ” Routing to Research Manager...", is_done=True)                    
                    #result = await research_manager.run(user_input)
                #elif triage_result.task_type == "cryptocurrency":

               
               # we don't know what market to check, but we know it's a cryptocurrency
                if triage_result.task_type == "cryptocurrency":
                    self.printer.update_item("routing", "šŸ’° Routing to Cryptocurrency Manager...", is_done=True)
                    print("CREATING CRYPTO MANAGER")
                    cryptocurrency_manager = CryptocurrencyManager()
                    print("CALLING CRYPTO MANAGER")
                    result = await cryptocurrency_manager.run(user_input)
                    print("CRYPTO MANAGER RESUTL: ", result)
                # To conduct trades and/or query stocks we use Tradingview
                elif triage_result.task_type == "trading":
                    self.printer.update_item("routing", "šŸ“ˆ Routing to Polygon/Tradingview Manager...", is_done=True)
                    polygon_manager = PolygonManagerGeneric()
                    result = await polygon_manager.run(user_input)
                # Used to specifically query Coingecko about a cryptocurrency
                elif triage_result.task_type == "crypto_api":
                    self.printer.update_item("routing", "šŸŖ™ Routing to CoinGecko API Manager...", is_done=True)
                    coingecko_api_manager = CoingeckoAPIManagerGeneric()
                    result = await coingecko_api_manager.run(user_input)
                else:
                    self.printer.update_item("routing", "ā“ Input unclear, defaulting to Research Manager...", is_done=True)
                    
                print("RUNNING RESEARCH MANAGER")
                research_result = await research_manager.run(user_input)
                print("RESEARCH RESULTS: ", research_result)
              
                 # Compare result with research_result and use an Agent to synthesize final output
                self.printer.update_item("finalizing", "Finalizing result...", is_done=False)
                # Define a simple synthesis agent for combining results
                # from agents import Agent
                print("CREATING SYNTHESIS AGENT")
                synthesis_agent = Agent(
                    name="SynthesisAgent",
                    instructions="Combine the following results into a single, clear, and comprehensive report. Do not repeat information. Present the best insights from both results.",
                    model=model
                )
                
                syn_input = f"Combine the following results into a final report:\n\nResearch Result:\n{research_result}\n\nManager Result:\n{result}"
                print("RUNNING SYNTHESIS AGENT: ", syn_input)
                final_result = await synthesis_agent.run(syn_input)

                print("SYNTHESIS RESULT: ", final_result)
              
                self.printer.update_item("finalizing", "Result finalized successfully", is_done=True)
                self.printer.update_item("completed", "Triage and execution completed successfully", is_done=True)
                self.printer.update_item("final_output", final_result.output, is_done=True)           
                
                self.printer.end()
                return final_result.output

        except Exception as e:
            self.printer.end()
            raise e

Main Function

async def main():
    query = startTrigger.data.content
    
    try:
        triageManager = TriageManager()
        result = await triageManager.route_and_execute(query, message_history)
        console.print("\nāœ… Process completed successfully!", style="bold green")
        console.print(f"Final Result: {result}", style="green")
        return result
    except ModelHTTPError as e:
        console.print(f"\nāŒ Process failed: {e}", style="bold red")
        error_data = {
          "status_code": e.status_code,
          "message": e.message,
          "body": e.body # If available
        }
        return error_data

return await main()

Problems:

  • I'm getting 404 errors when the CoingeckoAPIManagerGeneric or PolygonManagerGeneric classes are used, but I can't tell if the endpoint urls are being misformed (missing http/https or missing the whole base url, so it could be trying to send requests to '/api/v3/tokens' instead of 'https://api.coingecko.com/api/v3')
  • While any API calls made from the BaseAPIManager always fail with a 404, any calls the ResearchManager makes work just fine.... in fact if I ask it 'what is current price of BTC on binance', after the API calls using CoingeckoAPIManagerGeneric and PolygonManagerGeneric fail the ResearchManager finds and makes a call to https://www.binance.com/en/trade/BTC_USDT using web searches... which works just fine, and since it sorta combines answers from the generic managers and the research manager it still gives a proper answer, but it's using extra api calls and tokens that are costing me in the long run

random side thought.... anybody know if using the PydanticAI Graphs would be a better option for this? I'm kinda wondering if the extra bit of control over what agent/tool gets used and when that the Graphs offer might be a better/cheaper option, or at least make debugging a bit easier?