agent-task-executor/sample_task.py
zhukang dd797ab5e4 feat: initial commit of agent task executor framework
- Add core task execution framework
- Add LLM integration with DeepSeek
- Add text analysis task implementation
- Add configuration management
- Add tests and documentation
2025-01-14 20:53:09 +08:00

111 lines
3.7 KiB
Python

from task_executor import TaskExecutor, TaskStatus
from typing import Dict, Any
import time
class SampleTaskExecutor(TaskExecutor):
def __init__(self):
super().__init__()
self.task_steps = [
{
"id": "input_validation",
"name": "Validate Input",
"required_info": ["input_data"]
},
{
"id": "resource_check",
"name": "Check Resources",
"required_info": ["system_resources"]
},
{
"id": "task_processing",
"name": "Process Task",
"required_info": ["processed_data"]
},
{
"id": "result_validation",
"name": "Validate Results",
"required_info": ["validation_criteria"]
}
]
def validate_input(self, input_data: Dict[str, Any]) -> bool:
"""Validate specific input requirements for the sample task."""
required_fields = ["input_data", "validation_criteria"]
return all(field in input_data for field in required_fields)
def execute_step(self, step_id: str, step_data: Dict[str, Any]) -> bool:
"""Execute a specific step of the sample task."""
try:
# Simulate step execution
time.sleep(1) # Simulate work being done
step_state = {
"step_id": step_id,
"status": TaskStatus.IN_PROGRESS,
"required_info": step_data.get("required_info", []),
"available_info": [],
"missing_info": [],
"resources_used": {
"time": time.time() - self.start_time,
"memory": "100MB" # Simulated memory usage
}
}
# Update execution state
self.current_step = {
"id": step_id,
"name": step_data["name"],
"status": TaskStatus.COMPLETED.value,
"progress": 100
}
return True
except Exception as e:
self.logger.error(f"Step {step_id} failed: {str(e)}")
return False
def execute(self, task_input: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the sample task with all its steps."""
try:
if not self.validate_input(task_input):
raise ValueError("Invalid task input")
self.logger.info(f"Starting sample task execution: {self.task_id}")
# Execute each step in sequence
for step in self.task_steps:
if not self.execute_step(step["id"], step):
raise Exception(f"Step {step['id']} failed")
if len(self.execution_path) % self.CHECKPOINT_INTERVAL == 0:
self.create_checkpoint()
return self.get_status_update()
except Exception as e:
self.logger.error(f"Sample task failed: {str(e)}")
return self.get_status_update()
def main():
# Example usage
executor = SampleTaskExecutor()
# Sample task input
task_input = {
"input_data": "sample data",
"validation_criteria": ["criteria1", "criteria2"],
"system_resources": {
"memory": "1GB",
"cpu": "2 cores"
}
}
# Execute the task
result = executor.execute(task_input)
print(f"Task execution completed with status: {result['current_step']['status']}")
print(f"Execution path: {result['execution_path']}")
if __name__ == "__main__":
main()