- Add core task execution framework - Add LLM integration with DeepSeek - Add text analysis task implementation - Add configuration management - Add tests and documentation
111 lines
3.7 KiB
Python
111 lines
3.7 KiB
Python
from task_executor import TaskExecutor, TaskStatus
|
|
from typing import Dict, Any
|
|
import time
|
|
|
|
class SampleTaskExecutor(TaskExecutor):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.task_steps = [
|
|
{
|
|
"id": "input_validation",
|
|
"name": "Validate Input",
|
|
"required_info": ["input_data"]
|
|
},
|
|
{
|
|
"id": "resource_check",
|
|
"name": "Check Resources",
|
|
"required_info": ["system_resources"]
|
|
},
|
|
{
|
|
"id": "task_processing",
|
|
"name": "Process Task",
|
|
"required_info": ["processed_data"]
|
|
},
|
|
{
|
|
"id": "result_validation",
|
|
"name": "Validate Results",
|
|
"required_info": ["validation_criteria"]
|
|
}
|
|
]
|
|
|
|
def validate_input(self, input_data: Dict[str, Any]) -> bool:
|
|
"""Validate specific input requirements for the sample task."""
|
|
required_fields = ["input_data", "validation_criteria"]
|
|
return all(field in input_data for field in required_fields)
|
|
|
|
def execute_step(self, step_id: str, step_data: Dict[str, Any]) -> bool:
|
|
"""Execute a specific step of the sample task."""
|
|
try:
|
|
# Simulate step execution
|
|
time.sleep(1) # Simulate work being done
|
|
|
|
step_state = {
|
|
"step_id": step_id,
|
|
"status": TaskStatus.IN_PROGRESS,
|
|
"required_info": step_data.get("required_info", []),
|
|
"available_info": [],
|
|
"missing_info": [],
|
|
"resources_used": {
|
|
"time": time.time() - self.start_time,
|
|
"memory": "100MB" # Simulated memory usage
|
|
}
|
|
}
|
|
|
|
# Update execution state
|
|
self.current_step = {
|
|
"id": step_id,
|
|
"name": step_data["name"],
|
|
"status": TaskStatus.COMPLETED.value,
|
|
"progress": 100
|
|
}
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Step {step_id} failed: {str(e)}")
|
|
return False
|
|
|
|
def execute(self, task_input: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute the sample task with all its steps."""
|
|
try:
|
|
if not self.validate_input(task_input):
|
|
raise ValueError("Invalid task input")
|
|
|
|
self.logger.info(f"Starting sample task execution: {self.task_id}")
|
|
|
|
# Execute each step in sequence
|
|
for step in self.task_steps:
|
|
if not self.execute_step(step["id"], step):
|
|
raise Exception(f"Step {step['id']} failed")
|
|
|
|
if len(self.execution_path) % self.CHECKPOINT_INTERVAL == 0:
|
|
self.create_checkpoint()
|
|
|
|
return self.get_status_update()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Sample task failed: {str(e)}")
|
|
return self.get_status_update()
|
|
|
|
def main():
|
|
# Example usage
|
|
executor = SampleTaskExecutor()
|
|
|
|
# Sample task input
|
|
task_input = {
|
|
"input_data": "sample data",
|
|
"validation_criteria": ["criteria1", "criteria2"],
|
|
"system_resources": {
|
|
"memory": "1GB",
|
|
"cpu": "2 cores"
|
|
}
|
|
}
|
|
|
|
# Execute the task
|
|
result = executor.execute(task_input)
|
|
print(f"Task execution completed with status: {result['current_step']['status']}")
|
|
print(f"Execution path: {result['execution_path']}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|