南宁市网站建设_网站建设公司_原型设计_seo优化
2026/1/6 15:36:33 网站建设 项目流程

视障人士语音拨号助手

一、实际应用场景与痛点

应用场景

视障用户张先生需要经常联系家人和朋友。传统的通讯录操作对视障人士极不友好:触摸屏无法提供有效反馈,输入号码容易出错,查找联系人需要别人帮助。一次紧急情况下,他急需拨打急救电话,但无法快速找到联系人,耽误了宝贵时间。他急需一款能够通过语音指令快速拨打电话的智能助手。

核心痛点

1. 屏幕触摸困难:触摸屏缺乏触觉反馈,无法准确点击

2. 联系人查找慢:滚动查找联系人效率低下

3. 号码输入易错:语音输入号码容易识别错误

4. 紧急情况无助:紧急时无法快速拨打关键联系人

5. 隐私泄露风险:让他人帮助拨号泄露个人隐私

6. 操作步骤复杂:多步操作对视障用户不友好

二、核心逻辑设计

1. 启动语音监听,等待唤醒词

2. 识别用户语音指令

3. 解析指令意图(拨号/搜索/紧急呼叫)

4. 搜索通讯录匹配联系人

5. 语音确认联系人信息

6. 自动拨打确认的联系人

7. 通话状态监控和提示

8. 通话记录和常用联系人管理

三、模块化代码实现

主程序文件:voice_dialer_assistant.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

视障人士语音拨号助手

通过语音指令搜索通讯录并自动拨号

版本:2.0.0

作者:无障碍智能助手

"""

import os

import sys

import json

import time

import sqlite3

import threading

import queue

import re

from datetime import datetime

from typing import Dict, List, Tuple, Optional, Any, Union

from dataclasses import dataclass, asdict, field

from enum import Enum

import warnings

warnings.filterwarnings('ignore')

# 语音识别

try:

import speech_recognition as sr

SPEECH_RECOGNITION_AVAILABLE = True

except ImportError:

SPEECH_RECOGNITION_AVAILABLE = False

print("警告: speech_recognition未安装,语音识别功能受限")

# 语音合成

try:

import pyttsx3

TTS_AVAILABLE = True

except ImportError:

TTS_AVAILABLE = False

print("警告: pyttsx3未安装,语音合成功能受限")

# 音频处理

try:

import pyaudio

import wave

import simpleaudio as sa

AUDIO_AVAILABLE = True

except ImportError:

AUDIO_AVAILABLE = False

# Android相关(如果运行在Android上)

try:

from android import Android

ANDROID_AVAILABLE = True

except ImportError:

ANDROID_AVAILABLE = False

# 系统调用(用于拨号)

import subprocess

import webbrowser

class CallType(Enum):

"""通话类型枚举"""

OUTGOING = "outgoing" # 拨出电话

INCOMING = "incoming" # 来电

MISSED = "missed" # 未接来电

REJECTED = "rejected" # 拒接

EMERGENCY = "emergency" # 紧急呼叫

class CommandType(Enum):

"""语音指令类型枚举"""

DIAL = "dial" # 拨打电话

SEARCH = "search" # 搜索联系人

EMERGENCY = "emergency" # 紧急呼叫

REDIAL = "redial" # 重拨

CANCEL = "cancel" # 取消操作

HELP = "help" # 帮助

LIST_CONTACTS = "list" # 列出联系人

ADD_CONTACT = "add" # 添加联系人

DELETE_CONTACT = "delete" # 删除联系人

UNKNOWN = "unknown" # 未知指令

@dataclass

class Contact:

"""联系人信息类"""

id: int

name: str

phone_numbers: List[str] # 可能有多个号码

groups: List[str] # 分组标签

notes: Optional[str] = None

frequency: int = 0 # 呼叫频率

last_called: Optional[float] = None

created_at: float = field(default_factory=time.time)

def to_dict(self) -> Dict:

"""转换为字典"""

return {

'id': self.id,

'name': self.name,

'phone_numbers': self.phone_numbers,

'groups': self.groups,

'notes': self.notes,

'frequency': self.frequency,

'last_called': datetime.fromtimestamp(self.last_called).isoformat()

if self.last_called else None,

'created_at': datetime.fromtimestamp(self.created_at).isoformat()

}

@dataclass

class CallRecord:

"""通话记录类"""

id: int

contact_id: Optional[int]

contact_name: str

phone_number: str

call_type: CallType

timestamp: float

duration: float = 0.0 # 通话时长(秒)

successful: bool = True

def to_dict(self) -> Dict:

"""转换为字典"""

return {

'id': self.id,

'contact_id': self.contact_id,

'contact_name': self.contact_name,

'phone_number': self.phone_number,

'call_type': self.call_type.value,

'timestamp': datetime.fromtimestamp(self.timestamp).isoformat(),

'duration': self.duration,

'successful': self.successful

}

@dataclass

class VoiceCommand:

"""语音指令类"""

raw_text: str

command_type: CommandType

parameters: Dict[str, Any]

confidence: float

timestamp: float = field(default_factory=time.time)

def to_dict(self) -> Dict:

"""转换为字典"""

return {

'raw_text': self.raw_text,

'command_type': self.command_type.value,

'parameters': self.parameters,

'confidence': self.confidence,

'timestamp': datetime.fromtimestamp(self.timestamp).isoformat()

}

class ContactDatabase:

"""联系人数据库管理器"""

def __init__(self, db_path: str = "data/contacts.db"):

"""

初始化数据库

Args:

db_path: 数据库文件路径

"""

self.db_path = db_path

self.connection = None

self.setup_database()

def setup_database(self):

"""设置数据库"""

# 确保数据目录存在

os.makedirs(os.path.dirname(self.db_path), exist_ok=True)

# 连接数据库

self.connection = sqlite3.connect(self.db_path)

self.connection.row_factory = sqlite3.Row

# 创建表

self._create_tables()

# 插入示例数据(如果数据库为空)

self._insert_sample_data()

def _create_tables(self):

"""创建数据表"""

cursor = self.connection.cursor()

# 联系人表

cursor.execute('''

CREATE TABLE IF NOT EXISTS contacts (

id INTEGER PRIMARY KEY AUTOINCREMENT,

name TEXT NOT NULL,

phone_numbers TEXT NOT NULL, -- JSON数组

groups TEXT NOT NULL, -- JSON数组

notes TEXT,

frequency INTEGER DEFAULT 0,

last_called REAL,

created_at REAL NOT NULL

)

''')

# 通话记录表

cursor.execute('''

CREATE TABLE IF NOT EXISTS call_records (

id INTEGER PRIMARY KEY AUTOINCREMENT,

contact_id INTEGER,

contact_name TEXT NOT NULL,

phone_number TEXT NOT NULL,

call_type TEXT NOT NULL,

timestamp REAL NOT NULL,

duration REAL DEFAULT 0,

successful BOOLEAN DEFAULT TRUE,

FOREIGN KEY (contact_id) REFERENCES contacts (id)

)

''')

# 语音指令记录表

cursor.execute('''

CREATE TABLE IF NOT EXISTS voice_commands (

id INTEGER PRIMARY KEY AUTOINCREMENT,

raw_text TEXT NOT NULL,

command_type TEXT NOT NULL,

parameters TEXT NOT NULL, -- JSON对象

confidence REAL NOT NULL,

timestamp REAL NOT NULL

)

''')

# 紧急联系人表

cursor.execute('''

CREATE TABLE IF NOT EXISTS emergency_contacts (

id INTEGER PRIMARY KEY AUTOINCREMENT,

contact_id INTEGER NOT NULL,

emergency_type TEXT NOT NULL, -- police/fire/ambulance/family/doctor

priority INTEGER DEFAULT 1,

FOREIGN KEY (contact_id) REFERENCES contacts (id)

)

''')

self.connection.commit()

def _insert_sample_data(self):

"""插入示例数据"""

cursor = self.connection.cursor()

# 检查是否有数据

cursor.execute("SELECT COUNT(*) FROM contacts")

count = cursor.fetchone()[0]

if count == 0:

# 添加示例联系人

sample_contacts = [

("张三", ["13800138000"], ["家人"], "父亲", 10),

("李四", ["13900139000"], ["朋友"], "好友", 5),

("王五", ["13600136000"], ["同事"], "同事", 3),

("急救中心", ["120"], ["紧急"], "医疗急救", 0),

("报警电话", ["110"], ["紧急"], "警察", 0),

("火警电话", ["119"], ["紧急"], "消防", 0)

]

for name, phones, groups, notes, freq in sample_contacts:

self.add_contact(

name=name,

phone_numbers=phones,

groups=groups,

notes=notes,

frequency=freq

)

# 添加紧急联系人

emergency_mapping = [

("急救中心", "ambulance", 1),

("报警电话", "police", 1),

("火警电话", "fire", 1),

("张三", "family", 2)

]

for contact_name, etype, priority in emergency_mapping:

cursor.execute("SELECT id FROM contacts WHERE name = ?", (contact_name,))

result = cursor.fetchone()

if result:

self.add_emergency_contact(result[0], etype, priority)

self.connection.commit()

print("已添加示例联系人数据")

def add_contact(self, name: str, phone_numbers: List[str],

groups: List[str] = None, notes: str = None,

frequency: int = 0) -> int:

"""

添加联系人

Args:

name: 联系人姓名

phone_numbers: 电话号码列表

groups: 分组列表

notes: 备注

frequency: 呼叫频率

Returns:

联系人ID

"""

if groups is None:

groups = []

cursor = self.connection.cursor()

cursor.execute('''

INSERT INTO contacts

(name, phone_numbers, groups, notes, frequency, created_at)

VALUES (?, ?, ?, ?, ?, ?)

''', (

name,

json.dumps(phone_numbers, ensure_ascii=False),

json.dumps(groups, ensure_ascii=False),

notes,

frequency,

time.time()

))

contact_id = cursor.lastrowid

self.connection.commit()

return contact_id

def get_contact(self, contact_id: int) -> Optional[Contact]:

"""获取联系人"""

cursor = self.connection.cursor()

cursor.execute("SELECT * FROM contacts WHERE id = ?", (contact_id,))

row = cursor.fetchone()

if row:

return self._row_to_contact(row)

return None

def search_contacts(self, keyword: str, limit: int = 10) -> List[Contact]:

"""

搜索联系人

Args:

keyword: 搜索关键词

limit: 返回结果数量限制

Returns:

匹配的联系人列表

"""

cursor = self.connection.cursor()

# 使用SQLite的全文搜索(如果支持)或LIKE搜索

if keyword:

query = '''

SELECT * FROM contacts

WHERE name LIKE ? OR phone_numbers LIKE ? OR groups LIKE ?

ORDER BY frequency DESC, last_called DESC

LIMIT ?

'''

like_keyword = f"%{keyword}%"

cursor.execute(query, (like_keyword, like_keyword, like_keyword, limit))

else:

# 如果没有关键词,返回最常联系的人

query = '''

SELECT * FROM contacts

ORDER BY frequency DESC, last_called DESC

LIMIT ?

'''

cursor.execute(query, (limit,))

rows = cursor.fetchall()

return [self._row_to_contact(row) for row in rows]

def _row_to_contact(self, row) -> Contact:

"""将数据库行转换为Contact对象"""

return Contact(

id=row['id'],

name=row['name'],

phone_numbers=json.loads(row['phone_numbers']),

groups=json.loads(row['groups']),

notes=row['notes'],

frequency=row['frequency'],

last_called=row['last_called'],

created_at=row['created_at']

)

def update_contact_frequency(self, contact_id: int):

"""更新联系人呼叫频率"""

cursor = self.connection.cursor()

cursor.execute('''

UPDATE contacts

SET frequency = frequency + 1, last_called = ?

WHERE id = ?

''', (time.time(), contact_id))

self.connection.commit()

def add_call_record(self, contact_id: Optional[int], contact_name: str,

phone_number: str, call_type: CallType,

duration: float = 0.0, successful: bool = True) -> int:

"""

添加通话记录

Returns:

记录ID

"""

cursor = self.connection.cursor()

cursor.execute('''

INSERT INTO call_records

(contact_id, contact_name, phone_number, call_type,

timestamp, duration, successful)

VALUES (?, ?, ?, ?, ?, ?, ?)

''', (

contact_id,

contact_name,

phone_number,

call_type.value,

time.time(),

duration,

successful

))

record_id = cursor.lastrowid

self.connection.commit()

return record_id

def get_recent_calls(self, limit: int = 20) -> List[CallRecord]:

"""获取最近通话记录"""

cursor = self.connection.cursor()

cursor.execute('''

SELECT * FROM call_records

ORDER BY timestamp DESC

LIMIT ?

''', (limit,))

rows = cursor.fetchall()

records = []

for row in rows:

records.append(CallRecord(

id=row['id'],

contact_id=row['contact_id'],

contact_name=row['contact_name'],

phone_number=row['phone_number'],

call_type=CallType(row['call_type']),

timestamp=row['timestamp'],

duration=row['duration'],

successful=bool(row['successful'])

))

return records

def add_emergency_contact(self, contact_id: int,

emergency_type: str, priority: int = 1):

"""添加紧急联系人"""

cursor = self.connection.cursor()

cursor.execute('''

INSERT OR REPLACE INTO emergency_contacts

(contact_id, emergency_type, priority)

VALUES (?, ?, ?)

''', (contact_id, emergency_type, priority))

self.connection.commit()

def get_emergency_contacts(self, emergency_type: str = None) -> List[Contact]:

"""获取紧急联系人"""

cursor = self.connection.cursor()

if emergency_type:

query = '''

SELECT c.* FROM contacts c

JOIN emergency_contacts ec ON c.id = ec.contact_id

WHERE ec.emergency_type = ?

ORDER BY ec.priority ASC

'''

cursor.execute(query, (emergency_type,))

else:

query = '''

SELECT c.* FROM contacts c

JOIN emergency_contacts ec ON c.id = ec.contact_id

ORDER BY ec.emergency_type, ec.priority ASC

'''

cursor.execute(query)

rows = cursor.fetchall()

return [self._row_to_contact(row) for row in rows]

def log_voice_command(self, command: VoiceCommand):

"""记录语音指令"""

cursor = self.connection.cursor()

cursor.execute('''

INSERT INTO voice_commands

(raw_text, command_type, parameters, confidence, timestamp)

VALUES (?, ?, ?, ?, ?)

''', (

command.raw_text,

command.command_type.value,

json.dumps(command.parameters, ensure_ascii=False),

command.confidence,

command.timestamp

))

self.connection.commit()

def close(self):

"""关闭数据库连接"""

if self.connection:

self.connection.close()

class SpeechRecognizer:

"""语音识别器"""

def __init__(self, config: Dict):

"""

初始化语音识别器

Args:

config: 识别器配置

"""

self.config = config

self.recognizer = None

self.microphone = None

if SPEECH_RECOGNITION_AVAILABLE:

self._initialize_recognizer()

def _initialize_recognizer(self):

"""初始化语音识别组件"""

try:

self.recognizer = sr.Recognizer()

self.microphone = sr.Microphone()

# 调整环境噪声

with self.microphone as source:

print("正在调整环境噪声,请保持安静...")

self.recognizer.adjust_for_ambient_noise(source, duration=1)

print("语音识别器初始化成功")

except Exception as e:

print(f"语音识别器初始化失败: {e}")

self.recognizer = None

self.microphone = None

def listen_for_wake_word(self, wake_word: str = "小助手",

timeout: float = None, phrase_time_limit: float = 2) -> bool:

"""

监听唤醒词

Args:

wake_word: 唤醒词

timeout: 超时时间(秒)

phrase_time_limit: 短语时长限制

Returns:

是否检测到唤醒词

"""

if not self.recognizer or not self.microphone:

return False

try:

with self.microphone as source:

print(f"正在监听唤醒词: '{wake_word}'...")

audio = self.recognizer.listen(source, timeout=timeout,

phrase_time_limit=phrase_time_limit)

# 识别语音

try:

text = self.recognizer.recognize_google(audio, language='zh-CN')

print(f"识别到语音: {text}")

# 检查是否包含唤醒词

if wake_word.lower() in text.lower():

print(f"检测到唤醒词: {wake_word}")

return True

except sr.UnknownValueError:

print("无法识别语音")

except sr.RequestError as e:

print(f"语音识别服务错误: {e}")

except sr.WaitTimeoutError:

pass # 超时是正常的

return False

def listen_for_command(self, timeout: float = 5,

phrase_time_limit: float = 5) -> Optional[str]:

"""

监听语音指令

Args:

timeout: 超时时间

phrase_time_limit: 短语时长限制

Returns:

识别的文本,失败返回None

"""

if not self.recognizer or not self.microphone:

return None

try:

print("请说出指令...")

with self.microphone as source:

audio = self.recognizer.listen(source, timeout=timeout,

phrase_time_limit=phrase_time_limit)

# 识别语音

try:

text = self.recognizer.recognize_google(audio, language='zh-CN')

print(f"识别到指令: {text}")

return text

except sr.UnknownValueError:

print("无法识别语音指令")

return None

except sr.RequestError as e:

print(f"语音识别服务错误: {e}")

return None

except sr.WaitTimeoutError:

print("等待指令超时")

return None

def continuous_listen(self, callback, stop_event: threading.Event):

"""

持续监听语音

Args:

callback: 识别到语音时的回调函数

stop_event: 停止事件

"""

if not self.recognizer or not self.microphone:

return

print("开始持续语音监听...")

while not stop_event.is_set():

try:

text = self.listen_for_command(timeout=1)

if text and callback:

callback(text)

except Exception as e:

print(f"监听错误: {e}")

time.sleep(0.1)

class VoiceSynthesizer:

"""语音合成器"""

def __init__(self, config: Dict):

"""

初始化语音合成器

Args:

config: 合成器配置

"""

self.config = config

self.tts_engine = None

if TTS_AVAILABLE:

self._initialize_tts()

def _initialize_tts(self):

"""初始化TTS引擎"""

try:

self.tts_engine = pyttsx3.init()

# 设置语音属性

rate = self.config.get('speech_r

如果你觉得这个工具好用,欢迎关注我!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询