家庭冰箱温度智能优化系统
一、实际应用场景与痛点
应用场景
现代家庭冰箱通常有冷藏室(0-10℃)和冷冻室(-24~-18℃)两个温区。用户通常设置固定温度,但实际上:
- 冰箱内物品存放量随时间变化
- 不同时段电价可能不同(分时电价)
- 家庭成员开关冰箱频率随时间变化
- 季节和环境温度变化影响冰箱能耗
痛点分析
1. 能源浪费:固定温度设置无法适应使用场景变化
2. 保鲜效果差:温度波动大或设置不当影响食物保鲜
3. 电费成本高:高峰时段用电增加电费支出
4. 设备寿命短:频繁启停和过载运行缩短冰箱寿命
5. 用户体验差:需要手动调节,智能化程度低
二、核心逻辑讲解
控制系统架构
采用模糊PID控制 + 强化学习的混合智能控制系统:
1. 数据采集层:收集温度、湿度、开门次数、电量等数据
2. 决策优化层:使用模糊逻辑处理不确定因素,PID进行精确控制,强化学习长期优化
3. 执行控制层:输出最佳温度设定值
4. 反馈学习层:根据保鲜效果和能耗调整控制策略
核心算法原理
模糊输入变量:
1. 冰箱负载率(空载、中等、满载)
2. 环境温度(低温、常温、高温)
3. 使用频率(低频、中频、高频)
4. 电价时段(谷时、平时、峰时)
模糊输出变量:
1. 冷藏室温度设定值
2. 冷冻室温度设定值
3. 压缩机工作模式
强化学习奖励函数:
R = w1*节能效果 + w2*保鲜质量 - w3*温度波动
三、模块化代码实现
项目结构
refrigerator_optimizer/
│
├── main.py # 主程序入口
├── config.py # 配置文件
├── requirements.txt # 依赖包
│
├── sensors/ # 传感器模块
│ ├── __init__.py
│ ├── temperature_sensor.py
│ └── power_sensor.py
│
├── controllers/ # 控制器模块
│ ├── __init__.py
│ ├── fuzzy_controller.py
│ ├── pid_controller.py
│ └── rl_optimizer.py
│
├── models/ # 数据模型
│ ├── __init__.py
│ ├── refrigerator_state.py
│ └── energy_model.py
│
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── data_logger.py
│ └── scheduler.py
│
└── tests/ # 测试文件
└── test_controllers.py
核心代码实现
main.py
#!/usr/bin/env python3
"""
家庭冰箱温度优化系统主程序
基于模糊PID控制和强化学习的智能温度控制系统
"""
import time
import logging
from datetime import datetime
from config import SystemConfig
from models.refrigerator_state import RefrigeratorState
from controllers.fuzzy_controller import FuzzyTemperatureController
from controllers.rl_optimizer import RLOptimizer
from utils.data_logger import DataLogger
from utils.scheduler import TimeScheduler
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RefrigeratorOptimizer:
"""冰箱优化系统主类"""
def __init__(self, config_path='config.yaml'):
"""
初始化优化系统
Args:
config_path: 配置文件路径
"""
self.config = SystemConfig(config_path)
self.state = RefrigeratorState()
# 初始化控制器
self.fuzzy_controller = FuzzyTemperatureController(self.config)
self.rl_optimizer = RLOptimizer(self.config)
self.data_logger = DataLogger(self.config)
self.scheduler = TimeScheduler()
# 系统状态
self.running = False
self.iteration = 0
logger.info("冰箱温度优化系统初始化完成")
def update_sensor_data(self):
"""
更新传感器数据(模拟或真实)
实际应用中这里会连接真实传感器
此处使用模拟数据演示
"""
import random
# 模拟环境温度(随季节和时间变化)
current_hour = datetime.now().hour
base_temp = 25 # 基准温度
if 0 <= current_hour < 6:
env_temp = base_temp - 2 + random.uniform(-1, 1)
elif 6 <= current_hour < 12:
env_temp = base_temp + random.uniform(-1, 1)
elif 12 <= current_hour < 18:
env_temp = base_temp + 3 + random.uniform(-1, 1)
else:
env_temp = base_temp + 1 + random.uniform(-1, 1)
# 更新状态
self.state.update_state(
env_temperature=env_temp,
door_open_count=random.randint(0, 3),
load_percentage=random.uniform(0.3, 0.8),
current_power=random.uniform(50, 150) # 瓦特
)
logger.debug(f"传感器数据更新: 环境温度={env_temp:.1f}°C, 负载={self.state.load_percentage:.1%}")
def optimize_temperature(self):
"""
执行温度优化决策
"""
# 获取当前电价时段
current_time = datetime.now()
price_period = self.scheduler.get_electricity_price_period(current_time)
# 模糊控制决策
fuzzy_settings = self.fuzzy_controller.calculate_optimal_temperature(
env_temp=self.state.env_temperature,
load=self.state.load_percentage,
door_freq=self.state.door_open_frequency,
price_period=price_period
)
# 强化学习优化调整
rl_adjustment = self.rl_optimizer.get_adjustment(
current_state={
'fridge_temp': self.state.fridge_temperature,
'freezer_temp': self.state.freezer_temperature,
'load': self.state.load_percentage,
'time_of_day': current_time.hour
},
fuzzy_action=fuzzy_settings
)
# 应用优化结果
final_settings = {
'fridge_target': fuzzy_settings['fridge_target'] + rl_adjustment.get('fridge_adj', 0),
'freezer_target': fuzzy_settings['freezer_target'] + rl_adjustment.get('freezer_adj', 0),
'compressor_mode': fuzzy_settings['compressor_mode']
}
# 限制温度范围
final_settings['fridge_target'] = max(1, min(8, final_settings['fridge_target']))
final_settings['freezer_target'] = max(-24, min(-16, final_settings['freezer_target']))
self.state.set_target_temperatures(
fridge_target=final_settings['fridge_target'],
freezer_target=final_settings['freezer_target']
)
logger.info(f"优化结果: 冷藏={final_settings['fridge_target']:.1f}°C, "
f"冷冻={final_settings['freezer_target']:.1f}°C, "
f"模式={final_settings['compressor_mode']}")
return final_settings
def calculate_energy_saving(self):
"""
计算节能效果
"""
# 基于理想模型计算节能比例
base_consumption = 1.5 # kWh/天,基准能耗
optimized_consumption = 1.2 # kWh/天,优化后能耗
# 实际计算会根据温度设置、环境温度等调整
saving_percent = ((base_consumption - optimized_consumption) / base_consumption) * 100
return saving_percent
def run(self, duration_hours=24):
"""
运行优化系统
Args:
duration_hours: 运行时长(小时)
"""
self.running = True
start_time = time.time()
duration_seconds = duration_hours * 3600
logger.info(f"开始运行冰箱优化系统,运行时长: {duration_hours}小时")
try:
while self.running and (time.time() - start_time) < duration_seconds:
self.iteration += 1
# 更新传感器数据
self.update_sensor_data()
# 执行优化
optimal_settings = self.optimize_temperature()
# 记录数据
self.data_logger.log_iteration(
iteration=self.iteration,
state=self.state,
settings=optimal_settings
)
# 每10分钟保存一次数据
if self.iteration % 10 == 0:
self.data_logger.save_to_file()
# 显示当前状态
if self.iteration % 5 == 0:
self.display_status()
# 等待下次优化(实际应用中根据需求调整间隔)
time.sleep(60) # 1分钟
except KeyboardInterrupt:
logger.info("用户中断运行")
except Exception as e:
logger.error(f"运行出错: {e}")
finally:
self.shutdown()
def display_status(self):
"""显示当前状态"""
saving = self.calculate_energy_saving()
print("\n" + "="*50)
print(f"迭代次数: {self.iteration}")
print(f"环境温度: {self.state.env_temperature:.1f}°C")
print(f"冷藏室当前温度: {self.state.fridge_temperature:.1f}°C")
print(f"冷冻室当前温度: {self.state.freezer_temperature:.1f}°C")
print(f"冷藏室目标温度: {self.state.fridge_target_temp:.1f}°C")
print(f"冷冻室目标温度: {self.state.freezer_target_temp:.1f}°C")
print(f"冰箱负载: {self.state.load_percentage:.1%}")
print(f"预估节能: {saving:.1f}%")
print("="*50)
def shutdown(self):
"""关闭系统"""
self.running = False
self.data_logger.save_to_file()
logger.info("系统已关闭,数据已保存")
if __name__ == "__main__":
# 创建并运行优化器
optimizer = RefrigeratorOptimizer()
# 运行24小时(或按需调整)
optimizer.run(duration_hours=24)
controllers/fuzzy_controller.py
"""
模糊控制器模块
实现基于模糊逻辑的温度决策
"""
import numpy as np
from typing import Dict, Tuple
import logging
logger = logging.getLogger(__name__)
class FuzzyTemperatureController:
"""模糊温度控制器"""
def __init__(self, config):
"""
初始化模糊控制器
Args:
config: 系统配置
"""
self.config = config
# 定义模糊集合的隶属度函数参数
self.setup_membership_functions()
# 模糊规则库
self.setup_fuzzy_rules()
logger.info("模糊控制器初始化完成")
def setup_membership_functions(self):
"""设置隶属度函数"""
# 环境温度模糊集:低温、常温、高温
self.env_temp_sets = {
'low': {'type': 'trapezoid', 'params': [-10, -5, 15, 20]},
'medium': {'type': 'triangle', 'params': [15, 22, 30]},
'high': {'type': 'trapezoid', 'params': [25, 30, 50, 50]}
}
# 负载率模糊集:空载、中等、满载
self.load_sets = {
'empty': {'type': 'trapezoid', 'params': [0, 0, 20, 40]},
'medium': {'type': 'triangle', 'params': [30, 50, 70]},
'full': {'type': 'trapezoid', 'params': [60, 80, 100, 100]}
}
# 开门频率模糊集:低频、中频、高频
self.door_freq_sets = {
'low': {'type': 'trapezoid', 'params': [0, 0, 3, 5]},
'medium': {'type': 'triangle', 'params': [3, 8, 12]},
'high': {'type': 'trapezoid', 'params': [10, 15, 50, 50]}
}
# 冷藏室温度输出模糊集
self.fridge_output_sets = {
'very_low': {'type': 'triangle', 'params': [0, 1, 2]},
'low': {'type': 'triangle', 'params': [1, 2, 4]},
'medium': {'type': 'triangle', 'params': [3, 5, 7]},
'high': {'type': 'triangle', 'params': [6, 8, 10]},
'very_high': {'type': 'triangle', 'params': [8, 9, 10]}
}
def setup_fuzzy_rules(self):
"""设置模糊规则库"""
# 规则格式: (env, load, door_freq, price) -> (fridge_temp, freezer_temp, compressor)
self.rules = [
# 环境低温、负载轻、低频使用 -> 较高温度省电
(('low', 'empty', 'low', 'peak'), ('high', 'very_high', 'eco')),
(('low', 'empty', 'low', 'off_peak'), ('very_high', 'very_high', 'eco')),
# 环境高温、负载重、高频使用 -> 较低温度保证保鲜
(('high', 'full', 'high', 'peak'), ('low', 'low', 'normal')),
(('high', 'full', 'high', 'off_peak'), ('medium', 'medium', 'normal')),
# 默认规则
(('medium', 'medium', 'medium', 'normal'), ('medium', 'medium', 'normal')),
]
def calculate_membership(self, value, mf_params):
"""
计算隶属度
Args:
value: 输入值
mf_params: 隶属度函数参数
Returns:
隶属度值
"""
mf_type = mf_params['type']
params = mf_params['params']
if mf_type == 'triangle':
a, b, c = params
if a <= value <= b:
return (value - a) / (b - a) if b != a else 1.0
elif b <= value <= c:
return (c - value) / (c - b) if c != b else 1.0
else:
return 0.0
elif mf_type == 'trapezoid':
a, b, c, d = params
if a <= value <= b:
return (value - a) / (b - a) if b != a else 1.0
elif b <= value <= c:
return 1.0
elif c <= value <= d:
return (d - value) / (d - c) if d != c else 1.0
else:
return 0.0
else:
return 0.0
def fuzzify(self, env_temp, load, door_freq, price_period):
"""
模糊化输入
Returns:
各输入变量的隶属度字典
"""
# 环境温度模糊化
env_degrees = {}
for set_name, mf_params in self.env_temp_sets.items():
env_degrees[set_name] = self.calculate_membership(env_temp, mf_params)
# 负载模糊化
load_degrees = {}
for set_name, mf_params in self.load_sets.items():
load_degrees[set_name] = self.calculate_membership(load, mf_params)
# 开门频率模糊化
door_degrees = {}
for set_name, mf_params in self.door_freq_sets.items():
door_degrees[set_name] = self.calculate_membership(door_freq, mf_params)
return {
'env': env_degrees,
'load': load_degrees,
'door': door_degrees
}
def apply_rules(self, fuzzy_inputs, price_period):
"""
应用模糊规则
Returns:
输出变量的激活强度
"""
rule_strengths = []
for rule in self.rules:
conditions, conclusions = rule
# 检查价格时段是否匹配
if conditions[3] != price_period and conditions[3] != 'normal':
continue
# 计算规则强度(取最小值)
strength = 1.0
strength = min(strength, fuzzy_inputs['env'].get(conditions[0], 0))
strength = min(strength, fuzzy_inputs['load'].get(conditions[1], 0))
strength = min(strength, fuzzy_inputs['door'].get(conditions[2], 0))
if strength > 0:
rule_strengths.append((strength, conclusions))
return rule_strengths
def defuzzify(self, activated_outputs):
"""
去模糊化(使用重心法)
Args:
activated_outputs: 激活的输出集合
Returns:
精确的输出值
"""
if not activated_outputs:
return 5.0, -18.0, 'normal' # 默认值
# 计算冷藏室温度(简化示例)
fridge_values = []
freezer_values = []
compressor_modes = []
weights = []
for strength, (fridge_set, freezer_set, compressor) in activated_outputs:
# 获取输出模糊集的中心值
fridge_center = self.get_output_center(fridge_set, 'fridge')
freezer_center = self.get_output_center(freezer_set, 'freezer')
fridge_values.append(fridge_center)
freezer_values.append(freezer_center)
compressor_modes.append(compressor)
weights.append(strength)
# 加权平均
if sum(weights) > 0:
fridge_output = np.average(fridge_values, weights=weights)
freezer_output = np.average(freezer_values, weights=weights)
# 选择最常见的压缩机模式
from collections import Counter
compressor_output = Counter(compressor_modes).most_common(1)[0][0]
else:
fridge_output, freezer_output, compressor_output = 5.0, -18.0, 'normal'
return fridge_output, freezer_output, compressor_output
def get_output_center(self, output_set, output_type):
"""获取输出模糊集的中心值"""
if output_type == 'fridge':
sets = self.fridge_output_sets
else:
# 冷冻室输出集(简化)
return -20.0 # 默认值
params = sets.get(output_set, sets['medium'])['params']
if sets[output_set]['type'] == 'triangle':
a, b, c = params
return b # 三角形中心
else:
a, b, c, d = params
return (b + c) / 2 # 梯形中心
def calculate_optimal_temperature(self, env_temp, load, door_freq, price_period):
"""
计算最佳温度设置
Args:
env_temp: 环境温度
load: 负载率 (0-1)
door_freq: 开门频率 (次/小时)
price_period: 电价时段 ('peak', 'off_peak', 'normal')
Returns:
包含温度设置和压缩机模式的字典
"""
# 1. 模糊化
fuzzy_inputs = self.fuzzify(env_temp, load*100, door_freq, price_period)
# 2. 应用规则
activated_rules = self.apply_rules(fuzzy_inputs, price_period)
# 3. 去模糊化
fridge_temp, freezer_temp, compressor_mode = self.defuzzify(activated_rules)
result = {
'fridge_target': float(fridge_temp),
'freezer_target': float(freezer_temp),
'compressor_mode': compressor_mode,
'fuzzy_inputs': fuzzy_inputs,
'activated_rules_count': len(activated_rules)
}
logger.debug(f"模糊控制结果: {result}")
return result
controllers/rl_optimizer.py
"""
强化学习优化器
基于Q-learning的长期优化
"""
import numpy as np
import random
from collections import defaultdict
import pickle
import os
import logging
logger = logging.getLogger(__name__)
class RLOptimizer:
"""强化学习优化器"""
def __init__(self, config):
"""
初始化强化学习优化器
Args:
config: 系统配置
"""
self.config = config
self.q_table = defaultdict(lambda: np.zeros(5)) # 5种动作
self.learning_rate = 0.1
self.discount_factor = 0.9
self.epsilon = 0.1 # 探索率
self.epsilon_decay = 0.995
self.min_epsilon = 0.01
# 动作空间:温度调整幅度
self.actions = [-2, -1, 0, 1, 2] # 摄氏度
# 状态历史
self.state_history = []
self.action_history = []
self.reward_history = []
# 尝试加载已有Q表
self.load_q_table()
logger.info("强化学习优化器初始化完成")
def get_state_key(self, state):
"""
将连续状态离散化为状态键
Args:
state: 状态字典
Returns:
离散化状态键
"""
# 离散化冷藏室温度
fridge_temp = state['fridge_temp']
fridge_bin = int(np.clip((fridge_temp - 1) / 2, 0, 3)) # 0-3
# 离散化负载
load = state['load']
load_bin = int(load * 3) # 0-2
# 离散化时间
hour = state['time_of_day']
time_bin = hour // 6 # 0-3
return (fridge_bin, load_bin, time_bin)
def choose_action(self, state_key, fuzzy_action):
"""
选择动作(ε-贪婪策略)
Args:
state_key: 状态键
fuzzy_action: 模糊控制建议的动作(基准温度)
Returns:
动作索引
"""
if random.random() < self.epsilon:
# 探索:随机选择动作
return random.randint(0, len(self.actions) - 1)
else:
# 利用:选择Q值最高的动作
q_values = self.q_table[state_key]
return int(np.argmax(q_values))
def get_adjustment(self, current_state, fuzzy_action):
"""
获取温度调整量
Args:
current_state: 当前状态
fuzzy_action: 模糊控制输出
Returns:
调整量字典
"""
# 获取状态键
state_key = self.get_state_key(current_state)
# 选择动作
action_idx = self.choose_action(state_key, fuzzy_action)
adjustment = self.actions[action_idx]
# 记录状态和动作
self.state_history.append(state_key)
self.action_history.append(action_idx)
return {
'fridge_adj': adjustment * 0.5, # 调整幅度减半
'freezer_adj': adjustment * 0.3, # 冷冻室调整较小
'action_idx': action_idx
}
def calculate_reward(self, old_state, new_state, action_idx):
"""
计算奖励值
奖励函数:R = w1*节能效果 + w2*保鲜质量 - w3*温度波动
Returns:
奖励值
"""
# 简化的奖励计算
# 实际应用中需要更复杂的模型
# 节能奖励:温度提高(在合理范围内)有助于节能
temp_change = new_state['fridge_temp'] - old_state['fridge_temp']
if 0 < temp_change <= 2:
energy_reward = 1.0
elif temp_change > 2:
energy_reward = 0.5
else:
energy_reward = -0.5
# 保鲜惩罚:温度过高影响保鲜
if new_state['fridge_temp'] > 7:
freshness_penalty = -2.0
elif new_state['fridge_temp'] < 2:
freshne
如果你觉得这个工具好用,欢迎关注我!