diff --git a/README.md b/README.md index 7ea67f5..bace2df 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Agent Task Executor -A flexible task execution framework that uses LLMs (Large Language Models). +A flexible task execution framework that uses LLMs (Large Language Models).The framework provides a robust foundation for building task-specific executors while handling common execution concerns. ## Features diff --git a/core/task_executor.py b/core/task_executor.py index f83ec87..64baa5f 100644 --- a/core/task_executor.py +++ b/core/task_executor.py @@ -8,38 +8,41 @@ import logging import asyncio from .llm_executor import LLMExecutor +# 任务状态枚举 class TaskStatus(Enum): - PENDING = "pending" - IN_PROGRESS = "in_progress" - COMPLETED = "completed" - FAILED = "failed" + PENDING = "pending" # 等待中 + IN_PROGRESS = "in_progress" # 进行中 + COMPLETED = "completed" # 已完成 + FAILED = "failed" # 已失败 +# 步骤状态数据类 @dataclass class StepState: - step_id: str - status: TaskStatus - required_info: List[str] - available_info: List[str] - missing_info: List[str] - resources_used: Dict[str, Any] + step_id: str # 步骤ID + status: TaskStatus # 当前状态 + required_info: List[str] # 所需信息 + available_info: List[str] # 可用信息 + missing_info: List[str] # 缺失信息 + resources_used: Dict[str, Any] # 已用资源 +# 任务执行器主类 class TaskExecutor: - MAX_RETRIES = 3 - TIMEOUT = 300 # seconds - CHECKPOINT_INTERVAL = 5 # steps + MAX_RETRIES = 3 # 最大重试次数 + TIMEOUT = 300 # 超时时间(秒) + CHECKPOINT_INTERVAL = 5 # 检查点间隔(步骤数) def __init__(self, llm_model: str = None): - """Initialize TaskExecutor.""" - self.task_id = str(uuid.uuid4()) - self.start_time = time.time() - self.checkpoints = [] - self.execution_path = [] - self.current_step = None - self.retry_count = 0 - self.llm_executor = LLMExecutor(model=llm_model) - self.task_input = None + """初始化任务执行器""" + self.task_id = str(uuid.uuid4()) # 生成唯一任务ID + self.start_time = time.time() # 记录开始时间 + self.checkpoints = [] # 检查点列表 + self.execution_path = [] # 执行路径记录 + self.current_step = None # 当前步骤 + self.retry_count = 0 # 重试计数器 + self.llm_executor = LLMExecutor(model=llm_model) # LLM执行器 + self.task_input = None # 任务输入 - # Configure logging + # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' @@ -47,144 +50,150 @@ class TaskExecutor: self.logger = logging.getLogger(f"TaskExecutor-{self.task_id}") def get_status_update(self) -> dict: - """Generate a status update for the current execution state.""" + """生成当前执行状态的状态更新""" return { - "task_id": self.task_id, - "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), - "current_step": self.current_step, - "checkpoints": self.checkpoints, + "task_id": self.task_id, # 任务ID + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), # 时间戳 + "current_step": self.current_step, # 当前步骤 + "checkpoints": self.checkpoints, # 检查点列表 "resources": { "used": { - "time": time.time() - self.start_time, - "memory": "N/A" # To be implemented + "time": time.time() - self.start_time, # 已用时间 + "memory": "N/A" # 待实现:内存使用 }, "available": { - "time": self.TIMEOUT - (time.time() - self.start_time), - "memory": "N/A" # To be implemented + "time": self.TIMEOUT - (time.time() - self.start_time), # 剩余时间 + "memory": "N/A" # 待实现:可用内存 } }, - "execution_path": self.execution_path, - "status": TaskStatus.COMPLETED.value if self.current_step and self.current_step.get("status") == TaskStatus.COMPLETED.value else TaskStatus.IN_PROGRESS.value + "execution_path": self.execution_path, # 执行路径 + "status": TaskStatus.COMPLETED.value if self.current_step and self.current_step.get("status") == TaskStatus.COMPLETED.value else TaskStatus.IN_PROGRESS.value # 当前状态 } async def execute_step(self, step_id: str, step_data: Dict[str, Any]) -> bool: - """Execute a single step of the task using LLM.""" + """使用LLM执行任务的单个步骤""" try: + # 初始化当前步骤状态 self.current_step = { - "id": step_id, - "name": step_data.get("name", "Unknown"), - "status": TaskStatus.IN_PROGRESS.value, - "progress": 0 + "id": step_id, # 步骤ID + "name": step_data.get("name", "Unknown"), # 步骤名称 + "status": TaskStatus.IN_PROGRESS.value, # 状态设为进行中 + "progress": 0 # 进度初始化为0 } - # Check if execution should continue + # 检查是否超时 if time.time() - self.start_time > self.TIMEOUT: - raise TimeoutError("Task execution timeout") + raise TimeoutError("任务执行超时") - # Execute step using LLM + # 使用LLM执行步骤 step_result = await self.llm_executor.execute_step( - step_instruction=step_data.get("instruction", ""), - step_input=step_data.get("input", {}), - context=step_data.get("context", {}) + step_instruction=step_data.get("instruction", ""), # 步骤指令 + step_input=step_data.get("input", {}), # 步骤输入 + context=step_data.get("context", {}) # 上下文信息 ) + # 检查步骤是否成功 if not step_result["success"]: - raise Exception(f"Step failed: {step_result['error']}") + raise Exception(f"步骤失败: {step_result['error']}") - # Update step status + # 更新执行路径 self.execution_path.append({ - "step_id": step_id, - "result": step_result["output"] + "step_id": step_id, # 步骤ID + "result": step_result["output"] # 步骤结果 }) - self.current_step["status"] = TaskStatus.COMPLETED.value - self.current_step["progress"] = 100 + # 更新步骤状态 + self.current_step["status"] = TaskStatus.COMPLETED.value # 状态设为已完成 + self.current_step["progress"] = 100 # 进度设为100% - # Create checkpoint if needed + # 如果需要创建检查点 if len(self.execution_path) % self.CHECKPOINT_INTERVAL == 0: self.create_checkpoint() return True except Exception as e: - self.current_step["status"] = TaskStatus.FAILED.value - return self.handle_error(e) + # 处理异常 + self.current_step["status"] = TaskStatus.FAILED.value # 状态设为失败 + return self.handle_error(e) # 调用错误处理 def validate_input(self, input_data: Dict[str, Any]) -> bool: - """Validate the input data for the task.""" - return True + """验证任务输入数据""" + return True # 默认实现,子类可重写 def create_checkpoint(self): - """Create a checkpoint of the current execution state.""" + """创建当前执行状态的检查点""" checkpoint = { - "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), - "task_id": self.task_id, - "current_step": self.current_step, - "execution_path": self.execution_path.copy(), + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), # 时间戳 + "task_id": self.task_id, # 任务ID + "current_step": self.current_step, # 当前步骤 + "execution_path": self.execution_path.copy(), # 执行路径副本 "resources": { "used": { - "time": time.time() - self.start_time, - "memory": "N/A" # To be implemented + "time": time.time() - self.start_time, # 已用时间 + "memory": "N/A" # 待实现:内存使用 } } } - self.checkpoints.append(checkpoint) - self.logger.info(f"Created checkpoint: {json.dumps(checkpoint, indent=2)}") + self.checkpoints.append(checkpoint) # 添加到检查点列表 + self.logger.info(f"创建检查点: {json.dumps(checkpoint, indent=2)}") # 记录日志 def rollback_to_checkpoint(self, checkpoint_index: int): - """Rollback the execution to a specific checkpoint.""" + """回滚到指定检查点""" if 0 <= checkpoint_index < len(self.checkpoints): - checkpoint = self.checkpoints[checkpoint_index] - # Implement state restoration logic - self.logger.info(f"Rolling back to checkpoint: {checkpoint['timestamp']}") + checkpoint = self.checkpoints[checkpoint_index] # 获取检查点 + # 实现状态恢复逻辑 + self.logger.info(f"回滚到检查点: {checkpoint['timestamp']}") # 记录日志 return True - return False + return False # 检查点索引无效 def get_next_actions(self) -> List[str]: - """Determine the next possible actions based on current state.""" + """根据当前状态确定下一步可能的操作""" actions = [] if self.current_step and self.current_step["status"] == TaskStatus.FAILED.value: - actions.extend(["retry", "rollback", "abort"]) + actions.extend(["retry", "rollback", "abort"]) # 失败时可重试、回滚或中止 elif self.current_step and self.current_step["status"] == TaskStatus.COMPLETED.value: - actions.extend(["continue", "checkpoint"]) + actions.extend(["continue", "checkpoint"]) # 完成时可继续或创建检查点 return actions def handle_error(self, error: Exception): - """Handle execution errors.""" - self.logger.error(f"Error occurred: {str(error)}") - self.retry_count += 1 + """处理执行错误""" + self.logger.error(f"发生错误: {str(error)}") # 记录错误日志 + self.retry_count += 1 # 增加重试计数 if self.retry_count >= self.MAX_RETRIES: - self.logger.error("Max retries reached. Terminating execution.") + self.logger.error("达到最大重试次数。终止执行。") # 记录终止日志 return False - # Implement error recovery logic - return True + # 实现错误恢复逻辑 + return True # 允许重试 async def execute(self, task_input: Dict[str, Any]) -> Dict[str, Any]: - """Execute the complete task.""" + """执行完整任务""" try: - # Store task input + # 存储任务输入 self.task_input = task_input + # 验证输入 if not self.validate_input(task_input): - raise ValueError("Invalid task input") + raise ValueError("无效的任务输入") - self.logger.info(f"Starting task execution: {self.task_id}") + self.logger.info(f"开始任务执行: {self.task_id}") # 记录开始日志 - # Execute each step in sequence + # 按顺序执行每个步骤 if hasattr(self, 'task_steps'): for step in self.task_steps: if not await self.execute_step(step["id"], step): - raise Exception(f"Step {step['id']} failed") + raise Exception(f"步骤 {step['id']} 失败") + # 按间隔创建检查点 if len(self.execution_path) % self.CHECKPOINT_INTERVAL == 0: self.create_checkpoint() else: - raise ValueError("No task steps defined") + raise ValueError("未定义任务步骤") - return self.get_status_update() + return self.get_status_update() # 返回最终状态 except Exception as e: - self.logger.error(f"Task failed: {str(e)}") - return self.get_status_update() + self.logger.error(f"任务失败: {str(e)}") # 记录失败日志 + return self.get_status_update() # 返回失败状态 diff --git a/tasksamples/text_analysis_task.py b/tasksamples/text_analysis_task.py index 6a23405..520a5a8 100644 --- a/tasksamples/text_analysis_task.py +++ b/tasksamples/text_analysis_task.py @@ -4,9 +4,12 @@ import time import json import re +# 中文文本分析执行器 class TextAnalysisExecutor(TaskExecutor): def __init__(self): + """初始化中文文本分析执行器""" super().__init__(llm_model="deepseek-chat") + # 定义任务步骤 self.task_steps = [ { "id": "input_validation", @@ -41,30 +44,31 @@ class TextAnalysisExecutor(TaskExecutor): ] def validate_input(self, input_data: Dict[str, Any]) -> bool: - """Validate specific input requirements for the text analysis task.""" + """验证文本分析任务的输入数据""" if "text" not in input_data: return False text = input_data.get("text", "") return isinstance(text, str) and len(text.strip()) > 0 async def execute_step(self, step_id: str, step_data: Dict[str, Any]) -> bool: - """Execute a specific step of the text analysis task.""" + """执行文本分析任务的特定步骤""" try: + # 初始化当前步骤状态 self.current_step = { - "id": step_id, - "name": step_data["name"], - "status": TaskStatus.IN_PROGRESS.value, - "progress": 0 + "id": step_id, # 步骤ID + "name": step_data["name"], # 步骤名称 + "status": TaskStatus.IN_PROGRESS.value, # 状态设为进行中 + "progress": 0 # 进度初始化为0 } - # Get step instruction and input + # 获取步骤指令和输入 instruction = step_data.get("instruction", "") step_input = {} - # Prepare step-specific input + # 准备步骤特定的输入 if step_id == "input_validation": text = self.task_input.get("text", "") - # Validate Chinese text encoding + # 验证中文文本编码 try: text.encode('utf-8').decode('utf-8') except UnicodeError: @@ -73,28 +77,28 @@ class TextAnalysisExecutor(TaskExecutor): elif step_id == "text_preprocessing": text = self.task_input.get("text", "") - # Basic Chinese text preprocessing - # Normalize whitespace while preserving Chinese text structure + # 中文文本预处理 + # 1. 规范化空白字符,同时保留中文文本结构 text = re.sub(r'\s+', ' ', text).strip() - # Normalize Chinese punctuation (simple example) + # 2. 规范化中文标点符号(简单示例) text = text.replace(',', ',').replace('。', '.').replace(':', ':') step_input = {"text": text} elif step_id == "generate_summary": - # Get the preprocessed text from previous step + # 从上一步获取预处理后的文本 prev_result = next( (step["result"] for step in self.execution_path if step["step_id"] == "text_preprocessing"), {} ) step_input = {"text": prev_result.get("preprocessed_text", self.task_input.get("text", ""))} elif step_id == "extract_keywords": - # Get the preprocessed text from previous step + # 从上一步获取预处理后的文本 prev_result = next( (step["result"] for step in self.execution_path if step["step_id"] == "text_preprocessing"), {} ) step_input = {"text": prev_result.get("preprocessed_text", self.task_input.get("text", ""))} elif step_id == "final_analysis": - # Get summary and keywords from previous steps + # 从之前的步骤获取摘要和关键词 summary_result = next( (step["result"] for step in self.execution_path if step["step_id"] == "generate_summary"), {} @@ -108,7 +112,7 @@ class TextAnalysisExecutor(TaskExecutor): "keywords": keywords_result.get("keywords", []) } - # Execute step using LLM + # 使用LLM执行步骤 step_result = await self.llm_executor.execute_step( step_instruction=instruction, step_input=step_input @@ -117,17 +121,17 @@ class TextAnalysisExecutor(TaskExecutor): if not step_result.get("success", False): raise Exception(f"Step failed: {step_result.get('error', 'Unknown error')}") - # Update execution path + # 更新执行路径 self.execution_path.append({ "step_id": step_id, "result": step_result.get("output", {}) }) - # Update step status + # 更新步骤状态 self.current_step["status"] = TaskStatus.COMPLETED.value self.current_step["progress"] = 100 - # Create checkpoint if needed + # 如果需要创建检查点 if len(self.execution_path) % self.CHECKPOINT_INTERVAL == 0: self.create_checkpoint() @@ -139,32 +143,33 @@ class TextAnalysisExecutor(TaskExecutor): return False def main(): - # Set console encoding to UTF-8 + """主函数:执行中文文本分析示例""" + # 设置控制台编码为UTF-8 import sys, io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') - # Create the executor + # 创建执行器实例 executor = TextAnalysisExecutor() - # Sample Chinese text for analysis + # 示例中文文本 sample_text = """ 从 ChatGPT 到 Devin:AI 编程的四个发展阶段与范式转变 Koji:我们再聊一聊 AI 编程。编程领域今年取得了非常令人兴奋的进展。雨森一直有很强的框架归纳和总结能力。前不久你跟我分享过你提炼出来的 AI 编程发展四段论,要不要在播客里和大家分享一下? 雨森:这其实是和很多朋友一起探讨得出的结果,是大家智慧的结晶。AI 编程从 ChatGPT 出现到现在也就两年出头的时间,但已经经历了四个阶段。 第一个阶段是让 AI 直接写代码,典型代表是早期的 ChatGPT、Claude。我们给它一个需求,比如「帮我写个贪吃蛇」,它就给出一段代码。在这个过程中,它既不知道我为什么要写贪吃蛇,也不知道代码运行情况如何。可能要我去本地编译运行后发现报错,再把错误告诉它,它才能给出调试后的结果。这时的 AI 完全就像一个只能通过邮件交流的笔友,是简单的问答模式。 第二阶段是以 GitHub Copilot 为代表,AI 开始拥有上下文,它可以把整个组织的代码库作为 context。这样 AI 就获得了大量新的背景信息。但这时用户还是需要手动把代码贴到 IDE 里面进行调试。我觉得这是 2.0 阶段,就是我们让 AI 拥有了 codebase 作为上下文。 2024 年一个非常大的进步是以 Cursor 为代表的编程 Copilot 的出现。它的核心理念是预测用户未来要写什么代码。根据你的代码库以及刚才写的代码,它预测你接下来要写什么代码、创建什么文件、做什么操作。这里面对于生成代码的质量和数量,以及文件的创建和修改都有很大提升。后来 Windsurf 还加入了对命令行操作的自动化,这样 AI 就能很好地使用我的电脑。原来的 AI 是在一张纸上写代码,我把代码抄走运行;现在 AI 可以在我的电脑上创建文件、执行命令行操作,进入到「我为你写」的阶段。 当我们觉得这已经很令人兴奋时,Devin 的出现带来了几个重要突破:首先,它可以异步工作。Cursor、Windsurf 这些工具虽然一步操作做的事情比较多,但仍然需要持续的注意力,即「我说一步它做一步」。而 Devin 可以持续工作,把用户的注意力释放出来。这是因为它多了一个 Planner,可以规划任务。 其次,它可以通过虚拟机执行更多操作,做更多调试工作。比如你写个网站,它可以自己用虚拟机去访问这个网站,检查前端后端的业务逻辑是否正确,并且可以随时打断和调整。大家用 Cursor 或者 ChatGPT 都知道,你无法在它输出的中间做调整,必须等它输出完后才能修改。但 Devin 就像真人一样,你可以在它完成任务时给出新指令,它会把这个结合到已有的 Planner 里调整计划。这就从「为你写」进化到了「为你做」。 总结一下这四个阶段:第一阶段是让 AI 写代码,代表是 ChatGPT;第二阶段是 AI 开放代码库,代表是 GitHub Copilot;第三阶段是 AI 可以自动写代码并执行,代表是 Cursor 和 Windsurf;第四阶段是 AI 虚拟员工,Devin 开创了一个很好的先例。 """ - # Prepare input + # 准备输入数据 task_input = { "text": sample_text } - # Execute the task + # 执行任务 import asyncio result = asyncio.run(executor.execute(task_input)) - # Print results with proper formatting + # 格式化输出结果 print("\n 文本分析结果:") print("=" * 50) - # Print each step's result with proper formatting + # 打印每个步骤的结果 for step in result.get("execution_path", []): step_id = step["step_id"] result_data = step["result"]