200 lines
8.0 KiB
Python
200 lines
8.0 KiB
Python
import os
|
|
from typing import Dict, Any, List, Optional
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
from dotenv import load_dotenv
|
|
import json
|
|
from .llm_client import LLMConfig, LLMProvider, create_llm_client
|
|
from config.config_loader import load_config
|
|
|
|
load_dotenv()
|
|
|
|
class LLMExecutor:
|
|
def __init__(self, model: str = None):
|
|
# Load configuration
|
|
config_dict = load_config()
|
|
llm_config = config_dict["llm"]
|
|
|
|
# Get provider-specific settings
|
|
provider = LLMProvider(llm_config["provider"])
|
|
api_base = (
|
|
llm_config["api"].get("api_base") or
|
|
llm_config.get(provider.value, {}).get("api_base")
|
|
)
|
|
|
|
# Initialize LLM client with configuration
|
|
config = LLMConfig(
|
|
provider=provider,
|
|
api_key=llm_config["api_key"],
|
|
api_base=api_base,
|
|
model=model or llm_config["model"],
|
|
max_tokens=llm_config.get("max_tokens", 2000),
|
|
temperature=llm_config.get("temperature", 0.7),
|
|
timeout=llm_config.get("timeout", 30),
|
|
)
|
|
self.llm_client = create_llm_client(config)
|
|
self.conversation_history = []
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
|
|
async def execute_step(self,
|
|
step_instruction: str,
|
|
step_input: Dict[str, Any],
|
|
context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""
|
|
Execute a single step using the LLM.
|
|
|
|
Args:
|
|
step_instruction: The instruction for the current step
|
|
step_input: Input data for the step
|
|
context: Optional context information
|
|
|
|
Returns:
|
|
Dict containing the LLM's response and any generated artifacts
|
|
"""
|
|
try:
|
|
# Format the prompt
|
|
messages = [
|
|
{"role": "system", "content": """You are an AI assistant helping with task execution.
|
|
Please process the input according to the instruction and return a JSON response.
|
|
The input text may contain Chinese characters. Please handle them correctly.
|
|
For example, if asked to validate text, return: {"valid": true/false, "reason": "explanation"}
|
|
For text preprocessing, return: {"preprocessed_text": "cleaned text"}
|
|
For summary generation, return: {"summary": "generated summary"}
|
|
For keyword extraction, return: {"keywords": ["keyword1", "keyword2", ...]}
|
|
For final analysis, return: {"analysis": "final analysis text"}
|
|
Please ensure all Chinese characters are preserved and handled correctly."""},
|
|
{"role": "user", "content": f"""Instruction: {step_instruction}
|
|
Input: {json.dumps(step_input, indent=2, ensure_ascii=False)}"""}
|
|
]
|
|
|
|
# Call LLM
|
|
try:
|
|
response = await self.llm_client.generate(messages)
|
|
if "error" in response:
|
|
raise Exception(f"LLM API error: {response['error']}")
|
|
|
|
if "choices" not in response or not response["choices"]:
|
|
raise Exception("No response from LLM API")
|
|
|
|
# Extract and parse response
|
|
content = response["choices"][0]["message"]["content"]
|
|
try:
|
|
# Try to extract JSON from markdown code block if present
|
|
if "```json" in content:
|
|
json_str = content.split("```json")[1].split("```")[0].strip()
|
|
output = json.loads(json_str)
|
|
else:
|
|
output = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
# If response is not JSON, wrap it in a result field
|
|
output = {"result": content}
|
|
|
|
return {
|
|
"success": True,
|
|
"output": output
|
|
}
|
|
except Exception as api_error:
|
|
raise Exception(f"LLM API call failed: {str(api_error)}")
|
|
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
def _build_messages(self,
|
|
instruction: str,
|
|
input_data: Dict[str, Any],
|
|
context: Optional[Dict[str, Any]] = None) -> List[Dict[str, str]]:
|
|
"""Build the message list for the LLM conversation."""
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": """You are an AI agent tasked with executing specific steps in a larger task.
|
|
Follow these guidelines:
|
|
1. Focus only on the current step's instruction
|
|
2. Use the provided input data and context
|
|
3. Return results in a clear, structured format
|
|
4. If you need additional information, specify it in your response
|
|
5. If you encounter errors, provide detailed error messages"""
|
|
}
|
|
]
|
|
|
|
# Add conversation history
|
|
messages.extend(self.conversation_history)
|
|
|
|
# Add context if provided
|
|
if context:
|
|
messages.append({
|
|
"role": "system",
|
|
"content": f"Context for current step: {context}"
|
|
})
|
|
|
|
# Add current instruction and input
|
|
messages.append({
|
|
"role": "user",
|
|
"content": f"""
|
|
Instruction: {instruction}
|
|
Input Data: {input_data}
|
|
|
|
Please execute this step and provide your response in the following format:
|
|
- Result: (main output)
|
|
- Additional Info: (any additional information or explanations)
|
|
- Required Next: (any information required for next steps)
|
|
"""
|
|
})
|
|
|
|
return messages
|
|
|
|
def _parse_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Parse the LLM response into a structured format."""
|
|
content = response["content"]
|
|
|
|
# Parse the response into sections
|
|
sections = {
|
|
"result": "",
|
|
"additional_info": "",
|
|
"required_next": ""
|
|
}
|
|
|
|
current_section = None
|
|
for line in content.split('\n'):
|
|
line = line.strip()
|
|
if line.startswith('- Result:'):
|
|
current_section = "result"
|
|
sections[current_section] = line.replace('- Result:', '').strip()
|
|
elif line.startswith('- Additional Info:'):
|
|
current_section = "additional_info"
|
|
sections[current_section] = line.replace('- Additional Info:', '').strip()
|
|
elif line.startswith('- Required Next:'):
|
|
current_section = "required_next"
|
|
sections[current_section] = line.replace('- Required Next:', '').strip()
|
|
elif current_section and line:
|
|
sections[current_section] += f"\n{line}"
|
|
|
|
return sections
|
|
|
|
def _update_conversation_history(self, user_message: Dict[str, str],
|
|
assistant_response: Dict[str, Any]):
|
|
"""Update the conversation history with the latest interaction."""
|
|
self.conversation_history.append(user_message)
|
|
self.conversation_history.append({
|
|
"role": "assistant",
|
|
"content": f"""
|
|
Result: {assistant_response['result']}
|
|
Additional Info: {assistant_response['additional_info']}
|
|
Required Next: {assistant_response['required_next']}
|
|
"""
|
|
})
|
|
|
|
# Keep only the last 10 messages to prevent context window from growing too large
|
|
if len(self.conversation_history) > 10:
|
|
self.conversation_history = self.conversation_history[-10:]
|
|
|
|
def clear_history(self):
|
|
"""Clear the conversation history."""
|
|
self.conversation_history = []
|
|
|
|
async def close(self):
|
|
"""Clean up resources."""
|
|
await self.llm_client.close()
|