我将为您创建一个完整的短视频自动字幕生成系统,包含方言识别和翻译功能。
项目结构
video_subtitle_system/
├── main.py # 主程序入口
├── config.py # 配置文件
├── audio_processor.py # 音频处理模块
├── speech_recognition.py # 语音识别模块
├── dialect_translator.py # 方言翻译模块
├── subtitle_generator.py # 字幕生成模块
├── utils.py # 工具函数
├── requirements.txt # 依赖包列表
├── README.md # 使用说明
└── examples/ # 示例文件
└── sample_video.mp4
核心代码实现
1. config.py - 配置文件
# -*- coding: utf-8 -*-
"""
配置文件 - 存储系统参数和路径设置
"""
import os
# 基础配置
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
OUTPUT_DIR = os.path.join(BASE_DIR, "output")
TEMP_DIR = os.path.join(BASE_DIR, "temp")
# 创建必要的目录
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
# 音频处理配置
AUDIO_CONFIG = {
"sample_rate": 16000,
"channels": 1,
"chunk_duration": 30, # 分段时长(秒)
"format": "wav"
}
# 语音识别配置
ASR_CONFIG = {
"model_path": "models/speech_recognition",
"language": "zh-cn",
"dialect": "mandarin" # 默认方言类型
}
# 翻译配置
TRANSLATION_CONFIG = {
"supported_dialects": ["cantonese", "shanghainese", "sichuan", "chongqing"],
"translation_model": "models/dialect_translation"
}
# 字幕配置
SUBTITLE_CONFIG = {
"font_size": 24,
"font_color": "white",
"background_color": "black",
"position": "bottom",
"max_chars_per_line": 20
}
2. audio_processor.py - 音频处理模块
# -*- coding: utf-8 -*-
"""
音频处理模块 - 负责从视频中提取和处理音频
使用 moviepy 和 pydub 进行音频处理
"""
import os
import tempfile
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
import librosa
import numpy as np
from config import AUDIO_CONFIG, TEMP_DIR
class AudioProcessor:
def __init__(self):
self.sample_rate = AUDIO_CONFIG["sample_rate"]
self.channels = AUDIO_CONFIG["channels"]
def extract_audio_from_video(self, video_path):
"""
从视频文件中提取音频
Args:
video_path (str): 视频文件路径
Returns:
str: 提取的音频文件路径
"""
try:
# 加载视频文件
video = VideoFileClip(video_path)
# 提取音频
audio = video.audio
# 保存音频文件
audio_filename = f"extracted_audio_{os.path.basename(video_path)}.wav"
audio_path = os.path.join(TEMP_DIR, audio_filename)
# 设置音频参数并保存
audio.write_audiofile(
audio_path,
fps=self.sample_rate,
nbytes=2,
codec='pcm_s16le'
)
# 关闭资源
video.close()
audio.close()
print(f"音频提取成功: {audio_path}")
return audio_path
except Exception as e:
print(f"音频提取失败: {str(e)}")
return None
def preprocess_audio(self, audio_path):
"""
音频预处理:降噪、标准化等
Args:
audio_path (str): 原始音频路径
Returns:
str: 处理后的音频路径
"""
try:
# 加载音频
audio = AudioSegment.from_file(audio_path)
# 转换为单声道
if audio.channels > 1:
audio = audio.set_channels(1)
# 设置采样率
audio = audio.set_frame_rate(self.sample_rate)
# 音量标准化
audio = audio.normalize()
# 降噪处理(简单的高通滤波)
audio = audio.high_pass_filter(80) # 去除低频噪音
# 保存处理后的音频
processed_path = audio_path.replace(".wav", "_processed.wav")
audio.export(processed_path, format="wav")
print(f"音频预处理完成: {processed_path}")
return processed_path
except Exception as e:
print(f"音频预处理失败: {str(e)}")
return audio_path
def split_audio(self, audio_path, chunk_duration=None):
"""
将长音频分割成小段以便处理
Args:
audio_path (str): 音频文件路径
chunk_duration (int): 每段时长(秒)
Returns:
list: 分割后的音频片段路径列表
"""
if chunk_duration is None:
chunk_duration = AUDIO_CONFIG["chunk_duration"]
try:
audio = AudioSegment.from_file(audio_path)
total_duration = len(audio) / 1000 # 转换为秒
chunks = []
chunk_length_ms = chunk_duration * 1000
for i in range(0, int(total_duration), chunk_duration):
start_time = i * 1000
end_time = min((i + chunk_duration) * 1000, len(audio))
chunk = audio[start_time:end_time]
# 保存分块
chunk_filename = f"chunk_{i//chunk_duration}.wav"
chunk_path = os.path.join(TEMP_DIR, chunk_filename)
chunk.export(chunk_path, format="wav")
chunks.append({
"path": chunk_path,
"start_time": i,
"end_time": i + chunk_duration,
"duration": chunk_duration
})
print(f"音频分割完成,共{len(chunks)}个片段")
return chunks
except Exception as e:
print(f"音频分割失败: {str(e)}")
return []
3. speech_recognition.py - 语音识别模块
# -*- coding: utf-8 -*-
"""
语音识别模块 - 使用Whisper模型进行语音转文字
支持多种方言识别
"""
import whisper
import torch
import os
from config import ASR_CONFIG, TEMP_DIR
class SpeechRecognizer:
def __init__(self, model_size="base"):
"""
初始化语音识别器
Args:
model_size (str): 模型大小 (tiny, base, small, medium, large)
"""
self.model_size = model_size
self.model = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {self.device}")
self._load_model()
def _load_model(self):
"""加载Whisper模型"""
try:
print(f"正在加载Whisper {self.model_size}模型...")
self.model = whisper.load_model(self.model_size, device=self.device)
print("模型加载完成")
except Exception as e:
print(f"模型加载失败: {str(e)}")
def detect_language(self, audio_path):
"""
检测音频的语言和方言
Args:
audio_path (str): 音频文件路径
Returns:
dict: 检测结果
"""
if self.model is None:
print("模型未加载")
return None
try:
# 使用Whisper检测语言
result = self.model.transcribe(audio_path, language=None, task="transcribe")
detected_info = {
"language": result.get("language", "unknown"),
"confidence": result.get("language_probs", {}).get(result.get("language"), 0),
"text": result.get("text", ""),
"segments": result.get("segments", [])
}
print(f"检测到语言: {detected_info['language']}, 置信度: {detected_info['confidence']:.2f}")
return detected_info
except Exception as e:
print(f"语言检测失败: {str(e)}")
return None
def recognize_speech(self, audio_path, dialect_hint=None):
"""
识别语音为文字
Args:
audio_path (str): 音频文件路径
dialect_hint (str): 方言提示
Returns:
dict: 识别结果
"""
if self.model is None:
print("模型未加载")
return None
try:
# 设置识别选项
options = {
"language": dialect_hint if dialect_hint else ASR_CONFIG["language"],
"task": "transcribe",
"verbose": False
}
# 执行语音识别
result = self.model.transcribe(audio_path, **options)
recognition_result = {
"full_text": result["text"].strip(),
"segments": [],
"language": result.get("language", "unknown")
}
# 处理分段信息
for segment in result["segments"]:
seg_info = {
"start": segment["start"],
"end": segment["end"],
"text": segment["text"].strip(),
"confidence": segment.get("avg_logprob", 0)
}
recognition_result["segments"].append(seg_info)
print(f"语音识别完成,识别到 {len(recognition_result['segments'])} 个片段")
return recognition_result
except Exception as e:
print(f"语音识别失败: {str(e)}")
return None
def batch_recognize(self, audio_chunks):
"""
批量识别多个音频片段
Args:
audio_chunks (list): 音频片段列表
Returns:
list: 识别结果列表
"""
results = []
for i, chunk in enumerate(audio_chunks):
print(f"正在识别第 {i+1}/{len(audio_chunks)} 个片段...")
result = self.recognize_speech(chunk["path"])
if result:
# 添加时间偏移
for seg in result["segments"]:
seg["start"] += chunk["start_time"]
seg["end"] += chunk["start_time"]
results.append({
"chunk_index": i,
"chunk_info": chunk,
"recognition_result": result
})
return results
4. dialect_translator.py - 方言翻译模块
# -*- coding: utf-8 -*-
"""
方言翻译模块 - 将方言翻译为标准普通话
使用预训练的翻译模型和规则库
"""
import re
import json
from config import TRANSLATION_CONFIG, SUPPORTED_DIALECTS
class DialectTranslator:
def __init__(self):
# 方言词汇映射表
self.dialect_mappings = self._load_dialect_mappings()
# 简单的翻译模型(实际应用中可以使用更复杂的NMT模型)
self.translation_model = None
def _load_dialect_mappings(self):
"""加载方言词汇映射表"""
mappings = {
"cantonese": {
"咩": "什么",
"嘅": "的",
"係": "是",
"唔": "不",
"俾": "给",
"佢": "他/她",
"哋": "们",
"咗": "了",
"嚟": "来",
"睇": "看",
"食": "吃",
"饮": "喝",
"瞓": "睡",
"倾": "聊",
"返": "回",
"做": "干/做"
},
"shanghainese": {
"侬": "你",
"阿拉": "我们",
"伊": "他/她",
"勿": "不",
"覅": "不要",
"交关": "非常",
"适意": "舒服",
"邪气": "很",
"老": "很",
"结棍": "厉害",
"嗲": "好/棒"
},
"sichuan": {
"啥子": "什么",
"要得": "好的",
"巴适": "舒服/好",
"安逸": "舒服",
"晓得": "知道",
"不存在": "没关系",
"摆龙门阵": "聊天",
"雄起": "加油",
"瓜娃子": "傻瓜",
"幺妹": "年轻女子"
},
"chongqing": {
"啷个": "怎么",
"要得": "好的",
"巴适": "很好",
"安逸": "舒服",
"晓得": "知道",
"不存在": "没关系",
"摆龙门阵": "聊天",
"崽儿": "小伙子",
"妹儿": "女孩",
"火锅": "火锅"
}
}
return mappings
def detect_dialect_from_text(self, text, confidence_threshold=0.7):
"""
从文本中检测可能的方言
Args:
text (str): 待检测文本
confidence_threshold (float): 置信度阈值
Returns:
dict: 检测结果
"""
dialect_scores = {}
for dialect, mapping in self.dialect_mappings.items():
score = 0
matched_words = []
for word, translation in mapping.items():
if word in text:
score += 1
matched_words.append(word)
if score > 0:
dialect_scores[dialect] = {
"score": score,
"matched_words": matched_words,
"confidence": min(score / len(text.split()) * 10, 1.0) # 简化的置信度计算
}
# 找出得分最高的方言
if dialect_scores:
best_dialect = max(dialect_scores.items(), key=lambda x: x[1]["confidence"])
if best_dialect[1]["confidence"] >= confidence_threshold:
return {
"detected_dialect": best_dialect[0],
"confidence": best_dialect[1]["confidence"],
"matched_words": best_dialect[1]["matched_words"]
}
return {"detected_dialect": "unknown", "confidence": 0.0, "matched_words": []}
def translate_text(self, text, source_dialect=None):
"""
翻译方言文本为标准普通话
Args:
text (str): 待翻译文本
source_dialect (str): 源方言类型
Returns:
dict: 翻译结果
"""
if not source_dialect or source_dialect == "unknown":
# 自动检测方言
detection_result = self.detect_dialect_from_text(text)
source_dialect = detection_result["detected_dialect"]
if source_dialect == "unknown" or source_dialect not in self.dialect_mappings:
return {
"original_text": text,
"translated_text": text,
"dialect": source_dialect,
"changes": []
}
# 执行词汇替换翻译
translated_text = text
changes = []
mapping = self.dialect_mappings[source_dialect]
for dialect_word, mandarin_word in mapping.items():
if dialect_word in translated_text:
# 记录替换
changes.append({
"original": dialect_word,
"translation": mandarin_word,
"position": translated_text.find(dialect_word)
})
# 执行替换
translated_text = translated_text.replace(dialect_word, mandarin_word)
result = {
"original_text": text,
"translated_text": translated_text,
"dialect": source_dialect,
"changes": changes,
"change_count": len(changes)
}
return result
def batch_translate(self, recognition_results):
"""
批量翻译识别结果
Args:
recognition_results (list): 语音识别结果列表
Returns:
list: 翻译结果列表
"""
translation_results = []
for result in recognition_results:
segments = result["recognition_result"]["segments"]
translated_segments = []
for segment in segments:
original_text = segment["text"]
translation_result = self.translate_text(original_text)
translated_segment = {
"start": segment["start"],
"end": segment["end"],
"original_text": original_text,
"translated_text": translation_result["translated_text"],
"dialect": translation_result["dialect"],
"confidence": segment["confidence"]
}
translated_segments.append(translated_segment)
translation_results.append({
"chunk_index": result["chunk_index"],
"chunk_info": result["chunk_info"],
"translated_segments": translated_segments
})
return translation_results
5. subtitle_generator.py - 字幕生成模块
# -*- coding: utf-8 -*-
"""
字幕生成模块 - 生成SRT格式字幕文件和带字幕的视频
"""
import os
from datetime import timedelta
import subprocess
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip
from config import SUBTITLE_CONFIG, OUTPUT_DIR
class SubtitleGenerator:
def __init__(self):
self.font_size = SUBTITLE_CONFIG["font_size"]
self.font_color = SUBTITLE_CONFIG["font_color"]
self.background_color = SUBTITLE_CONFIG["background_color"]
self.position = SUBTITLE_CONFIG["position"]
self.max_chars_per_line = SUBTITLE_CONFIG["max_chars_per_line"]
def seconds_to_srt_time(self, seconds):
"""
将秒数转换为SRT时间格式
Args:
seconds (float): 秒数
Returns:
str: SRT时间格式字符串
"""
td = timedelta(seconds=seconds)
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
milliseconds = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def generate_srt_content(self, translated_results):
"""
生成SRT字幕文件内容
Args:
translated_results (list): 翻译结果列表
Returns:
str: SRT格式的字幕内容
"""
srt_content = []
subtitle_index = 1
for result in translated_results:
for segment in result["translated_segments"]:
start_time = self.seconds_to_srt_time(segment["start"])
end_time = self.seconds_to_srt_time(segment["end"])
text = segment["translated_text"]
# 处理长文本换行
wrapped_text = self._wrap_text(text)
srt_entry = f"{subtitle_index}\n{start_time} --> {end_time}\n{wrapped_text}\n"
srt_content.append(srt_entry)
subtitle_index += 1
return "\n".join(srt_content)
def _wrap_text(self, text):
"""
处理文本换行
Args:
text (str): 原始文本
Returns:
str: 处理后的文本
"""
if len(text) <= self.max_chars_per_line:
return text
words = text.split()
lines = []
current_line = []
current_length = 0
for word in words:
if current_length + len(word) + 1 <= self.max_chars_per_line:
current_line.append(word)
current_length += len(word) + 1
else:
lines.append(" ".join(current_line))
current_line = [word]
current_length = len(word)
if current_line:
lines.append(" ".join(current_line))
return "\n".join(lines)
def save_srt_file(self, srt_content, filename="subtitles.srt"):
"""
保存SRT字幕文件
Args:
srt_content (str): SRT内容
filename (str): 文件名
Returns:
str: 文件路径
"""
filepath = os.path.join(OUTPUT_DIR, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(srt_content)
print(f"SRT字幕文件已保存: {filepath}")
return filepath
def create_subtitle_clips(self, translated_results, video_duration):
"""
创建字幕剪辑对象
Args:
translated_results (list): 翻译结果
video_duration (float): 视频总时长
Returns:
list: 字幕剪辑列表
"""
subtitle_clips = []
for result in translated_results:
for segment in result["translated_segments"]:
start_time = segment["start"]
end_time = segment["end"]
text = segment["translated_text"]
关注我,有更多实用程序等着你!