智能温室环境调控系统
实际应用场景描述
在现代农业种植中,温室大棚是重要的生产设施。然而,温室内环境受外界天气影响很大,特别是温度、湿度和光照。传统的温室管理依赖人工经验,需要农民根据天气预报和经验手动调节遮阳网、通风口等设备,效率低下且难以实现精确控制。
痛点分析
1. 劳动力密集:需要人工频繁检查调节
2. 响应延迟:人工调节响应慢,易错过最佳时机
3. 精度不足:凭经验调节,难以实现最优控制
4. 能耗浪费:无法精确控制,造成能源浪费
5. 作物应激:环境突变导致作物生长受影响
6. 数据缺失:缺乏系统的环境数据记录和分析
核心逻辑
1. 天气预测:获取未来24小时天气预报
2. 环境监测:实时监测温室内外环境参数
3. 智能决策:基于模糊控制和预测模型自动决策
4. 设备控制:自动控制遮阳网、通风口、风机等设备
5. 反馈优化:根据实际效果优化控制策略
6. 异常预警:提前预警极端天气风险
系统架构
greenhouse_control_system/
├── app.py # 主应用程序
├── config.py # 系统配置文件
├── requirements.txt # 依赖包列表
├── README.md # 系统说明文档
├── controllers/
│ ├── weather_predictor.py # 天气预报控制器
│ ├── environment_controller.py # 环境控制器
│ ├── fuzzy_controller.py # 模糊逻辑控制器
│ └── energy_optimizer.py # 能耗优化器
├── sensors/
│ ├── temperature_sensor.py # 温度传感器
│ ├── humidity_sensor.py # 湿度传感器
│ ├── light_sensor.py # 光照传感器
│ └── co2_sensor.py # CO2传感器
├── actuators/
│ ├── sunshade_control.py # 遮阳网控制
│ ├── ventilator_control.py # 通风口控制
│ ├── fan_control.py # 风机控制
│ └── heater_control.py # 加热器控制
├── models/
│ ├── crop_model.py # 作物生长模型
│ ├── weather_model.py # 天气预测模型
│ └── energy_model.py # 能耗模型
├── utils/
│ ├── data_processor.py # 数据处理工具
│ └── logger.py # 日志记录器
├── static/ # 静态资源
│ └── dashboard.html # 监控面板
└── data/ # 数据存储
├── weather_data/ # 天气数据
└── control_logs/ # 控制日志
核心代码实现
1. 配置文件 (config.py)
"""
智能温室环境调控系统配置文件
基于智能控制原理的环境参数设置
"""
import json
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional, Any
from enum import Enum
from datetime import time, datetime
import pytz
class CropType(Enum):
"""作物类型枚举"""
TOMATO = "tomato" # 番茄
CUCUMBER = "cucumber" # 黄瓜
PEPPER = "pepper" # 辣椒
LETTUCE = "lettuce" # 生菜
STRAWBERRY = "strawberry" # 草莓
FLOWER = "flower" # 花卉
HERB = "herb" # 香草
class WeatherCondition(Enum):
"""天气条件枚举"""
SUNNY = "sunny" # 晴天
CLOUDY = "cloudy" # 多云
PARTLY_CLOUDY = "partly_cloudy" # 少云
OVERCAST = "overcast" # 阴天
RAIN = "rain" # 雨天
SNOW = "snow" # 雪天
FOG = "fog" # 雾天
WINDY = "windy" # 大风
@dataclass
class CropRequirements:
"""作物生长需求参数"""
crop_type: CropType
optimal_temp_day: Tuple[float, float] # 日间最佳温度范围(°C)
optimal_temp_night: Tuple[float, float] # 夜间最佳温度范围
optimal_humidity: Tuple[float, float] # 最佳湿度范围(%)
optimal_light: Tuple[float, float] # 最佳光照范围(lux)
optimal_co2: Tuple[float, float] # 最佳CO2浓度(ppm)
# 生长阶段参数
growth_stages: Dict[str, Dict] # 不同生长阶段的需求
max_temp_tolerance: float = 40.0 # 最高耐受温度
min_temp_tolerance: float = 5.0 # 最低耐受温度
max_light_tolerance: float = 100000.0 # 最大光照耐受
def get_stage_requirements(self, stage: str) -> Dict:
"""获取特定生长阶段的需求"""
return self.growth_stages.get(stage, {
'temp_day': self.optimal_temp_day,
'temp_night': self.optimal_temp_night,
'humidity': self.optimal_humidity
})
@dataclass
class GreenhouseSpecs:
"""温室规格参数"""
# 温室尺寸
length: float = 50.0 # 长度(m)
width: float = 10.0 # 宽度(m)
height: float = 3.0 # 高度(m)
volume: float = 1500.0 # 体积(m³)
# 设备规格
sunshade_area: float = 500.0 # 遮阳网面积(m²)
sunshade_percent: Tuple[float, float] = (0.0, 100.0) # 遮阳率范围
vent_area: float = 20.0 # 通风口面积(m²)
vent_percent: Tuple[float, float] = (0.0, 100.0) # 开度范围
fan_capacity: float = 5000.0 # 风机容量(m³/h)
heater_power: float = 20.0 # 加热器功率(kW)
# 材料参数
thermal_conductivity: float = 2.5 # 热传导系数(W/m²·K)
solar_transmittance: float = 0.7 # 太阳光透射率
heat_capacity: float = 1200.0 # 热容量(J/kg·K)
def calculate_ventilation_rate(self, vent_open_percent: float,
wind_speed: float = 0.0) -> float:
"""计算通风率"""
effective_area = self.vent_area * (vent_open_percent / 100.0)
natural_vent = effective_area * wind_speed * 0.3 # 自然通风
forced_vent = self.fan_capacity * 0.7 # 强制通风比例
return natural_vent + forced_vent
@dataclass
class ControlParameters:
"""控制参数配置"""
# 温度控制参数
temp_control_range: Tuple[float, float] = (5.0, 40.0) # 可控温度范围
temp_hysteresis: float = 1.0 # 温度迟滞(°C)
heating_start_temp: float = 15.0 # 加热启动温度
cooling_start_temp: float = 28.0 # 降温启动温度
# 遮阳控制参数
light_control_range: Tuple[float, float] = (0, 100000) # 可控光照范围
sunshade_adjust_threshold: float = 5000.0 # 遮阳调整阈值(lux)
# 通风控制参数
humidity_control_range: Tuple[float, float] = (30.0, 90.0) # 湿度控制范围
co2_control_range: Tuple[float, float] = (300, 2000) # CO2控制范围
# 控制周期参数
sensor_read_interval: int = 60 # 传感器读取间隔(秒)
control_update_interval: int = 300 # 控制更新间隔(秒)
weather_update_interval: int = 1800 # 天气更新间隔(秒)
# 预测参数
forecast_horizon: int = 24 # 预测时域(小时)
control_horizon: int = 6 # 控制时域(小时)
# 安全参数
max_temp_alarm: float = 40.0 # 最高温度报警
min_temp_alarm: float = 0.0 # 最低温度报警
max_humidity_alarm: float = 95.0 # 最高湿度报警
min_humidity_alarm: float = 20.0 # 最低湿度报警
class GreenhouseConfig:
"""温室系统配置主类"""
def __init__(self, config_file: str = None):
"""
初始化配置
参数:
config_file: 配置文件路径
"""
# 作物需求数据库
self.crop_requirements = self._init_crop_requirements()
# 温室规格
self.greenhouse_specs = GreenhouseSpecs()
# 控制参数
self.control_params = ControlParameters()
# 当前作物设置
self.current_crop = CropType.TOMATO
self.current_crop_stage = "flowering" # 开花期
self.crop_density = 2.5 # 种植密度(株/m²)
# 地理位置参数
self.location_params = {
'latitude': 39.9042, # 纬度
'longitude': 116.4074, # 经度
'altitude': 50.0, # 海拔(m)
'timezone': 'Asia/Shanghai',
'sunrise_offset': 30, # 日出后延迟(分钟)
'sunset_offset': -30 # 日落前提前(分钟)
}
# 能源价格参数
self.energy_prices = {
'electricity': 0.8, # 电价(元/kWh)
'heating_fuel': 3.5, # 加热燃料(元/kg)
'co2_cost': 2.0 # CO2成本(元/kg)
}
# 控制模式
self.control_modes = {
'auto': '全自动',
'semi_auto': '半自动',
'manual': '手动',
'energy_save': '节能',
'production': '生产'
}
self.current_mode = 'auto'
# 加载自定义配置
if config_file:
self.load_config(config_file)
def _init_crop_requirements(self) -> Dict[CropType, CropRequirements]:
"""初始化作物需求数据库"""
requirements = {}
# 番茄
requirements[CropType.TOMATO] = CropRequirements(
crop_type=CropType.TOMATO,
optimal_temp_day=(20.0, 28.0),
optimal_temp_night=(15.0, 18.0),
optimal_humidity=(60.0, 80.0),
optimal_light=(20000, 60000),
optimal_co2=(800, 1200),
growth_stages={
'seedling': {
'temp_day': (20.0, 25.0),
'temp_night': (16.0, 18.0),
'humidity': (70.0, 85.0)
},
'vegetative': {
'temp_day': (22.0, 28.0),
'temp_night': (16.0, 20.0),
'humidity': (60.0, 75.0)
},
'flowering': {
'temp_day': (20.0, 28.0),
'temp_night': (15.0, 18.0),
'humidity': (60.0, 70.0)
},
'fruiting': {
'temp_day': (20.0, 28.0),
'temp_night': (15.0, 18.0),
'humidity': (60.0, 70.0)
}
}
)
# 黄瓜
requirements[CropType.CUCUMBER] = CropRequirements(
crop_type=CropType.CUCUMBER,
optimal_temp_day=(22.0, 30.0),
optimal_temp_night=(16.0, 20.0),
optimal_humidity=(70.0, 90.0),
optimal_light=(25000, 60000),
optimal_co2=(800, 1200),
growth_stages={
'seedling': {'temp_day': (22.0, 28.0), 'temp_night': (18.0, 22.0)},
'vegetative': {'temp_day': (24.0, 30.0), 'temp_night': (18.0, 22.0)},
'fruiting': {'temp_day': (22.0, 30.0), 'temp_night': (16.0, 20.0)}
}
)
# 生菜
requirements[CropType.LETTUCE] = CropRequirements(
crop_type=CropType.LETTUCE,
optimal_temp_day=(15.0, 22.0),
optimal_temp_night=(10.0, 15.0),
optimal_humidity=(60.0, 80.0),
optimal_light=(15000, 30000),
optimal_co2=(600, 1000),
growth_stages={
'seedling': {'temp_day': (15.0, 20.0), 'temp_night': (10.0, 15.0)},
'growing': {'temp_day': (15.0, 22.0), 'temp_night': (10.0, 15.0)}
}
)
# 草莓
requirements[CropType.STRAWBERRY] = CropRequirements(
crop_type=CropType.STRAWBERRY,
optimal_temp_day=(18.0, 25.0),
optimal_temp_night=(8.0, 12.0),
optimal_humidity=(60.0, 75.0),
optimal_light=(20000, 40000),
optimal_co2=(700, 1000),
growth_stages={
'vegetative': {'temp_day': (18.0, 25.0), 'temp_night': (10.0, 15.0)},
'flowering': {'temp_day': (18.0, 22.0), 'temp_night': (8.0, 12.0)},
'fruiting': {'temp_day': (18.0, 25.0), 'temp_night': (8.0, 12.0)}
}
)
return requirements
def load_config(self, config_file: str):
"""
从文件加载配置
参数:
config_file: 配置文件路径
"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config_data = json.load(f)
# 更新温室规格
if 'greenhouse_specs' in config_data:
specs = config_data['greenhouse_specs']
self.greenhouse_specs.length = specs.get('length', 50.0)
self.greenhouse_specs.width = specs.get('width', 10.0)
self.greenhouse_specs.height = specs.get('height', 3.0)
self.greenhouse_specs.volume = self.greenhouse_specs.length * \
self.greenhouse_specs.width * \
self.greenhouse_specs.height
# 更新控制参数
if 'control_params' in config_data:
params = config_data['control_params']
self.control_params.temp_hysteresis = params.get('temp_hysteresis', 1.0)
self.control_params.heating_start_temp = params.get('heating_start_temp', 15.0)
self.control_params.cooling_start_temp = params.get('cooling_start_temp', 28.0)
# 更新作物设置
if 'crop_settings' in config_data:
crop_settings = config_data['crop_settings']
crop_type = crop_settings.get('crop_type', 'tomato')
self.current_crop = CropType(crop_type)
self.current_crop_stage = crop_settings.get('crop_stage', 'vegetative')
self.crop_density = crop_settings.get('crop_density', 2.5)
# 更新地理位置
if 'location' in config_data:
location = config_data['location']
self.location_params['latitude'] = location.get('latitude', 39.9042)
self.location_params['longitude'] = location.get('longitude', 116.4074)
self.location_params['altitude'] = location.get('altitude', 50.0)
self.location_params['timezone'] = location.get('timezone', 'Asia/Shanghai')
print(f"配置已从 {config_file} 加载")
except FileNotFoundError:
print(f"配置文件 {config_file} 未找到,使用默认配置")
except json.JSONDecodeError as e:
print(f"配置文件解析错误: {e}")
def save_config(self, config_file: str):
"""
保存配置到文件
参数:
config_file: 配置文件路径
"""
config_data = {
'greenhouse_specs': {
'length': self.greenhouse_specs.length,
'width': self.greenhouse_specs.width,
'height': self.greenhouse_specs.height,
'volume': self.greenhouse_specs.volume
},
'control_params': {
'temp_hysteresis': self.control_params.temp_hysteresis,
'heating_start_temp': self.control_params.heating_start_temp,
'cooling_start_temp': self.control_params.cooling_start_temp,
'sensor_read_interval': self.control_params.sensor_read_interval
},
'crop_settings': {
'crop_type': self.current_crop.value,
'crop_stage': self.current_crop_stage,
'crop_density': self.crop_density
},
'location': self.location_params,
'energy_prices': self.energy_prices
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, indent=2, ensure_ascii=False)
print(f"配置已保存到 {config_file}")
def validate_config(self) -> Tuple[bool, List[str]]:
"""
验证配置有效性
返回:
(是否有效, 错误信息列表)
"""
errors = []
# 验证温室参数
if self.greenhouse_specs.length <= 0 or self.greenhouse_specs.width <= 0:
errors.append("温室尺寸必须为正数")
if self.greenhouse_specs.volume <= 0:
errors.append("温室体积必须为正数")
# 验证控制参数
if self.control_params.temp_control_range[0] >= self.control_params.temp_control_range[1]:
errors.append("温度控制范围下限必须小于上限")
if self.control_params.heating_start_temp >= self.control_params.cooling_start_temp:
errors.append("加热启动温度必须小于降温启动温度")
# 验证作物需求
if self.current_crop not in self.crop_requirements:
errors.append(f"不支持的作物类型: {self.current_crop.value}")
else:
crop_req = self.get_crop_requirements()
if crop_req.optimal_temp_day[0] >= crop_req.optimal_temp_day[1]:
errors.append("作物日间温度范围无效")
# 验证地理位置
if not (-90 <= self.location_params['latitude'] <= 90):
errors.append("纬度必须在-90到90度之间")
if not (-180 <= self.location_params['longitude'] <= 180):
errors.append("经度必须在-180到180度之间")
return len(errors) == 0, errors
def get_crop_requirements(self) -> Optional[CropRequirements]:
"""
获取当前作物的生长需求
返回:
作物需求参数
"""
return self.crop_requirements.get(self.current_crop)
def get_current_stage_requirements(self) -> Dict:
"""
获取当前生长阶段的需求
返回:
当前阶段的需求参数
"""
crop_req = self.get_crop_requirements()
if not crop_req:
return {}
return crop_req.get_stage_requirements(self.current_crop_stage)
def get_day_night_temp_range(self) -> Tuple[Tuple[float, float], Tuple[float, float]]:
"""
获取昼夜温度范围
返回:
(日间范围, 夜间范围)
"""
crop_req = self.get_crop_requirements()
if not crop_req:
return ((15, 30), (10, 20))
stage_req = self.get_current_stage_requirements()
if 'temp_day' in stage_req and 'temp_night' in stage_req:
return (tuple(stage_req['temp_day']), tuple(stage_req['temp_night']))
return (crop_req.optimal_temp_day, crop_req.optimal_temp_night)
def calculate_sun_position(self, dt: datetime = None) -> Tuple[float, float]:
"""
计算太阳位置
参数:
dt: 日期时间,None则使用当前时间
返回:
(太阳高度角, 方位角) 单位: 度
"""
if dt is None:
dt = datetime.now(pytz.timezone(self.location_params['timezone']))
import math
# 简化计算
lat = math.radians(self.location_params['latitude'])
lon = math.radians(self.location_params['longitude'])
# 儒略日
n = dt.timetuple().tm_yday
B = (n - 1) * 360 / 365
# 赤纬角
declination = 23.45 * math.sin(math.radians(B))
# 时角
LST = dt.hour + dt.minute/60 + dt.second/3600
H = 15 * (LST - 12)
# 太阳高度角
sin_alt = (math.sin(lat) * math.sin(math.radians(declination)) +
math.cos(lat) * math.cos(math.radians(declination)) * math.cos(math.radians(H)))
altitude = math.degrees(math.asin(max(min(sin_alt, 1), -1)))
# 太阳方位角
if math.cos(math.radians(altitude)) == 0:
azimuth = 0
else:
sin_azi = math.cos(math.radians(declination)) * math.sin(math.radians(H)) / math.cos(math.radians(altitude))
cos_azi = (math.sin(math.radians(declination)) - math.sin(lat) * sin_alt) / (math.cos(lat) * math.cos(math.radians(altitude)))
azimuth = math.degrees(math.atan2(sin_azi, cos_azi))
if azimuth < 0:
azimuth += 360
return altitude, azimuth
def is_daytime(self, dt: datetime = None) -> bool:
"""
判断是否为白天
参数:
dt: 日期时间
返回:
是否为白天
"""
altitude, _ = self.calculate_sun_position(dt)
return altitude > 0
def calculate_sunshine_hours(self, date: datetime = None) -> float:
"""
计算日照时长
参数:
date: 日期
返回:
日照时长(小时)
"""
if date is None:
date = datetime.now()
# 简化计算,实际应基于地理位置和日期
month = date.month
if 3 <= month <= 5: # 春季
return 12.0
elif 6 <= month <= 8: # 夏季
return 14.0
elif 9 <= month <= 11: # 秋季
return 10.0
else: # 冬季
return 8.0
2. 天气预报控制器 (controllers/weather_predictor.py)
"""
天气预报控制器 - 获取和处理天气数据
基于天气API的智能预测
"""
import requests
import json
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum
import time
from datetime import datetime, timedelta
import pytz
import numpy as np
from scipy import stats
@dataclass
class WeatherForecast:
"""天气预报数据"""
timestamp: datetime
temperature: float # 温度(°C)
feels_like: float # 体感温度(°C)
humidity: float # 湿度(%)
pressure: float # 气压(hPa)
wind_speed: float # 风速(m/s)
wind_direction: float # 风向(度)
cloud_cover: float # 云量(%)
precipitation: float # 降水量(mm)
precipitation_prob: float # 降水概率(%)
uv_index: float # UV指数
visibility: float # 能见度(km)
weather_code: int # 天气代码
description: str # 天气描述
icon: str # 天气图标
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'timestamp': self.timestamp.isoformat(),
'temperature': self.temperature,
'feels_like': self.feels_like,
'humidity': self.humidity,
'pressure': self.pressure,
'wind_speed': self.wind_speed,
'wind_direction':
如果你觉得这个工具好用,欢迎关注我!