用Python和Keras复现论文:LSTM-AutoEncoder检测教室CO2异常(附完整代码)

张开发
2026/4/15 6:16:38 15 分钟阅读

分享文章

用Python和Keras复现论文:LSTM-AutoEncoder检测教室CO2异常(附完整代码)
用Python和Keras实现LSTM-AutoEncoder的教室CO2异常检测实战当教室里的CO2浓度超过1000ppm时学生的注意力会显著下降——这个发现促使新西兰的研究团队开发了SKOMOBO监测设备。但如何从海量传感器数据中识别异常值本文将带你用Python和Keras完整复现一篇IEEE论文中的LSTM-AutoEncoder模型实现99.5%准确率的CO2异常检测系统。1. 环境准备与数据加载首先需要配置合适的开发环境。建议使用Python 3.8版本并创建独立的虚拟环境conda create -n co2_detection python3.8 conda activate co2_detection pip install tensorflow2.8 keras pandas matplotlib numpy scikit-learn论文中使用的是新西兰学校2018年的CO2监测数据我们可以从开放数据平台获取类似的室内空气质量数据集作为替代import pandas as pd # 加载模拟数据集 data pd.read_csv(classroom_co2.csv, parse_dates[timestamp]) print(f数据集时间范围{data[timestamp].min()} 至 {data[timestamp].max()}) print(f共 {len(data)} 条记录CO2均值{data[co2].mean():.1f}ppm) # 可视化原始数据 import matplotlib.pyplot as plt plt.figure(figsize(12, 6)) plt.plot(data[timestamp], data[co2], label原始CO2读数) plt.axhline(y1000, colorr, linestyle--, label异常阈值) plt.xlabel(时间); plt.ylabel(CO2浓度(ppm)) plt.title(教室CO2浓度时间序列); plt.legend() plt.show()典型的数据预处理步骤包括缺失值处理用前后值插补或标记为0重复值删除相同时间戳的记录只保留一条数据标准化使用Z-score归一化序列分割按固定时间窗口生成训练样本from sklearn.preprocessing import StandardScaler # 数据清洗 data data.drop_duplicates(subset[timestamp]) data[co2] data[co2].fillna(0) # 数据标准化 scaler StandardScaler() data[co2_scaled] scaler.fit_transform(data[[co2]]) # 创建时间窗口序列 def create_sequences(values, window_size10): sequences [] for i in range(len(values) - window_size): sequences.append(values[i:iwindow_size]) return np.array(sequences) X create_sequences(data[co2_scaled].values) print(f生成序列形状{X.shape}) # (样本数, 时间步长, 特征数)2. LSTM-AutoEncoder模型架构论文提出的混合模型结合了LSTM处理时间序列的优势和AutoEncoder的特征压缩能力。以下是使用Keras的实现from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, LSTM, RepeatVector, TimeDistributed, Dense # 模型参数 timesteps 10 # 时间窗口大小 features 1 # 单变量CO2数据 latent_dim 16 # 潜在空间维度 # 编码器 inputs Input(shape(timesteps, features)) encoded LSTM(32, activationtanh, return_sequencesTrue)(inputs) encoded LSTM(latent_dim, activationtanh)(encoded) # 解码器 decoded RepeatVector(timesteps)(encoded) decoded LSTM(latent_dim, activationtanh, return_sequencesTrue)(decoded) decoded LSTM(32, activationtanh, return_sequencesTrue)(decoded) outputs TimeDistributed(Dense(features))(decoded) # 完整模型 model Model(inputs, outputs) model.compile(optimizeradam, lossmae) model.summary()模型训练的关键技巧使用MAE平均绝对误差作为损失函数添加Dropout层防止过拟合论文中使用0.2的dropout率仅使用正常数据训练CO2 1000ppm# 分离正常数据训练集 normal_data data[data[co2] 1000][co2_scaled].values X_train create_sequences(normal_data) # 训练参数 history model.fit( X_train, X_train, epochs50, batch_size64, validation_split0.1, callbacks[ tf.keras.callbacks.EarlyStopping(patience5, monitorval_loss) ] ) # 训练过程可视化 plt.plot(history.history[loss], label训练损失) plt.plot(history.history[val_loss], label验证损失) plt.xlabel(Epoch); plt.ylabel(MAE损失) plt.title(模型训练过程); plt.legend() plt.show()3. 异常检测与阈值确定模型训练完成后我们需要确定异常判定的阈值# 计算训练集的重构误差 train_pred model.predict(X_train) train_mae np.mean(np.abs(train_pred - X_train), axis1) # 确定阈值取最大值或统计百分位 threshold np.max(train_mae) # 或者使用threshold np.percentile(train_mae, 99) print(f异常检测阈值{threshold:.4f}) # 可视化误差分布 plt.hist(train_mae, bins50) plt.axvline(threshold, colorr, linestyle--) plt.xlabel(重构误差); plt.ylabel(频数) plt.title(训练集重构误差分布); plt.show()在实际应用中我们可以用以下函数检测新数据中的异常def detect_anomalies(model, data, threshold): sequences create_sequences(data) pred model.predict(sequences) mae np.mean(np.abs(pred - sequences), axis1) anomalies mae threshold return anomalies, mae # 在全数据集上测试 full_sequences create_sequences(data[co2_scaled].values) anomalies, error detect_anomalies(model, data[co2_scaled].values, threshold) # 可视化检测结果 plt.figure(figsize(12, 6)) plt.plot(data[timestamp][:len(error)], error, label重构误差) plt.plot(data[timestamp][:len(error)][anomalies], error[anomalies], ro, label检测到的异常) plt.axhline(threshold, colorr, linestyle--, label阈值) plt.xlabel(时间); plt.ylabel(重构误差) plt.title(CO2异常检测结果); plt.legend() plt.show()4. 模型优化与性能提升根据论文中的实验我们可以从以下几个维度优化模型性能4.1 超参数调优比较不同参数组合的效果参数测试值最佳值准确率影响LSTM单元数16, 32, 64321.2%潜在空间维度8, 16, 32160.8%时间窗口大小5, 10, 15102.1%批量大小32, 64, 128640.5%# 使用Keras Tuner进行自动调参 import keras_tuner as kt def build_model(hp): model tf.keras.Sequential() model.add(LSTM(unitshp.Int(units, min_value16, max_value64, step16), return_sequencesTrue, input_shape(timesteps, features))) model.add(LSTM(hp.Int(latent_dim, 8, 32, 8))) model.add(RepeatVector(timesteps)) model.add(LSTM(hp.Int(latent_dim, 8, 32, 8), return_sequencesTrue)) model.add(TimeDistributed(Dense(features))) model.compile(optimizeradam, lossmae) return model tuner kt.RandomSearch( build_model, objectiveval_loss, max_trials10, executions_per_trial2, directorytuning, project_nameco2_anomaly ) tuner.search(X_train, X_train, epochs30, validation_split0.1) best_model tuner.get_best_models(num_models1)[0]4.2 模型架构改进论文对比了三种不同架构单层LSTM编码器和解码器各1层LSTM双层LSTM每部分2层LSTM堆叠变分自编码器加入概率编码# 变分LSTM-AutoEncoder实现示例 from tensorflow.keras import backend as K # 编码器 inputs Input(shape(timesteps, features)) x LSTM(32, return_sequencesTrue)(inputs) x LSTM(16)(x) # 变分层 z_mean Dense(latent_dim)(x) z_log_var Dense(latent_dim)(x) def sampling(args): z_mean, z_log_var args epsilon K.random_normal(shape(K.shape(z_mean)[0], latent_dim)) return z_mean K.exp(0.5 * z_log_var) * epsilon z Lambda(sampling)([z_mean, z_log_var]) # 解码器 decoder_input Input(shape(latent_dim,)) x RepeatVector(timesteps)(decoder_input) x LSTM(16, return_sequencesTrue)(x) x LSTM(32, return_sequencesTrue)(x) outputs TimeDistributed(Dense(features))(x) # 完整模型 encoder Model(inputs, z) decoder Model(decoder_input, outputs) outputs decoder(encoder(inputs)) vae Model(inputs, outputs) # 添加KL散度损失 reconstruction_loss tf.keras.losses.mse(inputs, outputs) kl_loss -0.5 * K.sum(1 z_log_var - K.square(z_mean) - K.exp(z_log_var), axis-1) vae_loss K.mean(reconstruction_loss kl_loss) vae.add_loss(vae_loss) vae.compile(optimizeradam)4.3 多变量扩展实际应用中可以结合温度、湿度等多维数据提升检测效果# 多变量LSTM-AutoEncoder multi_input Input(shape(timesteps, 3)) # CO2,温度,湿度 encoded LSTM(32, return_sequencesTrue)(multi_input) encoded LSTM(16)(encoded) decoded RepeatVector(timesteps)(encoded) decoded LSTM(16, return_sequencesTrue)(decoded) decoded LSTM(32, return_sequencesTrue)(decoded) multi_output TimeDistributed(Dense(3))(decoded) multi_model Model(multi_input, multi_output) multi_model.compile(optimizeradam, lossmae)5. 部署与应用实践将训练好的模型部署为实时监测系统import pickle from flask import Flask, request, jsonify # 保存模型和scaler model.save(co2_anomaly_detector.h5) with open(scaler.pkl, wb) as f: pickle.dump(scaler, f) app Flask(__name__) app.route(/detect, methods[POST]) def detect(): data request.json[sensor_data] # 接收10分钟窗口数据 scaled scaler.transform([[data]]) sequence create_sequences(scaled) pred model.predict(sequence) mae np.mean(np.abs(pred - sequence)) return jsonify({ is_anomaly: bool(mae threshold), confidence: float(mae/threshold) }) if __name__ __main__: app.run(host0.0.0.0, port5000)在实际教室部署时还需要考虑数据采集频率论文中使用1分钟间隔边缘计算在树莓派等设备上运行轻量级模型报警机制当检测到异常时触发通风系统模型更新定期用新数据重新训练模型# 边缘设备上的轻量级模型 lite_model tf.lite.TFLiteConverter.from_keras_model(model).convert() with open(model.tflite, wb) as f: f.write(lite_model) # 在树莓派上加载 interpreter tf.lite.Interpreter(model_pathmodel.tflite) interpreter.allocate_tensors()

更多文章