from task_executor import TaskExecutor, TaskStatus from typing import Dict, Any import time class SampleTaskExecutor(TaskExecutor): def __init__(self): super().__init__() self.task_steps = [ { "id": "input_validation", "name": "Validate Input", "required_info": ["input_data"] }, { "id": "resource_check", "name": "Check Resources", "required_info": ["system_resources"] }, { "id": "task_processing", "name": "Process Task", "required_info": ["processed_data"] }, { "id": "result_validation", "name": "Validate Results", "required_info": ["validation_criteria"] } ] def validate_input(self, input_data: Dict[str, Any]) -> bool: """Validate specific input requirements for the sample task.""" required_fields = ["input_data", "validation_criteria"] return all(field in input_data for field in required_fields) def execute_step(self, step_id: str, step_data: Dict[str, Any]) -> bool: """Execute a specific step of the sample task.""" try: # Simulate step execution time.sleep(1) # Simulate work being done step_state = { "step_id": step_id, "status": TaskStatus.IN_PROGRESS, "required_info": step_data.get("required_info", []), "available_info": [], "missing_info": [], "resources_used": { "time": time.time() - self.start_time, "memory": "100MB" # Simulated memory usage } } # Update execution state self.current_step = { "id": step_id, "name": step_data["name"], "status": TaskStatus.COMPLETED.value, "progress": 100 } return True except Exception as e: self.logger.error(f"Step {step_id} failed: {str(e)}") return False def execute(self, task_input: Dict[str, Any]) -> Dict[str, Any]: """Execute the sample task with all its steps.""" try: if not self.validate_input(task_input): raise ValueError("Invalid task input") self.logger.info(f"Starting sample task execution: {self.task_id}") # Execute each step in sequence for step in self.task_steps: if not self.execute_step(step["id"], step): raise Exception(f"Step {step['id']} failed") if len(self.execution_path) % self.CHECKPOINT_INTERVAL == 0: self.create_checkpoint() return self.get_status_update() except Exception as e: self.logger.error(f"Sample task failed: {str(e)}") return self.get_status_update() def main(): # Example usage executor = SampleTaskExecutor() # Sample task input task_input = { "input_data": "sample data", "validation_criteria": ["criteria1", "criteria2"], "system_resources": { "memory": "1GB", "cpu": "2 cores" } } # Execute the task result = executor.execute(task_input) print(f"Task execution completed with status: {result['current_step']['status']}") print(f"Execution path: {result['execution_path']}") if __name__ == "__main__": main()