Skip to content

在上一篇文章中 ,我们探讨了实现视频配音自动化同步的基本思路,并构建了一个初步的框架。那个框架的核心思想是“解耦”:将流程拆分为准备、决策、执行、合并四个独立的阶段。这个架构让我们摆脱了脆弱的单循环逻辑,迈出了从“能用”到“可靠”的第一步。

但是,当我们将这个模型投入到更复杂的实际应用中时,才发现真正的挑战才刚刚开始。现实世界的媒体处理,充满了各种微小的、不可预测的“不确定性”。一个理论上完美的模型,在这些不确定性面前,往往不堪一击。

本文将续写我们的探索之旅,聚焦于如何处理这些“魔鬼细节”,以及我们的自动化方案是如何从一个“理想模型”,一步步进化成一个能够在炮火中稳定前行的“工程现实”。

ffmpeg的毫秒级“谎言”

之前“吸收”微小间隙的策略通过将几十毫秒的间隙并入前一个视频片段,避免了“跳帧”问题。理论上,这应该能完美地保持时间线的连续性。

但现实很快给了我们一记重拳。我们发现,即使精确地命令 ffmpeg 创建一个 2540 毫秒的片段,它最终生成的文件的实际时长可能是 2543 毫秒,也可能是 2538 毫秒。这种微小的偏差,源于视频编码的内在复杂性——帧率、关键帧位置等因素,都会影响最终输出的精确时长。

单个片段几毫秒的误差看似无伤大雅。但在一个有数百个片段的长视频中,这些微小的误差会不断累积。处理到视频后半段时,累积的偏差可能达到数秒甚至数十秒,足以让音画再次分道扬镳。

我们最初的“理想模型”——即用一个变量 current_timeline_ms 来累加每个片段的预估时长——在这种现实面前彻底失效了。

从“预测未来”到“承认现实”

经过慎重考虑,我决定:放弃对未来的预测,转而完全基于已发生的事实来构建时间线。

转而引入了一套新的、更贴近现实的逻辑来重构音频合并阶段 (_recalculate_timeline_and_merge_audio)。

新逻辑的核心是:

  1. 事实基准: 在任何时刻,len(merged_audio)——即当前已拼接音频的总时长——就是唯一相信的“事实”。它代表了时间线真实走到了哪里。

  2. 动态校准: 当准备拼接下一个字幕片段 it 时,我们不再想当然地认为它应该从 it['start_time'] 这个预估的时间点开始。而是先做一个比较:

    • offset = it['start_time'] - len(merged_audio)

    这个 offset 就是“期望”与“现实”的差距。

  3. 智能应对:

    • 如果 offset > 0: 这意味着“现实”走得比“期望”慢了(之前的片段实际时长比预估的短)。此时,声音不能提前出现。我们必须用一段 offset 时长的静音来“等待”时间线走到正确的位置。
    • 如果 offset < 0: 这意味着“现实”走得比“期望”快了(之前的片段实际时长比预估的长)。此时,我们不能粗暴地裁剪掉已经存在的声音。我们必须“承认”这个事实,将当前字幕的开始时间向后推 abs(offset) 毫秒,以跟上现实的步伐。

为了将这个“后推”的影响传递下去,我们引入了一个至关重要的变量:add_extend_time。每当一个片段被迫后推时,这个推移量就会被累加到 add_extend_time 中。后续所有字幕的 start_timeend_time 都会加上这个累积的偏移量。

这套机制,让我们的时间线构建过程从一个僵硬的计划,变成了一个拥有自我校准能力的动态系统。它不再害怕 ffmpeg 的毫秒级“谎言”,因为它总能根据已经拼接好的部分,来动态调整后续片段的位置,确保每一步都踩在坚实的大地上。

音频加速的“最后一公里”:atempopydub 的协同作战

在音频加速的实践中,也遇到了类似的“精度”问题。pydubspeedup 方法虽然方便,但在某些情况下音质损失较大。因而决定使用 ffmpegatempo 滤镜。

atempo 的音质表现更出色,但它同样存在输出时长与理论计算值有微小偏差的问题。为了解决这“最后一公里”的精度问题,我们设计了一套两阶段的加速策略,封装在新的 _audio_speedup 方法中。

  1. 粗调 (ffmpeg atempo): 首先,使用 atempo 滤镜对音频进行主要的变速处理。例如,需要加速1.8倍,我们就用 atempo=1.8。这能完成99%的工作,并且保证了音质。
  2. 微调 (pydub 裁剪): atempo 处理完后,立刻用 pydub 读取它的实际时长。假如我们期望得到一个 3000ms 的音频,而 atempo 实际输出了 3008ms。这8毫秒的差距,就交给 pydub 来完成。一个简单的切片操作 audio[:-8],就能精确地裁剪掉多余的部分,得到一个不多不少、正好 3000ms 的完美音频片段。

最终的进化版

经过这一系列的迭代和重构, SpeedRate 类最终演变成了一个更成熟、更健壮的形态。它学会了不再盲信计划,而是时刻根据现实进行动态调整。它用更专业的工具去处理核心任务,同时用更灵活的手段去弥补这些工具的微小缺陷。

下面,就是最终实现。它可能不那么“优雅”,代码中充满了各种防御性的检查和动态调整的逻辑。但正是这些看似“繁琐”的部分,构成了它能在复杂多变的现实世界中稳定运行的坚固铠甲。

python
import os
import shutil
import time
from pathlib import Path
import concurrent.futures

from pydub import AudioSegment
from pydub.exceptions import CouldntDecodeError

from videotrans.configure import config
from videotrans.util import tools

class SpeedRate:
    """
    通过音频加速和视频慢放来对齐翻译配音和原始视频时间轴。
    这是一个经过多次实战迭代的健壮版本,核心在于处理现实世界中的不确定性。
    """

    MIN_CLIP_DURATION_MS = 50  # 最小有效片段时长(毫秒)

    def __init__(self,
                 *,
                 queue_tts=None,
                 shoud_videorate=False,
                 shoud_audiorate=False,
                 uuid=None,
                 novoice_mp4=None,
                 raw_total_time=0,
                 noextname=None,
                 target_audio=None,
                 cache_folder=None
                 ):
        self.queue_tts = queue_tts
        self.shoud_videorate = shoud_videorate
        self.shoud_audiorate = shoud_audiorate
        self.uuid = uuid
        self.novoice_mp4_original = novoice_mp4
        self.novoice_mp4 = novoice_mp4
        self.raw_total_time = raw_total_time
        self.noextname = noextname
        self.target_audio = target_audio
        self.cache_folder = cache_folder if cache_folder else Path(f'{config.TEMP_DIR}/{str(uuid if uuid else time.time())}').as_posix()
        Path(self.cache_folder).mkdir(parents=True, exist_ok=True)
        
        self.max_audio_speed_rate = max(1.0, float(config.settings.get('audio_rate', 5.0)))
        self.max_video_pts_rate = max(1.0, float(config.settings.get('video_rate', 10.0)))
        
        config.logger.info(f"SpeedRate initialized for '{self.noextname}'. AudioRate: {self.shoud_audiorate}, VideoRate: {self.shoud_videorate}")
        config.logger.info(f"Config limits: MaxAudioSpeed={self.max_audio_speed_rate}, MaxVideoPTS={self.max_video_pts_rate}, MinClipDuration={self.MIN_CLIP_DURATION_MS}ms")

    def run(self):
        """主执行函数"""
        self._prepare_data()
        self._calculate_adjustments()
        self._execute_audio_speedup()
        self._execute_video_processing()
        merged_audio = self._recalculate_timeline_and_merge_audio()
        if merged_audio:
            self._finalize_audio(merged_audio)
        return self.queue_tts

    def _prepare_data(self):
        """第一步:准备和初始化数据。"""
        tools.set_process(text="Preparing data...", uuid=self.uuid)

        # 第一阶段:初始化独立数据
        for it in self.queue_tts:
            it['start_time_source'] = it['start_time']
            it['end_time_source'] = it['end_time']
            it['source_duration'] = it['end_time_source'] - it['start_time_source']
            it['dubb_time'] = int(tools.get_audio_time(it['filename']) * 1000) if tools.vail_file(it['filename']) else 0
            it['target_audio_duration'] = it['dubb_time']
            it['target_video_duration'] = it['source_duration']
            it['video_pts'] = 1.0
        
        # 第二阶段:计算间隙
        for i, it in enumerate(self.queue_tts):
            if i < len(self.queue_tts) - 1:
                next_item = self.queue_tts[i + 1]
                it['silent_gap'] = next_item['start_time_source'] - it['end_time_source']
            else:
                it['silent_gap'] = self.raw_total_time - it['end_time_source']
            it['silent_gap'] = max(0, it['silent_gap'])

    def _audio_speedup(self, audio_file, atempo, target_duration_ms):
        """使用ffmpeg atempo粗调 + pydub微调,实现精准音频加速"""
        ext = Path(audio_file).suffix[1:]
        input_file = f"{audio_file}.tmp.{ext}"
        shutil.copy2(audio_file, input_file)
        try:
            tools.runffmpeg(["-y", "-i", input_file, "-filter:a", f"atempo={atempo}", audio_file])
            audio = AudioSegment.from_file(audio_file, format=ext)
            real_time = len(audio)
            diff = real_time - target_duration_ms
            
            # 在50ms的微小差距内,使用pydub强制裁剪以精确对齐
            if 0 < diff < 50:
                fast_audio = audio[:-diff]
                fast_audio.export(audio_file, format=ext)
                return len(fast_audio)
            return real_time
        finally:
            if Path(input_file).exists():
                os.remove(input_file)

    def _calculate_adjustments(self):
        """第二步:计算调整方案。"""
        tools.set_process(text="Calculating adjustments...", uuid=self.uuid)
        for i, it in enumerate(self.queue_tts):
            
            if it['dubb_time'] > it['source_duration'] and tools.vail_file(it['filename']):
                try:
                    _, _ = tools.remove_silence_from_file(it['filename'], silence_threshold=-50.0, chunk_size=10, is_start=True)
                    it['dubb_time'] = int(tools.get_audio_time(it['filename']) * 1000)
                except Exception as e:
                    config.logger.warning(f"Could not remove silence from {it['filename']}: {e}")

            effective_source_duration = it['source_duration']
            if it.get('silent_gap', 0) < self.MIN_CLIP_DURATION_MS:
                effective_source_duration += it['silent_gap']

            if it['dubb_time'] <= effective_source_duration or effective_source_duration <= 0:
                continue

            dub_duration = it['dubb_time']
            source_duration = effective_source_duration
            silent_gap = it['silent_gap']
            over_time = dub_duration - source_duration

            if self.shoud_audiorate and not self.shoud_videorate:
                required_speed = dub_duration / source_duration
                if required_speed <= 1.5:
                    it['target_audio_duration'] = source_duration
                else:
                    available_time = source_duration + (silent_gap if silent_gap >= self.MIN_CLIP_DURATION_MS else 0)
                    duration_at_1_5x = int(dub_duration / 1.5)
                    it['target_audio_duration'] = duration_at_1_5x if duration_at_1_5x <= available_time else available_time
            
            elif not self.shoud_audiorate and self.shoud_videorate:
                required_pts = dub_duration / source_duration
                if required_pts <= 1.5:
                    it['target_video_duration'] = dub_duration
                else:
                    available_time = source_duration + (silent_gap if silent_gap >= self.MIN_CLIP_DURATION_MS else 0)
                    duration_at_1_5x = source_duration * 1.5
                    it['target_video_duration'] = duration_at_1_5x if duration_at_1_5x <= available_time else available_time
            
            elif self.shoud_audiorate and self.shoud_videorate:
                if over_time <= 1000:
                    it['target_audio_duration'] = source_duration
                else:
                    adjustment_share = over_time // 2
                    it['target_audio_duration'] = dub_duration - adjustment_share
                    it['target_video_duration'] = source_duration + adjustment_share

            if self.shoud_audiorate and it['target_audio_duration'] < dub_duration:
                speed_ratio = dub_duration / it['target_audio_duration']
                if speed_ratio > self.max_audio_speed_rate:
                    it['target_audio_duration'] = dub_duration / self.max_audio_speed_rate
            
            if self.shoud_videorate and it['target_video_duration'] > source_duration:
                pts_ratio = it['target_video_duration'] / source_duration
                if pts_ratio > self.max_video_pts_rate: it['target_video_duration'] = source_duration * self.max_video_pts_rate
                it['video_pts'] = max(1.0, it['target_video_duration'] / source_duration)
    
    def _process_single_audio(self, item):
        """处理单个音频文件的加速任务"""
        input_file_path = item['filename']
        target_duration_ms = int(item['target_duration_ms'])
        
        try:
            current_duration_ms = int(tools.get_audio_time(input_file_path) * 1000)
            if target_duration_ms <= 0 or current_duration_ms <= target_duration_ms:
                return input_file_path, current_duration_ms, ""

            speedup_ratio = current_duration_ms / target_duration_ms
            after_duration = self._audio_speedup(input_file_path, speedup_ratio, target_duration_ms)
            item['ref']['dubb_time'] = after_duration
            return input_file_path, after_duration, ""
        except Exception as e:
            config.logger.error(f"Error processing audio {input_file_path}: {e}")
            return input_file_path, None, str(e)

    def _execute_audio_speedup(self):
        """第三步:执行音频加速。"""
        if not self.shoud_audiorate: return
        tasks = [
            {"filename": it['filename'], "target_duration_ms": it['target_audio_duration'], "ref": it}
            for it in self.queue_tts if it.get('dubb_time', 0) > it.get('target_audio_duration', 0) and tools.vail_file(it['filename'])
        ]
        if not tasks: return

        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(self._process_single_audio, task) for task in tasks]
            for i, future in enumerate(concurrent.futures.as_completed(futures)):
                if config.exit_soft: executor.shutdown(wait=False, cancel_futures=True); return
                future.result()
                tools.set_process(text=f"Audio processing: {i + 1}/{len(tasks)}", uuid=self.uuid)

    def _execute_video_processing(self):
        """第四步:执行视频裁切(采用微小间隙吸收策略)。"""
        if not self.shoud_videorate or not self.novoice_mp4_original:
            return
            
        video_tasks = []
        processed_video_clips = []
        last_end_time = 0

        i = 0
        while i < len(self.queue_tts):
            it = self.queue_tts[i]
            gap_before = it['start_time_source'] - last_end_time
            if gap_before > self.MIN_CLIP_DURATION_MS:
                clip_path = Path(f'{self.cache_folder}/{i:05d}_gap.mp4').as_posix()
                video_tasks.append({"ss": tools.ms_to_time_string(ms=last_end_time), "to": tools.ms_to_time_string(ms=it['start_time_source']), "source": self.novoice_mp4_original, "pts": 1.0, "out": clip_path})
                processed_video_clips.append(clip_path)

            start_ss = it['start_time_source']
            end_to = it['end_time_source']
            
            if i + 1 < len(self.queue_tts):
                next_it = self.queue_tts[i+1]
                gap_after = next_it['start_time_source'] - it['end_time_source']
                if 0 < gap_after < self.MIN_CLIP_DURATION_MS:
                    end_to = next_it['start_time_source']
            
            current_clip_source_duration = end_to - start_ss
            if current_clip_source_duration > self.MIN_CLIP_DURATION_MS:
                clip_path = Path(f"{self.cache_folder}/{i:05d}_sub.mp4").as_posix()
                pts_val = it.get('video_pts', 1.0)
                if pts_val > 1.01:
                    new_target_duration = it.get('target_video_duration', current_clip_source_duration)
                    pts_val = max(1.0, new_target_duration / current_clip_source_duration)
                video_tasks.append({"ss": tools.ms_to_time_string(ms=start_ss), "to": tools.ms_to_time_string(ms=end_to), "source": self.novoice_mp4_original, "pts": pts_val, "out": clip_path})
                processed_video_clips.append(clip_path)
            last_end_time = end_to
            i += 1
        
        if (final_gap := self.raw_total_time - last_end_time) > self.MIN_CLIP_DURATION_MS:
            clip_path = Path(f'{self.cache_folder}/zzzz_final_gap.mp4').as_posix()
            video_tasks.append({"ss": tools.ms_to_time_string(ms=last_end_time), "to": "", "source": self.novoice_mp4_original, "pts": 1.0, "out": clip_path})
            processed_video_clips.append(clip_path)

        for j, task in enumerate(video_tasks):
            if config.exit_soft: return
            tools.set_process(text=f"Video processing: {j + 1}/{len(video_tasks)}", uuid=self.uuid)
            the_pts = task['pts'] if task.get('pts', 1.0) > 1.01 else ""
            tools.cut_from_video(ss=task['ss'], to=task['to'], source=task['source'], pts=the_pts, out=task['out'])
            
            output_path = Path(task['out'])
            if not output_path.exists() or output_path.stat().st_size == 0:
                config.logger.warning(f"Segment {task['out']} failed (PTS={task.get('pts', 1.0)}). Fallback.")
                tools.cut_from_video(ss=task['ss'], to=task['to'], source=task['source'], pts="", out=task['out'])
                if not output_path.exists() or output_path.stat().st_size == 0:
                    config.logger.error(f"FATAL: Fallback for {task['out']} also failed. MISSING.")

        valid_clips = [clip for clip in processed_video_clips if Path(clip).exists() and Path(clip).stat().st_size > 0]
        if not valid_clips:
            self.novoice_mp4 = self.novoice_mp4_original
            return

        concat_txt_path = Path(f'{self.cache_folder}/concat_list.txt').as_posix()
        tools.create_concat_txt(valid_clips, concat_txt=concat_txt_path)
        
        merged_video_path = Path(f'{self.cache_folder}/merged_{self.noextname}.mp4').as_posix()
        tools.set_process(text="Merging video clips...", uuid=self.uuid)
        tools.concat_multi_mp4(out=merged_video_path, concat_txt=concat_txt_path)
        self.novoice_mp4 = merged_video_path

    def _recalculate_timeline_and_merge_audio(self):
        """第五步:基于“承认现实”原则,重新计算时间线并合并音频。"""
        merged_audio = AudioSegment.empty()
        video_was_processed = self.shoud_videorate and self.novoice_mp4_original and Path(self.novoice_mp4).name.startswith("merged_")

        if video_was_processed:
            config.logger.info("Building audio timeline based on processed video clips.")
            add_extend_time = 0
            for clip_filename in sorted(os.listdir(self.cache_folder)):
                if not (clip_filename.endswith(".mp4") and ("_sub" in clip_filename or "_gap" in clip_filename)): continue
                
                clip_path = Path(f'{self.cache_folder}/{clip_filename}').as_posix()
                try:
                    if not (Path(clip_path).exists() and Path(clip_path).stat().st_size > 0): continue
                    clip_duration = tools.get_video_duration(clip_path)
                except Exception as e:
                    config.logger.warning(f"Corrupt clip {clip_path} (error: {e}). Skipping.")
                    continue

                if "_sub" in clip_filename:
                    index = int(clip_filename.split('_')[0])
                    it = self.queue_tts[index]
                    
                    it['start_time'] += add_extend_time
                    it['end_time'] += add_extend_time
                    start_end_duration = it['end_time'] - it['start_time']
                    
                    segment = AudioSegment.from_file(it['filename']) if tools.vail_file(it['filename']) else AudioSegment.silent(duration=clip_duration)
                    if len(segment) > clip_duration: segment = segment[:clip_duration]
                    elif len(segment) < clip_duration: segment += AudioSegment.silent(duration=clip_duration - len(segment))
                    
                    offset = it['start_time'] - len(merged_audio)
                    if offset > 0:
                        merged_audio += AudioSegment.silent(duration=offset)
                    elif offset < 0:
                        abs_offset = abs(offset)
                        it['start_time'] += abs_offset
                        add_extend_time += abs_offset
                    
                    merged_audio += segment
                    it['end_time'] = it['start_time'] + clip_duration
                    if clip_duration > start_end_duration:
                        add_extend_time += clip_duration - start_end_duration
                    
                    it['startraw'], it['endraw'] = tools.ms_to_time_string(ms=it['start_time']), tools.ms_to_time_string(ms=it['end_time']) 
                else: # gap
                    merged_audio += AudioSegment.silent(duration=clip_duration)
        else:
            config.logger.info("Building audio timeline based on original timings (video not processed).")
            add_extend_time = 0
            for i, it in enumerate(self.queue_tts):
                it['start_time'] += add_extend_time
                it['end_time'] += add_extend_time
                start_end_duration = it['end_time'] - it['start_time']
                
                dubb_time = int(tools.get_audio_time(it['filename']) * 1000) if tools.vail_file(it['filename']) else it['source_duration']
                segment = AudioSegment.from_file(it['filename']) if tools.vail_file(it['filename']) else AudioSegment.silent(duration=dubb_time)
                if len(segment) > dubb_time: segment = segment[:dubb_time]
                elif len(segment) < dubb_time: segment += AudioSegment.silent(duration=dubb_time - len(segment))

                offset = it['start_time'] - len(merged_audio)
                if offset > 0:
                    merged_audio += AudioSegment.silent(duration=offset)
                elif offset < 0:
                    abs_offset = abs(offset)
                    it['start_time'] += abs_offset
                    add_extend_time += abs_offset

                merged_audio += segment
                clip_time = len(segment)
                it['end_time'] = it['start_time'] + clip_time
                if clip_time > start_end_duration:
                    add_extend_time += clip_time - start_end_duration
                it['startraw'], it['endraw'] = tools.ms_to_time_string(ms=it['start_time']), tools.ms_to_time_string(ms=it['end_time'])
        return merged_audio

    def _export_audio(self, audio_segment, destination_path):
        """将Pydub音频段导出到指定路径,处理不同格式。"""
        wavfile = Path(f'{self.cache_folder}/temp_{time.time_ns()}.wav').as_posix()
        try:
            audio_segment.export(wavfile, format="wav")
            ext = Path(destination_path).suffix.lower()
            if ext == '.wav': shutil.copy2(wavfile, destination_path)
            elif ext == '.m4a': tools.wav2m4a(wavfile, destination_path)
            else: tools.runffmpeg(["-y", "-i", wavfile, "-ar", "48000", "-b:a", "192k", destination_path])
        finally:
            if Path(wavfile).exists(): os.remove(wavfile)
    
    def _finalize_audio(self, merged_audio):
        """第六步:导出并对齐最终音视频时长。"""
        try:
            self._export_audio(merged_audio, self.target_audio)
            video_was_processed = self.shoud_videorate and self.novoice_mp4_original and Path(self.novoice_mp4).name.startswith("merged_")
            if not video_was_processed: return
            if not (tools.vail_file(self.novoice_mp4) and tools.vail_file(self.target_audio)): return

            video_duration_ms = tools.get_video_duration(self.novoice_mp4)
            audio_duration_ms = int(tools.get_audio_time(self.target_audio) * 1000)
            
            padding_needed = video_duration_ms - audio_duration_ms
            if padding_needed > 10:
                final_audio_segment = AudioSegment.from_file(self.target_audio) + AudioSegment.silent(duration=padding_needed)
                self._export_audio(final_audio_segment, self.target_audio)
        except Exception as e:
            config.logger.error(f"Failed to export or finalize audio: {e}")
            raise RuntimeError(f"Failed to finalize audio: {e}")

从一个简单的想法,到一个能抵御现实世界各种不确定性的自动化系统,这条路充满了对细节的反复打磨和对核心思想的不断颠覆。最终的解决方案,可能不是理论上最优美的,但它是在无数次失败和调试后,被证明是务实、可靠且有效的。

这正是工程的魅力所在:它不仅仅是编写代码,更是在约束和不确定性中,寻找并构建出那个最合适的解决方案。