agent-task-executor/config/secure_config.py
zhukang dd797ab5e4 feat: initial commit of agent task executor framework
- Add core task execution framework
- Add LLM integration with DeepSeek
- Add text analysis task implementation
- Add configuration management
- Add tests and documentation
2025-01-14 20:53:09 +08:00

132 lines
4.8 KiB
Python

import os
import json
from pathlib import Path
from base64 import b64encode, b64decode
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from typing import Dict, Any, Optional
class SecureConfig:
def __init__(self, config_path: str = None):
self.config_path = Path(config_path or Path(__file__).parent / "secure.key")
self._ensure_key_exists()
self.fernet = self._get_fernet()
def _ensure_key_exists(self):
"""Ensure encryption key exists, generate if not."""
self.config_path.parent.mkdir(parents=True, exist_ok=True)
if not self.config_path.exists():
# Generate a new encryption key
key = Fernet.generate_key()
with open(self.config_path, "wb") as f:
f.write(key)
# Set restrictive permissions on Windows or Unix
if os.name == 'nt': # Windows
import win32security
import win32api
import ntsecuritycon as con
# Get current user's SID
username = win32api.GetUserName()
sid = win32security.LookupAccountName(None, username)[0]
# Create a new DACL with full access for the current user
dacl = win32security.ACL()
dacl.AddAccessAllowedAce(
win32security.ACL_REVISION,
con.FILE_ALL_ACCESS,
sid
)
# Create and set the security descriptor
security_desc = win32security.SECURITY_DESCRIPTOR()
security_desc.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(
str(self.config_path),
win32security.DACL_SECURITY_INFORMATION,
security_desc
)
else: # Unix
os.chmod(self.config_path, 0o600)
def _get_fernet(self) -> Fernet:
"""Get Fernet instance for encryption/decryption."""
with open(self.config_path, "rb") as f:
key = f.read()
return Fernet(key)
def encrypt_value(self, value: str) -> str:
"""Encrypt a string value."""
if not value:
return None
return self.fernet.encrypt(value.encode()).decode()
def decrypt_value(self, encrypted_value: str) -> Optional[str]:
"""Decrypt an encrypted string value."""
if not encrypted_value:
return None
try:
return self.fernet.decrypt(encrypted_value.encode()).decode()
except Exception:
return None
def save_secure_config(self, config: Dict[str, Any], file_path: str):
"""Save configuration with encrypted sensitive values."""
file_path = Path(file_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
secure_config = {}
# Encrypt sensitive values
if "api_key" in config:
secure_config["api_key"] = self.encrypt_value(config["api_key"])
# Save to file
with open(file_path, "w") as f:
json.dump(secure_config, f, indent=2)
# Set restrictive permissions
if os.name == 'nt': # Windows
import win32security
import win32api
import ntsecuritycon as con
# Get current user's SID
username = win32api.GetUserName()
sid = win32security.LookupAccountName(None, username)[0]
# Create a new DACL with full access for the current user
dacl = win32security.ACL()
dacl.AddAccessAllowedAce(
win32security.ACL_REVISION,
con.FILE_ALL_ACCESS,
sid
)
# Create and set the security descriptor
security_desc = win32security.SECURITY_DESCRIPTOR()
security_desc.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(
str(file_path),
win32security.DACL_SECURITY_INFORMATION,
security_desc
)
else: # Unix
os.chmod(file_path, 0o600)
def load_secure_config(self, file_path: str) -> Dict[str, Any]:
"""Load and decrypt configuration."""
if not Path(file_path).exists():
return {}
with open(file_path, "r") as f:
secure_config = json.load(f)
# Decrypt sensitive values
config = {}
if "api_key" in secure_config:
config["api_key"] = self.decrypt_value(secure_config["api_key"])
return config