poc/project/llmclipboard/llmclipboard/document_processor.py

348 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""文档处理器模块"""
import os
import re
import jieba
import jieba.analyse
import json
import sys
from datetime import datetime
from collections import Counter
from pathlib import Path
import logging
from .ai_processor import AIProcessor
# 配置日志记录
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 确保 stdout 使用 UTF-8 编码
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
# 创建控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
# 将处理器添加到记录器
logger.addHandler(console_handler)
class DocumentProcessor:
def __init__(self, config_path=None, ai_config=None):
# 确定配置文件路径
if config_path:
self.config_path = config_path
else:
# 如果没有提供配置路径,使用默认路径
if getattr(sys, 'frozen', False):
# 打包环境 - 使用可执行文件所在目录
config_dir = os.path.dirname(sys.executable)
else:
# 开发环境 - 使用项目根目录
config_dir = os.path.dirname(os.path.dirname(__file__))
self.config_path = os.path.join(config_dir, 'categories.json')
logger.info(f"使用分类配置文件路径: {self.config_path}")
self.load_categories()
# 初始化AI处理器
self.ai_config = ai_config or {}
self.ai_processor = AIProcessor(self.ai_config)
self.ai_enabled = self.ai_config and self.ai_config.get('model_type', 'none') != 'none'
if self.ai_enabled:
self.ai_processor.initialize()
def load_categories(self):
"""加载预定义的分类规则"""
try:
logger.info(f"尝试从 {self.config_path} 加载分类规则")
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
self.categories = json.load(f)
logger.info(f"成功加载分类规则,包含 {len(self.categories)} 个分类")
for category, keywords in self.categories.items():
logger.debug(f"分类 '{category}' 包含 {len(keywords)} 个关键词")
else:
logger.warning(f"分类规则文件 {self.config_path} 不存在,使用默认分类")
self.categories = {
"技术": ["编程", "开发", "代码", "框架", "算法", "数据库", "API"],
"学习": ["教程", "课程", "学习", "笔记", "知识", "总结"],
"工作": ["会议", "项目", "计划", "报告", "任务", "进度"],
"想法": ["想法", "创意", "思考", "灵感", "观点", "建议"],
"资源": ["工具", "资源", "链接", "参考", "文档", "书籍"]
}
self._save_categories()
except Exception as e:
logger.error(f"加载分类规则时出错: {e}")
import traceback
traceback.print_exc()
# 使用默认分类
self.categories = {
"技术": ["编程", "开发", "代码", "框架", "算法", "数据库", "API"],
"学习": ["教程", "课程", "学习", "笔记", "知识", "总结"],
"工作": ["会议", "项目", "计划", "报告", "任务", "进度"],
"想法": ["想法", "创意", "思考", "灵感", "观点", "建议"],
"资源": ["工具", "资源", "链接", "参考", "文档", "书籍"]
}
def _save_categories(self):
"""保存分类规则"""
try:
logger.info(f"保存分类规则到 {self.config_path}")
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.categories, f, ensure_ascii=False, indent=2)
logger.info(f"分类规则已保存,包含 {len(self.categories)} 个分类")
except Exception as e:
logger.error(f"保存分类规则时出错: {e}")
import traceback
traceback.print_exc()
def update_categories(self, new_categories):
"""更新分类规则"""
try:
logger.info(f"更新分类规则,新分类包含 {len(new_categories)} 个分类")
self.categories = new_categories
self._save_categories()
logger.info("分类规则已更新")
except Exception as e:
logger.error(f"更新分类规则时出错: {e}")
import traceback
traceback.print_exc()
def update_ai_config(self, ai_config):
"""更新AI配置"""
self.ai_config = ai_config or {}
self.ai_processor = AIProcessor(self.ai_config)
self.ai_enabled = self.ai_config and self.ai_config.get('model_type', 'none') != 'none'
if self.ai_enabled:
self.ai_processor.initialize()
def extract_title(self, content):
"""从内容中提取标题"""
# 尝试使用AI提取标题
if self.ai_enabled:
try:
ai_title = self.ai_processor.extract_title(content)
if ai_title:
logger.info(f"AI提取的标题: {ai_title}")
return ai_title
except Exception as e:
logger.error(f"AI提取标题失败: {e}")
# 如果AI提取失败或未启用使用传统方法
if not content or content.isspace():
logger.debug("收到空内容,返回默认标题")
logger.debug("收到空内容或纯空白内容")
return "无标题文档"
logger.debug("提取标题的原始内容长度: %d", len(content))
logger.debug("提取标题的原始内容前100个字符: %s", content[:100])
logger.debug("提取标题的原始内容包含的换行符数量: %d", content.count('\n'))
# 清理内容
content = content.strip()
logger.debug("清理后的内容开头100个字符: %s", content[:100])
# 按行分割内容
lines = content.splitlines()
logger.debug("分割后的行数: %d", len(lines))
# 获取第一行
first_line = lines[0].strip()
logger.debug("第一行内容: %s", first_line)
logger.debug("第一行长度: %d", len(first_line))
# 如果第一行是代码块标记,尝试使用下一个非空行
if first_line.startswith('```'):
logger.debug("第一行是代码块标记,寻找下一个有效行")
for line in lines[1:]:
line = line.strip()
if line and not line.startswith('```'): # 找到第一个非代码块标记的非空行
first_line = line
logger.debug("使用非代码块行作为标题: %s", first_line)
break
if first_line.startswith('```'): # 如果没有找到其他行,使用默认标题
logger.debug("未找到合适的标题行,使用默认标题")
return "代码片段"
# 如果第一行以数字编号开头(如 "1." 或 "1、"),尝试使用下一行
elif re.match(r'^\d+[.、]', first_line):
logger.debug("第一行以数字编号开头,寻找下一个有效行")
for line in lines[1:]:
line = line.strip()
if line: # 找到第一个非空行
first_line = line
logger.debug("使用非编号行作为标题: %s", first_line)
break
# 如果第一行以 Markdown 列表标记开头(如 "- " 或 "* "),移除标记
elif first_line.startswith(('- ', '* ')):
logger.debug("移除 Markdown 列表标记")
first_line = first_line[2:].strip()
# 如果第一行以常见标记词开头(如"标题:"),尝试使用下一行
elif any(first_line.startswith(mark) for mark in ["标题:", "标题:", "题目:", "题目:", "主题:", "主题:", "概要:", "概要:"]):
logger.debug("第一行包含标记词,寻找下一个非空行")
for line in lines[1:]:
line = line.strip()
if line: # 找到第一个非空行
first_line = line
logger.debug("使用下一个非空行作为标题: %s", first_line)
break
# 移除 Markdown 格式标记
first_line = re.sub(r'\*\*|\*|`|__', '', first_line)
# 如果包含标点符号,尝试提取第一句话
if any(char in first_line for char in ',。!?;,!?;'):
logger.debug("标题包含标点符号,尝试提取第一句话")
sentences = re.split(r'([,。!?;,!?;])', first_line)
logger.debug("按标点分割后得到 %d 个部分", len(sentences))
logger.debug("分割后的句子: %s", sentences)
# 获取第一句话(包括标点符号)
first_sentence = sentences[0]
if len(sentences) > 1:
first_sentence += sentences[1] # 添加标点符号
logger.debug("使用第一个句子: %s", first_sentence)
first_line = first_sentence
# 如果标题仍然太长,进行截断
if len(first_line) > 50:
logger.debug("标题过长(%d字符),进行截断", len(first_line))
first_line = first_line[:47] + "..."
logger.debug("截断后的标题: %s", first_line)
logger.debug("最终标题: %s", first_line)
logger.debug("最终标题长度: %d", len(first_line))
return first_line
def extract_tags(self, content):
"""提取内容的标签"""
# 尝试使用AI生成标签
if self.ai_enabled:
try:
ai_tags = self.ai_processor.generate_tags(content)
if ai_tags:
logger.info(f"AI生成的标签: {ai_tags}")
return ai_tags
except Exception as e:
logger.error(f"AI生成标签失败: {e}")
# 如果AI生成失败或未启用使用传统方法
# 使用结巴分词提取关键词作为标签
tags = set(jieba.analyse.extract_tags(content, topK=5))
# 添加基于规则的标签
for category, keywords in self.categories.items():
if any(keyword in content for keyword in keywords):
tags.add(category)
return list(tags)
def categorize(self, content, tags):
"""对内容进行分类"""
# 尝试使用AI进行分类
if self.ai_enabled:
try:
ai_category = self.ai_processor.categorize(content)
if ai_category:
logger.info(f"AI分类结果: {ai_category}")
return ai_category
except Exception as e:
logger.error(f"AI分类失败: {e}")
# 如果AI分类失败或未启用使用传统方法
# 基于标签和内容关键词进行分类
category_scores = Counter()
# 根据标签评分
for tag in tags:
for category, keywords in self.categories.items():
if tag in keywords or tag == category:
category_scores[category] += 2
# 根据内容关键词评分
for category, keywords in self.categories.items():
for keyword in keywords:
if keyword in content:
category_scores[category] += 1
# 返回得分最高的分类,如果没有匹配则返回"未分类"
return category_scores.most_common(1)[0][0] if category_scores else "未分类"
def process_document(self, content, base_save_path):
"""处理文档内容并返回保存信息"""
# 提取标题
title = self.extract_title(content)
# 清理内容
content = self.clean_content(content)
# 提取标签
tags = self.extract_tags(content)
# 确定分类
category = self.categorize(content, tags)
# 生成文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_title = re.sub(r'[\\/:*?"<>|]', '_', title) # 移除不安全的文件名字符
filename = f"{timestamp}_{safe_title[:30]}.md"
# 构建保存路径
save_dir = os.path.join(base_save_path, category)
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, filename)
# 构建完整的markdown内容
full_content = f"""---
title: {title}
date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
tags: {', '.join(tags)}
category: {category}
---
{content}
"""
return {
'path': save_path,
'content': full_content,
'title': title,
'tags': tags,
'category': category
}
def clean_content(self, content):
"""清理内容,移除多余的换行符和空白"""
if not content or content.isspace():
logger.debug("收到空内容或纯空白内容")
return ""
logger.debug("原始内容长度: %d", len(content))
logger.debug("原始内容前100个字符: %s", content[:100])
logger.debug("原始内容包含的换行符数量: %d", content.count('\n'))
# 移除文件名
content = re.sub(r'captured_text_\d{8}_\d{6}', '', content)
logger.debug("已移除文件名")
logger.debug("移除文件名后的内容长度: %d", len(content))
# 清理内容,保持原有的换行格式
lines = content.splitlines()
cleaned_lines = []
for line in lines:
line = line.strip()
if line or cleaned_lines: # 如果当前行非空或已经有内容,保留这一行
cleaned_lines.append(line)
# 使用原始换行符重新组合内容
content = '\n'.join(cleaned_lines)
logger.debug("清理后内容长度: %d", len(content))
logger.debug("清理后内容包含的换行符数量: %d", content.count('\n'))
logger.debug("清理后内容的前100个字符: %s", content[:100])
return content