Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Download individuals albums and songs. #12

Merged
merged 5 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions amusing/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from importlib import metadata
from typing import Annotated
from typing import Annotated, Optional

import typer
from rich.console import Console
from rich.table import Table

from amusing.cli_operations import (
download_album_operation,
download_library_operation,
download_song_operation,
organize_library_operation,
Expand Down Expand Up @@ -36,19 +37,31 @@ def callback(
"""My app description"""


@app.command("album")
def download_album(
title: str = typer.Option(..., help="Title of the album"),
artist: Optional[str] = typer.Option(None, help="Artist of the album (optional)"),
):
"""Search and download the album and add it and any or all of its songs to the db.
Creates a new album if not already present.
This is the preferred way of adding new songs/albums to the music library.
"""
output = download_album_operation(title, APP_CONFIG["root_download_path"], artist)
print(output)


@app.command("song")
def download_song(
name: Annotated[str, typer.Argument(help="Name of the song.")],
artist: Annotated[str, typer.Argument(help="Aritst of the song.")],
album: Annotated[str, typer.Argument(help="Album the song belongs to.")],
title: str = typer.Option(..., help="Title of the song"),
artist: Optional[str] = typer.Option(None, help="Artist of the song (optional)"),
album: Optional[str] = typer.Option(None, help="Album of the song (optional)"),
force: Annotated[bool, typer.Option(help="Overwrite the song if present.")] = False,
):
"""Search and download the song and add it to the db.
"""Search and download an individual song and add it to the db.
Creates a new album if not already present.
"""
print(f"Given: {name} from {album} by {artist} and force is {force}")
output = download_song_operation(
album, name, artist, APP_CONFIG["root_download_path"], False
title, APP_CONFIG["root_download_path"], artist, album, force
)
print(output)

Expand Down
213 changes: 187 additions & 26 deletions amusing/cli_operations.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import hashlib
import os
import re
from shutil import copyfile

import amusing.core.save_to_db
import amusing.core.search
import typer
from sqlalchemy.orm import Session

from amusing.core.download import download
from amusing.core.metadata import search_album_metadata, search_songs_metadata
from amusing.core.parse_csv import process_csv
from amusing.core.parse_xml import parse_library_xml
from amusing.core.save_to_db import check_if_song_in_db
from amusing.core.search import search
from amusing.db.engine import get_new_db_session
from amusing.db.models import Album, Organizer, Song
from amusing.utils.funcs import construct_db_path, short_filename, short_filename_clean


def download_song_operation(
album_name: str,
song_name: str,
artist_name: str,
root_download_path: str,
artist_name: str = None,
album_name: str = None,
overwrite: bool = False,
) -> str:
"""Download a particular song and add it to the db.
Expand All @@ -30,45 +33,203 @@ def download_song_operation(
overwrite (bool): whether to overwrite the song if present in db and downloads.

"""
song = Song(
title=song_name,
artist=artist_name,
album=Album(title=album_name),
)
song_metadata_dict = search_songs_metadata(song_name, artist_name, album_name)
if song_metadata_dict:
song = Song(
title=song_metadata_dict["title"],
artist=song_metadata_dict["artist"],
album=Album(title=song_metadata_dict["album"]),
)
song.composer = song_metadata_dict["composer"]
song.disc = song_metadata_dict["disc"]
song.track = song_metadata_dict["track"]
else:
song = Song(
title=song_name,
artist=artist_name,
album=Album(title=album_name),
)
# fetch song from YT Music
song_fetched = search(song)
if not song_fetched:
return "Couldn't find song through YouTube Music Search."
album_name = song_fetched.album.title
song_name = song_fetched.title
artist_name = song_fetched.artist

try:
download(song_fetched, root_download_path)
download(song_fetched, root_download_path, overwrite)
except RuntimeError as e:
print(f"[!] Error: {e}")
return "Something went wrong while downloading a song. Please try again."
return "Something went wrong while downloading a song."
except FileNotFoundError as e:
print(f"[!] Error: {e}")
return "Is FFmpeg installed? It is required to generate the songs."

# insert into db
session = get_new_db_session(construct_db_path(root_download_path))
if not album_name:
album_name = song_name
album_in_db, error = create_new_album(album_name, album_dir, session)
if error:
return "Something went wrong in creating album. Please try again."

error = create_new_song(
song_name, artist_name, song_fetched.video_id, album_in_db, session, overwrite
albums = (
session.query(Album).filter(Album.title.ilike(f"%{song.album.title}%")).all()
)
if error:
return "Something went wrong in creating song. Please try again."
if albums:
typer.echo(
f"\n\n\nSimilar Albums are already present in the db. Do you want to download this song into any of these albums?"
)
choices = [album.title for album in albums] + ["Make a new album"]
typer.echo("Select an album or create a new one:")
for idx, choice in enumerate(choices, start=1):
typer.echo(f"{idx}. {choice}")
# Prompt user for a choice of album
selected_choice = typer.prompt("Enter the option of your choice", type=int)
if selected_choice == len(albums) + 1:
# Create and commit the new album
album = Album(title=album_name)
session.add(album)
session.commit()
typer.echo(f"New album '{album_name}' created!")
else:
# Fetch the selected album from the database
album = albums[selected_choice - 1]
typer.echo(f"Selected album: {album.title}")

# check if the song is present in the selected album in db
song_present_in_db_query, error = check_if_song_in_db(
song.title, song.artist, album, session
)
if song_present_in_db_query and overwrite:
session.delete(song_present_in_db_query)
song.album = album
song.video_id = song_fetched.video_id
session.add(song)
session.commit()
else:
# Need to make a new album and a song in db
album = Album(title=album_name)
session.add(album)
session.commit()
typer.echo(f"New album '{album_name}' created!")
song.album = album
song.video_id = song_fetched.video_id
session.add(song)
session.commit()

return "Added song!"


def _download_all_songs_for_given_album(
album: Album, album_metadata: dict, root_download_path: str, session: Session
):
"""A companion function to download_album_operation to do exactly as it says on the tin."""
for song_metadata_dict in album_metadata["track_list"]:
print("---")
choice = typer.prompt(
f"Processing song: {song_metadata_dict['title']}. Press any key to continue or 's' to skip this song."
)
if choice == "s":
continue
song = Song(
title=song_metadata_dict["title"],
artist=song_metadata_dict["artist"],
album=album,
)
song.composer = song_metadata_dict.get("composer")
song.disc = song_metadata_dict.get("disc_number")
song.track = song_metadata_dict.get("track_number")
song.genre = song_metadata_dict.get("genre")
# fetch song from YT Music
song_fetched = search(song)
if not song_fetched:
return "Couldn't find song through YouTube Music Search."
try:
download(song_fetched, root_download_path, True)
except RuntimeError as e:
print(f"[!] Error: {e}")
return "Something went wrong while downloading a song."
except FileNotFoundError as e:
print(f"[!] Error: {e}")
return "Is FFmpeg installed? It is required to generate the songs."
# check if the song is present in the given album in db
song_present_in_db_query, error = check_if_song_in_db(
song.title, song.artist, album, session
)
if song_present_in_db_query:
session.delete(song_present_in_db_query)
song.album = album
song.video_id = song_fetched.video_id
session.add(song)
session.commit()
return "Added all songs!"


def download_album_operation(
album_name: str,
root_download_path: str,
artist_name: str = None,
):
"""Download a particular album and all of its songs and add it to the db.

Parameters:
album_name (str): name of the album
artist_name (str): name of the artist
root_download_path (str): the path to download songs and put db into.
"""
album_metadata = search_album_metadata(album_name, artist_name)

# add new album to db or edit an existing album
session = get_new_db_session(construct_db_path(root_download_path))
albums = (
session.query(Album)
.filter(Album.title.ilike(f"%{album_metadata['title']}%"))
.all()
)
num_tracks = album_metadata.get("num_tracks")
if albums:
typer.echo(
f"\n\n\nSimilar Albums are already present in the db. Do you want to choose any of these albums to edit?"
)
choices = [album.title for album in albums] + ["Make a new album"]
typer.echo("Select an album or create a new one:")
for idx, choice in enumerate(choices, start=1):
typer.echo(f"{idx}. {choice}")
# Prompt user for a choice of album
selected_choice = typer.prompt("Enter the option of your choice", type=int)
if selected_choice == len(albums) + 1:
# Create and commit the new album
album = Album(
title=album_metadata["title"],
tracks=num_tracks,
artist=album_metadata["artist"],
release_date=album_metadata["release_date"],
artwork_url=album_metadata["artwork_url"],
)
session.add(album)
session.commit()
typer.echo(f"New album '{album_name}' created!")
elif 1 <= selected_choice <= len(albums):
# Fetch the selected album from the database and edit it
album = albums[selected_choice - 1]
typer.echo(f"Selected album: {album.title}")
album.title = album_metadata["title"]
album.artist = album_metadata["artist"]
album.tracks = num_tracks
album.release_date = album_metadata["release_date"]
album.artwork_url = album_metadata["artwork_url"]
session.add(album)
session.commit()
else:
# create a new album in db
album = Album(
title=album_metadata["title"],
tracks=num_tracks,
artist=album_metadata["artist"],
release_date=album_metadata["release_date"],
artwork_url=album_metadata["artwork_url"],
)
session.add(album)
session.commit()
typer.echo(f"New album '{album.title}' created!")

return _download_all_songs_for_given_album(
album, album_metadata, root_download_path, session
)


def parse_library_operation(root_download_path: str, lib_path: str) -> str:
"""Parse the Library XML or CSV file.

Expand Down
6 changes: 3 additions & 3 deletions amusing/core/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def song_file(song: Song, album_dir: str) -> str:
return ""


def download(song: Song, root_download_path: str):
def download(song: Song, root_download_path: str, overwrite: bool = False):
"""Download a song from YouTube video and generate file with metadata."""

video_id = song.video_id
Expand All @@ -159,8 +159,8 @@ def download(song: Song, root_download_path: str):
song_filename = short_filename(songs_dir, song_name, artwork_hash, video_id)
song_file_path = os.path.join(songs_dir, song_filename)

# Skip download if the song is already present
if os.path.exists(song_file_path):
# Skip download if the song is already present and overwrite is False
if os.path.exists(song_file_path) and not overwrite:
return

# Escape glob characters
Expand Down
Loading