import os from typing import Dict, Any, List, Optional from tenacity import retry, stop_after_attempt, wait_exponential from dotenv import load_dotenv import json from llm_client import LLMConfig, LLMProvider, create_llm_client from config.config_loader import load_config load_dotenv() class LLMExecutor: def __init__(self, model: str = None): # Load configuration config_dict = load_config() llm_config = config_dict["llm"] # Get provider-specific settings provider = LLMProvider(llm_config["provider"]) api_base = ( llm_config["api"].get("api_base") or llm_config.get(provider.value, {}).get("api_base") ) # Initialize LLM client with configuration config = LLMConfig( provider=provider, api_key=llm_config["api_key"], api_base=api_base, model=model or llm_config["model"], max_tokens=llm_config.get("max_tokens", 2000), temperature=llm_config.get("temperature", 0.7), timeout=llm_config.get("timeout", 30), ) self.llm_client = create_llm_client(config) self.conversation_history = [] @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def execute_step(self, step_instruction: str, step_input: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Execute a single step using the LLM. Args: step_instruction: The instruction for the current step step_input: Input data for the step context: Optional context information Returns: Dict containing the LLM's response and any generated artifacts """ try: # Format the prompt messages = [ {"role": "system", "content": """You are an AI assistant helping with task execution. Please process the input according to the instruction and return a JSON response. The input text may contain Chinese characters. Please handle them correctly. For example, if asked to validate text, return: {"valid": true/false, "reason": "explanation"} For text preprocessing, return: {"preprocessed_text": "cleaned text"} For summary generation, return: {"summary": "generated summary"} For keyword extraction, return: {"keywords": ["keyword1", "keyword2", ...]} For final analysis, return: {"analysis": "final analysis text"} Please ensure all Chinese characters are preserved and handled correctly."""}, {"role": "user", "content": f"""Instruction: {step_instruction} Input: {json.dumps(step_input, indent=2, ensure_ascii=False)}"""} ] # Call LLM try: response = await self.llm_client.generate(messages) if "error" in response: raise Exception(f"LLM API error: {response['error']}") if "choices" not in response or not response["choices"]: raise Exception("No response from LLM API") # Extract and parse response content = response["choices"][0]["message"]["content"] try: # Try to extract JSON from markdown code block if present if "```json" in content: json_str = content.split("```json")[1].split("```")[0].strip() output = json.loads(json_str) else: output = json.loads(content) except json.JSONDecodeError: # If response is not JSON, wrap it in a result field output = {"result": content} return { "success": True, "output": output } except Exception as api_error: raise Exception(f"LLM API call failed: {str(api_error)}") except Exception as e: return { "success": False, "error": str(e) } def _build_messages(self, instruction: str, input_data: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> List[Dict[str, str]]: """Build the message list for the LLM conversation.""" messages = [ { "role": "system", "content": """You are an AI agent tasked with executing specific steps in a larger task. Follow these guidelines: 1. Focus only on the current step's instruction 2. Use the provided input data and context 3. Return results in a clear, structured format 4. If you need additional information, specify it in your response 5. If you encounter errors, provide detailed error messages""" } ] # Add conversation history messages.extend(self.conversation_history) # Add context if provided if context: messages.append({ "role": "system", "content": f"Context for current step: {context}" }) # Add current instruction and input messages.append({ "role": "user", "content": f""" Instruction: {instruction} Input Data: {input_data} Please execute this step and provide your response in the following format: - Result: (main output) - Additional Info: (any additional information or explanations) - Required Next: (any information required for next steps) """ }) return messages def _parse_response(self, response: Dict[str, Any]) -> Dict[str, Any]: """Parse the LLM response into a structured format.""" content = response["content"] # Parse the response into sections sections = { "result": "", "additional_info": "", "required_next": "" } current_section = None for line in content.split('\n'): line = line.strip() if line.startswith('- Result:'): current_section = "result" sections[current_section] = line.replace('- Result:', '').strip() elif line.startswith('- Additional Info:'): current_section = "additional_info" sections[current_section] = line.replace('- Additional Info:', '').strip() elif line.startswith('- Required Next:'): current_section = "required_next" sections[current_section] = line.replace('- Required Next:', '').strip() elif current_section and line: sections[current_section] += f"\n{line}" return sections def _update_conversation_history(self, user_message: Dict[str, str], assistant_response: Dict[str, Any]): """Update the conversation history with the latest interaction.""" self.conversation_history.append(user_message) self.conversation_history.append({ "role": "assistant", "content": f""" Result: {assistant_response['result']} Additional Info: {assistant_response['additional_info']} Required Next: {assistant_response['required_next']} """ }) # Keep only the last 10 messages to prevent context window from growing too large if len(self.conversation_history) > 10: self.conversation_history = self.conversation_history[-10:] def clear_history(self): """Clear the conversation history.""" self.conversation_history = [] async def close(self): """Clean up resources.""" await self.llm_client.close()