mirror of
https://github.com/zoriya/Kyoo.git
synced 2026-05-23 23:19:08 +00:00
106 lines
4.0 KiB
Python
106 lines
4.0 KiB
Python
from __future__ import annotations
|
|
|
|
from .hls_utils import fetch_text, parse_media_playlist, probe_segment_timeline
|
|
|
|
|
|
def test_copied_audio_has_no_gaps_at_lazy_window_boundaries(
|
|
master_context: dict, test_config, byte_cache
|
|
) -> None:
|
|
"""For -c:a copy, each lazy window (every ~100 segments) can drop a
|
|
boundary-crossing packet. Verify there are no audible gaps."""
|
|
max_allowed_gap_seconds = 0.06
|
|
failures: list[str] = []
|
|
|
|
for audio in master_context["audios"]:
|
|
# Only test the "original" (copied) audio rendition.
|
|
if "original" not in audio.url:
|
|
continue
|
|
|
|
text = fetch_text(
|
|
audio.url,
|
|
timeout_seconds=test_config.timeout_seconds,
|
|
headers=test_config.headers,
|
|
)
|
|
playlist = parse_media_playlist(text, audio.url, master_context["client_id"])
|
|
if len(playlist.segment_urls) < 102:
|
|
# Not enough segments to cross a boundary
|
|
continue
|
|
|
|
# Probe segments around each 100-segment boundary.
|
|
boundaries = [i for i in range(99, len(playlist.segment_urls), 100)]
|
|
for boundary in boundaries:
|
|
before = probe_segment_timeline(
|
|
segment_url=playlist.segment_urls[boundary],
|
|
map_url=playlist.map_url,
|
|
timeout_seconds=test_config.timeout_seconds,
|
|
byte_cache=byte_cache,
|
|
headers=test_config.headers,
|
|
)
|
|
after = probe_segment_timeline(
|
|
segment_url=playlist.segment_urls[boundary + 1],
|
|
map_url=playlist.map_url,
|
|
timeout_seconds=test_config.timeout_seconds,
|
|
byte_cache=byte_cache,
|
|
headers=test_config.headers,
|
|
)
|
|
gap = after.start - before.end
|
|
if gap > max_allowed_gap_seconds:
|
|
failures.append(
|
|
f"{audio.attrs.get('GROUP-ID', audio.url)}"
|
|
f" boundary={boundary}->{boundary + 1}"
|
|
f" gap={gap:.6f}s"
|
|
)
|
|
|
|
assert not failures, "copied audio gap at lazy window boundary: " + ", ".join(
|
|
failures
|
|
)
|
|
|
|
|
|
def test_copied_audio_has_no_large_gaps_anywhere(
|
|
master_context: dict, test_config, byte_cache
|
|
) -> None:
|
|
"""Check every consecutive segment pair in the copied audio for gaps."""
|
|
max_allowed_gap_seconds = 0.06
|
|
failures: list[str] = []
|
|
|
|
for audio in master_context["audios"]:
|
|
if "original" not in audio.url:
|
|
continue
|
|
|
|
text = fetch_text(
|
|
audio.url,
|
|
timeout_seconds=test_config.timeout_seconds,
|
|
headers=test_config.headers,
|
|
)
|
|
playlist = parse_media_playlist(text, audio.url, master_context["client_id"])
|
|
if len(playlist.segment_urls) < 2:
|
|
continue
|
|
|
|
# Check up to 50 consecutive pairs (enough to cross a window boundary).
|
|
sample_count = min(len(playlist.segment_urls) - 1, 50)
|
|
for idx in range(sample_count):
|
|
before = probe_segment_timeline(
|
|
segment_url=playlist.segment_urls[idx],
|
|
map_url=playlist.map_url,
|
|
timeout_seconds=test_config.timeout_seconds,
|
|
byte_cache=byte_cache,
|
|
headers=test_config.headers,
|
|
)
|
|
after = probe_segment_timeline(
|
|
segment_url=playlist.segment_urls[idx + 1],
|
|
map_url=playlist.map_url,
|
|
timeout_seconds=test_config.timeout_seconds,
|
|
byte_cache=byte_cache,
|
|
headers=test_config.headers,
|
|
)
|
|
gap = after.start - before.end
|
|
if gap > max_allowed_gap_seconds:
|
|
failures.append(
|
|
f"{audio.attrs.get('GROUP-ID', audio.url)}"
|
|
f" seg={idx}->{idx + 1} gap={gap:.6f}s"
|
|
)
|
|
# Fail fast on the first large gap
|
|
break
|
|
|
|
assert not failures, "copied audio gap detected: " + ", ".join(failures)
|