1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| import requests import re import json import os import subprocess from faster_whisper import WhisperModel
class Bili: def __init__(self, url): self.url = url self.headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_2_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36", "Host": "www.bilibili.com", "Accept-Encoding": "gzip, deflate, br", "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", } self.output_dir = os.path.abspath("../../../codeInputOutput/output/biliVideo2Text") os.makedirs(self.output_dir, exist_ok=True)
def start(self): res = requests.get(self.url, headers=self.headers) res.encoding = "utf-8" result = re.findall("window.__playinfo__=(.*?)</script>", res.text) if not result: print("未找到 playinfo 数据") return None playinfo_json = json.loads(result[0]) audio_streams = playinfo_json.get("data", {}).get("dash", {}).get("audio", []) if not audio_streams: print("未找到音频流") return None print("找到以下音频流:") for i, stream in enumerate(audio_streams): print(f"{i+1}. ID: {stream['id']}, Bandwidth: {stream['bandwidth']}, URL: {stream['baseUrl']}") return audio_streams
def download_audio(self, audio_stream, output_filename="audio.m4s"): output_path = os.path.join(self.output_dir, output_filename) audio_url = audio_stream["baseUrl"] print(f"正在下载音频流: {audio_url}") response = requests.get(audio_url, headers=self.headers, stream=True) if response.status_code == 200: with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"音频下载完成: {output_path}") return output_path else: print(f"下载失败,状态码: {response.status_code}") return None
def convert_to_wav(self, input_file, output_wav="audio.wav"): output_path = os.path.join(self.output_dir, output_wav) try: command = ["ffmpeg", "-i", input_file, "-acodec", "pcm_s16le", "-ar", "44100", output_path, "-y"] subprocess.run(command, check=True) print(f"转换完成: {output_path}") return output_path except subprocess.CalledProcessError as e: print(f"转换失败: {e}") return None except FileNotFoundError: print("未找到 ffmpeg,请确保已安装 ffmpeg 并添加到系统路径") return None
def transcribe_audio(self, audio_file_path, output_txt="transcription.txt", model_size="medium", language="zh"): output_txt_path = os.path.join(self.output_dir, output_txt) if not os.path.exists(audio_file_path): print(f"错误:文件 {audio_file_path} 不存在!") return None
print(f"加载模型 '{model_size}' 到GPU...") model = WhisperModel(model_size, device="cuda", compute_type="float16")
print("开始转录音频...") segments, info = model.transcribe(audio_file_path, language=language) with open(output_txt_path, "w", encoding="utf-8") as f: print(f"检测到的语言:{info.language},概率:{info.language_probability:.2f}") for segment in segments: line = f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}" print(line) f.write(line + "\n") print(f"转录完成,结果已保存到 {output_txt_path}") return output_txt_path
if __name__ == '__main__': bili = Bili("https://www.bilibili.com/video/BV1u2oiYqE1a/?spm_id_from=333.1007.tianma.2-1-4.click") audio_streams = bili.start() if audio_streams: best_audio = max(audio_streams, key=lambda x: x["bandwidth"]) print(f"选择最高质量音频流: ID {best_audio['id']}, Bandwidth: {best_audio['bandwidth']}") m4s_file = bili.download_audio(best_audio) if m4s_file: wav_file = bili.convert_to_wav(m4s_file) if wav_file: txt_file = bili.transcribe_audio(wav_file, language="zh") if txt_file: print(f"\n最终文本文件路径: {os.path.abspath(txt_file)}")
|