Skip to content

Commit

Permalink
fix: Don't add episode to DB if they failed downloading (#209, #210)
Browse files Browse the repository at this point in the history
  • Loading branch information
janw authored Dec 3, 2024
1 parent 8f4ae3c commit b61530a
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 51 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/bump-version.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: Bump version

on: workflow_dispatch
on:
push:
branches:
- main

jobs:
bump-version:
Expand Down
1 change: 1 addition & 0 deletions hack/rich-codex.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ export DELETED_FILES="deleted.txt"
export NO_CONFIRM="true"
export SKIP_GIT_CHECKS="true"
export CLEAN_IMG_PATHS='./assets/*.svg'
export CI=1

exec poetry run rich-codex
92 changes: 48 additions & 44 deletions podcast_archiver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from typing import TYPE_CHECKING

from podcast_archiver import constants
from podcast_archiver.config import Settings
from podcast_archiver.download import DownloadJob
from podcast_archiver.enums import DownloadResult, QueueCompletionType
from podcast_archiver.logging import logger, rprint
from podcast_archiver.models import Episode, Feed, FeedInfo
from podcast_archiver.types import EpisodeResult, EpisodeResultsList
from podcast_archiver.types import EpisodeResult, EpisodeResultsList, FutureEpisodeResult
from podcast_archiver.utils import FilenameFormatter, handle_feed_request

if TYPE_CHECKING:
from pathlib import Path

from podcast_archiver.config import Settings
from podcast_archiver.database import BaseDatabase


Expand All @@ -38,27 +38,29 @@ class FeedProcessor:

known_feeds: dict[str, FeedInfo]

def __init__(self, settings: Settings) -> None:
self.settings = settings
self.filename_formatter = FilenameFormatter(settings)
self.database = settings.get_database()
def __init__(self, settings: Settings | None = None) -> None:
self.settings = settings or Settings()
self.filename_formatter = FilenameFormatter(self.settings)
self.database = self.settings.get_database()
self.pool_executor = ThreadPoolExecutor(max_workers=self.settings.concurrency)
self.stop_event = Event()
self.known_feeds = {}

def process(self, url: str) -> ProcessingResult:
result = ProcessingResult()
with handle_feed_request(url):
result.feed = Feed(url=url, known_info=self.known_feeds.get(url))
if not (feed := self.load_feed(url, known_feeds=self.known_feeds)):
return ProcessingResult()

if result.feed:
rprint(f"\n[bold bright_magenta]Downloading archive for: {result.feed}[/]\n")
episode_results, completion_msg = self._process_episodes(feed=result.feed)
self._handle_results(episode_results, result=result)
result, tombstone = self.process_feed(feed=feed)

rprint(f"\n[bar.finished]✔ {completion_msg} for: {result.feed}[/]")
rprint(f"\n[bar.finished]✔ {tombstone} for: {feed}[/]")
return result

def load_feed(self, url: str, known_feeds: dict[str, FeedInfo]) -> Feed | None:
with handle_feed_request(url):
feed = Feed(url=url, known_info=known_feeds.get(url))
known_feeds[feed.url] = feed.info
return feed

def _preflight_check(self, episode: Episode, target: Path) -> DownloadResult | None:
if self.database.exists(episode):
logger.debug("Pre-flight check on episode '%s': already in database.", episode)
Expand All @@ -70,58 +72,60 @@ def _preflight_check(self, episode: Episode, target: Path) -> DownloadResult | N

return None

def _process_episodes(self, feed: Feed) -> tuple[EpisodeResultsList, QueueCompletionType]:
def process_feed(self, feed: Feed) -> tuple[ProcessingResult, QueueCompletionType]:
rprint(f"\n[bold bright_magenta]Downloading archive for: {feed}[/]\n")
tombstone = QueueCompletionType.COMPLETED
results: EpisodeResultsList = []
for idx, episode in enumerate(feed.episode_iter(self.settings.maximum_episode_count), 1):
if completion := self._process_episode(episode, feed.info, results):
return results, completion

if (enqueued := self._enqueue_episode(episode, feed.info)) is None:
tombstone = QueueCompletionType.FOUND_EXISTING
break
results.append(enqueued)
if (max_count := self.settings.maximum_episode_count) and idx == max_count:
logger.debug("Reached requested maximum episode count of %s", max_count)
return results, QueueCompletionType.MAX_EPISODES
tombstone = QueueCompletionType.MAX_EPISODES
break

return results, QueueCompletionType.COMPLETED
success, failures = self._handle_results(results)
return ProcessingResult(feed=feed, success=success, failures=failures), tombstone

def _process_episode(
self, episode: Episode, feed_info: FeedInfo, results: EpisodeResultsList
) -> QueueCompletionType | None:
def _enqueue_episode(self, episode: Episode, feed_info: FeedInfo) -> FutureEpisodeResult | None:
target = self.filename_formatter.format(episode=episode, feed_info=feed_info)
if result := self._preflight_check(episode, target):
rprint(f"[bar.finished]✔ {result}: {episode}[/]")
results.append(EpisodeResult(episode, result))
if self.settings.update_archive:
logger.debug("Up to date with %r", episode)
return QueueCompletionType.FOUND_EXISTING
return None
return None
return EpisodeResult(episode, result)

logger.debug("Queueing download for %r", episode)
results.append(
self.pool_executor.submit(
DownloadJob(
episode,
target=target,
max_download_bytes=constants.DEBUG_PARTIAL_SIZE if self.settings.debug_partial else None,
write_info_json=self.settings.write_info_json,
stop_event=self.stop_event,
)
return self.pool_executor.submit(
DownloadJob(
episode,
target=target,
max_download_bytes=constants.DEBUG_PARTIAL_SIZE if self.settings.debug_partial else None,
write_info_json=self.settings.write_info_json,
stop_event=self.stop_event,
)
)
return None

def _handle_results(self, episode_results: EpisodeResultsList, *, result: ProcessingResult) -> None:
if not result.feed:
return
def _handle_results(self, episode_results: EpisodeResultsList) -> tuple[int, int]:
failures = success = 0
for episode_result in episode_results:
if not isinstance(episode_result, EpisodeResult):
try:
episode_result = episode_result.result()
logger.debug("Got future result %s", episode_result)
except Exception:
result.failures += 1
except Exception as exc:
logger.debug("Got exception from future %s", episode_result, exc_info=exc)
continue

if episode_result.result not in (DownloadResult.COMPLETED_SUCCESSFULLY, DownloadResult.ALREADY_EXISTS):
failures += 1
continue

self.database.add(episode_result.episode)
result.success += 1
self.known_feeds[result.feed.url] = result.feed.info
success += 1
return success, failures

def shutdown(self) -> None:
if not self.stop_event.is_set():
Expand Down
3 changes: 2 additions & 1 deletion podcast_archiver/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ class ProgressCallback(Protocol):
def __call__(self, total: int | None = None, completed: int | None = None) -> None: ...


EpisodeResultsList: TypeAlias = list[Future[EpisodeResult] | EpisodeResult]
FutureEpisodeResult: TypeAlias = Future[EpisodeResult] | EpisodeResult
EpisodeResultsList: TypeAlias = list[FutureEpisodeResult]
62 changes: 57 additions & 5 deletions tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

from pathlib import Path
from typing import TYPE_CHECKING
from unittest import mock
from unittest.mock import patch

import pytest

from podcast_archiver.config import Settings
from podcast_archiver.enums import DownloadResult
from podcast_archiver.models import FeedPage
from podcast_archiver.processor import FeedProcessor, ProcessingResult
from podcast_archiver.types import EpisodeResult

if TYPE_CHECKING:
from pydantic_core import Url
from responses import RequestsMock

from podcast_archiver.models import Episode
from podcast_archiver.types import EpisodeResultsList


@pytest.mark.parametrize(
"file_exists,database_exists,expected_result",
Expand All @@ -31,11 +35,10 @@ def test_preflight_check(
database_exists: bool,
expected_result: DownloadResult | None,
) -> None:
settings = Settings()
feed = FeedPage.model_validate(feedobj_lautsprecher)
episode = feed.episodes[0]
target = Path("file.mp3")
proc = FeedProcessor(settings)
proc = FeedProcessor()
if file_exists:
target.touch()
with patch.object(proc.database, "exists", return_value=database_exists):
Expand All @@ -45,10 +48,59 @@ def test_preflight_check(


def test_retrieve_failure(responses: RequestsMock) -> None:
settings = Settings()
proc = FeedProcessor(settings)
proc = FeedProcessor()

result = proc.process("https://broken.url.invalid")

assert result == ProcessingResult()
assert result.feed is None


def test_download_success(tmp_path_cd: Path, feed_lautsprecher: str) -> None:
proc = FeedProcessor()

result = proc.process(feed_lautsprecher)

assert result != ProcessingResult()
assert result.success == 5
assert result.feed
assert result.feed.url == feed_lautsprecher


def test_handle_results_mixed(episode: Episode) -> None:
proc = FeedProcessor()
episodes: EpisodeResultsList = [
EpisodeResult(episode=episode, result=DownloadResult.COMPLETED_SUCCESSFULLY),
EpisodeResult(episode=episode, result=DownloadResult.FAILED),
]

with mock.patch.object(proc.database, "add", return_value=None) as mock_add:
success, failures = proc._handle_results(episodes)

assert success == 1
assert failures == 1
assert mock_add.call_count == 1


def test_handle_results_failure(episode: Episode) -> None:
proc = FeedProcessor()
episodes: EpisodeResultsList = [EpisodeResult(episode=episode, result=DownloadResult.ABORTED)]

with mock.patch.object(proc.database, "add", return_value=None) as mock_add:
success, failures = proc._handle_results(episodes)

assert success == 0
assert failures == 1
mock_add.assert_not_called()


def test_handle_results_failed_future(episode: Episode) -> None:
proc = FeedProcessor()
episodes: EpisodeResultsList = [EpisodeResult(episode=episode, result=DownloadResult.ABORTED)]

with mock.patch.object(proc.database, "add", return_value=None) as mock_add:
success, failures = proc._handle_results(episodes)

assert success == 0
assert failures == 1
mock_add.assert_not_called()

0 comments on commit b61530a

Please # to comment.