Tidal login working

This commit is contained in:
Nathan Thomas 2023-12-20 22:21:22 -08:00
parent 1c2fb8db18
commit cf770892f1
3 changed files with 229 additions and 35 deletions

View File

@ -1,4 +1,5 @@
import asyncio
import base64
import functools
import hashlib
import itertools
@ -16,7 +17,8 @@ from typing import Any, Callable, Optional
import aiofiles
import aiohttp
import m3u8
from Cryptodome.Cipher import Blowfish
from Cryptodome.Cipher import AES, Blowfish
from Cryptodome.Util import Counter
from .. import converter
from ..exceptions import NonStreamable
@ -26,7 +28,8 @@ logger = logging.getLogger("streamrip")
def generate_temp_path(url: str):
return os.path.join(
tempfile.gettempdir(), f"__streamrip_{hash(url)}_{time.time()}.download",
tempfile.gettempdir(),
f"__streamrip_{hash(url)}_{time.time()}.download",
)
@ -183,18 +186,20 @@ class TidalDownloadable(Downloadable):
error messages.
"""
def __init__(self, session: aiohttp.ClientSession, info: dict):
def __init__(self, session: aiohttp.ClientSession, url: str, enc_key, codec):
self.session = session
url = info.get("url")
self.url = url
assert enc_key is None
if self.url is None:
if restrictions := info["restrictions"]:
# Turn CamelCase code into a readable sentence
words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"])
raise NonStreamable(
words[0] + " " + " ".join(map(str.lower, words[1:])),
)
raise NonStreamable(f"Tidal download: dl_info = {info}")
raise Exception
# if restrictions := info["restrictions"]:
# # Turn CamelCase code into a readable sentence
# words = re.findall(r"([A-Z][a-z]+)", restrictions[0]["code"])
# raise NonStreamable(
# words[0] + " " + " ".join(map(str.lower, words[1:])),
# )
#
# raise NonStreamable(f"Tidal download: dl_info = {info}")
assert isinstance(url, str)
self.downloadable = BasicDownloadable(session, url, "m4a")
@ -202,6 +207,45 @@ class TidalDownloadable(Downloadable):
async def _download(self, path: str, callback):
await self.downloadable._download(path, callback)
@staticmethod
async def _decrypt_mqa_file(in_path, out_path, encryption_key):
"""Decrypt an MQA file.
:param in_path:
:param out_path:
:param encryption_key:
"""
# Do not change this
master_key = "UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754="
# Decode the base64 strings to ascii strings
master_key = base64.b64decode(master_key)
security_token = base64.b64decode(encryption_key)
# Get the IV from the first 16 bytes of the securityToken
iv = security_token[:16]
encrypted_st = security_token[16:]
# Initialize decryptor
decryptor = AES.new(master_key, AES.MODE_CBC, iv)
# Decrypt the security token
decrypted_st = decryptor.decrypt(encrypted_st)
# Get the audio stream decryption key and nonce from the decrypted security token
key = decrypted_st[:16]
nonce = decrypted_st[16:24]
counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
async with aiofiles.open(in_path, "rb") as enc_file, aiofiles.open(
out_path, "wb"
) as dec_file:
dec_bytes = decryptor.decrypt(await enc_file.read())
await dec_file.write(dec_bytes)
class SoundcloudDownloadable(Downloadable):
def __init__(self, session, info: dict):
@ -285,7 +329,8 @@ async def concat_audio_files(paths: list[str], out: str, ext: str, max_files_ope
tempdir = tempfile.gettempdir()
outpaths = [
os.path.join(
tempdir, f"__streamrip_ffmpeg_{hash(paths[i*max_files_open])}.{ext}",
tempdir,
f"__streamrip_ffmpeg_{hash(paths[i*max_files_open])}.{ext}",
)
for i in range(num_batches)
]

View File

@ -1,8 +1,16 @@
import base64
import json
import logging
import re
import time
import aiohttp
from ..config import Config
from .client import Client
from .downloadable import TidalDownloadable
logger = logging.getLogger("streamrip")
BASE = "https://api.tidalhifi.com/v1"
AUTH_URL = "https://auth.tidal.com/v1/oauth2"
@ -11,6 +19,16 @@ CLIENT_ID = base64.b64decode("elU0WEhWVmtjMnREUG80dA==").decode("iso-8859-1")
CLIENT_SECRET = base64.b64decode(
"VkpLaERGcUpQcXZzUFZOQlY2dWtYVEptd2x2YnR0UDd3bE1scmM3MnNlND0=",
).decode("iso-8859-1")
STREAM_URL_REGEX = re.compile(
r"#EXT-X-STREAM-INF:BANDWIDTH=\d+,AVERAGE-BANDWIDTH=\d+,CODECS=\"(?!jpeg)[^\"]+\",RESOLUTION=\d+x\d+\n(.+)"
)
QUALITY_MAP = {
0: "LOW", # AAC
1: "HIGH", # AAC
2: "LOSSLESS", # CD Quality
3: "HI_RES", # MQA
}
class TidalClient(Client):
@ -43,6 +61,114 @@ class TidalClient(Client):
self.logged_in = True
async def get_metadata(self, item_id: str, media_type: str) -> dict:
"""Send a request to the api for information.
:param item_id:
:type item_id: str
:param media_type: track, album, playlist, or video.
:type media_type: str
:rtype: dict
"""
url = f"{media_type}s/{item_id}"
item = await self._api_request(url)
if media_type in ("playlist", "album"):
# TODO: move into new method
resp = await self._api_request(f"{url}/items")
tracks_left = item["numberOfTracks"]
if tracks_left > 100:
offset = 0
while tracks_left > 0:
offset += 100
tracks_left -= 100
items_resp = await self._api_request(
f"{url}/items", {"offset": offset}
)
resp["items"].extend(items_resp["items"])
item["tracks"] = [item["item"] for item in resp["items"]]
elif media_type == "artist":
logger.debug("filtering eps")
album_resp = await self._api_request(f"{url}/albums")
ep_resp = await self._api_request(
f"{url}/albums", params={"filter": "EPSANDSINGLES"}
)
item["albums"] = album_resp["items"]
item["albums"].extend(ep_resp["items"])
logger.debug(item)
return item
async def search(self, query: str, media_type: str, limit: int = 100) -> dict:
"""Search for a query.
:param query:
:type query: str
:param media_type: track, album, playlist, or video.
:type media_type: str
:param limit: max is 100
:type limit: int
:rtype: dict
"""
params = {
"query": query,
"limit": limit,
}
assert media_type in ("album", "track", "playlist", "video")
return await self._api_request(f"search/{media_type}s", params=params)
async def get_downloadable(self, track_id, quality: int = 3):
params = {
"audioquality": QUALITY_MAP[quality],
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
resp = await self._api_request(
f"tracks/{track_id}/playbackinfopostpaywall", params
)
try:
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
except KeyError:
raise Exception(resp["userMessage"])
logger.debug(manifest)
return TidalDownloadable(
self.session,
url=manifest["urls"][0],
enc_key=manifest.get("keyId"),
codec=manifest["codecs"],
)
async def get_video_file_url(self, video_id: str) -> str:
"""Get the HLS video stream url.
The stream is downloaded using ffmpeg for now.
:param video_id:
:type video_id: str
:rtype: str
"""
params = {
"videoquality": "HIGH",
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
resp = await self._api_request(
f"videos/{video_id}/playbackinfopostpaywall", params=params
)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
async with self.session.get(manifest["urls"][0]) as resp:
available_urls = await resp.json()
available_urls.encoding = "utf-8"
# Highest resolution is last
*_, last_match = STREAM_URL_REGEX.finditer(available_urls.text)
return last_match.group(1)
# ---------- Login Utilities ---------------
async def _login_by_access_token(self, token: str, user_id: str):
"""Login using the access token.
@ -53,7 +179,8 @@ class TidalClient(Client):
"""
headers = {"authorization": f"Bearer {token}"} # temporary
async with self.session.get(
"https://api.tidal.com/v1/sessions", headers=headers,
"https://api.tidal.com/v1/sessions",
headers=headers,
) as _resp:
resp = await _resp.json()
@ -74,8 +201,7 @@ class TidalClient(Client):
"client_id": CLIENT_ID,
"scope": "r_usr+w_usr+w_sub",
}
_resp = await self._api_post(f"{AUTH_URL}/device_authorization", data)
resp = await _resp.json()
resp = await self._api_post(f"{AUTH_URL}/device_authorization", data)
if resp.get("status", 200) != 200:
raise Exception(f"Device authorization failed {resp}")
@ -99,14 +225,14 @@ class TidalClient(Client):
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
}
_resp = await self._api_post(
logger.debug("Checking with %s", data)
resp = await self._api_post(
f"{AUTH_URL}/token",
data,
(CLIENT_ID, CLIENT_SECRET),
aiohttp.BasicAuth(login=CLIENT_ID, password=CLIENT_SECRET),
)
resp = await _resp.json()
if resp.get("status", 200) != 200:
if "status" in resp and resp["status"] != 200:
if resp["status"] == 400 and resp["sub_status"] == 1002:
return 2, {}
else:
@ -135,38 +261,59 @@ class TidalClient(Client):
resp = await self._api_post(
f"{AUTH_URL}/token",
data,
(CLIENT_ID, CLIENT_SECRET),
aiohttp.BasicAuth(login=CLIENT_ID, password=CLIENT_SECRET),
)
resp_json = await resp.json()
if resp_json.get("status", 200) != 200:
if resp.get("status", 200) != 200:
raise Exception("Refresh failed")
c = self.config
c.access_token = resp_json["access_token"]
c.token_expiry = resp_json["expires_in"] + time.time()
c.access_token = resp["access_token"]
c.token_expiry = resp["expires_in"] + time.time()
self._update_authorization_from_config()
async def _get_device_code(self):
async def _get_device_code(self) -> tuple[str, str]:
"""Get the device code that will be used to log in on the browser."""
if not hasattr(self, "session"):
self.session = await self.get_session()
data = {
"client_id": CLIENT_ID,
"scope": "r_usr+w_usr+w_sub",
}
_resp = await self._api_post(f"{AUTH_URL}/device_authorization", data)
resp = await _resp.json()
resp = await self._api_post(f"{AUTH_URL}/device_authorization", data)
if resp.get("status", 200) != 200:
raise Exception(f"Device authorization failed {resp}")
return resp["verificationUriComplete"]
return resp["deviceCode"], resp["verificationUriComplete"]
async def _api_post(self, url, data, auth=None):
"""Post to the Tidal API.
# ---------- API Request Utilities ---------------
async def _api_post(self, url, data, auth: aiohttp.BasicAuth | None = None) -> dict:
"""Post to the Tidal API. Status not checked!
:param url:
:param data:
:param auth:
"""
async with self.session.post(url, data=data, auth=auth, verify=False) as resp:
return resp
async with self.session.post(url, data=data, auth=auth) as resp:
return await resp.json()
async def _api_request(self, path: str, params=None) -> dict:
"""Handle Tidal API requests.
:param path:
:type path: str
:param params:
:rtype: dict
"""
if params is None:
params = {}
params["countryCode"] = self.config.country_code
params["limit"] = 100
async with self.session.get(f"{BASE}/{path}", params=params) as resp:
resp.raise_for_status()
return await resp.json()

View File

@ -97,8 +97,8 @@ class TidalPrompter(CredentialPrompter):
return len(self.config.session.tidal.access_token) > 0
async def prompt_and_login(self):
device_code = await self.client._get_device_code()
login_link = f"https://{device_code}"
device_code, uri = await self.client._get_device_code()
login_link = f"https://{uri}"
console.print(
f"Go to [blue underline]{login_link}[/blue underline] to log into Tidal within 5 minutes.",
@ -111,6 +111,7 @@ class TidalPrompter(CredentialPrompter):
while elapsed < self.timeout_s:
elapsed = time.time() - start
status, info = await self.client._get_auth_status(device_code)
print(status, info)
if status == 2:
# pending
time.sleep(4)
@ -129,6 +130,7 @@ class TidalPrompter(CredentialPrompter):
c.token_expiry = info["token_expiry"] # type: ignore
self.client._update_authorization_from_config()
self.client.logged_in = True
self.save()
def type_check_client(self, client) -> TidalClient: