- Add core task execution framework - Add LLM integration with DeepSeek - Add text analysis task implementation - Add configuration management - Add tests and documentation
132 lines
4.8 KiB
Python
132 lines
4.8 KiB
Python
import os
|
|
import json
|
|
from pathlib import Path
|
|
from base64 import b64encode, b64decode
|
|
from cryptography.fernet import Fernet
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
from typing import Dict, Any, Optional
|
|
|
|
class SecureConfig:
|
|
def __init__(self, config_path: str = None):
|
|
self.config_path = Path(config_path or Path(__file__).parent / "secure.key")
|
|
self._ensure_key_exists()
|
|
self.fernet = self._get_fernet()
|
|
|
|
def _ensure_key_exists(self):
|
|
"""Ensure encryption key exists, generate if not."""
|
|
self.config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if not self.config_path.exists():
|
|
# Generate a new encryption key
|
|
key = Fernet.generate_key()
|
|
with open(self.config_path, "wb") as f:
|
|
f.write(key)
|
|
# Set restrictive permissions on Windows or Unix
|
|
if os.name == 'nt': # Windows
|
|
import win32security
|
|
import win32api
|
|
import ntsecuritycon as con
|
|
|
|
# Get current user's SID
|
|
username = win32api.GetUserName()
|
|
sid = win32security.LookupAccountName(None, username)[0]
|
|
|
|
# Create a new DACL with full access for the current user
|
|
dacl = win32security.ACL()
|
|
dacl.AddAccessAllowedAce(
|
|
win32security.ACL_REVISION,
|
|
con.FILE_ALL_ACCESS,
|
|
sid
|
|
)
|
|
|
|
# Create and set the security descriptor
|
|
security_desc = win32security.SECURITY_DESCRIPTOR()
|
|
security_desc.SetSecurityDescriptorDacl(1, dacl, 0)
|
|
win32security.SetFileSecurity(
|
|
str(self.config_path),
|
|
win32security.DACL_SECURITY_INFORMATION,
|
|
security_desc
|
|
)
|
|
else: # Unix
|
|
os.chmod(self.config_path, 0o600)
|
|
|
|
def _get_fernet(self) -> Fernet:
|
|
"""Get Fernet instance for encryption/decryption."""
|
|
with open(self.config_path, "rb") as f:
|
|
key = f.read()
|
|
return Fernet(key)
|
|
|
|
def encrypt_value(self, value: str) -> str:
|
|
"""Encrypt a string value."""
|
|
if not value:
|
|
return None
|
|
return self.fernet.encrypt(value.encode()).decode()
|
|
|
|
def decrypt_value(self, encrypted_value: str) -> Optional[str]:
|
|
"""Decrypt an encrypted string value."""
|
|
if not encrypted_value:
|
|
return None
|
|
try:
|
|
return self.fernet.decrypt(encrypted_value.encode()).decode()
|
|
except Exception:
|
|
return None
|
|
|
|
def save_secure_config(self, config: Dict[str, Any], file_path: str):
|
|
"""Save configuration with encrypted sensitive values."""
|
|
file_path = Path(file_path)
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
secure_config = {}
|
|
|
|
# Encrypt sensitive values
|
|
if "api_key" in config:
|
|
secure_config["api_key"] = self.encrypt_value(config["api_key"])
|
|
|
|
# Save to file
|
|
with open(file_path, "w") as f:
|
|
json.dump(secure_config, f, indent=2)
|
|
|
|
# Set restrictive permissions
|
|
if os.name == 'nt': # Windows
|
|
import win32security
|
|
import win32api
|
|
import ntsecuritycon as con
|
|
|
|
# Get current user's SID
|
|
username = win32api.GetUserName()
|
|
sid = win32security.LookupAccountName(None, username)[0]
|
|
|
|
# Create a new DACL with full access for the current user
|
|
dacl = win32security.ACL()
|
|
dacl.AddAccessAllowedAce(
|
|
win32security.ACL_REVISION,
|
|
con.FILE_ALL_ACCESS,
|
|
sid
|
|
)
|
|
|
|
# Create and set the security descriptor
|
|
security_desc = win32security.SECURITY_DESCRIPTOR()
|
|
security_desc.SetSecurityDescriptorDacl(1, dacl, 0)
|
|
win32security.SetFileSecurity(
|
|
str(file_path),
|
|
win32security.DACL_SECURITY_INFORMATION,
|
|
security_desc
|
|
)
|
|
else: # Unix
|
|
os.chmod(file_path, 0o600)
|
|
|
|
def load_secure_config(self, file_path: str) -> Dict[str, Any]:
|
|
"""Load and decrypt configuration."""
|
|
if not Path(file_path).exists():
|
|
return {}
|
|
|
|
with open(file_path, "r") as f:
|
|
secure_config = json.load(f)
|
|
|
|
# Decrypt sensitive values
|
|
config = {}
|
|
if "api_key" in secure_config:
|
|
config["api_key"] = self.decrypt_value(secure_config["api_key"])
|
|
|
|
return config
|