六盘水市网站建设_网站建设公司_Python_seo优化
2026/1/2 14:56:07 网站建设 项目流程

我将为您创建一个完整的短视频自动字幕生成系统,包含方言识别和翻译功能。

项目结构

video_subtitle_system/

├── main.py # 主程序入口

├── config.py # 配置文件

├── audio_processor.py # 音频处理模块

├── speech_recognition.py # 语音识别模块

├── dialect_translator.py # 方言翻译模块

├── subtitle_generator.py # 字幕生成模块

├── utils.py # 工具函数

├── requirements.txt # 依赖包列表

├── README.md # 使用说明

└── examples/ # 示例文件

└── sample_video.mp4

核心代码实现

1. config.py - 配置文件

# -*- coding: utf-8 -*-

"""

配置文件 - 存储系统参数和路径设置

"""

import os

# 基础配置

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

OUTPUT_DIR = os.path.join(BASE_DIR, "output")

TEMP_DIR = os.path.join(BASE_DIR, "temp")

# 创建必要的目录

os.makedirs(OUTPUT_DIR, exist_ok=True)

os.makedirs(TEMP_DIR, exist_ok=True)

# 音频处理配置

AUDIO_CONFIG = {

"sample_rate": 16000,

"channels": 1,

"chunk_duration": 30, # 分段时长(秒)

"format": "wav"

}

# 语音识别配置

ASR_CONFIG = {

"model_path": "models/speech_recognition",

"language": "zh-cn",

"dialect": "mandarin" # 默认方言类型

}

# 翻译配置

TRANSLATION_CONFIG = {

"supported_dialects": ["cantonese", "shanghainese", "sichuan", "chongqing"],

"translation_model": "models/dialect_translation"

}

# 字幕配置

SUBTITLE_CONFIG = {

"font_size": 24,

"font_color": "white",

"background_color": "black",

"position": "bottom",

"max_chars_per_line": 20

}

2. audio_processor.py - 音频处理模块

# -*- coding: utf-8 -*-

"""

音频处理模块 - 负责从视频中提取和处理音频

使用 moviepy 和 pydub 进行音频处理

"""

import os

import tempfile

from moviepy.editor import VideoFileClip

from pydub import AudioSegment

import librosa

import numpy as np

from config import AUDIO_CONFIG, TEMP_DIR

class AudioProcessor:

def __init__(self):

self.sample_rate = AUDIO_CONFIG["sample_rate"]

self.channels = AUDIO_CONFIG["channels"]

def extract_audio_from_video(self, video_path):

"""

从视频文件中提取音频

Args:

video_path (str): 视频文件路径

Returns:

str: 提取的音频文件路径

"""

try:

# 加载视频文件

video = VideoFileClip(video_path)

# 提取音频

audio = video.audio

# 保存音频文件

audio_filename = f"extracted_audio_{os.path.basename(video_path)}.wav"

audio_path = os.path.join(TEMP_DIR, audio_filename)

# 设置音频参数并保存

audio.write_audiofile(

audio_path,

fps=self.sample_rate,

nbytes=2,

codec='pcm_s16le'

)

# 关闭资源

video.close()

audio.close()

print(f"音频提取成功: {audio_path}")

return audio_path

except Exception as e:

print(f"音频提取失败: {str(e)}")

return None

def preprocess_audio(self, audio_path):

"""

音频预处理:降噪、标准化等

Args:

audio_path (str): 原始音频路径

Returns:

str: 处理后的音频路径

"""

try:

# 加载音频

audio = AudioSegment.from_file(audio_path)

# 转换为单声道

if audio.channels > 1:

audio = audio.set_channels(1)

# 设置采样率

audio = audio.set_frame_rate(self.sample_rate)

# 音量标准化

audio = audio.normalize()

# 降噪处理(简单的高通滤波)

audio = audio.high_pass_filter(80) # 去除低频噪音

# 保存处理后的音频

processed_path = audio_path.replace(".wav", "_processed.wav")

audio.export(processed_path, format="wav")

print(f"音频预处理完成: {processed_path}")

return processed_path

except Exception as e:

print(f"音频预处理失败: {str(e)}")

return audio_path

def split_audio(self, audio_path, chunk_duration=None):

"""

将长音频分割成小段以便处理

Args:

audio_path (str): 音频文件路径

chunk_duration (int): 每段时长(秒)

Returns:

list: 分割后的音频片段路径列表

"""

if chunk_duration is None:

chunk_duration = AUDIO_CONFIG["chunk_duration"]

try:

audio = AudioSegment.from_file(audio_path)

total_duration = len(audio) / 1000 # 转换为秒

chunks = []

chunk_length_ms = chunk_duration * 1000

for i in range(0, int(total_duration), chunk_duration):

start_time = i * 1000

end_time = min((i + chunk_duration) * 1000, len(audio))

chunk = audio[start_time:end_time]

# 保存分块

chunk_filename = f"chunk_{i//chunk_duration}.wav"

chunk_path = os.path.join(TEMP_DIR, chunk_filename)

chunk.export(chunk_path, format="wav")

chunks.append({

"path": chunk_path,

"start_time": i,

"end_time": i + chunk_duration,

"duration": chunk_duration

})

print(f"音频分割完成,共{len(chunks)}个片段")

return chunks

except Exception as e:

print(f"音频分割失败: {str(e)}")

return []

3. speech_recognition.py - 语音识别模块

# -*- coding: utf-8 -*-

"""

语音识别模块 - 使用Whisper模型进行语音转文字

支持多种方言识别

"""

import whisper

import torch

import os

from config import ASR_CONFIG, TEMP_DIR

class SpeechRecognizer:

def __init__(self, model_size="base"):

"""

初始化语音识别器

Args:

model_size (str): 模型大小 (tiny, base, small, medium, large)

"""

self.model_size = model_size

self.model = None

self.device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"使用设备: {self.device}")

self._load_model()

def _load_model(self):

"""加载Whisper模型"""

try:

print(f"正在加载Whisper {self.model_size}模型...")

self.model = whisper.load_model(self.model_size, device=self.device)

print("模型加载完成")

except Exception as e:

print(f"模型加载失败: {str(e)}")

def detect_language(self, audio_path):

"""

检测音频的语言和方言

Args:

audio_path (str): 音频文件路径

Returns:

dict: 检测结果

"""

if self.model is None:

print("模型未加载")

return None

try:

# 使用Whisper检测语言

result = self.model.transcribe(audio_path, language=None, task="transcribe")

detected_info = {

"language": result.get("language", "unknown"),

"confidence": result.get("language_probs", {}).get(result.get("language"), 0),

"text": result.get("text", ""),

"segments": result.get("segments", [])

}

print(f"检测到语言: {detected_info['language']}, 置信度: {detected_info['confidence']:.2f}")

return detected_info

except Exception as e:

print(f"语言检测失败: {str(e)}")

return None

def recognize_speech(self, audio_path, dialect_hint=None):

"""

识别语音为文字

Args:

audio_path (str): 音频文件路径

dialect_hint (str): 方言提示

Returns:

dict: 识别结果

"""

if self.model is None:

print("模型未加载")

return None

try:

# 设置识别选项

options = {

"language": dialect_hint if dialect_hint else ASR_CONFIG["language"],

"task": "transcribe",

"verbose": False

}

# 执行语音识别

result = self.model.transcribe(audio_path, **options)

recognition_result = {

"full_text": result["text"].strip(),

"segments": [],

"language": result.get("language", "unknown")

}

# 处理分段信息

for segment in result["segments"]:

seg_info = {

"start": segment["start"],

"end": segment["end"],

"text": segment["text"].strip(),

"confidence": segment.get("avg_logprob", 0)

}

recognition_result["segments"].append(seg_info)

print(f"语音识别完成,识别到 {len(recognition_result['segments'])} 个片段")

return recognition_result

except Exception as e:

print(f"语音识别失败: {str(e)}")

return None

def batch_recognize(self, audio_chunks):

"""

批量识别多个音频片段

Args:

audio_chunks (list): 音频片段列表

Returns:

list: 识别结果列表

"""

results = []

for i, chunk in enumerate(audio_chunks):

print(f"正在识别第 {i+1}/{len(audio_chunks)} 个片段...")

result = self.recognize_speech(chunk["path"])

if result:

# 添加时间偏移

for seg in result["segments"]:

seg["start"] += chunk["start_time"]

seg["end"] += chunk["start_time"]

results.append({

"chunk_index": i,

"chunk_info": chunk,

"recognition_result": result

})

return results

4. dialect_translator.py - 方言翻译模块

# -*- coding: utf-8 -*-

"""

方言翻译模块 - 将方言翻译为标准普通话

使用预训练的翻译模型和规则库

"""

import re

import json

from config import TRANSLATION_CONFIG, SUPPORTED_DIALECTS

class DialectTranslator:

def __init__(self):

# 方言词汇映射表

self.dialect_mappings = self._load_dialect_mappings()

# 简单的翻译模型(实际应用中可以使用更复杂的NMT模型)

self.translation_model = None

def _load_dialect_mappings(self):

"""加载方言词汇映射表"""

mappings = {

"cantonese": {

"咩": "什么",

"嘅": "的",

"係": "是",

"唔": "不",

"俾": "给",

"佢": "他/她",

"哋": "们",

"咗": "了",

"嚟": "来",

"睇": "看",

"食": "吃",

"饮": "喝",

"瞓": "睡",

"倾": "聊",

"返": "回",

"做": "干/做"

},

"shanghainese": {

"侬": "你",

"阿拉": "我们",

"伊": "他/她",

"勿": "不",

"覅": "不要",

"交关": "非常",

"适意": "舒服",

"邪气": "很",

"老": "很",

"结棍": "厉害",

"嗲": "好/棒"

},

"sichuan": {

"啥子": "什么",

"要得": "好的",

"巴适": "舒服/好",

"安逸": "舒服",

"晓得": "知道",

"不存在": "没关系",

"摆龙门阵": "聊天",

"雄起": "加油",

"瓜娃子": "傻瓜",

"幺妹": "年轻女子"

},

"chongqing": {

"啷个": "怎么",

"要得": "好的",

"巴适": "很好",

"安逸": "舒服",

"晓得": "知道",

"不存在": "没关系",

"摆龙门阵": "聊天",

"崽儿": "小伙子",

"妹儿": "女孩",

"火锅": "火锅"

}

}

return mappings

def detect_dialect_from_text(self, text, confidence_threshold=0.7):

"""

从文本中检测可能的方言

Args:

text (str): 待检测文本

confidence_threshold (float): 置信度阈值

Returns:

dict: 检测结果

"""

dialect_scores = {}

for dialect, mapping in self.dialect_mappings.items():

score = 0

matched_words = []

for word, translation in mapping.items():

if word in text:

score += 1

matched_words.append(word)

if score > 0:

dialect_scores[dialect] = {

"score": score,

"matched_words": matched_words,

"confidence": min(score / len(text.split()) * 10, 1.0) # 简化的置信度计算

}

# 找出得分最高的方言

if dialect_scores:

best_dialect = max(dialect_scores.items(), key=lambda x: x[1]["confidence"])

if best_dialect[1]["confidence"] >= confidence_threshold:

return {

"detected_dialect": best_dialect[0],

"confidence": best_dialect[1]["confidence"],

"matched_words": best_dialect[1]["matched_words"]

}

return {"detected_dialect": "unknown", "confidence": 0.0, "matched_words": []}

def translate_text(self, text, source_dialect=None):

"""

翻译方言文本为标准普通话

Args:

text (str): 待翻译文本

source_dialect (str): 源方言类型

Returns:

dict: 翻译结果

"""

if not source_dialect or source_dialect == "unknown":

# 自动检测方言

detection_result = self.detect_dialect_from_text(text)

source_dialect = detection_result["detected_dialect"]

if source_dialect == "unknown" or source_dialect not in self.dialect_mappings:

return {

"original_text": text,

"translated_text": text,

"dialect": source_dialect,

"changes": []

}

# 执行词汇替换翻译

translated_text = text

changes = []

mapping = self.dialect_mappings[source_dialect]

for dialect_word, mandarin_word in mapping.items():

if dialect_word in translated_text:

# 记录替换

changes.append({

"original": dialect_word,

"translation": mandarin_word,

"position": translated_text.find(dialect_word)

})

# 执行替换

translated_text = translated_text.replace(dialect_word, mandarin_word)

result = {

"original_text": text,

"translated_text": translated_text,

"dialect": source_dialect,

"changes": changes,

"change_count": len(changes)

}

return result

def batch_translate(self, recognition_results):

"""

批量翻译识别结果

Args:

recognition_results (list): 语音识别结果列表

Returns:

list: 翻译结果列表

"""

translation_results = []

for result in recognition_results:

segments = result["recognition_result"]["segments"]

translated_segments = []

for segment in segments:

original_text = segment["text"]

translation_result = self.translate_text(original_text)

translated_segment = {

"start": segment["start"],

"end": segment["end"],

"original_text": original_text,

"translated_text": translation_result["translated_text"],

"dialect": translation_result["dialect"],

"confidence": segment["confidence"]

}

translated_segments.append(translated_segment)

translation_results.append({

"chunk_index": result["chunk_index"],

"chunk_info": result["chunk_info"],

"translated_segments": translated_segments

})

return translation_results

5. subtitle_generator.py - 字幕生成模块

# -*- coding: utf-8 -*-

"""

字幕生成模块 - 生成SRT格式字幕文件和带字幕的视频

"""

import os

from datetime import timedelta

import subprocess

from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip

from config import SUBTITLE_CONFIG, OUTPUT_DIR

class SubtitleGenerator:

def __init__(self):

self.font_size = SUBTITLE_CONFIG["font_size"]

self.font_color = SUBTITLE_CONFIG["font_color"]

self.background_color = SUBTITLE_CONFIG["background_color"]

self.position = SUBTITLE_CONFIG["position"]

self.max_chars_per_line = SUBTITLE_CONFIG["max_chars_per_line"]

def seconds_to_srt_time(self, seconds):

"""

将秒数转换为SRT时间格式

Args:

seconds (float): 秒数

Returns:

str: SRT时间格式字符串

"""

td = timedelta(seconds=seconds)

hours, remainder = divmod(td.seconds, 3600)

minutes, seconds = divmod(remainder, 60)

milliseconds = td.microseconds // 1000

return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"

def generate_srt_content(self, translated_results):

"""

生成SRT字幕文件内容

Args:

translated_results (list): 翻译结果列表

Returns:

str: SRT格式的字幕内容

"""

srt_content = []

subtitle_index = 1

for result in translated_results:

for segment in result["translated_segments"]:

start_time = self.seconds_to_srt_time(segment["start"])

end_time = self.seconds_to_srt_time(segment["end"])

text = segment["translated_text"]

# 处理长文本换行

wrapped_text = self._wrap_text(text)

srt_entry = f"{subtitle_index}\n{start_time} --> {end_time}\n{wrapped_text}\n"

srt_content.append(srt_entry)

subtitle_index += 1

return "\n".join(srt_content)

def _wrap_text(self, text):

"""

处理文本换行

Args:

text (str): 原始文本

Returns:

str: 处理后的文本

"""

if len(text) <= self.max_chars_per_line:

return text

words = text.split()

lines = []

current_line = []

current_length = 0

for word in words:

if current_length + len(word) + 1 <= self.max_chars_per_line:

current_line.append(word)

current_length += len(word) + 1

else:

lines.append(" ".join(current_line))

current_line = [word]

current_length = len(word)

if current_line:

lines.append(" ".join(current_line))

return "\n".join(lines)

def save_srt_file(self, srt_content, filename="subtitles.srt"):

"""

保存SRT字幕文件

Args:

srt_content (str): SRT内容

filename (str): 文件名

Returns:

str: 文件路径

"""

filepath = os.path.join(OUTPUT_DIR, filename)

with open(filepath, 'w', encoding='utf-8') as f:

f.write(srt_content)

print(f"SRT字幕文件已保存: {filepath}")

return filepath

def create_subtitle_clips(self, translated_results, video_duration):

"""

创建字幕剪辑对象

Args:

translated_results (list): 翻译结果

video_duration (float): 视频总时长

Returns:

list: 字幕剪辑列表

"""

subtitle_clips = []

for result in translated_results:

for segment in result["translated_segments"]:

start_time = segment["start"]

end_time = segment["end"]

text = segment["translated_text"]

关注我,有更多实用程序等着你!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询