秦皇岛市网站建设_网站建设公司_UX设计_seo优化
2026/1/10 12:01:43 网站建设 项目流程

智能坐姿矫正系统设计与实现

一、实际应用场景与痛点分析

应用场景

随着现代人长时间使用电脑、手机,不良坐姿导致的颈椎病、腰椎病日益普遍。本系统面向办公室人员、学生、远程工作者等长期坐姿人群,通过智能识别不良坐姿并提供实时矫正提醒。

主要痛点

1. 无意识习惯 - 用户常在不自觉中形成不良坐姿

2. 缺乏即时反馈 - 传统方法无法提供实时提醒

3. 个体差异大 - 标准矫正方法不适用于所有人

4. 难以坚持 - 矫正设备复杂,用户依从性差

5. 隐私问题 - 摄像头监控令人不适

6. 干扰工作 - 矫正提醒过于频繁影响工作

二、核心逻辑与智能控制原理

系统架构

感知层 → 处理层 → 决策层 → 执行层

↓ ↓ ↓ ↓

IMU传感器 → 姿态估计 → 模糊推理 → 震动提醒

压力传感器 → 特征提取 → 专家系统 → 声音提示

摄像头 → 深度学习 → 自适应控制 → 界面显示

核心智能控制原理

1. 模糊控制 - 处理"轻微前倾"、"严重低头"等模糊概念

2. 专家系统 - 基于人体工程学和康复医学规则库

3. 自适应控制 - 根据用户习惯调整灵敏度

4. 状态机控制 - 管理坐姿状态转移

5. PID控制 - 平滑提醒强度变化

三、代码实现

主程序:smart_posture_corrector.py

#!/usr/bin/env python3

"""

智能坐姿矫正系统

基于智能控制原理的实时坐姿监测与矫正系统

"""

import json

import datetime

import time

import threading

import queue

import numpy as np

from typing import Dict, List, Tuple, Optional, Any

from dataclasses import dataclass, asdict, field

from enum import Enum

import matplotlib.pyplot as plt

from matplotlib.animation import FuncAnimation

from collections import deque

import warnings

from abc import ABC, abstractmethod

import random

import logging

from dataclasses_json import dataclass_json

import os

from scipy import signal

from scipy.spatial.transform import Rotation

import pickle

# 配置日志

logging.basicConfig(

level=logging.INFO,

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

handlers=[

logging.FileHandler('posture.log', encoding='utf-8'),

logging.StreamHandler()

]

)

logger = logging.getLogger(__name__)

class PostureType(Enum):

"""坐姿类型枚举"""

CORRECT = "正确坐姿"

LEAN_FORWARD = "身体前倾"

LEAN_BACKWARD = "身体后仰"

LEAN_LEFT = "身体左倾"

LEAN_RIGHT = "身体右倾"

HEAD_DOWN = "低头"

HEAD_UP = "仰头"

TORSO_TWIST = "躯干扭转"

SHOULDER_ASYMMETRY = "肩膀不对称"

CROSS_LEGS = "翘二郎腿"

class SensorType(Enum):

"""传感器类型枚举"""

IMU = "惯性测量单元" # 加速度计+陀螺仪+磁力计

PRESSURE = "压力传感器"

CAMERA = "摄像头"

ULTRASONIC = "超声波"

INFRARED = "红外"

class AlertLevel(Enum):

"""提醒级别枚举"""

NONE = 0 # 无需提醒

MILD = 1 # 轻度提醒

MODERATE = 2 # 中度提醒

SEVERE = 3 # 重度提醒

CRITICAL = 4 # 严重提醒

@dataclass_json

@dataclass

class PostureData:

"""姿态数据"""

timestamp: datetime.datetime

pitch: float # 俯仰角(前倾/后仰) 单位:度

roll: float # 横滚角(左右倾斜) 单位:度

yaw: float # 偏航角(扭转) 单位:度

acceleration: Tuple[float, float, float] # 三轴加速度

angular_velocity: Tuple[float, float, float] # 三轴角速度

pressure_distribution: Optional[List[float]] = None # 压力分布

sensor_type: SensorType = SensorType.IMU

@dataclass_json

@dataclass

class PostureState:

"""坐姿状态"""

posture_type: PostureType

severity: float # 严重程度 0-1

confidence: float # 置信度 0-1

duration: float # 持续时间(秒)

start_time: datetime.datetime

alert_level: AlertLevel = AlertLevel.NONE

@dataclass_json

@dataclass

class UserProfile:

"""用户基本信息"""

user_id: str

name: str

age: int

height: float # 厘米

weight: float # 公斤

occupation: str

health_conditions: List[str] # 健康状况

posture_habits: List[str] # 坐姿习惯

work_hours: Tuple[float, float] # 工作时间段

sensitivity: float = 0.5 # 灵敏度 0-1

training_data: List[PostureData] = field(default_factory=list)

class PostureFeatureExtractor:

"""

姿态特征提取器

从原始传感器数据提取特征

"""

def __init__(self, window_size: int = 10):

self.window_size = window_size

self.data_buffer = deque(maxlen=window_size)

def add_data(self, data: PostureData):

"""添加数据到缓冲区"""

self.data_buffer.append(data)

def extract_features(self) -> Dict[str, Any]:

"""从缓冲区提取特征"""

if len(self.data_buffer) < 2:

return {}

# 获取最新数据

latest_data = self.data_buffer[-1]

# 角度特征

angle_features = {

'pitch': latest_data.pitch,

'roll': latest_data.roll,

'yaw': latest_data.yaw,

'pitch_abs': abs(latest_data.pitch),

'roll_abs': abs(latest_data.roll),

'yaw_abs': abs(latest_data.yaw)

}

# 加速度特征

accel = np.array(latest_data.acceleration)

accel_features = {

'accel_magnitude': np.linalg.norm(accel),

'accel_x': accel[0],

'accel_y': accel[1],

'accel_z': accel[2],

'accel_variance': np.var([d.acceleration for d in self.data_buffer], axis=0).tolist()

}

# 角速度特征

gyro = np.array(latest_data.angular_velocity)

gyro_features = {

'gyro_magnitude': np.linalg.norm(gyro),

'gyro_x': gyro[0],

'gyro_y': gyro[1],

'gyro_z': gyro[2]

}

# 动态特征(需要历史数据)

if len(self.data_buffer) >= 5:

pitch_values = [d.pitch for d in self.data_buffer]

roll_values = [d.roll for d in self.data_buffer]

dynamic_features = {

'pitch_velocity': (pitch_values[-1] - pitch_values[-2]) * 10, # 度/秒

'roll_velocity': (roll_values[-1] - roll_values[-2]) * 10,

'pitch_std': np.std(pitch_values),

'roll_std': np.std(roll_values),

'pitch_change': max(pitch_values) - min(pitch_values),

'roll_change': max(roll_values) - min(roll_values)

}

else:

dynamic_features = {

'pitch_velocity': 0,

'roll_velocity': 0,

'pitch_std': 0,

'roll_std': 0,

'pitch_change': 0,

'roll_change': 0

}

# 综合特征

all_features = {

**angle_features,

**accel_features,

**gyro_features,

**dynamic_features,

'timestamp': latest_data.timestamp,

'data_count': len(self.data_buffer)

}

return all_features

def calculate_stability_score(self) -> float:

"""计算坐姿稳定性评分(0-100)"""

if len(self.data_buffer) < 5:

return 100.0

# 获取角度数据

pitch_values = [d.pitch for d in self.data_buffer]

roll_values = [d.roll for d in self.data_buffer]

# 计算角度变化的标准差

pitch_std = np.std(pitch_values)

roll_std = np.std(roll_values)

# 计算稳定性分数

# 标准差越小,稳定性越高

max_std = 5.0 # 最大可接受标准差

pitch_score = max(0, 100 - (pitch_std / max_std * 50))

roll_score = max(0, 100 - (roll_std / max_std * 50))

stability_score = (pitch_score + roll_score) / 2

return stability_score

class FuzzyPostureClassifier:

"""

模糊姿态分类器

处理模糊姿态边界

"""

def __init__(self):

# 模糊集合定义

self.angle_sets = {

'correct': {'center': 0, 'range': 5}, # 正确范围:±5度

'mild': {'center': 10, 'range': 10}, # 轻度:5-15度

'moderate': {'center': 20, 'range': 10}, # 中度:15-25度

'severe': {'center': 30, 'range': 10} # 严重:25-35度

}

# 持续时间模糊集合

self.duration_sets = {

'short': {'center': 5, 'range': 5}, # 短暂:0-10秒

'medium': {'center': 20, 'range': 10}, # 中等:10-30秒

'long': {'center': 60, 'range': 30} # 长期:30-90秒

}

def fuzzy_angle_classification(self, angle: float) -> Dict[str, float]:

"""模糊角度分类"""

memberships = {}

for level, params in self.angle_sets.items():

center = params['center']

width = params['range']

# 计算隶属度(三角隶属函数)

if abs(angle) <= center - width/2 or abs(angle) >= center + width/2:

membership = 0

elif abs(angle) <= center:

membership = (abs(angle) - (center - width/2)) / (width/2)

else:

membership = ((center + width/2) - abs(angle)) / (width/2)

memberships[level] = max(0, min(1, membership))

return memberships

def fuzzy_duration_classification(self, duration: float) -> Dict[str, float]:

"""模糊持续时间分类"""

memberships = {}

for level, params in self.duration_sets.items():

center = params['center']

width = params['range']

if duration <= center - width/2 or duration >= center + width/2:

membership = 0

elif duration <= center:

membership = (duration - (center - width/2)) / (width/2)

else:

membership = ((center + width/2) - duration) / (width/2)

memberships[level] = max(0, min(1, membership))

return memberships

def infer_posture_type(self, features: Dict[str, Any]) -> List[Tuple[PostureType, float]]:

"""模糊推理坐姿类型"""

pitch = features.get('pitch', 0)

roll = features.get('roll', 0)

yaw = features.get('yaw', 0)

# 计算各个角度的模糊分类

pitch_membership = self.fuzzy_angle_classification(pitch)

roll_membership = self.fuzzy_angle_classification(roll)

yaw_membership = self.fuzzy_angle_classification(yaw)

# 定义模糊规则

rules = [

# 前倾/后仰

{

'condition': lambda: pitch_membership.get('moderate', 0) > 0.5 and pitch > 0,

'type': PostureType.LEAN_FORWARD,

'confidence': pitch_membership.get('moderate', 0) * 0.8

},

{

'condition': lambda: pitch_membership.get('moderate', 0) > 0.5 and pitch < 0,

'type': PostureType.LEAN_BACKWARD,

'confidence': pitch_membership.get('moderate', 0) * 0.8

},

# 左右倾斜

{

'condition': lambda: roll_membership.get('moderate', 0) > 0.5 and roll > 0,

'type': PostureType.LEAN_LEFT,

'confidence': roll_membership.get('moderate', 0) * 0.8

},

{

'condition': lambda: roll_membership.get('moderate', 0) > 0.5 and roll < 0,

'type': PostureType.LEAN_RIGHT,

'confidence': roll_membership.get('moderate', 0) * 0.8

},

# 低头/仰头

{

'condition': lambda: pitch_membership.get('severe', 0) > 0.7 and pitch > 15,

'type': PostureType.HEAD_DOWN,

'confidence': pitch_membership.get('severe', 0) * 0.9

},

{

'condition': lambda: pitch_membership.get('severe', 0) > 0.7 and pitch < -15,

'type': PostureType.HEAD_UP,

'confidence': pitch_membership.get('severe', 0) * 0.9

},

# 躯干扭转

{

'condition': lambda: yaw_membership.get('moderate', 0) > 0.6,

'type': PostureType.TORSO_TWIST,

'confidence': yaw_membership.get('moderate', 0) * 0.7

}

]

# 应用模糊规则

results = []

for rule in rules:

if rule['condition']():

results.append((rule['type'], rule['confidence']))

# 如果没有检测到不良坐姿,则认为是正确坐姿

if not results and pitch_membership.get('correct', 0) > 0.7 and roll_membership.get('correct', 0) > 0.7:

results.append((PostureType.CORRECT, 0.9))

return results

class PostureExpertSystem:

"""

坐姿专家系统

基于人体工程学和康复医学规则库

"""

def __init__(self):

self.rules = self._initialize_rules()

self.alerts_history = deque(maxlen=100)

def _initialize_rules(self) -> List[Dict]:

"""初始化专家规则库"""

return [

{

"name": "严重前倾",

"condition": lambda data: data.get('pitch', 0) > 25,

"action": "立即挺直背部,收下巴,肩胛骨后缩",

"alert_level": AlertLevel.CRITICAL,

"message": "⚠️ 严重前倾!立即矫正,避免颈椎压力过大",

"priority": 10

},

{

"name": "长时间低头",

"condition": lambda data: data.get('pitch', 0) > 15 and data.get('duration', 0) > 30,

"action": "抬头,调整屏幕高度,使视线平视",

"alert_level": AlertLevel.SEVERE,

"message": "📱 长时间低头,调整视线高度",

"priority": 8

},

{

"name": "身体倾斜",

"condition": lambda data: abs(data.get('roll', 0)) > 15,

"action": "调整坐姿,保持身体左右对称",

"alert_level": AlertLevel.MODERATE,

"message": "⚖️ 身体倾斜,保持左右平衡",

"priority": 6

},

{

"name": "轻微前倾",

"condition": lambda data: 10 < data.get('pitch', 0) <= 20,

"action": "轻微挺直,放松肩膀",

"alert_level": AlertLevel.MILD,

"message": "💺 轻微前倾,注意坐姿",

"priority": 4

},

{

"name": "坐姿稳定",

"condition": lambda data: abs(data.get('pitch', 0)) <= 5 and abs(data.get('roll', 0)) <= 5,

"action": "保持当前良好坐姿",

"alert_level": AlertLevel.NONE,

"message": "✅ 坐姿良好,继续保持!",

"priority": 0

},

{

"name": "频繁晃动",

"condition": lambda data: data.get('stability_score', 100) < 60,

"action": "放松身体,减少不必要的晃动",

"alert_level": AlertLevel.MILD,

"message": "🔄 坐姿不稳定,放松身体",

"priority": 3

},

{

"name": "后仰过度",

"condition": lambda data: data.get('pitch', 0) < -20,

"action": "调整椅子角度,保持适当后倾",

"alert_level": AlertLevel.MODERATE,

"message": "🪑 后仰过度,调整椅子角度",

"priority": 5

},

{

"name": "肩部不对称",

"condition": lambda data: data.get('shoulder_asymmetry', 0) > 0.3,

"action": "放松肩膀,保持双肩水平",

"alert_level": AlertLevel.MILD,

"message": "🤷 肩部不对称,放松肩膀",

"priority": 4

}

]

def evaluate_posture(self, features: Dict, current_state: PostureState) -> Dict:

"""评估坐姿并返回建议"""

evaluation = {

"needs_alert": False,

"alert_level": AlertLevel.NONE,

"recommendations": [],

"messages": [],

"timestamp": datetime.datetime.now()

}

# 准备评估数据

eval_data = {

**features,

"duration": current_state.duration if current_state else 0,

"current_type": current_state.posture_type if current_state else None

}

# 应用专家规则

triggered_rules = []

for rule in self.rules:

try:

if rule["condition"](eval_data):

triggered_rules.append(rule)

except Exception as e:

logger.warning(f"规则执行失败: {rule['name']}, 错误: {e}")

# 按优先级排序

triggered_rules.sort(key=lambda x: x["priority"], reverse=True)

# 处理触发的规则

for rule in triggered_rules[:3]: # 最多返回3条建议

if rule["alert_level"] != AlertLevel.NONE:

evaluation["needs_alert"] = True

if rule["alert_level"].value > evaluation["alert_level"].value:

evaluation["alert_level"] = rule["alert_level"]

evaluation["recommendations"].append(rule["action"])

evaluation["messages"].append(rule["message"])

# 记录提醒历史

if evaluation["needs_alert"]:

self.alerts_history.append({

"time": datetime.datetime.now(),

"level": evaluation["alert_level"],

"message": evaluation["messages"][0] if evaluation["messages"] else "",

"features": features

})

return evaluation

class AdaptiveAlertController:

"""

自适应提醒控制器

基于PID控制和状态机

"""

def __init__(self, user_sensitivity: float = 0.5):

self.user_sensitivity = user_sensitivity

self.last_alert_time = None

self.alert_count = 0

self.alert_history = deque(maxlen=20)

# PID控制器参数

self.kp = 0.8 # 比例增益

self.ki = 0.1 # 积分增益

self.kd = 0.2 # 微分增益

self.error_integral = 0

self.last_error = 0

# 状态机

self.state = "NORMAL"

self.state_transitions = {

"NORMAL": ["MILD_ALERT", "IGNORE"],

"MILD_ALERT": ["NORMAL", "MODERATE_ALERT"],

"MODERATE_ALERT": ["NORMAL", "SEVERE_ALERT"],

"SEVERE_ALERT": ["NORMAL", "CRITICAL_ALERT"],

"CRITICAL_ALERT": ["NORMAL"],

"IGNORE": ["NORMAL"]

}

def calculate_alert_intensity(self, severity: float, duration: float) -> float:

"""

计算提醒强度(0-1)

基于PID控制算法

"""

# 期望的坐姿质量(理想状态为0,表示无不良坐姿)

desired_quality = 0

# 当前误差 = 严重程度

current_error = severity

# 计算PID控制输出

self.error_integral += current_error

error_derivative = current_error - self.last_error

# PID输出

pid_output = (self.kp * current_error +

self.ki * self.error_integral +

self.kd * error_derivative)

# 考虑持续时间的影响

duration_factor = min(1.0, duration / 60) # 持续时间超过60秒达到最大影响

# 综合强度计算

base_intensity = pid_output * (1.0 + duration_factor)

# 考虑用户灵敏度

intensity = base_intensity * (1.5 - self.user_sensitivity) # 灵敏度低则强度高

# 限制在0-1范围内

intensity = max(0, min(1, intensity))

self.last_error = current_error

return intensity

def update_state(self, alert_level: AlertLevel, response_received: bool = False):

"""更新状态机状态"""

current_state = self.state

# 定义状态转移规则

if alert_level == AlertLevel.NONE:

new_state = "NORMAL"

elif alert_level == AlertLevel.MILD:

if current_state == "NORMAL":

new_state = "MILD_ALERT"

elif current_state == "MILD_ALERT" and not response_received:

new_state = "MODERATE_ALERT"

else:

new_state = current_state

elif alert_level == AlertLevel.MODERATE:

if current_state in ["NORMAL", "MILD_ALERT"]:

new_state = "MODERATE_ALERT"

elif current_state == "MODERATE_ALERT" and not response_received:

new_state = "SEVERE_ALERT"

else:

new_state = current_state

elif alert_level == AlertLevel.SEVERE:

new_state = "SEVERE_ALERT"

elif alert_level == AlertLevel.CRITICAL:

new_state = "CRITICAL_ALERT"

else:

new_state = current_state

# 如果用户有响应,恢复到较低状态

if response_received and new_state != "NORMAL":

if "CRITICAL" in new_state:

new_state = "SEVERE_ALERT"

elif "SEVERE" in new_state:

new_state = "MODERATE_ALERT"

elif "MODERATE" in new_state:

new_state = "MILD_ALERT"

elif "MILD" in new_state:

new_state = "NORMAL"

# 应用状态转移

if new_state in self.state_transitions.get(current_state, []):

self.state = new_state

logger.info(f"状态转移: {current_state} -> {new_state}")

return self.state

def get_alert_pattern(self, intensity: float, alert_level: AlertLevel) -> Dict:

"""根据强度生成提醒模式"""

if alert_level == AlertLevel.NONE or intensity < 0.1:

return {"type": "none", "pattern": []}

# 基本震动模式

base_patterns

如果你觉得这个工具好用,欢迎关注我!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询