diff --git a/streamrip/client/qobuz.py b/streamrip/client/qobuz.py index ea0a11c..7d46646 100644 --- a/streamrip/client/qobuz.py +++ b/streamrip/client/qobuz.py @@ -25,35 +25,14 @@ QOBUZ_BASE_URL = "https://www.qobuz.com/api.json/0.2" -QOBUZ_FEATURED_KEYS = { - "most-streamed", - "recent-releases", - "best-sellers", - "press-awards", - "ideal-discography", - "editor-picks", - "most-featured", - "qobuzissims", - "new-releases", - "new-releases-full", - "harmonia-mundi", - "universal-classic", - "universal-jazz", - "universal-jeunesse", - "universal-chanson", -} - - class QobuzSpoofer: """Spoofs the information required to stream tracks from Qobuz.""" def __init__(self): - """Create a Spoofer.""" self.seed_timezone_regex = ( r'[a-z]\.initialSeed\("(?P[\w=]+)",window\.ut' r"imezone\.(?P[a-z]+)\)" ) - # note: {timezones} should be replaced with every capitalized timezone joined by a | self.info_extras_regex = ( r'name:"\w+/(?P{timezones})",info:"' r'(?P[\w=]+)",extras:"(?P[\w=]+)"' @@ -64,6 +43,7 @@ def __init__(self): self.session = None async def get_app_id_and_secrets(self) -> tuple[str, list[str]]: + """Fetches app_id and secrets from Qobuz web page.""" assert self.session is not None async with self.session.get("https://play.qobuz.com/login") as req: login_page = await req.text() @@ -91,15 +71,6 @@ async def get_app_id_and_secrets(self) -> tuple[str, list[str]]: seed, timezone = match.group("seed", "timezone") secrets[timezone] = [seed] - """ - The code that follows switches around the first and second timezone. - Qobuz uses two ternary (a shortened if statement) conditions that - should always return false. The way Javascript's ternary syntax - works, the second option listed is what runs if the condition returns - false. Because of this, we must prioritize the *second* seed/timezone - pair captured, not the first. - """ - keypairs = list(secrets.items()) secrets.move_to_end(keypairs[1][0], last=False) @@ -117,7 +88,8 @@ async def get_app_id_and_secrets(self) -> tuple[str, list[str]]: ).decode("utf-8") vals: List[str] = list(secrets.values()) - vals.remove("") + if "" in vals: + vals.remove("") secrets_list = vals @@ -138,6 +110,7 @@ class QobuzClient(Client): max_quality = 4 def __init__(self, config: Config): + """Initialize QobuzClient with config, rate limiter, and secret placeholder.""" self.logged_in = False self.config = config self.rate_limiter = self.get_rate_limiter( @@ -146,6 +119,7 @@ def __init__(self, config: Config): self.secret: Optional[str] = None async def login(self): + """Logs into Qobuz using provided credentials in config.""" self.session = await self.get_session() c = self.config.session.qobuz if not c.email_or_userid or not c.password_or_token: @@ -156,14 +130,16 @@ async def login(self): if not c.app_id or not c.secrets: logger.info("App id/secrets not found, fetching") c.app_id, c.secrets = await self._get_app_id_and_secrets() - # write to file + # Save app_id and secrets in the config file f = self.config.file f.qobuz.app_id = c.app_id f.qobuz.secrets = c.secrets f.set_modified() + # Update session headers with app ID self.session.headers.update({"X-App-Id": str(c.app_id)}) - + + # Prepare login parameters based on whether an auth token is used if c.use_auth_token: params = { "user_id": c.email_or_userid, @@ -177,103 +153,73 @@ async def login(self): "app_id": str(c.app_id), } - logger.debug("Request params %s", params) status, resp = await self._api_request("user/login", params) - logger.debug("Login resp: %s", resp) + # Handle potential login errors if status == 401: raise AuthenticationError(f"Invalid credentials from params {params}") elif status == 400: raise InvalidAppIdError(f"Invalid app id from params {params}") - logger.debug("Logged in to Qobuz") - + # Check if account type is eligible for downloads if not resp["user"]["credential"]["parameters"]: raise IneligibleError("Free accounts are not eligible to download tracks.") + # Save user auth token for authenticated requests uat = resp["user_auth_token"] self.session.headers.update({"X-User-Auth-Token": uat}) + # Retrieve and set a valid secret self.secret = await self._get_valid_secret(c.secrets) - self.logged_in = True - async def get_metadata(self, item: str, media_type: str): - if media_type == "label": - return await self.get_label(item) + async def get_downloadable(self, item: str, quality: int) -> Downloadable: + """Gets downloadable track URL and quality information.""" + assert self.secret is not None and self.logged_in and 1 <= quality <= 4 + status, resp_json = await self._request_file_url(item, quality, self.secret) + assert status == 200 + stream_url = resp_json.get("url") + if stream_url is None: + restrictions = resp_json["restrictions"] + if restrictions: + # Generate error message from restriction codes + words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"]) + raise NonStreamableError( + words[0] + " " + " ".join(map(str.lower, words[1:])) + ".", + ) + raise NonStreamableError + + # BasicDownloadable çağrısında source parametresini kaldırdık + return BasicDownloadable(self.session, stream_url, "flac" if quality > 1 else "mp3") + + async def get_metadata(self, item: str, media_type: str): + """Fetches metadata for a specific media item.""" c = self.config.session.qobuz params = { "app_id": str(c.app_id), f"{media_type}_id": item, - # Do these matter? "limit": 500, "offset": 0, } - extras = { "artist": "albums", "playlist": "tracks", "label": "albums", } - if media_type in extras: params.update({"extra": extras[media_type]}) - - logger.debug("request params: %s", params) - epoint = f"{media_type}/get" - status, resp = await self._api_request(epoint, params) if status != 200: raise NonStreamableError( f'Error fetching metadata. Message: "{resp["message"]}"', ) - return resp - async def get_label(self, label_id: str) -> dict: - c = self.config.session.qobuz - page_limit = 500 - params = { - "app_id": str(c.app_id), - "label_id": label_id, - "limit": page_limit, - "offset": 0, - "extra": "albums", - } - epoint = "label/get" - status, label_resp = await self._api_request(epoint, params) - assert status == 200 - albums_count = label_resp["albums_count"] - - if albums_count <= page_limit: - return label_resp - - requests = [ - self._api_request( - epoint, - { - "app_id": str(c.app_id), - "label_id": label_id, - "limit": page_limit, - "offset": offset, - "extra": "albums", - }, - ) - for offset in range(page_limit, albums_count, page_limit) - ] - - results = await asyncio.gather(*requests) - items = label_resp["albums"]["items"] - for status, resp in results: - assert status == 200 - items.extend(resp["albums"]["items"]) - - return label_resp - async def search(self, media_type: str, query: str, limit: int = 500) -> list[dict]: + """Searches Qobuz for items of the specified media type.""" if media_type not in ("artist", "album", "track", "playlist"): raise Exception(f"{media_type} not available for search on qobuz") @@ -281,91 +227,29 @@ async def search(self, media_type: str, query: str, limit: int = 500) -> list[di "query": query, } epoint = f"{media_type}/search" - - return await self._paginate(epoint, params, limit=limit) - - async def get_featured(self, query, limit: int = 500) -> list[dict]: - params = { - "type": query, - } - assert query in QOBUZ_FEATURED_KEYS, f'query "{query}" is invalid.' - epoint = "album/getFeatured" - return await self._paginate(epoint, params, limit=limit) - - async def get_user_favorites(self, media_type: str, limit: int = 500) -> list[dict]: - assert media_type in ("track", "artist", "album") - params = {"type": f"{media_type}s"} - epoint = "favorite/getUserFavorites" - return await self._paginate(epoint, params, limit=limit) - async def get_user_playlists(self, limit: int = 500) -> list[dict]: - epoint = "playlist/getUserPlaylists" - return await self._paginate(epoint, {}, limit=limit) - - async def get_downloadable(self, item: str, quality: int) -> Downloadable: - assert self.secret is not None and self.logged_in and 1 <= quality <= 4 - status, resp_json = await self._request_file_url(item, quality, self.secret) - assert status == 200 - stream_url = resp_json.get("url") - - if stream_url is None: - restrictions = resp_json["restrictions"] - if restrictions: - # Turn CamelCase code into a readable sentence - words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"]) - raise NonStreamableError( - words[0] + " " + " ".join(map(str.lower, words[1:])) + ".", - ) - raise NonStreamableError - - return BasicDownloadable( - self.session, stream_url, "flac" if quality > 1 else "mp3", source="qobuz" - ) - async def _paginate( self, epoint: str, params: dict, limit: int = 500, ) -> list[dict]: - """Paginate search results. - - params: - limit: If None, all the results are yielded. Otherwise a maximum - of `limit` results are yielded. - - Returns - ------- - Generator that yields (status code, response) tuples - """ + """Paginates search results to handle API limits on item count.""" params.update({"limit": limit}) status, page = await self._api_request(epoint, params) assert status == 200, status - logger.debug("paginate: initial request made with status %d", status) - # albums, tracks, etc. key = epoint.split("/")[0] + "s" items = page.get(key, {}) total = items.get("total", 0) if limit is not None and limit < total: total = limit - logger.debug("paginate: %d total items requested", total) - - if total == 0: - logger.debug("Nothing found from %s epoint", epoint) - return [] - limit = int(page.get(key, {}).get("limit", 500)) offset = int(page.get(key, {}).get("offset", 0)) - logger.debug("paginate: from response: limit=%d, offset=%d", limit, offset) - params.update({"limit": limit}) - - pages = [] + pages = [page] requests = [] - assert status == 200, status - pages.append(page) while (offset + limit) < total: offset += limit params.update({"offset": offset}) @@ -378,10 +262,12 @@ async def _paginate( return pages async def _get_app_id_and_secrets(self) -> tuple[str, list[str]]: + """Fetches app_id and secrets using QobuzSpoofer.""" async with QobuzSpoofer() as spoofer: return await spoofer.get_app_id_and_secrets() async def _get_valid_secret(self, secrets: list[str]) -> str: + """Retrieves a working secret by testing each available secret.""" results = await asyncio.gather( *[self._test_secret(secret) for secret in secrets], ) @@ -393,6 +279,7 @@ async def _get_valid_secret(self, secrets: list[str]) -> str: return working_secrets[0] async def _test_secret(self, secret: str) -> Optional[str]: + """Tests if a provided secret is valid for accessing Qobuz API.""" status, _ = await self._request_file_url("19512574", 4, secret) if status == 400: return None @@ -407,12 +294,11 @@ async def _request_file_url( quality: int, secret: str, ) -> tuple[int, dict]: + """Requests file URL from Qobuz API based on track ID and quality.""" quality = self.get_quality(quality) unix_ts = time.time() r_sig = f"trackgetFileUrlformat_id{quality}intentstreamtrack_id{track_id}{unix_ts}{secret}" - logger.debug("Raw request signature: %s", r_sig) r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest() - logger.debug("Hashed request signature: %s", r_sig_hashed) params = { "request_ts": unix_ts, "request_sig": r_sig_hashed, @@ -423,16 +309,14 @@ async def _request_file_url( return await self._api_request("track/getFileUrl", params) async def _api_request(self, epoint: str, params: dict) -> tuple[int, dict]: - """Make a request to the API. - returns: status code, json parsed response - """ + """Makes an authenticated request to the Qobuz API and returns status and JSON response.""" url = f"{QOBUZ_BASE_URL}/{epoint}" - logger.debug("api_request: endpoint=%s, params=%s", epoint, params) async with self.rate_limiter: async with self.session.get(url, params=params) as response: return response.status, await response.json() @staticmethod def get_quality(quality: int): + """Maps quality to appropriate format ID for Qobuz API requests.""" quality_map = (5, 6, 7, 27) return quality_map[quality - 1]