-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTimingOffset.py
340 lines (303 loc) · 14.1 KB
/
TimingOffset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!/usr/bin/env python3
# TimingOffset
# Copyright (c) Akatsumekusa and contributors
# ---------------------------------------------------------------------
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ---------------------------------------------------------------------
import argparse
import os
from pathlib import Path
import platform
import math
import numpy as np
import re
from sklearn.preprocessing import StandardScaler
import sys
import tempfile
import typing
if platform.system() == "Windows":
os.system("")
fps_num = 24000
fps_den = 1001
def print_frame(frame: int) -> str:
return f"frame {frame} ({frame // (fps_num / fps_den * 60):02.0f}:{(frame % (fps_num / fps_den * 60)) // (fps_num / fps_den):02.0f})"
def print_offset(offset: int) -> str:
if offset < 240:
return f"{str(offset - 240)}f"
else:
return f"+{str(offset - 240)}f"
def get_keyframes_keyframe_format(f: typing.TextIO) -> list[int]:
lines = []
while True:
try:
line = f.readline()
except UnicodeDecodeError:
continue
if not line:
break
if line.startswith("#") or line.startswith("fps"):
continue
line = line.rstrip()
if line:
try:
line = int(line)
except ValueError:
continue
lines.append(line)
return lines
def get_keyframes_lwi(f: typing.TextIO) -> np.ndarray[bool]:
lines = []
while True:
try:
line = f.readline()
except UnicodeDecodeError:
continue
if not line:
break
if line.startswith("Key="):
lines.append(bool(int(line[4])))
else:
continue
return np.array(lines, dtype=bool)
def get_keyframes_video(clip: Path) -> np.ndarray[bool]:
from vapoursynth import core
global fps_num
global fps_den
with tempfile.TemporaryDirectory(prefix="TimingOffset") as cache:
cachefile = Path(cache).joinpath("TimingOffset.lwi")
clip = core.lsmas.LWLibavSource(clip.as_posix(), cache=True, cachefile=cachefile.as_posix())
print("\r\033[1A\033[K", end="", file=sys.stderr)
if clip.fps.numerator != 0 and clip.fps.denominator != 0:
fps_num = clip.fps.numerator
fps_den = clip.fps.denominator
with cachefile.open("r", encoding="utf-8") as f:
return get_keyframes_lwi(f)
def guess_filetype(path: Path) -> str:
if path.stat().st_size > 16777216:
return "binary"
f = path.open("r", encoding="utf-8")
try:
line = f.readline()
if line.startswith("# keyframe format") or line.startswith("fps"):
f.close()
return "keyframe_format"
elif line.startswith("<LSMASHWorksIndexVersion"):
f.close()
return "lwi"
else:
f.close()
return "binary"
except UnicodeDecodeError:
f.close()
return "binary"
def get_keyframes(clip: Path) -> typing.Union[list[int], np.ndarray[bool]]:
if clip.stat().st_size > 16777216:
return get_keyframes_video(clip)
f = clip.open("r", encoding="utf-8")
try:
line = f.readline()
if line.startswith("# keyframe format") or line.startswith("fps"):
result = get_keyframes_keyframe_format(f)
f.close()
return result
elif line.startswith("<LSMASHWorksIndexVersion"):
result = get_keyframes_lwi(f)
f.close()
return result
else:
f.close()
return get_keyframes_video(clip)
except UnicodeDecodeError:
f.close()
return get_keyframes_video(clip)
def inflate_keyframes_array(left: typing.Union[list[int], np.ndarray[bool]], right: typing.Union[list[int], np.ndarray[bool]]) -> tuple[np.ndarray[bool], np.ndarray[bool]]:
if (left_type := isinstance(left, np.ndarray)) and (right_type := isinstance(right, np.ndarray)):
return left, right
elif left_type and (not right_type):
new_right = np.zeros_like(left)
for n in right:
new_right[n] = True
return left, new_right
elif (not left_type) and right_type:
new_left = np.zeros_like(right)
for n in left:
new_left[n] = True
return new_left, right
else:
new_left = np.zeros(max(left[-1], right[-1]), dtype=bool)
new_right = np.zeros_like(new_left)
for n in left:
new_left[n] = True
for n in right:
new_right[n] = True
return new_left, new_right
this_is_likely_due_to = True
# Caller must assure that range was valid for both left and right array.
def guess_offset_range(left: np.ndarray[bool], right: np.ndarray[bool], range_: tuple[int]) -> typing.Optional[str]:
global this_is_likely_due_to
results = np.empty((481,), dtype=int)
for iter in range(-240, 0):
rl = max(range_[0] + iter, 0)
rr = range_[1] + iter
ll = range_[1] - (rr - rl)
lr = range_[1]
results[iter + 240] = np.count_nonzero(np.logical_and(left[ll:lr], right[rl:rr]))
for iter in range(0, 241):
rl = range_[0] + iter
rr = min(range_[1] + iter, right.shape[0])
ll = range_[0]
lr = range_[0] + (rr - rl)
results[iter + 240] = np.count_nonzero(np.logical_and(left[ll:lr], right[rl:rr]))
clf = StandardScaler(copy=True)
results = clf.fit_transform(results.reshape((-1, 1))).reshape((-1))
results_significant = np.nonzero(results > 5)[0]
if results_significant.shape[0] == 0:
message = f"\033[31mCould not find a significant relevance between left and right clips between \033[1;33m{print_frame(range_[0])}\033[0;31m and \033[1;33m{print_frame(range_[1])}\033[0;31m.\033[0m\n"
if (results_significant := np.nonzero(results > 4)[0]).shape[0] != 0:
message += "Timing offset with high unit variance are:\n"
for index in results_significant:
message += f"\033[31m* \033[1;33m{print_offset(index).rjust(4)} \033[0mwith unit variance {results[index]:.3f}\033[31m.\033[0m\n"
return message
elif results_significant.shape[0] == 1:
if results_significant[0] == 240:
return None
else:
return f"\033[34mPossible \033[1;34m{print_offset(results_significant[0])}\033[0;34m offset between \033[1;34m{print_frame(range_[0])}\033[0;34m and \033[1;34m{print_frame(range_[1])}\033[0;34m \033[0mwith unit variance {results[results_significant[0]]:.3f}\033[34m.\033[0m\n"
else:
message = f"\033[34mMultiple possible offsets detected between \033[1;34m{print_frame(range_[0])}\033[0;34m and \033[1;34m{print_frame(range_[1])}\033[0;34m:\033[0m\n"
for index in results_significant:
message += f"\033[34m* \033[1;34m{print_offset(index).rjust(4)} \033[0mwith unit variance {results[index]:.3f}\033[34m.\033[0m\n"
if this_is_likely_due_to:
message += "This is likely due to changes in timing in the middle of the segment, for example, with earlier parts of the segment following one offset and later parts following another, or it might just be a coincident, especially in the case where one offset has very high unit variance while all other offsets have low unit variances.\n"
this_is_likely_due_to = False
else:
message += "This may be due to changes in timing in the middle of the segment, or otherwise a coincident.\n"
return message
def guess_offset(left: np.ndarray[bool], right: np.ndarray[bool]) -> typing.Optional[str]:
a_message = None
if (length := left.shape[0]) != right.shape[0]:
length = min(left.shape[0], right.shape[0])
if abs(left.shape[0] - right.shape[0]) > 72:
a_message = "\033[33mLeft and right clips' length differs by more than 72 frames.\033[0m\n"
a_message += f"Left clip has {str(left.shape[0])} frames.\n"
a_message += f"Right clip has {str(right.shape[0])} frames.\n"
b_message = None
if length < 481:
b_message = "\033[31mComparations on clips whose lengths are under 481 frames are not supported.\033[0m\n"
b_message += str(left.shape)
else:
section_count = math.floor(length / 5754)
section_length = math.floor(length / section_count)
for i in range(section_count):
if i < section_count - 1:
return_message = guess_offset_range(left, right, (i * section_length, (i+1) * section_length))
else:
return_message = guess_offset_range(left, right, (i * section_length, length))
if return_message:
if b_message is None:
b_message = return_message
else:
b_message += return_message
if a_message and (not b_message):
return a_message + "No timing differences were detected. Left and right clips are aligned.\n"
elif (not a_message) and (not b_message):
return None
elif (not a_message) and b_message:
return b_message
else:
return a_message + b_message
parser = argparse.ArgumentParser(prog="TimingOffset", description="Detect whether Web and BD sources align based on video keyframe")
parser.add_argument("left", type=Path, help="The clip to compare against. Supports video file, lwi file, keyframe format file, or directory containing such files (smart)")
parser.add_argument("right", type=Path, help="The clip to compare. Supports video file, lwi file, keyframe format file, or directory containing such files (smart)")
args = parser.parse_args()
left = args.left
right = args.right
file_match = re.compile(r"(?<![a-z0-9A-DF-OQ-Z])([0-9]{2,3}(?:\.[0-9])?)[^a-uw-z0-9A-Z\.]")
messaged = False
def convert_path_to_list_of_files(path: Path):
if path.is_file():
return [path]
elif path.is_dir():
file_dict = {}
for file in path.iterdir():
if file.is_file() and (match := file_match.search(file.name)):
file_key = float(match.group(1))
if file_key in file_dict:
file_dict[file_key].append(file)
else:
file_dict[file_key] = [file]
file_list = []
for key in sorted(file_dict):
if len(file_dict[key]) == 1:
file_list.append(file_dict[key][0])
else:
binary = None
keyframe_format = None
lwi = None
for file in file_dict[key]:
if guess_filetype(file) == "binary":
if binary is None:
binary = file
else:
if file.stat().st_size > binary.stat().st_size:
binary = file
elif guess_filetype(file) == "lwi":
lwi = file
elif guess_filetype(file) == "keyframe_format":
keyframe_format = file
else:
raise ValueError
if keyframe_format is not None:
file_list.append(keyframe_format)
elif lwi is not None:
file_list.append(lwi)
elif binary is not None:
file_list.append(binary)
else:
raise ValueError
return file_list
else:
raise ValueError(f"Path \"{path.as_posix()}\" is neither a file nor a directory.")
left = convert_path_to_list_of_files(left)
right = convert_path_to_list_of_files(right)
if len(left) != len(right):
raise ValueError(f"Number of files in the left is different than number of files in the right.\n\tFiles in the left: {str([file.name for file in left])}\n\tFiles in the right: {str([file.name for file in right])}")
if len(left) == 0:
raise ValueError(f"No file is recognised in either provided directories.")
for i in range(len(left)):
if (match := file_match.search(right[i].name)) or (match := file_match.search(left[i].name)):
print(f"\033[1;37mComparing Episode {float(match.group(1)):02g}...\033[0m", end="\n")
left_ = get_keyframes(left[i])
right_ = get_keyframes(right[i])
left_, right_ = inflate_keyframes_array(left_, right_)
message = guess_offset(left_, right_)
if message:
messaged = True
if match:
print(f"\r\033[1A\033[K\033[1;37mOffsets in Episode {float(match.group(1)):02g} between left reference \033[0m\"{left[i].name}\"\033[1;37m and right target \033[0m\"{right[i].name}\"\033[1;37m:\033[0m", end="\n")
print(message, end="")
else:
print(message, end="")
else:
if match:
print("\r\033[1A\033[K", end="")
if not messaged:
print("No timing differences were detected. Left and right clips are aligned.", end="\n")