streamrip/streamrip/client/tidal.py

318 lines
10 KiB
Python

import base64
import json
import logging
import re
import time
import aiohttp
from ..config import Config
from .client import Client
from .downloadable import TidalDownloadable
logger = logging.getLogger("streamrip")
BASE = "https://api.tidalhifi.com/v1"
AUTH_URL = "https://auth.tidal.com/v1/oauth2"
CLIENT_ID = base64.b64decode("elU0WEhWVmtjMnREUG80dA==").decode("iso-8859-1")
CLIENT_SECRET = base64.b64decode(
"VkpLaERGcUpQcXZzUFZOQlY2dWtYVEptd2x2YnR0UDd3bE1scmM3MnNlND0=",
).decode("iso-8859-1")
AUTH = aiohttp.BasicAuth(login=CLIENT_ID, password=CLIENT_SECRET)
STREAM_URL_REGEX = re.compile(
r"#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS=\"(?!jpeg)[^\"]+\",RESOLUTION=\d+x\d+\n(.+)"
)
QUALITY_MAP = {
0: "LOW", # AAC
1: "HIGH", # AAC
2: "LOSSLESS", # CD Quality
3: "HI_RES", # MQA
}
class TidalClient(Client):
"""TidalClient."""
source = "tidal"
max_quality = 3
def __init__(self, config: Config):
self.logged_in = False
self.global_config = config
self.config = config.session.tidal
self.rate_limiter = self.get_rate_limiter(
config.session.downloads.requests_per_minute,
)
async def login(self):
self.session = await self.get_session()
c = self.config
if not c.access_token:
raise Exception("Access token not found in config.")
self.token_expiry = float(c.token_expiry)
self.refresh_token = c.refresh_token
if self.token_expiry - time.time() < 86400: # 1 day
await self._refresh_access_token()
else:
await self._login_by_access_token(c.access_token, c.user_id)
self.logged_in = True
async def get_metadata(self, item_id: str, media_type: str) -> dict:
"""Send a request to the api for information.
:param item_id:
:type item_id: str
:param media_type: track, album, playlist, or video.
:type media_type: str
:rtype: dict
"""
url = f"{media_type}s/{item_id}"
item = await self._api_request(url)
if media_type in ("playlist", "album"):
# TODO: move into new method
resp = await self._api_request(f"{url}/items")
tracks_left = item["numberOfTracks"]
if tracks_left > 100:
offset = 0
while tracks_left > 0:
offset += 100
tracks_left -= 100
items_resp = await self._api_request(
f"{url}/items", {"offset": offset}
)
resp["items"].extend(items_resp["items"])
item["tracks"] = [item["item"] for item in resp["items"]]
elif media_type == "artist":
logger.debug("filtering eps")
album_resp = await self._api_request(f"{url}/albums")
ep_resp = await self._api_request(
f"{url}/albums", params={"filter": "EPSANDSINGLES"}
)
item["albums"] = album_resp["items"]
item["albums"].extend(ep_resp["items"])
logger.debug(item)
return item
async def search(self, query: str, media_type: str, limit: int = 100) -> dict:
"""Search for a query.
:param query:
:type query: str
:param media_type: track, album, playlist, or video.
:type media_type: str
:param limit: max is 100
:type limit: int
:rtype: dict
"""
params = {
"query": query,
"limit": limit,
}
assert media_type in ("album", "track", "playlist", "video")
return await self._api_request(f"search/{media_type}s", params=params)
async def get_downloadable(self, track_id: str, quality: int):
params = {
"audioquality": QUALITY_MAP[quality],
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
resp = await self._api_request(
f"tracks/{track_id}/playbackinfopostpaywall", params
)
logger.debug(resp)
try:
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
except KeyError:
raise Exception(resp["userMessage"])
logger.debug(manifest)
enc_key = manifest.get("keyId")
if manifest.get("encryptionType") == "NONE":
enc_key = None
return TidalDownloadable(
self.session,
url=manifest["urls"][0],
codec=manifest["codecs"],
encryption_key=enc_key,
restrictions=manifest.get("restrictions"),
)
async def get_video_file_url(self, video_id: str) -> str:
"""Get the HLS video stream url.
The stream is downloaded using ffmpeg for now.
:param video_id:
:type video_id: str
:rtype: str
"""
params = {
"videoquality": "HIGH",
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
resp = await self._api_request(
f"videos/{video_id}/playbackinfopostpaywall", params=params
)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
async with self.session.get(manifest["urls"][0]) as resp:
available_urls = await resp.json()
available_urls.encoding = "utf-8"
# Highest resolution is last
*_, last_match = STREAM_URL_REGEX.finditer(available_urls.text)
return last_match.group(1)
# ---------- Login Utilities ---------------
async def _login_by_access_token(self, token: str, user_id: str):
"""Login using the access token.
Used after the initial authorization.
:param token: access token
:param user_id: To verify that the user is correct
"""
headers = {"authorization": f"Bearer {token}"} # temporary
async with self.session.get(
"https://api.tidal.com/v1/sessions",
headers=headers,
) as _resp:
resp = await _resp.json()
if resp.get("status", 200) != 200:
raise Exception(f"Login failed {resp}")
if str(resp.get("userId")) != str(user_id):
raise Exception(f"User id mismatch {resp['userId']} v {user_id}")
c = self.config
c.user_id = resp["userId"]
c.country_code = resp["countryCode"]
c.access_token = token
self._update_authorization_from_config()
async def _get_login_link(self) -> str:
data = {
"client_id": CLIENT_ID,
"scope": "r_usr+w_usr+w_sub",
}
resp = await self._api_post(f"{AUTH_URL}/device_authorization", data)
if resp.get("status", 200) != 200:
raise Exception(f"Device authorization failed {resp}")
device_code = resp["deviceCode"]
return f"https://{device_code}"
def _update_authorization_from_config(self):
self.session.headers.update(
{"authorization": f"Bearer {self.config.access_token}"},
)
async def _get_auth_status(self, device_code) -> tuple[int, dict[str, int | str]]:
"""Check if the user has logged in inside the browser.
returns (status, authentication info)
"""
data = {
"client_id": CLIENT_ID,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
}
logger.debug("Checking with %s", data)
resp = await self._api_post(f"{AUTH_URL}/token", data, AUTH)
if "status" in resp and resp["status"] != 200:
if resp["status"] == 400 and resp["sub_status"] == 1002:
return 2, {}
else:
return 1, {}
ret = {}
ret["user_id"] = resp["user"]["userId"]
ret["country_code"] = resp["user"]["countryCode"]
ret["access_token"] = resp["access_token"]
ret["refresh_token"] = resp["refresh_token"]
ret["token_expiry"] = resp["expires_in"] + time.time()
return 0, ret
async def _refresh_access_token(self):
"""Refresh the access token given a refresh token.
The access token expires in a week, so it must be refreshed.
Requires a refresh token.
"""
data = {
"client_id": CLIENT_ID,
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
"scope": "r_usr+w_usr+w_sub",
}
resp = await self._api_post(f"{AUTH_URL}/token", data, AUTH)
if resp.get("status", 200) != 200:
raise Exception("Refresh failed")
c = self.config
c.access_token = resp["access_token"]
c.token_expiry = resp["expires_in"] + time.time()
self._update_authorization_from_config()
async def _get_device_code(self) -> tuple[str, str]:
"""Get the device code that will be used to log in on the browser."""
if not hasattr(self, "session"):
self.session = await self.get_session()
data = {
"client_id": CLIENT_ID,
"scope": "r_usr+w_usr+w_sub",
}
resp = await self._api_post(f"{AUTH_URL}/device_authorization", data)
if resp.get("status", 200) != 200:
raise Exception(f"Device authorization failed {resp}")
return resp["deviceCode"], resp["verificationUriComplete"]
# ---------- API Request Utilities ---------------
async def _api_post(self, url, data, auth: aiohttp.BasicAuth | None = None) -> dict:
"""Post to the Tidal API. Status not checked!
:param url:
:param data:
:param auth:
"""
async with self.session.post(url, data=data, auth=auth) as resp:
return await resp.json()
async def _api_request(self, path: str, params=None) -> dict:
"""Handle Tidal API requests.
:param path:
:type path: str
:param params:
:rtype: dict
"""
if params is None:
params = {}
params["countryCode"] = self.config.country_code
params["limit"] = 100
async with self.session.get(f"{BASE}/{path}", params=params) as resp:
resp.raise_for_status()
return await resp.json()