-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtranscriber.py
145 lines (103 loc) · 4.53 KB
/
transcriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import concurrent.futures
import os
import time
from pathlib import Path
import openai
from moviepy.editor import VideoFileClip
from tqdm import tqdm
from pydub import AudioSegment
AUDIO_PATH = Path('./audio')
VIDEO_PATH = Path('./videos')
TRANSCRIPTS_PATH = Path('./transcripts')
WHISPER_COST_PER_SECOND = 0.0001
CHUNK_LENGTH_MS = 5 * 60 * 1000 # 5 minutes.
def get_untranscribed_video_files():
all_video_files = [file for file in os.listdir(VIDEO_PATH) if file.endswith('.mp4')]
all_transcript_files = [file for file in os.listdir(TRANSCRIPTS_PATH) if file.endswith('.txt')]
untranscribed_video_files = []
for video_file in all_video_files:
transcript_file = video_file.replace('.mp4', '.txt')
if transcript_file not in all_transcript_files:
untranscribed_video_files.append(video_file)
return untranscribed_video_files
def list_mp4s_and_total_duration(mp4_files) -> bool:
total_duration = 0
max_filename_length = max([len(file) for file in mp4_files])
print("Video files found: ")
for file in mp4_files:
filepath = VIDEO_PATH / file
with VideoFileClip(str(filepath)) as video:
duration = video.duration
total_duration += duration
print(f"* {file} {'-' * (1 + max_filename_length - len(file))} {duration:.2f} seconds")
print(f"\nCumulative Runtime: {total_duration:.2f} seconds")
print(f"Estimated transcription cost: ${total_duration * WHISPER_COST_PER_SECOND:.2f}")
# Print without newline.
print("Do you want to continue? [y/N] ", end='')
response = input()
if response.lower() == 'y':
return True
else:
return False
def extract_audio(video_file):
video = VideoFileClip(str(VIDEO_PATH / video_file))
audio_filename = video_file.replace('.mp4', '.mp3')
video.audio.write_audiofile(str(AUDIO_PATH / audio_filename), logger=None) # Disable logging
audio = AudioSegment.from_mp3(str(AUDIO_PATH / audio_filename))
# Splitting the audio
chunks = [audio[i:i + CHUNK_LENGTH_MS] for i in range(0, len(audio), CHUNK_LENGTH_MS)]
chunk_filenames = []
for idx, chunk in enumerate(chunks):
chunk_filename = f"{audio_filename.replace('.mp3', '')}_{idx}.mp3"
chunk.export(AUDIO_PATH / chunk_filename, format="mp3")
chunk_filenames.append(chunk_filename)
return chunk_filenames
def transcribe_audio(client, audio_filename):
response = client.audio.transcriptions.create(
file=open(AUDIO_PATH / audio_filename, "rb"),
model="whisper-1"
)
return response.text
def combine_transcripts(transcripts, original_filename):
combined_transcript = "\n".join(transcripts)
filename = Path(original_filename).name.replace('.mp3', '.txt')
with open(TRANSCRIPTS_PATH / filename, 'w') as f:
f.write(combined_transcript)
def transcribe_audio(client, audio_filename):
max_retries = 5
retry_delay = 20 # Initial delay in seconds
for attempt in range(max_retries):
try:
response = client.audio.transcriptions.create(
file=open(AUDIO_PATH / audio_filename, "rb"),
model="whisper-1"
)
return response.text
except openai.RateLimitError as e:
print(f"Rate limit exceeded. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
except Exception as e:
raise e # Re-raise for any other exceptions
raise RuntimeError(f"Failed to transcribe after {max_retries} attempts.")
def main():
video_files = get_untranscribed_video_files()
if len(video_files) == 0:
print("No video files found to transcribe. Either all videos have been transcribed or no videos have been downloaded.")
return
proceed = list_mp4s_and_total_duration(video_files)
if not proceed:
return
with concurrent.futures.ThreadPoolExecutor() as executor:
transcribe_progress = tqdm(desc="Transcription", unit="files")
client = openai.OpenAI()
for video_file in video_files:
chunk_filenames = extract_audio(video_file)
futures = [executor.submit(transcribe_audio, client, chunk_filename) for chunk_filename in chunk_filenames]
transcripts = [future.result() for future in futures]
combine_transcripts(transcripts, video_file.replace('.mp4', '.mp3'))
transcribe_progress.update(1)
transcribe_progress.close()
if __name__ == '__main__':
main()
print("Exiting.")