Skip to content

Commit

Permalink
Merge pull request #12 from yashprakash13/develop
Browse files Browse the repository at this point in the history
Download individuals albums and songs, with metadata from MB.
  • Loading branch information
yashprakash13 authored Dec 28, 2024
2 parents 3a6980c + f2fdf6e commit 438912e
Show file tree
Hide file tree
Showing 11 changed files with 703 additions and 188 deletions.
27 changes: 20 additions & 7 deletions amusing/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from importlib import metadata
from typing import Annotated
from typing import Annotated, Optional

import typer
from rich.console import Console
from rich.table import Table

from amusing.cli_operations import (
download_album_operation,
download_library_operation,
download_song_operation,
organize_library_operation,
Expand Down Expand Up @@ -36,19 +37,31 @@ def callback(
"""My app description"""


@app.command("album")
def download_album(
title: str = typer.Option(..., help="Title of the album"),
artist: Optional[str] = typer.Option(None, help="Artist of the album (optional)"),
):
"""Search and download the album and add it and any or all of its songs to the db.
Creates a new album if not already present.
This is the preferred way of adding new songs/albums to the music library.
"""
output = download_album_operation(title, APP_CONFIG["root_download_path"], artist)
print(output)


@app.command("song")
def download_song(
name: Annotated[str, typer.Argument(help="Name of the song.")],
artist: Annotated[str, typer.Argument(help="Aritst of the song.")],
album: Annotated[str, typer.Argument(help="Album the song belongs to.")],
title: str = typer.Option(..., help="Title of the song"),
artist: Optional[str] = typer.Option(None, help="Artist of the song (optional)"),
album: Optional[str] = typer.Option(None, help="Album of the song (optional)"),
force: Annotated[bool, typer.Option(help="Overwrite the song if present.")] = False,
):
"""Search and download the song and add it to the db.
"""Search and download an individual song and add it to the db.
Creates a new album if not already present.
"""
print(f"Given: {name} from {album} by {artist} and force is {force}")
output = download_song_operation(
album, name, artist, APP_CONFIG["root_download_path"], False
title, APP_CONFIG["root_download_path"], artist, album, force
)
print(output)

Expand Down
213 changes: 187 additions & 26 deletions amusing/cli_operations.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import hashlib
import os
import re
from shutil import copyfile

import amusing.core.save_to_db
import amusing.core.search
import typer
from sqlalchemy.orm import Session

from amusing.core.download import download
from amusing.core.metadata import search_album_metadata, search_songs_metadata
from amusing.core.parse_csv import process_csv
from amusing.core.parse_xml import parse_library_xml
from amusing.core.save_to_db import check_if_song_in_db
from amusing.core.search import search
from amusing.db.engine import get_new_db_session
from amusing.db.models import Album, Organizer, Song
from amusing.utils.funcs import construct_db_path, short_filename, short_filename_clean


def download_song_operation(
album_name: str,
song_name: str,
artist_name: str,
root_download_path: str,
artist_name: str = None,
album_name: str = None,
overwrite: bool = False,
) -> str:
"""Download a particular song and add it to the db.
Expand All @@ -30,45 +33,203 @@ def download_song_operation(
overwrite (bool): whether to overwrite the song if present in db and downloads.
"""
song = Song(
title=song_name,
artist=artist_name,
album=Album(title=album_name),
)
song_metadata_dict = search_songs_metadata(song_name, artist_name, album_name)
if song_metadata_dict:
song = Song(
title=song_metadata_dict["title"],
artist=song_metadata_dict["artist"],
album=Album(title=song_metadata_dict["album"]),
)
song.composer = song_metadata_dict["composer"]
song.disc = song_metadata_dict["disc"]
song.track = song_metadata_dict["track"]
else:
song = Song(
title=song_name,
artist=artist_name,
album=Album(title=album_name),
)
# fetch song from YT Music
song_fetched = search(song)
if not song_fetched:
return "Couldn't find song through YouTube Music Search."
album_name = song_fetched.album.title
song_name = song_fetched.title
artist_name = song_fetched.artist

try:
download(song_fetched, root_download_path)
download(song_fetched, root_download_path, overwrite)
except RuntimeError as e:
print(f"[!] Error: {e}")
return "Something went wrong while downloading a song. Please try again."
return "Something went wrong while downloading a song."
except FileNotFoundError as e:
print(f"[!] Error: {e}")
return "Is FFmpeg installed? It is required to generate the songs."

# insert into db
session = get_new_db_session(construct_db_path(root_download_path))
if not album_name:
album_name = song_name
album_in_db, error = create_new_album(album_name, album_dir, session)
if error:
return "Something went wrong in creating album. Please try again."

error = create_new_song(
song_name, artist_name, song_fetched.video_id, album_in_db, session, overwrite
albums = (
session.query(Album).filter(Album.title.ilike(f"%{song.album.title}%")).all()
)
if error:
return "Something went wrong in creating song. Please try again."
if albums:
typer.echo(
f"\n\n\nSimilar Albums are already present in the db. Do you want to download this song into any of these albums?"
)
choices = [album.title for album in albums] + ["Make a new album"]
typer.echo("Select an album or create a new one:")
for idx, choice in enumerate(choices, start=1):
typer.echo(f"{idx}. {choice}")
# Prompt user for a choice of album
selected_choice = typer.prompt("Enter the option of your choice", type=int)
if selected_choice == len(albums) + 1:
# Create and commit the new album
album = Album(title=album_name)
session.add(album)
session.commit()
typer.echo(f"New album '{album_name}' created!")
else:
# Fetch the selected album from the database
album = albums[selected_choice - 1]
typer.echo(f"Selected album: {album.title}")

# check if the song is present in the selected album in db
song_present_in_db_query, error = check_if_song_in_db(
song.title, song.artist, album, session
)
if song_present_in_db_query and overwrite:
session.delete(song_present_in_db_query)
song.album = album
song.video_id = song_fetched.video_id
session.add(song)
session.commit()
else:
# Need to make a new album and a song in db
album = Album(title=album_name)
session.add(album)
session.commit()
typer.echo(f"New album '{album_name}' created!")
song.album = album
song.video_id = song_fetched.video_id
session.add(song)
session.commit()

return "Added song!"


def _download_all_songs_for_given_album(
album: Album, album_metadata: dict, root_download_path: str, session: Session
):
"""A companion function to download_album_operation to do exactly as it says on the tin."""
for song_metadata_dict in album_metadata["track_list"]:
print("---")
choice = typer.prompt(
f"Processing song: {song_metadata_dict['title']}. Press any key to continue or 's' to skip this song."
)
if choice == "s":
continue
song = Song(
title=song_metadata_dict["title"],
artist=song_metadata_dict["artist"],
album=album,
)
song.composer = song_metadata_dict.get("composer")
song.disc = song_metadata_dict.get("disc_number")
song.track = song_metadata_dict.get("track_number")
song.genre = song_metadata_dict.get("genre")
# fetch song from YT Music
song_fetched = search(song)
if not song_fetched:
return "Couldn't find song through YouTube Music Search."
try:
download(song_fetched, root_download_path, True)
except RuntimeError as e:
print(f"[!] Error: {e}")
return "Something went wrong while downloading a song."
except FileNotFoundError as e:
print(f"[!] Error: {e}")
return "Is FFmpeg installed? It is required to generate the songs."
# check if the song is present in the given album in db
song_present_in_db_query, error = check_if_song_in_db(
song.title, song.artist, album, session
)
if song_present_in_db_query:
session.delete(song_present_in_db_query)
song.album = album
song.video_id = song_fetched.video_id
session.add(song)
session.commit()
return "Added all songs!"


def download_album_operation(
album_name: str,
root_download_path: str,
artist_name: str = None,
):
"""Download a particular album and all of its songs and add it to the db.
Parameters:
album_name (str): name of the album
artist_name (str): name of the artist
root_download_path (str): the path to download songs and put db into.
"""
album_metadata = search_album_metadata(album_name, artist_name)

# add new album to db or edit an existing album
session = get_new_db_session(construct_db_path(root_download_path))
albums = (
session.query(Album)
.filter(Album.title.ilike(f"%{album_metadata['title']}%"))
.all()
)
num_tracks = album_metadata.get("num_tracks")
if albums:
typer.echo(
f"\n\n\nSimilar Albums are already present in the db. Do you want to choose any of these albums to edit?"
)
choices = [album.title for album in albums] + ["Make a new album"]
typer.echo("Select an album or create a new one:")
for idx, choice in enumerate(choices, start=1):
typer.echo(f"{idx}. {choice}")
# Prompt user for a choice of album
selected_choice = typer.prompt("Enter the option of your choice", type=int)
if selected_choice == len(albums) + 1:
# Create and commit the new album
album = Album(
title=album_metadata["title"],
tracks=num_tracks,
artist=album_metadata["artist"],
release_date=album_metadata["release_date"],
artwork_url=album_metadata["artwork_url"],
)
session.add(album)
session.commit()
typer.echo(f"New album '{album_name}' created!")
elif 1 <= selected_choice <= len(albums):
# Fetch the selected album from the database and edit it
album = albums[selected_choice - 1]
typer.echo(f"Selected album: {album.title}")
album.title = album_metadata["title"]
album.artist = album_metadata["artist"]
album.tracks = num_tracks
album.release_date = album_metadata["release_date"]
album.artwork_url = album_metadata["artwork_url"]
session.add(album)
session.commit()
else:
# create a new album in db
album = Album(
title=album_metadata["title"],
tracks=num_tracks,
artist=album_metadata["artist"],
release_date=album_metadata["release_date"],
artwork_url=album_metadata["artwork_url"],
)
session.add(album)
session.commit()
typer.echo(f"New album '{album.title}' created!")

return _download_all_songs_for_given_album(
album, album_metadata, root_download_path, session
)


def parse_library_operation(root_download_path: str, lib_path: str) -> str:
"""Parse the Library XML or CSV file.
Expand Down
6 changes: 3 additions & 3 deletions amusing/core/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def song_file(song: Song, album_dir: str) -> str:
return ""


def download(song: Song, root_download_path: str):
def download(song: Song, root_download_path: str, overwrite: bool = False):
"""Download a song from YouTube video and generate file with metadata."""

video_id = song.video_id
Expand All @@ -159,8 +159,8 @@ def download(song: Song, root_download_path: str):
song_filename = short_filename(songs_dir, song_name, artwork_hash, video_id)
song_file_path = os.path.join(songs_dir, song_filename)

# Skip download if the song is already present
if os.path.exists(song_file_path):
# Skip download if the song is already present and overwrite is False
if os.path.exists(song_file_path) and not overwrite:
return

# Escape glob characters
Expand Down
Loading

0 comments on commit 438912e

Please # to comment.