Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix qobuz.py [BUG FIXED] "X not in list" #772

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 43 additions & 159 deletions streamrip/client/qobuz.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,14 @@

QOBUZ_BASE_URL = "https://www.qobuz.com/api.json/0.2"

QOBUZ_FEATURED_KEYS = {
"most-streamed",
"recent-releases",
"best-sellers",
"press-awards",
"ideal-discography",
"editor-picks",
"most-featured",
"qobuzissims",
"new-releases",
"new-releases-full",
"harmonia-mundi",
"universal-classic",
"universal-jazz",
"universal-jeunesse",
"universal-chanson",
}


class QobuzSpoofer:
"""Spoofs the information required to stream tracks from Qobuz."""

def __init__(self):
"""Create a Spoofer."""
self.seed_timezone_regex = (
r'[a-z]\.initialSeed\("(?P<seed>[\w=]+)",window\.ut'
r"imezone\.(?P<timezone>[a-z]+)\)"
)
# note: {timezones} should be replaced with every capitalized timezone joined by a |
self.info_extras_regex = (
r'name:"\w+/(?P<timezone>{timezones})",info:"'
r'(?P<info>[\w=]+)",extras:"(?P<extras>[\w=]+)"'
Expand All @@ -64,6 +43,7 @@ def __init__(self):
self.session = None

async def get_app_id_and_secrets(self) -> tuple[str, list[str]]:
"""Fetches app_id and secrets from Qobuz web page."""
assert self.session is not None
async with self.session.get("https://play.qobuz.com/#") as req:
login_page = await req.text()
Expand Down Expand Up @@ -91,15 +71,6 @@ async def get_app_id_and_secrets(self) -> tuple[str, list[str]]:
seed, timezone = match.group("seed", "timezone")
secrets[timezone] = [seed]

"""
The code that follows switches around the first and second timezone.
Qobuz uses two ternary (a shortened if statement) conditions that
should always return false. The way Javascript's ternary syntax
works, the second option listed is what runs if the condition returns
false. Because of this, we must prioritize the *second* seed/timezone
pair captured, not the first.
"""

keypairs = list(secrets.items())
secrets.move_to_end(keypairs[1][0], last=False)

Expand All @@ -117,7 +88,8 @@ async def get_app_id_and_secrets(self) -> tuple[str, list[str]]:
).decode("utf-8")

vals: List[str] = list(secrets.values())
vals.remove("")
if "" in vals:
vals.remove("")

secrets_list = vals

Expand All @@ -138,6 +110,7 @@ class QobuzClient(Client):
max_quality = 4

def __init__(self, config: Config):
"""Initialize QobuzClient with config, rate limiter, and secret placeholder."""
self.logged_in = False
self.config = config
self.rate_limiter = self.get_rate_limiter(
Expand All @@ -146,6 +119,7 @@ def __init__(self, config: Config):
self.secret: Optional[str] = None

async def login(self):
"""Logs into Qobuz using provided credentials in config."""
self.session = await self.get_session()
c = self.config.session.qobuz
if not c.email_or_userid or not c.password_or_token:
Expand All @@ -156,14 +130,16 @@ async def login(self):
if not c.app_id or not c.secrets:
logger.info("App id/secrets not found, fetching")
c.app_id, c.secrets = await self._get_app_id_and_secrets()
# write to file
# Save app_id and secrets in the config file
f = self.config.file
f.qobuz.app_id = c.app_id
f.qobuz.secrets = c.secrets
f.set_modified()

# Update session headers with app ID
self.session.headers.update({"X-App-Id": str(c.app_id)})


# Prepare login parameters based on whether an auth token is used
if c.use_auth_token:
params = {
"user_id": c.email_or_userid,
Expand All @@ -177,195 +153,103 @@ async def login(self):
"app_id": str(c.app_id),
}

logger.debug("Request params %s", params)
status, resp = await self._api_request("user/#", params)
logger.debug("Login resp: %s", resp)

# Handle potential login errors
if status == 401:
raise AuthenticationError(f"Invalid credentials from params {params}")
elif status == 400:
raise InvalidAppIdError(f"Invalid app id from params {params}")

logger.debug("Logged in to Qobuz")

# Check if account type is eligible for downloads
if not resp["user"]["credential"]["parameters"]:
raise IneligibleError("Free accounts are not eligible to download tracks.")

# Save user auth token for authenticated requests
uat = resp["user_auth_token"]
self.session.headers.update({"X-User-Auth-Token": uat})

# Retrieve and set a valid secret
self.secret = await self._get_valid_secret(c.secrets)

self.logged_in = True

async def get_metadata(self, item: str, media_type: str):
if media_type == "label":
return await self.get_label(item)
async def get_downloadable(self, item: str, quality: int) -> Downloadable:
"""Gets downloadable track URL and quality information."""
assert self.secret is not None and self.logged_in and 1 <= quality <= 4
status, resp_json = await self._request_file_url(item, quality, self.secret)
assert status == 200
stream_url = resp_json.get("url")

if stream_url is None:
restrictions = resp_json["restrictions"]
if restrictions:
# Generate error message from restriction codes
words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"])
raise NonStreamableError(
words[0] + " " + " ".join(map(str.lower, words[1:])) + ".",
)
raise NonStreamableError

# BasicDownloadable çağrısında source parametresini kaldırdık
return BasicDownloadable(self.session, stream_url, "flac" if quality > 1 else "mp3")

async def get_metadata(self, item: str, media_type: str):
"""Fetches metadata for a specific media item."""
c = self.config.session.qobuz
params = {
"app_id": str(c.app_id),
f"{media_type}_id": item,
# Do these matter?
"limit": 500,
"offset": 0,
}

extras = {
"artist": "albums",
"playlist": "tracks",
"label": "albums",
}

if media_type in extras:
params.update({"extra": extras[media_type]})

logger.debug("request params: %s", params)

epoint = f"{media_type}/get"

status, resp = await self._api_request(epoint, params)

if status != 200:
raise NonStreamableError(
f'Error fetching metadata. Message: "{resp["message"]}"',
)

return resp

async def get_label(self, label_id: str) -> dict:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is all this code removed? Does it cause the X not in list bug?

c = self.config.session.qobuz
page_limit = 500
params = {
"app_id": str(c.app_id),
"label_id": label_id,
"limit": page_limit,
"offset": 0,
"extra": "albums",
}
epoint = "label/get"
status, label_resp = await self._api_request(epoint, params)
assert status == 200
albums_count = label_resp["albums_count"]

if albums_count <= page_limit:
return label_resp

requests = [
self._api_request(
epoint,
{
"app_id": str(c.app_id),
"label_id": label_id,
"limit": page_limit,
"offset": offset,
"extra": "albums",
},
)
for offset in range(page_limit, albums_count, page_limit)
]

results = await asyncio.gather(*requests)
items = label_resp["albums"]["items"]
for status, resp in results:
assert status == 200
items.extend(resp["albums"]["items"])

return label_resp

async def search(self, media_type: str, query: str, limit: int = 500) -> list[dict]:
"""Searches Qobuz for items of the specified media type."""
if media_type not in ("artist", "album", "track", "playlist"):
raise Exception(f"{media_type} not available for search on qobuz")

params = {
"query": query,
}
epoint = f"{media_type}/search"

return await self._paginate(epoint, params, limit=limit)

async def get_featured(self, query, limit: int = 500) -> list[dict]:
params = {
"type": query,
}
assert query in QOBUZ_FEATURED_KEYS, f'query "{query}" is invalid.'
epoint = "album/getFeatured"
return await self._paginate(epoint, params, limit=limit)

async def get_user_favorites(self, media_type: str, limit: int = 500) -> list[dict]:
assert media_type in ("track", "artist", "album")
params = {"type": f"{media_type}s"}
epoint = "favorite/getUserFavorites"

return await self._paginate(epoint, params, limit=limit)

async def get_user_playlists(self, limit: int = 500) -> list[dict]:
epoint = "playlist/getUserPlaylists"
return await self._paginate(epoint, {}, limit=limit)

async def get_downloadable(self, item: str, quality: int) -> Downloadable:
assert self.secret is not None and self.logged_in and 1 <= quality <= 4
status, resp_json = await self._request_file_url(item, quality, self.secret)
assert status == 200
stream_url = resp_json.get("url")

if stream_url is None:
restrictions = resp_json["restrictions"]
if restrictions:
# Turn CamelCase code into a readable sentence
words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"])
raise NonStreamableError(
words[0] + " " + " ".join(map(str.lower, words[1:])) + ".",
)
raise NonStreamableError

return BasicDownloadable(
self.session, stream_url, "flac" if quality > 1 else "mp3", source="qobuz"
)

async def _paginate(
self,
epoint: str,
params: dict,
limit: int = 500,
) -> list[dict]:
"""Paginate search results.

params:
limit: If None, all the results are yielded. Otherwise a maximum
of `limit` results are yielded.

Returns
-------
Generator that yields (status code, response) tuples
"""
"""Paginates search results to handle API limits on item count."""
params.update({"limit": limit})
status, page = await self._api_request(epoint, params)
assert status == 200, status
logger.debug("paginate: initial request made with status %d", status)
# albums, tracks, etc.
key = epoint.split("/")[0] + "s"
items = page.get(key, {})
total = items.get("total", 0)
if limit is not None and limit < total:
total = limit

logger.debug("paginate: %d total items requested", total)

if total == 0:
logger.debug("Nothing found from %s epoint", epoint)
return []

limit = int(page.get(key, {}).get("limit", 500))
offset = int(page.get(key, {}).get("offset", 0))

logger.debug("paginate: from response: limit=%d, offset=%d", limit, offset)
params.update({"limit": limit})

pages = []
pages = [page]
requests = []
assert status == 200, status
pages.append(page)
while (offset + limit) < total:
offset += limit
params.update({"offset": offset})
Expand All @@ -378,10 +262,12 @@ async def _paginate(
return pages

async def _get_app_id_and_secrets(self) -> tuple[str, list[str]]:
"""Fetches app_id and secrets using QobuzSpoofer."""
async with QobuzSpoofer() as spoofer:
return await spoofer.get_app_id_and_secrets()

async def _get_valid_secret(self, secrets: list[str]) -> str:
"""Retrieves a working secret by testing each available secret."""
results = await asyncio.gather(
*[self._test_secret(secret) for secret in secrets],
)
Expand All @@ -393,6 +279,7 @@ async def _get_valid_secret(self, secrets: list[str]) -> str:
return working_secrets[0]

async def _test_secret(self, secret: str) -> Optional[str]:
"""Tests if a provided secret is valid for accessing Qobuz API."""
status, _ = await self._request_file_url("19512574", 4, secret)
if status == 400:
return None
Expand All @@ -407,12 +294,11 @@ async def _request_file_url(
quality: int,
secret: str,
) -> tuple[int, dict]:
"""Requests file URL from Qobuz API based on track ID and quality."""
quality = self.get_quality(quality)
unix_ts = time.time()
r_sig = f"trackgetFileUrlformat_id{quality}intentstreamtrack_id{track_id}{unix_ts}{secret}"
logger.debug("Raw request signature: %s", r_sig)
r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest()
logger.debug("Hashed request signature: %s", r_sig_hashed)
params = {
"request_ts": unix_ts,
"request_sig": r_sig_hashed,
Expand All @@ -423,16 +309,14 @@ async def _request_file_url(
return await self._api_request("track/getFileUrl", params)

async def _api_request(self, epoint: str, params: dict) -> tuple[int, dict]:
"""Make a request to the API.
returns: status code, json parsed response
"""
"""Makes an authenticated request to the Qobuz API and returns status and JSON response."""
url = f"{QOBUZ_BASE_URL}/{epoint}"
logger.debug("api_request: endpoint=%s, params=%s", epoint, params)
async with self.rate_limiter:
async with self.session.get(url, params=params) as response:
return response.status, await response.json()

@staticmethod
def get_quality(quality: int):
"""Maps quality to appropriate format ID for Qobuz API requests."""
quality_map = (5, 6, 7, 27)
return quality_map[quality - 1]