本系列写到第三篇,算是把 字幕音画同步 一条小路走成了能通车的土路。前两篇里,我们像修理工一样,拿着扳手到处拧螺丝:哪段音画差十几秒,就补哪段;哪段变速后变调刺耳,就换个算法重算。最终,一条 23 分钟的片子从肉眼可见的十几秒漂移,收敛到 200 ms 左右——对工程原型来说,算能交差。
但“能跑”和“好用”之间,还差一次彻底的梳理。这篇不打算再炫技,只想把整套做法摊开来,让你看清:
- 我们到底在解决什么问题?
- 为了搞定它,我们准备了哪几条“策略路线”?
- 真正落地的代码长什么样?为什么长成这样?
如果你已经看过前两篇,可以把本文当作“设计说明书 + 踩坑记录”。如果没看过,直接从这里开始也不影响——所有关键信息都会重新讲一遍。
问题的本质:一句话,时间对不上
给中文视频配英文音或其他语言例如俄语 德语,最常见的麻烦是“语速不同”。同一句台词,中文 3 秒,英文 4 秒。画面里的人闭嘴了,声音还在说——观众立刻出戏。
我们能做的只有两件事:
- 让声音快一点(收)。
- 让画面慢一点(放)。
两者都有副作用:
- 收过头,声音尖得刺耳。
- 放过头,动作慢得像回放。
于是,问题变成了:如何“收”“放”结合,把副作用降到最低。
四条策略路线
我们把可能的打法拆成四种“模式”,在代码里用四个分支实现。你可以按内容类型一键切换。
模式 | 核心思想 | 适用场景 | 备注 |
---|---|---|---|
压力共担:同时音频加速视频慢速 | 音画各让一步,失真均摊 | 普通对话、新闻 | 默认推荐 |
画面让步:仅视频慢速 | 死保音质,牺牲画面 | 音乐 MV、高品质旁白 | 最多慢放 10 倍 |
声音迁就:仅音频加速 | 死保画面,牺牲音质 | 舞蹈、动作片 | 不限加速倍数 |
原汁原味:不做任何变速 | 不变速,纯拼接 | 用户强制要求 | 末尾补静帧或静音 |
后面所有代码,都围绕“怎么在一条流水线里同时支持这四种玩法”展开。
从蓝图到现实:三次大改
V1:直接拼接——误差滚雪球
最早的做法很简单:
- 算完每段该多长,
- 用 FFmpeg 切出来,
- 一段段接在一起。
跑 5 分钟短片看不出问题;跑 23 分钟,误差滚到 13 秒——浮点误差、帧率取整、时间基差异,全都跑出来。
V2:理论模型——误差变小,但没根除
我们引入“动态时间偏移”:
- 每段起点不再依赖前一段的实际结果,
- 而是用一个公式算“理论起点”。
误差从 13 秒降到 3 秒,依然不够。
V3:物理现实优先——误差收敛到 200 ms
彻底放弃预测,直接“测出来”:
- 每生成一个视频片段,立即用 ffprobe 量真实时长,
- 音频完全按这份“实测蓝图”拼接。
这一步之后,23分钟视频第一次稳在了 200 ms 以内,2个小时视频误差可控在1s左右,尚可接受。
核心流程拆解
下面把 SpeedRate
类的主要步骤再过一遍。
入口 run()
:先分流
- 如果用户选了“原汁原味”,直接
_run_no_rate_change_mode()
,一个独立分支,跟后面复杂逻辑互不干扰。 - 否则,走完整流水线:准备数据 → 计算调整 → 处理音频 → 处理视频 → 重建音频 → 导出。
_prepare_data()
:打地基
- 读帧率,算“原始时长”,算“字幕间空白”。
- 这些数据后面每一步都会用,提前算好,避免重复劳动。
_calculate_adjustments()
:做决策
按四种模式算“理论目标时长”。这一步只算数,不动文件。
_execute_audio_speedup()
:动手改音频
- 用 pydub.speedup 按倍率处理。
- 处理完再“剪一刀”保证误差 < 10 ms。
_execute_video_processing()
:动手改视频
- 先把整段切成小片段,统一编码成中间格式,避免拼接花屏。
- 每切完一段立即量“真实时长”,写回字典,供后面音频对齐。
_recalculate_timeline_and_merge_audio()
:按实测结果拼音频
- 不再看原始字幕时长,只看“视频真实时长”。
- 视频长了,音频补静音;视频短了,音频剪掉尾巴。
_finalize_files()
:最终对齐
- 音视频总长对不上时,用补静音或定格最后一帧兜底。
代码骨架速览
下面这段伪代码概括了主流程,方便快速定位:
def run():
if 不变速:
纯净拼接()
return
准备数据()
计算理论时长()
音频变速()
视频变速并测真实时长()
按真实时长重建音频()
最终对齐导出()
真正的实现散落在十几个小函数里,每个函数只做一件事,名字就是动词:_cut
, _concat
, _export
……阅读时顺着调用链往下点即可。
踩过的坑
- 拼接花屏:不同视频片段如果帧率、色彩空间不一致(在启用FFmpeg硬件加速时很可能会出现),直接 concat 会花屏。我们用“中间格式”统一参数,再无损拼接。
- 音频重采样噪声:为了对齐,曾尝试把 所有配音片段统一重采样到 44.1 kHz 再归一化,结果底噪明显,折腾很久也无法彻底消除。最后放弃,宁可剪静音。
- PTS 上限:FFmpeg 的
setpts
超过 10 极容易失败而且视频慢的如同幻灯片,不具实用性,因此强加硬限制,宁可再剪音频。
怎么用
把 SpeedRate
当普通类用:
sr = SpeedRate(
queue_tts=字幕队列,
shoud_audiorate=True,
shoud_videorate=True,
novoice_mp4=无声视频路径,# ffmpeg -i 视频 -an 无声视频.mp4
uuid=随机串,
cache_folder=临时目录
)
sr.run()
参数说明:
queue_tts
:每条字幕的字典列表。
[
{'line': 33, 'start_time': 131170, 'end_time': 132250, 'startraw': '00:02:11,170', 'endraw': '00:02:12,250', 'time': '00:02:11,170 --> 00:02:12,250','filename':'配音片段文件地址'}
...
]
shoud_audiorate / shoud_videorate
:布尔开关,决定走哪条策略。- 其余路径类参数按实际给即可。
小结
这套方案最大的价值,不在算法多先进,而在“可落地”:
- 用四种策略覆盖绝大多数内容类型;
- 用“实测对齐”解决浮点误差;
- 用“中间格式”解决拼接稳定性;
- 用“短函数 + 明确命名”降低维护难度。
完整代码(含详细注释)
import os
import shutil
import time
from pathlib import Path
import json
from pydub import AudioSegment
from videotrans.configure import config
from videotrans.util import tools
class SpeedRate:
"""
通过音频加速和视频慢放来对齐翻译配音和原始视频时间轴。
主要实现原理
# 功能概述, 使用python3开发视频翻译功能:
1. 即A语言发音的视频,分离出无声画面视频文件和音频文件,使用语音识别对音频文件识别出原始字幕后,将该字幕翻译翻译为B语言的字幕,再将该B语言字幕配音为B语言配音,然后将B语言字幕和B语言配音同A分离出的无声视频,进行音画同步对齐和合并为新视频。
2. 当前正在做的这部分就是“配音、字幕、视频对齐”,B语言字幕是逐条配音的,每条字幕的配音生成一个mp3音频文件。
3. 因为语言不同,因此每条配音可能大于该条字幕的时间,例如该条字幕时长是3s,配音后的mp3时长如果小于等于3s,则不影响,但如果配音时长大于3s,则有问题,需要通过将音频片段自动加速到3s实现同步。也可以通过将该字幕的原始字幕所对应原始视频该片段截取下来,慢速播放延长该视频时长直到匹配配音时长,实现对齐。当然也可以同时 音频自动加速 和 视频慢速,从而避免音频加速太多或视频慢速太多。
# 具体音画同步原理说明
## 音频和视频同时启用时的策略
1. 如果配音时长 小于 当前片段的原始字幕时长,则无需音频加速和视频慢速
2. 如果配音时长 大于 当前片段的原始字幕时长,则判断音频时长缩短到和原始字幕时长一致时,需要的加速倍数是多少,
- 如果该倍数 小于等于 1.5,则照此对配音加速即可,无需视频慢速处理
- 如果该倍数 大于 1.5,则将 原始字幕时长 加上 和 下条字幕开始时间之间的静默时间(该静默可能时0,也可能小于或大于`self.MIN_CLIP_DURATION_MS`,如果最后一条字幕,可能到视频结尾还有静默区间),记为 total_a
* 如果该时长 total_a 大于 配音时长,则配音无需加速,自然播放完毕即可,视频也无需慢速,注意因此导致的时间轴变化和对视频裁切的影响
* 如果该时长 total_a 小于配音时长,则计算将配音时长缩短到 total_a 时,需要的加速倍数
- 如果该倍数 小于等于 1.5,则照此加速音频即可,无需视频慢速,注意因此导致的时间轴变化和对视频裁切的影响
- 如果该倍数 大于1.5,则按之前逻辑,音频加速和视频慢速各自负担一半
## 仅仅使用音频加速时
1. 如果配音时长 小于 当前片段的原始字幕时长,则无需音频加速
2. 如果配音时长 大于 当前片段的原始字幕时长,则计算将音频缩短到和原始字幕时长一致时,所需的加速倍数是多少,
- 如果该倍数 小于等于 1.5,则照此对配音加速即可
- 如果该倍数 大于 1.5,则将原始字幕时长加上 和 下条字幕开始时间之间的静默时间(可能时0,也可能小于或大于`self.MIN_CLIP_DURATION_MS`,如果最后一条字幕,可能到视频结尾还有静默区间),记为 total_b
* 如果该时长 total_b 大于配音时长,则将配音无需加速,自然播放完毕即可,total_b 在容下配音后如果还有剩余空间则使用静音填充。
* 如果该时长 total_b 仍小于配音时长,则无视倍数,强制将配音时长缩短到total_a
3. 注意开头和结尾以及字幕之间的静默区间,尤其是利用后可能还剩余的静默空间,最终合成后的音频长度,在存在视频时(self.novoice_mp4) 长度应等于视频长度,在不存在时,长度应等于从0到最后一条字幕的结束时间。
## 仅仅视频慢速时
1. 如果配音时长 小于 当前片段的原始字幕时长,则无需视频慢速,直接从本条字幕开始时间裁切到下条字幕开始时间,如果这是第一条字幕,则从0时间开始裁切
2. 如果配音时长 大于 当前片段的原始字幕时长,则判断原始字幕时长加上 和 下条字幕开始时间之间的静默时间(可能时0,也可能小于或大于`self.MIN_CLIP_DURATION_MS`,如果最后一条字幕,可能到视频结尾还有静默区间),记为 total_c
* 如果该时长 total_c 大于配音时长,则无需视频慢速,自然播放完毕即可,此时应裁切 total_c 时长的视频片段,即裁切到到下条字幕开始时间,而且无需慢速处理,同样如果这是第一条字幕,则从0时间开始裁切
* 如果该时长 total_c 仍小于配音时长,强制将视频片段(时长为total_a) 慢速延长到和配音等长,此处注意下,如果PTS倍数大于10,可能失败,因此PTS最大为10,如果到10了,仍短于配音时长,则设PTS=10,并将配音时长强制缩短到和慢速后的视频一样长。
3. 裁切需注意第一条字幕前的区域(开始时间可能大于0)和最后一条字幕后的区域(结束时间可能不到视频末尾)
4. 无需慢速处理的片段,直接裁切本条字幕开始时间到下条字幕开始时间,无需单独区分静默,因为均无慢速。
5. 需要慢速处理的片段,则需要注意其后的静默空间问题,避免导致丢失视频片段
## 没有 `音频加速`也没有`视频慢速`时
- 第一步按字幕拼接音频
1. 如果第一条字幕不是从0开始的,则前面填充静音。
2. 如果本条字幕开始时间到下条字幕开始时间,这个时长 大于 等于本条配音时长,则直接拼接该配音文件,若差值大于0,即还有富裕空间则后面填充静音。
3. 如果本条字幕开始时间到下条字幕开始时间,这个时长小于 本条配音时长,则直接拼接,无需其他处理
4. 如果是最后一条字幕,则直接将该配音片段拼接上即可,无需判断后边是否还有空间。
- 第二步查看是否存在视频文件
1. elf.novoice_mp4 is not None, 并且该文件存在,则为存在视频,此时比较合并后的音频时长和视频时长
- 如果音频时长 小于 视频时长,则音频末尾填充静音直到长度一致
- 如果音频时长 大于 视频时长,则视频最后定格延长,直到和音频时长一致
2. 如果不存在视频文件,则无需其他处理
===============================================================================================
"""
MIN_CLIP_DURATION_MS = 50
def __init__(self,
*,
queue_tts=None,
shoud_videorate=False,
shoud_audiorate=False,
uuid=None,
novoice_mp4=None,
raw_total_time=0,
noextname=None,
target_audio=None,
cache_folder=None
):
self.noextname=noextname
self.raw_total_time=raw_total_time
self.queue_tts = queue_tts
self.shoud_videorate = shoud_videorate
self.shoud_audiorate = shoud_audiorate
self.uuid = uuid
self.novoice_mp4_original = novoice_mp4
self.novoice_mp4 = novoice_mp4
self.cache_folder = cache_folder if cache_folder else Path(f'{config.TEMP_DIR}/{str(uuid if uuid else time.time())}').as_posix()
Path(self.cache_folder).mkdir(parents=True, exist_ok=True)
self.target_audio_original = target_audio
self.target_audio = Path(f'{self.cache_folder}/final_audio{Path(target_audio).suffix}').as_posix()
self.max_audio_speed_rate = 100
self.max_video_pts_rate = 10
self.source_video_fps = 30
config.logger.info(f"SpeedRate 初始化。音频加速: {self.shoud_audiorate}, 视频慢速: {self.shoud_videorate}")
def run(self):
# =========================================================================================
# 如果既不加速音频也不慢放视频
if not self.shoud_audiorate and not self.shoud_videorate:
config.logger.info("检测到未启用音视频变速,进入纯净拼接模式。")
self._run_no_rate_change_mode()
return self.queue_tts
# 否则,执行加减速同步流程
self._prepare_data()
self._calculate_adjustments()
self._execute_audio_speedup()
clip_meta_list_with_real_durations = self._execute_video_processing()
merged_audio = self._recalculate_timeline_and_merge_audio(clip_meta_list_with_real_durations)
if merged_audio:
self._finalize_files(merged_audio)
return self.queue_tts
def _run_no_rate_change_mode(self):
"""
模式四:“纯净拼接”的完整实现。
1. 准备数据。
2. `last_end_time` 精确测量并填充字幕间的静音。
3. 循环中,拼接配音,然后根据“可用空间”和“配音时长”的关系,决定如何填充后续静音。
4. 所有片段拼接完后,调用通用的 `_finalize_files` 方法来处理与视频的最终对齐。
"""
process_text = "[纯净模式] 正在拼接音频..." if config.defaulelang == 'zh' else "[Pure Mode] Merging audio..."
tools.set_process(text=process_text, uuid=self.uuid)
config.logger.info("================== [纯净模式] 开始处理 ==================")
# 确保基础数据已准备
self._prepare_data()
merged_audio = AudioSegment.empty()
last_end_time = 0
# 第一步:按字幕拼接音频
for i, it in enumerate(self.queue_tts):
# 1. 填充字幕前的静音
silence_duration = it['start_time_source'] - last_end_time
if silence_duration > 0:
merged_audio += AudioSegment.silent(duration=silence_duration)
config.logger.info(f"字幕[{it['line']}]前,填充静音 {silence_duration}ms")
# 加载配音片段
segment = None
if tools.vail_file(it['filename']):
try:
segment = AudioSegment.from_file(it['filename'])
except Exception as e:
config.logger.error(f"字幕[{it['line']}] 加载音频文件 {it['filename']} 失败: {e},将忽略此片段。")
else:
config.logger.warning(f"字幕[{it['line']}] 配音文件不存在: {it['filename']},将忽略此片段。")
if not segment:
last_end_time = it['end_time_source'] # 即使音频不存在,也要推进时间轴
continue
# 更新字幕的新时间戳
it['start_time'] = len(merged_audio)
it['end_time'] = it['start_time'] + len(segment)
it['startraw'], it['endraw'] = tools.ms_to_time_string(ms=it['start_time']), tools.ms_to_time_string(ms=it['end_time'])
merged_audio += segment
config.logger.info(f"字幕[{it['line']}] 已拼接,配音时长: {len(segment)}ms, 新时间区间: {it['start_time']}-{it['end_time']}")
# 2. & 3. 填充配音后的静音(如果适用)
if i < len(self.queue_tts) - 1:
next_start_time = self.queue_tts[i+1]['start_time_source']
available_space = next_start_time - it['start_time_source']
if available_space >= len(segment):
remaining_silence = available_space - len(segment)
if remaining_silence > 0:
merged_audio += AudioSegment.silent(duration=remaining_silence)
config.logger.info(f"字幕[{it['line']}]后,填充剩余静音 {remaining_silence}ms")
last_end_time = next_start_time
else:
# 配音时长 > 可用空间,直接连接下一个,时间轴自然被推后
last_end_time = it['start_time_source'] + len(segment)
else:
# 4. 最后一条字幕,后面不再填充静音
last_end_time = it['end_time']
# 第二步:检查视频文件并对齐
self._finalize_files(merged_audio)
config.logger.info("================== [纯净模式] 处理完成 ==================")
def _prepare_data(self):
"""
此阶段为所有后续计算提供基础数据。关键是计算出 `source_duration` (原始时长)
和 `silent_gap` (与下一条字幕的静默间隙),这是所有策略判断的依据。
同时,`final_video_duration_real` 字段也被初始化。
:return:
"""
tools.set_process(text="[1/5] 准备数据..." if config.defaulelang == 'zh' else "[1/5] Preparing data...", uuid=self.uuid)
config.logger.info("================== [阶段 1/5] 准备数据 ==================")
if self.novoice_mp4_original and tools.vail_file(self.novoice_mp4_original):
try: self.source_video_fps = tools.get_video_info(self.novoice_mp4_original, video_fps=True) or 30
except Exception as e: config.logger.warning(f"无法探测源视频帧率,将使用默认值30。错误: {e}"); self.source_video_fps = 30
config.logger.info(f"源视频帧率被设定为: {self.source_video_fps}")
for it in self.queue_tts:
it['start_time_source'] = it['start_time']
it['end_time_source'] = it['end_time']
it['source_duration'] = it['end_time_source'] - it['start_time_source']
it['dubb_time'] = self._get_audio_time_ms(it['filename'], line=it['line'])
it['final_audio_duration_theoretical'] = it['dubb_time']
it['final_video_duration_theoretical'] = it['source_duration']
# 用于存储探测到的物理时长
it['final_video_duration_real'] = it['source_duration']
for i, it in enumerate(self.queue_tts):
if i < len(self.queue_tts) - 1:
it['silent_gap'] = self.queue_tts[i + 1]['start_time_source'] - it['end_time_source']
else:
it['silent_gap'] = self.raw_total_time - it['end_time_source']
it['silent_gap'] = max(0, it['silent_gap'])
def _calculate_adjustments(self):
"""
- `if self.shoud_audiorate and self.shoud_videorate:` 音频加速和视频慢速同时启用。
- `elif self.shoud_audiorate:` 仅仅音频加速。
- `elif self.shoud_videorate:` 仅仅视频慢速。
里面的嵌套 `if` 则实现了更精细的策略,如“优先利用间隙”、“温和调整优先”等。
最终,它会为每个需要调整的片段计算出一个“理论目标时长”。
:return:
"""
tools.set_process(text="[2/5] 计算调整方案..." if config.defaulelang == 'zh' else "[2/5] Calculating adjustments...", uuid=self.uuid)
config.logger.info("================== [阶段 2/5] 计算调整方案 ==================")
for i, it in enumerate(self.queue_tts):
config.logger.info(f"--- 开始分析字幕[{it['line']}] ---")
dubb_duration = it['dubb_time']
source_duration = it['source_duration']
if source_duration <= 0:
it['final_video_duration_theoretical'] = 0
it['final_audio_duration_theoretical'] = 0
config.logger.warning(f"字幕[{it['line']}] 原始时长为0,跳过处理。")
continue
silent_gap = it['silent_gap']
block_source_duration = source_duration + silent_gap
config.logger.debug(f"字幕[{it['line']}]:原始数据:配音时长={dubb_duration}ms, 字幕时长={source_duration}ms, 静默间隙={silent_gap}ms, 片段块总长={block_source_duration}ms")
# 如果音频可以被原始时段容纳,则无需处理
if dubb_duration <= source_duration:
config.logger.info(f"字幕[{it['line']}]:配音({dubb_duration}ms) <= 字幕({source_duration}ms),无需调整。")
it['final_video_duration_theoretical'] = source_duration
it['final_audio_duration_theoretical'] = dubb_duration
continue
target_duration = dubb_duration
if self.shoud_audiorate and self.shoud_videorate:
config.logger.debug(f"字幕[{it['line']}]:进入[音视频结合]决策模式。")
speed_to_fit_source = dubb_duration / source_duration
if speed_to_fit_source <= 1.5:
config.logger.info(f"字幕[{it['line']}]:[决策] 仅需音频加速(倍率{speed_to_fit_source:.2f} <= 1.5),视频不慢放。")
target_duration = source_duration
elif block_source_duration >= dubb_duration:
config.logger.info(f"字幕[{it['line']}]:[决策] 利用静默间隙即可容纳配音,音视频均不变速。")
target_duration = dubb_duration
else:
speed_to_fit_block = dubb_duration / block_source_duration
if speed_to_fit_block <= 1.5:
config.logger.info(f"字幕[{it['line']}]:[决策] 音频加速填满片段块即可(倍率{speed_to_fit_block:.2f} <= 1.5)。")
target_duration = block_source_duration
else:
config.logger.info(f"字幕[{it['line']}]:[决策] 倍率({speed_to_fit_block:.2f}) > 1.5,音视频共同承担调整。")
over_time = dubb_duration - block_source_duration
video_extension = over_time / 2
target_duration = int(block_source_duration + video_extension)
elif self.shoud_audiorate:
config.logger.debug(f"字幕[{it['line']}]:进入[仅音频加速]决策模式。")
speed_to_fit_source = dubb_duration / source_duration
if speed_to_fit_source <= 1.5:
target_duration = source_duration
elif block_source_duration >= dubb_duration:
target_duration = dubb_duration
else:
target_duration = block_source_duration
elif self.shoud_videorate:
config.logger.debug(f"字幕[{it['line']}]:进入[仅视频慢速]决策模式。")
if block_source_duration >= dubb_duration:
target_duration = dubb_duration
else:
target_duration = dubb_duration
if self.shoud_videorate:
pts_ratio = target_duration / source_duration
if pts_ratio > self.max_video_pts_rate:
config.logger.warning(f"字幕[{it['line']}]:计算出的PTS({pts_ratio:.2f})超过最大值({self.max_video_pts_rate}),已强制修正。")
target_duration = int(source_duration * self.max_video_pts_rate)
it['final_video_duration_theoretical'] = target_duration
it['final_audio_duration_theoretical'] = target_duration
config.logger.info(f"字幕[{it['line']}]:[最终方案] 理论目标音视频时长统一为: {target_duration}ms")
def _execute_audio_speedup(self):
"""
1. 遍历所有字幕,检查 `dubb_time` 是否大于 `final_audio_duration`。
2. 对于需要处理的音频,计算出精确的加速倍率。
3. 使用 `pydub.speedup` 执行变速。
4. **精度微调**: 变速后,使用切片操作 (`[:target_duration_ms]`) 对音频进行微调,确保其最终时长与目标值的误差在10ms以内。
5. 用处理后的真实时长更新 `it['dubb_time']`。
:return:
"""
tools.set_process(text="[3/5] 处理音频..." if config.defaulelang == 'zh' else "[3/5] Processing audio...", uuid=self.uuid)
config.logger.info("================== [阶段 3/5] 执行音频加速 ==================")
for it in self.queue_tts:
target_duration_ms = int(it['final_audio_duration_theoretical'])
if it['dubb_time'] > target_duration_ms and tools.vail_file(it['filename']):
try:
current_duration_ms = it['dubb_time']
if target_duration_ms <= 0 or current_duration_ms - target_duration_ms < 10:
continue
speedup_ratio = current_duration_ms / target_duration_ms
if speedup_ratio < 1.01: continue
if speedup_ratio > self.max_audio_speed_rate:
config.logger.warning(f"字幕[{it['line']}]:计算出的音频加速倍率({speedup_ratio:.2f})超过限制({self.max_audio_speed_rate}),已强制应用最大值。")
speedup_ratio = self.max_audio_speed_rate
config.logger.info(f"字幕[{it['line']}]:[执行] 音频加速,倍率={speedup_ratio:.2f} (从 {current_duration_ms}ms -> {target_duration_ms}ms)")
audio = AudioSegment.from_file(it['filename'])
fast_audio = audio.speedup(playback_speed=speedup_ratio)
if len(fast_audio) > target_duration_ms: fast_audio = fast_audio[:target_duration_ms]
fast_audio.export(it['filename'], format=Path(it['filename']).suffix[1:])
it['dubb_time'] = self._get_audio_time_ms(it['filename'], line=it['line'])
except Exception as e:
config.logger.error(f"字幕[{it['line']}]:音频加速失败 {it['filename']}: {e}")
def _execute_video_processing(self):
"""
视频处理阶段
它的主要任务不再仅仅是处理视频,而是“测量物理现实”。
1. `_create_clip_meta`:创建一个包含所有裁切任务的“蓝图”。
2. 循环遍历蓝图,调用 `_cut_to_intermediate` 生成每个视频片段。
3. **关键一步**:片段生成后,`real_duration_ms = tools.get_video_duration(task['out'])`
这行代码就是“物理探测仪”,它测量出片段的真实时长。
4. 将真实时长存回任务元数据中,供后续的音频重建阶段使用。
:return:
"""
tools.set_process(text="[4/5] 处理视频并探测真实时长..." if config.defaulelang == 'zh' else "[4/5] Processing video & probing real durations...", uuid=self.uuid)
config.logger.info("================== [阶段 4/5] 执行视频处理并探测真实时长 ==================")
if not self.shoud_videorate or not self.novoice_mp4_original:
return None
clip_meta_list = self._create_clip_meta()
for task in clip_meta_list:
if config.exit_soft: return None
pts_param = str(task['pts']) if task.get('pts', 1.0) > 1.01 else None
self._cut_to_intermediate(ss=task['ss'], to=task['to'], source=self.novoice_mp4_original, pts=pts_param, out=task['out'])
real_duration_ms = 0
if Path(task['out']).exists() and Path(task['out']).stat().st_size > 0:
real_duration_ms = tools.get_video_duration(task['out'])
task['real_duration_ms'] = real_duration_ms
if task['type'] == 'sub':
sub_item = self.queue_tts[task['index']]
sub_item['final_video_duration_real'] = real_duration_ms
config.logger.info(f"字幕[{task['line']}] 视频片段处理完成。理论时长: {sub_item['final_video_duration_theoretical']}ms, 物理探测时长: {real_duration_ms}ms")
else:
config.logger.info(f"间隙片段 {Path(task['out']).name} 处理完成。物理探测时长: {real_duration_ms}ms")
self._concat_and_finalize(clip_meta_list)
return clip_meta_list
def _create_clip_meta(self):
"""
- 遍历字幕,将每个“字幕”和其前后的“有效间隙”都创建为一个独立的裁切任务。
- 计算每个字幕片段最终的PTS值:`final_video_duration / source_duration`。
:return:
"""
clip_meta_list = []
if not self.queue_tts: return []
if self.queue_tts[0]['start_time_source'] > self.MIN_CLIP_DURATION_MS:
clip_path = Path(f'{self.cache_folder}/00000_first_gap.mp4').as_posix()
clip_meta_list.append({"type": "gap", "out": clip_path, "ss": 0, "to": self.queue_tts[0]['start_time_source'], "pts": 1.0})
for i, it in enumerate(self.queue_tts):
if i > 0:
gap_start = self.queue_tts[i-1]['end_time_source']
gap_end = it['start_time_source']
if gap_end - gap_start >= self.MIN_CLIP_DURATION_MS:
clip_path = Path(f'{self.cache_folder}/{i:05d}_gap.mp4').as_posix()
clip_meta_list.append({"type": "gap", "out": clip_path, "ss": gap_start, "to": gap_end, "pts": 1.0})
if it['source_duration'] > 0:
clip_path = Path(f"{self.cache_folder}/{i:05d}_sub.mp4").as_posix()
pts_val = it['final_video_duration_theoretical'] / it['source_duration'] if it['source_duration'] > 0 else 1.0
clip_meta_list.append({"type": "sub", "index": i, "out": clip_path, "ss": it['start_time_source'], "to": it['end_time_source'], "pts": pts_val, "line": it['line']})
last_item = self.queue_tts[-1]
final_gap_start = last_item['end_time_source']
if self.raw_total_time - final_gap_start >= self.MIN_CLIP_DURATION_MS:
clip_path = Path(f'{self.cache_folder}/zzzz_final_gap.mp4').as_posix()
clip_meta_list.append({"type": "gap", "out": clip_path, "ss": final_gap_start, "to": self.raw_total_time, "pts": 1.0})
meta_path = Path(f'{self.cache_folder}/clip_meta.json').as_posix()
with open(meta_path, 'w', encoding='utf-8') as f: json.dump(clip_meta_list, f, ensure_ascii=False, indent=2)
return clip_meta_list
def _cut_to_intermediate(self, ss, to, source, pts, out):
"""将视频片段裁切为标准化的中间格式"""
config.logger.info(f"正在生成中间片段: {Path(out).name}, 原始范围: {ss}-{to}, PTS={pts or '1.0'}")
cmd = ['-y', '-ss', tools.ms_to_time_string(ms=ss,sepflag='.'), '-to', tools.ms_to_time_string(ms=to,sepflag='.'), '-i', source,
'-an', '-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '10',
'-pix_fmt', 'yuv420p', '-r', str(self.source_video_fps)]
if pts: cmd.extend(['-vf', f'setpts={pts}*PTS,fps={self.source_video_fps}'])
cmd.append(out)
tools.runffmpeg(cmd, force_cpu=True)
if not Path(out).exists() or Path(out).stat().st_size == 0:
config.logger.warning(f"中间片段 {Path(out).name} 生成失败,尝试无PTS参数重试。")
if pts: cmd.pop(-2); cmd.pop(-2)
tools.runffmpeg(cmd, force_cpu=True)
def _concat_and_finalize(self, clip_meta_list):
"""无损拼接中间片段,然后进行一次性的最终编码"""
valid_clips = [task['out'] for task in clip_meta_list if Path(task['out']).exists() and Path(task['out']).stat().st_size > 0]
if not valid_clips:
config.logger.error("没有任何有效的视频中间片段生成,视频处理失败!")
self.novoice_mp4 = self.novoice_mp4_original
return
concat_txt_path = Path(f'{self.cache_folder}/concat_list.txt').as_posix()
tools.create_concat_txt(valid_clips, concat_txt=concat_txt_path)
intermediate_merged_path = Path(f'{self.cache_folder}/intermediate_merged.mp4').as_posix()
concat_cmd = ['-y', '-f', 'concat', '-safe', '0', '-i', concat_txt_path, '-c', 'copy', intermediate_merged_path]
tools.runffmpeg(concat_cmd, force_cpu=True)
if not Path(intermediate_merged_path).exists():
config.logger.error("拼接后的中间视频文件未能生成,视频处理失败!")
return
final_video_path = Path(f'{self.cache_folder}/merged_{self.noextname}.mp4').as_posix()
video_codec = config.settings['video_codec']
finalize_cmd = ['-y', '-i', intermediate_merged_path, '-c:v', f'libx{video_codec}', '-crf',
str(config.settings.get("crf", 23)), '-preset', config.settings.get('preset', 'fast'), '-an', final_video_path]
tools.runffmpeg(finalize_cmd)
if Path(final_video_path).exists():
shutil.copy2(final_video_path, self.novoice_mp4)
config.logger.info(f"最终无声视频已成功生成并复制到: {self.novoice_mp4}")
else:
config.logger.error("最终视频编码失败,保留原始无声视频。")
self.novoice_mp4 = self.novoice_mp4_original
if Path(intermediate_merged_path).exists(): os.remove(intermediate_merged_path)
for clip_path in valid_clips:
if Path(clip_path).exists(): os.remove(clip_path)
if Path(concat_txt_path).exists(): os.remove(concat_txt_path)
def _recalculate_timeline_and_merge_audio(self, clip_meta_list):
"""
音频重建阶段
这个方法的设计体现了智能切换。
- **模式一/二的实现**:如果`clip_meta_list`存在(意味着视频被处理过),
它就调用 `_recalculate_timeline_based_on_physical_reality`。
这个函数严格按照视频片段的物理现实来拼接音频,是最终解决时间漂移的关键。
- **模式三的实现**:如果视频未被处理,它会回退到
`_recalculate_timeline_with_theoretical_offset`。
:param clip_meta_list:
:return:
"""
process_text = "[5/5] 基于物理现实重建音频..." if config.defaulelang == 'zh' else "[5/5] Reconstructing audio based on physical reality..."
tools.set_process(text=process_text, uuid=self.uuid)
config.logger.info("================== [阶段 5/5] 基于物理现实重建音频 ==================")
if not self.shoud_videorate or not clip_meta_list:
config.logger.warning("未处理视频或无视频片段信息,回退到理论时间轴模型构建音频。")
return self._recalculate_timeline_with_theoretical_offset()
merged_audio = AudioSegment.empty()
current_timeline_ms = 0
for task in clip_meta_list:
task_real_duration = int(task.get('real_duration_ms', 0))
if task_real_duration <= 0:
continue
if task['type'] == 'gap':
merged_audio += AudioSegment.silent(duration=task_real_duration)
config.logger.info(f"音频流中添加物理间隙:时长 {task_real_duration}ms")
current_timeline_ms += task_real_duration
elif task['type'] == 'sub':
it = self.queue_tts[task['index']]
it['start_time'] = current_timeline_ms
final_duration = task_real_duration
if tools.vail_file(it['filename']):
try:
segment = AudioSegment.from_file(it['filename'])
if len(segment) < final_duration:
segment += AudioSegment.silent(duration=final_duration - len(segment))
elif len(segment) > final_duration:
segment = segment[:final_duration]
except Exception as e:
config.logger.error(f"字幕[{it['line']}] 加载音频失败: {e},使用等长静音替代。")
segment = AudioSegment.silent(duration=final_duration)
else:
config.logger.warning(f"字幕[{it['line']}] 配音文件不存在,使用等长静音替代。")
segment = AudioSegment.silent(duration=final_duration)
merged_audio += segment
current_timeline_ms += final_duration
it['end_time'] = current_timeline_ms
it['startraw'], it['endraw'] = tools.ms_to_time_string(ms=it['start_time']), tools.ms_to_time_string(ms=it['end_time'])
config.logger.info(
f"字幕[{it['line']}] 音频重建:新区间 {it['start_time']}-{it['end_time']} (物理时长 {final_duration}ms)"
)
return merged_audio
def _recalculate_timeline_with_theoretical_offset(self):
"""
备用方法:当不处理视频时,使用基于理论 time_offset 的模型。
"""
merged_audio = AudioSegment.empty()
time_offset = 0
for i, it in enumerate(self.queue_tts):
it['start_time'] = it['start_time_source'] + time_offset
current_audio_length = len(merged_audio)
silence_needed = max(0, it['start_time'] - current_audio_length)
if silence_needed > 0:
merged_audio += AudioSegment.silent(duration=silence_needed)
final_duration = it['final_video_duration_theoretical']
if final_duration <= 0 : continue
if tools.vail_file(it['filename']):
try:
segment = AudioSegment.from_file(it['filename'])
if len(segment) < final_duration: segment += AudioSegment.silent(duration=final_duration - len(segment))
elif len(segment) > final_duration: segment = segment[:final_duration]
except Exception as e:
segment = AudioSegment.silent(duration=final_duration)
else:
segment = AudioSegment.silent(duration=final_duration)
merged_audio += segment
it['end_time'] = it['start_time'] + final_duration
it['startraw'], it['endraw'] = tools.ms_to_time_string(ms=it['start_time']), tools.ms_to_time_string(ms=it['end_time'])
time_offset += (it['final_video_duration_theoretical'] - it['source_duration'])
new_total_duration = self.raw_total_time + time_offset
final_gap = new_total_duration - len(merged_audio)
if final_gap > 0:
merged_audio += AudioSegment.silent(duration=final_gap)
return merged_audio
def _finalize_files(self, merged_audio):
"""
负责导出最终的音频,并执行最后的音视频对齐检查。
无论是复杂的物理对齐模式,还是简单的纯净拼接模式,最终都会调用这个方法,
确保了所有模式下的输出都有统一的质量保证(比如时长对齐)。
:param merged_audio:
:return:
"""
final_step_text = "[最终步骤] 导出并对齐..." if config.defaulelang == 'zh' else '[Final Step] Exporting and finalizing...'
tools.set_process(text=final_step_text, uuid=self.uuid)
config.logger.info("================== [最终步骤] 导出、对齐并交付 ==================")
try:
self._export_audio(merged_audio, self.target_audio)
if self.novoice_mp4 and tools.vail_file(self.novoice_mp4):
config.logger.info("开始最终音视频时长对齐检查...")
video_duration_ms = tools.get_video_duration(self.novoice_mp4)
audio_duration_ms = self._get_audio_time_ms(self.target_audio)
config.logger.info(f"最终检查: 视频物理总长 = {video_duration_ms}ms, 音频物理总长 = {audio_duration_ms}ms")
duration_diff = video_duration_ms - audio_duration_ms
config.logger.info(f"时长差异 (视频 - 音频) = {duration_diff}ms")
TOLERANCE_MS = 150 # 最终对齐的容忍度
if duration_diff > TOLERANCE_MS:
config.logger.warning(f"视频比音频长 {duration_diff}ms,将在音频末尾补齐等长静音。")
final_audio_segment = AudioSegment.from_file(self.target_audio)
final_audio_segment += AudioSegment.silent(duration=duration_diff)
self._export_audio(final_audio_segment, self.target_audio)
config.logger.info("音频补齐静音操作完成。")
elif duration_diff < -TOLERANCE_MS:
freeze_duration_sec = abs(duration_diff) / 1000.0
config.logger.warning(f"音频比视频长 {abs(duration_diff)}ms,将定格视频最后一帧 {freeze_duration_sec:.3f} 秒以对齐。")
final_video_path = Path(f'{self.cache_folder}/final_video_with_freeze.mp4').as_posix()
cmd = ['-y', '-i', self.novoice_mp4,
'-vf', f'tpad=stop_mode=clone:stop_duration={freeze_duration_sec}',
'-c:v', f'libx{config.settings["video_codec"]}',
'-crf', str(config.settings.get("crf", 23)),
'-preset', config.settings.get('preset', 'fast'),
'-an', final_video_path]
if tools.runffmpeg(cmd, force_cpu=True) and Path(final_video_path).exists():
shutil.copy2(final_video_path, self.novoice_mp4)
config.logger.info("视频定格延长操作成功。")
else:
config.logger.error("视频定格延长操作失败!音视频可能存在时长不一致。")
else:
config.logger.info("音视频时长差异在容忍范围内,无需额外对齐处理。")
if Path(self.target_audio).exists():
shutil.copy2(self.target_audio, self.target_audio_original)
config.logger.info(f"最终音频文件已成功交付到: {self.target_audio_original}")
else:
config.logger.error(f"最终音频文件 {self.target_audio} 未能生成,交付失败!")
except Exception as e:
config.logger.exception(f"导出或对齐最终音视频时发生致命错误: {e}")
raise RuntimeError(f"导出或对齐最终音视频时发生致命错误: {e}")
config.logger.info("所有处理完成,音视频已成功生成。")
def _get_audio_time_ms(self, file_path, line=None):
if not tools.vail_file(file_path):
if line is not None: config.logger.warning(f"字幕[{line}]:配音文件 {file_path} 不存在。")
return 0
try: return int(tools.get_audio_time(file_path) * 1000)
except Exception:
try: return len(AudioSegment.from_file(file_path))
except Exception as e:
config.logger.error(f"字幕[{line or 'N/A'}]:获取音频文件 {file_path} 时长失败: {e}")
return 0
def _export_audio(self, audio_segment, destination_path):
wavfile = Path(f'{self.cache_folder}/temp_{time.time_ns()}.wav').as_posix()
try:
audio_segment.export(wavfile, format="wav")
ext = Path(destination_path).suffix.lower()
cmd = ["-y", "-i", wavfile]
if ext == '.wav':
cmd.extend(["-c:a", "pcm_s16le", "-ar", "44100", "-ac", "2"])
elif ext == '.m4a':
cmd.extend(["-c:a", "aac", "-b:a", "128k", "-ar", "44100", "-ac", "2"])
else:
cmd.extend(["-b:a", "128k", "-ar", "44100", "-ac", "2"])
cmd.append(destination_path)
tools.runffmpeg(cmd, force_cpu=True)
finally:
if Path(wavfile).exists(): os.remove(wavfile)