智能邮件分类系统
一、实际应用场景与痛点
应用场景
现代职场人士每天收到大量邮件:
- 工作邮件(60-70%):项目沟通、会议邀请、工作报告、任务分配
- 生活邮件(20-30%):个人购物、社交通知、账单、订阅信息
- 垃圾邮件(10-20%):广告推销、诈骗信息、恶意链接
- 紧急邮件:需要立即处理的重要邮件
- 待办邮件:需要稍后处理的邮件
痛点分析
1. 信息过载:每天处理大量邮件,效率低下
2. 分类混乱:工作与生活邮件混杂,难以区分
3. 重要信息遗漏:紧急邮件被淹没
4. 时间浪费:手动分类消耗大量时间
5. 安全风险:垃圾邮件中隐藏恶意内容
6. 个性化需求:不同用户分类标准不同
7. 多语言处理:处理中文、英文、混合语言邮件
二、核心逻辑讲解
系统架构
采用"多层智能分类+自适应学习"的混合架构:
数据接入层 → 预处理层 → 特征提取层 → 多级分类层 → 后处理层 → 执行层
│ │ │ │ │ │
IMAP/POP3 文本清洗 TF-IDF/NN 贝叶斯/ 规则过滤 移动/标记
邮件客户端 编码转换 Word2Vec SVM/深度学习 优先级排序 自动回复
语言检测 BERT嵌入 强化学习 垃圾隔离
核心算法原理
1. 特征工程
- 文本特征:TF-IDF、词袋模型
- 语义特征:Word2Vec、BERT嵌入
- 结构特征:发件人、主题、时间
- 行为特征:用户历史操作
2. 多级分类策略
第一级:垃圾邮件识别(高召回率)
第二级:工作/生活分类(高准确率)
第三级:子类别细分(个性化)
第四级:优先级判断(紧急程度)
3. 自适应学习机制
- 在线学习:用户反馈实时调整
- 增量学习:新样本逐步更新模型
- 迁移学习:跨用户知识迁移
- 联邦学习:隐私保护学习
优化目标函数
准确率 = 正确分类数 / 总数
召回率 = 正确识别的正样本 / 实际正样本数
F1分数 = 2 * (准确率 * 召回率) / (准确率 + 召回率)
用户满意度 = α*准确率 + β*时效性 + γ*个性化程度
三、模块化代码实现
项目结构
smart_email_classifier/
│
├── main.py # 主程序入口
├── config.py # 配置文件
├── requirements.txt # 依赖包
├── email_samples/ # 邮件样本数据
│ ├── work/
│ ├── personal/
│ └── spam/
│
├── core/ # 核心算法模块
│ ├── __init__.py
│ ├── email_processor.py # 邮件预处理
│ ├── feature_extractor.py # 特征提取
│ ├── classifier_engine.py # 分类引擎
│ ├── adaptive_learner.py # 自适应学习
│ └── priority_analyzer.py # 优先级分析
│
├── models/ # 数据模型
│ ├── __init__.py
│ ├── email_data.py
│ ├── classification_result.py
│ └── user_profile.py
│
├── email_client/ # 邮件客户端接口
│ ├── __init__.py
│ ├── imap_client.py
│ ├── outlook_integration.py
│ └── local_email_reader.py
│
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── text_utils.py
│ ├── language_detector.py
│ ├── performance_evaluator.py
│ └── visualization.py
│
├── database/ # 数据存储
│ ├── __init__.py
│ ├── email_database.py
│ └── model_storage.py
│
├── tests/ # 测试文件
│ ├── __init__.py
│ ├── test_classifier.py
│ └── test_email_processor.py
│
└── web_interface/ # Web管理界面
├── app.py
├── templates/
└── static/
核心代码实现
main.py
#!/usr/bin/env python3
"""
智能邮件分类系统主程序
基于机器学习和自然语言处理的智能邮件分类
"""
import os
import sys
import time
import logging
import json
import threading
import schedule
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from queue import Queue
import numpy as np
import pandas as pd
# 导入自定义模块
from config import SystemConfig
from models.email_data import EmailData
from models.classification_result import ClassificationResult
from models.user_profile import UserProfile
from core.email_processor import EmailProcessor
from core.feature_extractor import FeatureExtractor
from core.classifier_engine import ClassifierEngine
from core.adaptive_learner import AdaptiveLearner
from core.priority_analyzer import PriorityAnalyzer
from email_client.imap_client import EmailClient
from utils.performance_evaluator import PerformanceEvaluator
from utils.visualization import ResultVisualizer
from database.email_database import EmailDatabase
from database.model_storage import ModelStorage
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('email_classifier.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class SmartEmailClassifier:
"""智能邮件分类器主类"""
def __init__(self, config_path: str = 'config.yaml'):
"""
初始化智能邮件分类器
Args:
config_path: 配置文件路径
"""
# 加载配置
self.config = SystemConfig(config_path)
# 初始化数据库
self.database = EmailDatabase(self.config.DATABASE_PATH)
self.model_storage = ModelStorage(self.config.MODEL_STORAGE_PATH)
# 初始化用户配置
self.user_profile = UserProfile()
# 初始化核心模块
self.email_processor = EmailProcessor(self.config)
self.feature_extractor = FeatureExtractor(self.config)
self.classifier_engine = ClassifierEngine(self.config)
self.adaptive_learner = AdaptiveLearner(self.config)
self.priority_analyzer = PriorityAnalyzer(self.config)
# 初始化邮件客户端
self.email_client = EmailClient(self.config)
# 初始化工具模块
self.performance_evaluator = PerformanceEvaluator()
self.visualizer = ResultVisualizer()
# 消息队列
self.email_queue = Queue()
self.result_queue = Queue()
# 线程控制
self.running = False
self.fetch_thread = None
self.process_thread = None
self.action_thread = None
# 统计数据
self.stats = {
'total_processed': 0,
'classified_work': 0,
'classified_personal': 0,
'classified_spam': 0,
'classified_urgent': 0,
'accuracy_work': 0.0,
'accuracy_personal': 0.0,
'accuracy_spam': 0.0,
'avg_processing_time': 0.0,
'user_feedback_positive': 0,
'user_feedback_negative': 0
}
# 加载模型
self._load_models()
# 加载用户习惯
self._load_user_habits()
logger.info("智能邮件分类系统初始化完成")
def _load_models(self):
"""加载预训练模型"""
try:
# 尝试加载现有模型
models = self.model_storage.load_models()
if models:
# 加载分类器
self.classifier_engine.load_models(models)
logger.info("预训练模型加载成功")
else:
# 训练新模型
self._train_initial_models()
logger.info("初始模型训练完成")
except Exception as e:
logger.error(f"加载模型失败: {e}")
# 使用默认模型
self._train_default_models()
def _train_initial_models(self):
"""训练初始模型"""
try:
# 从数据库加载训练数据
training_data = self.database.get_training_data(limit=1000)
if training_data and len(training_data) >= self.config.MIN_TRAINING_SAMPLES:
# 预处理数据
processed_data = []
labels = []
for email in training_data:
processed = self.email_processor.process_email(email)
processed_data.append(processed)
labels.append(email['category'])
# 提取特征
features = self.feature_extractor.extract_batch(processed_data)
# 训练模型
self.classifier_engine.train(features, labels)
# 保存模型
models = self.classifier_engine.get_models()
self.model_storage.save_models(models)
logger.info(f"使用 {len(training_data)} 条数据训练初始模型")
else:
# 使用内置样本训练
self._train_with_sample_data()
except Exception as e:
logger.error(f"训练初始模型失败: {e}")
self._train_default_models()
def _train_with_sample_data(self):
"""使用样本数据训练"""
try:
# 加载样本数据
sample_data = self._load_sample_data()
if sample_data:
emails = []
labels = []
for category, data_list in sample_data.items():
for data in data_list:
emails.append(data)
labels.append(category)
# 预处理
processed_emails = []
for email in emails:
processed = self.email_processor.process_email(email)
processed_emails.append(processed)
# 提取特征
features = self.feature_extractor.extract_batch(processed_emails)
# 训练模型
self.classifier_engine.train(features, labels)
logger.info(f"使用样本数据训练模型: {len(emails)} 条样本")
else:
self._train_default_models()
except Exception as e:
logger.error(f"使用样本数据训练失败: {e}")
self._train_default_models()
def _load_sample_data(self) -> Dict:
"""加载样本数据"""
# 这里加载内置的样本邮件
# 实际应用中可以从文件或数据库加载
sample_data = {
'work': [
{
'subject': '项目会议邀请',
'body': '请参加明天上午10点的项目进度会议',
'sender': 'manager@company.com',
'recipient': 'user@company.com',
'date': '2024-01-15 09:00:00',
'has_attachment': False
},
{
'subject': '季度报告提交',
'body': '请在本周五前提交季度工作报告',
'sender': 'hr@company.com',
'recipient': 'user@company.com',
'date': '2024-01-15 14:30:00',
'has_attachment': True
}
],
'personal': [
{
'subject': '周末聚餐邀请',
'body': '这周末有空一起吃饭吗?',
'sender': 'friend@personal.com',
'recipient': 'user@personal.com',
'date': '2024-01-15 19:00:00',
'has_attachment': False
},
{
'subject': '亚马逊订单确认',
'body': '您的订单已确认,预计明天送达',
'sender': 'orders@amazon.com',
'recipient': 'user@personal.com',
'date': '2024-01-15 16:20:00',
'has_attachment': False
}
],
'spam': [
{
'subject': '限时优惠!立即购买!',
'body': '特别优惠!点击链接获取折扣',
'sender': 'promotion@spam.com',
'recipient': 'user@personal.com',
'date': '2024-01-15 03:15:00',
'has_attachment': False
},
{
'subject': '紧急通知:账户异常',
'body': '您的账户存在异常,请立即验证',
'sender': 'security@fakebank.com',
'recipient': 'user@personal.com',
'date': '2024-01-15 11:45:00',
'has_attachment': False
}
]
}
return sample_data
def _train_default_models(self):
"""训练默认模型(基于规则)"""
# 创建基于规则的默认分类器
logger.info("训练基于规则的默认模型")
# 这里可以添加一些基本规则
# 实际实现中,可以训练一个简单的模型
def _load_user_habits(self):
"""加载用户习惯"""
try:
habits = self.database.get_user_habits(self.config.USER_ID)
if habits:
self.user_profile.load_from_dict(habits)
logger.info("用户习惯加载成功")
except Exception as e:
logger.warning(f"加载用户习惯失败: {e}")
def _fetch_email_thread(self):
"""邮件获取线程"""
logger.info("邮件获取线程启动")
while self.running:
try:
# 检查邮件
new_emails = self.email_client.fetch_new_emails()
for email in new_emails:
# 放入处理队列
self.email_queue.put(email)
logger.debug(f"获取到新邮件: {email.get('subject', '无主题')}")
# 定时获取
time.sleep(self.config.FETCH_INTERVAL)
except Exception as e:
logger.error(f"获取邮件失败: {e}")
time.sleep(30) # 出错等待30秒
def _process_email_thread(self):
"""邮件处理线程"""
logger.info("邮件处理线程启动")
while self.running:
try:
if not self.email_queue.empty():
email = self.email_queue.get_nowait()
# 记录开始时间
start_time = time.time()
# 处理邮件
result = self.process_single_email(email)
# 计算处理时间
processing_time = time.time() - start_time
# 更新统计
self.stats['total_processed'] += 1
self.stats['avg_processing_time'] = (
0.9 * self.stats['avg_processing_time'] +
0.1 * processing_time
)
# 放入结果队列
self.result_queue.put(result)
# 记录到数据库
self._save_email_result(email, result)
else:
time.sleep(0.1) # 队列为空时短暂等待
except Exception as e:
logger.error(f"处理邮件失败: {e}")
time.sleep(1)
def _action_thread(self):
"""执行动作线程(移动邮件、标记等)"""
logger.info("执行动作线程启动")
while self.running:
try:
if not self.result_queue.empty():
result = self.result_queue.get_nowait()
# 执行分类动作
self._execute_classification_action(result)
else:
time.sleep(0.1)
except Exception as e:
logger.error(f"执行动作失败: {e}")
time.sleep(1)
def process_single_email(self, email_data: Dict) -> ClassificationResult:
"""
处理单封邮件
Args:
email_data: 原始邮件数据
Returns:
分类结果
"""
try:
# 1. 预处理邮件
processed_email = self.email_processor.process_email(email_data)
# 2. 提取特征
features = self.feature_extractor.extract(processed_email)
# 3. 分类预测
classification = self.classifier_engine.predict(features)
# 4. 优先级分析
priority = self.priority_analyzer.analyze_priority(
processed_email, classification
)
# 5. 创建结果对象
result = ClassificationResult(
email_id=email_data.get('id', ''),
subject=processed_email.subject,
sender=processed_email.sender,
date=processed_email.date,
predicted_category=classification['category'],
predicted_subcategory=classification.get('subcategory', ''),
confidence=classification['confidence'],
priority_level=priority['level'],
priority_reason=priority['reason'],
features=features,
raw_email=email_data
)
# 6. 应用用户个性化规则
self._apply_user_rules(result)
# 7. 更新统计
self._update_classification_stats(result)
logger.info(f"邮件分类完成: [{result.predicted_category}] "
f"{result.subject[:50]}... "
f"置信度: {result.confidence:.2f}")
return result
except Exception as e:
logger.error(f"处理邮件失败: {e}")
# 返回默认分类结果
return ClassificationResult(
email_id=email_data.get('id', ''),
subject=email_data.get('subject', ''),
sender=email_data.get('sender', ''),
date=email_data.get('date', datetime.now()),
predicted_category='unknown',
confidence=0.0,
priority_level='normal',
priority_reason='处理失败',
raw_email=email_data
)
def _apply_user_rules(self, result: ClassificationResult):
"""应用用户个性化规则"""
# 检查用户自定义规则
user_rules = self.user_profile.get_rules()
for rule in user_rules:
if self._rule_matches(rule, result):
# 应用规则覆盖
if rule.get('override_category'):
result.predicted_category = rule['override_category']
result.confidence = 1.0 # 用户规则置信度高
result.applied_rule = rule['name']
logger.debug(f"应用用户规则: {rule['name']}")
break
def _rule_matches(self, rule: Dict, result: ClassificationResult) -> bool:
"""检查规则是否匹配"""
conditions = rule.get('conditions', [])
for condition in conditions:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
# 获取字段值
if field == 'sender':
field_value = result.sender
elif field == 'subject':
field_value = result.subject
elif field == 'category':
field_value = result.predicted_category
else:
continue
# 应用操作符
if operator == 'contains':
if value.lower() not in field_value.lower():
return False
elif operator == 'equals':
if field_value != value:
return False
elif operator == 'starts_with':
if not field_value.startswith(value):
return False
elif operator == 'ends_with':
if not field_value.endswith(value):
return False
return True
def _update_classification_stats(self, result: ClassificationResult):
"""更新分类统计"""
category = result.predicted_category
if category == 'work':
self.stats['classified_work'] += 1
elif category == 'personal':
self.stats['classified_personal'] += 1
elif category == 'spam':
self.stats['classified_spam'] += 1
if result.priority_level == 'urgent':
self.stats['classified_urgent'] += 1
def _execute_classification_action(self, result: ClassificationResult):
"""执行分类动作"""
try:
# 根据分类结果执行相应动作
if result.predicted_category == 'spam':
# 垃圾邮件处理
if self.config.AUTO_DELETE_SPAM:
self.email_client.delete_email(result.email_id)
logger.info(f"删除垃圾邮件: {result.subject[:50]}...")
else:
self.email_client.move_to_folder(
result.email_id,
self.config.SPAM_FOLDER
)
logger.info(f"移动到垃圾邮件: {result.subject[:50]}...")
elif result.predicted_category == 'work':
# 工作邮件处理
folder = self.config.WORK_FOLDER
if result.predicted_subcategory:
folder = f"{folder}/{result.predicted_subcategory}"
self.email_client.move_to_folder(result.email_id, folder)
# 标记重要邮件
if result.priority_level == 'urgent':
self.email_client.mark_as_important(result.email_id)
logger.info(f"标记为重要工作邮件: {result.subject[:50]}...")
else:
logger.info(f"移动到工作文件夹: {result.subject[:50]}...")
el
如果你觉得这个工具好用,欢迎关注我!