Tidal MQA get_file_url working

Also formatting
This commit is contained in:
nathom 2021-03-27 21:44:38 -07:00
parent bbc08e45e4
commit ead14afbbe
7 changed files with 129 additions and 66 deletions

View File

@ -15,12 +15,12 @@ requirements = read_file("requirements.txt").strip().split()
setup(
name=pkg_name,
version="0.2.2",
author='Nathan',
author_email='nathanthomas707@gmail.com',
keywords='lossless, hi-res, qobuz, tidal, deezer, audio, convert',
author="Nathan",
author_email="nathanthomas707@gmail.com",
keywords="lossless, hi-res, qobuz, tidal, deezer, audio, convert",
description="A stream downloader for Qobuz, Tidal, and Deezer.",
long_description=read_file("README.md"),
long_description_content_type='text/markdown',
long_description_content_type="text/markdown",
install_requires=requirements,
py_modules=["streamrip"],
entry_points={

View File

@ -1,6 +1,6 @@
import logging
from getpass import getpass
import os
from getpass import getpass
import click
@ -21,15 +21,13 @@ if not os.path.isdir(CACHE_DIR):
@click.group(invoke_without_command=True)
@click.option("-c", "--convert", metavar="CODEC")
@click.option("-u", "--urls", metavar="URLS")
@click.option("-t", "--text", metavar='PATH')
@click.option("-t", "--text", metavar="PATH")
@click.option("-nd", "--no-db", is_flag=True)
@click.option("--debug", is_flag=True)
@click.option("--reset-config", is_flag=True)
@click.pass_context
def cli(ctx, **kwargs):
"""
"""
""""""
global config
global core
@ -53,10 +51,10 @@ def cli(ctx, **kwargs):
logger.debug(f"handling {kwargs['urls']}")
core.handle_urls(kwargs["urls"])
if kwargs['text'] is not None:
if os.path.isfile(kwargs['text']):
if kwargs["text"] is not None:
if os.path.isfile(kwargs["text"]):
logger.debug(f"Handling {kwargs['text']}")
core.handle_txt(kwargs['text'])
core.handle_txt(kwargs["text"])
else:
click.secho(f"Text file {kwargs['text']} does not exist.")
@ -176,24 +174,24 @@ def discover(ctx, **kwargs):
@cli.command()
@click.option("-o", "--open", is_flag=True, help='Open the config file')
@click.option("-q", "--qobuz", is_flag=True, help='Set Qobuz credentials')
@click.option("-t", "--tidal", is_flag=True, help='Set Tidal credentials')
@click.option("-o", "--open", is_flag=True, help="Open the config file")
@click.option("-q", "--qobuz", is_flag=True, help="Set Qobuz credentials")
@click.option("-t", "--tidal", is_flag=True, help="Set Tidal credentials")
@click.pass_context
def config(ctx, **kwargs):
"""Manage the streamrip configuration."""
if kwargs['open']:
if kwargs["open"]:
click.launch(CONFIG_PATH)
if kwargs['qobuz']:
config.file['qobuz']['email'] = input("Qobuz email: ")
config.file['qobuz']['password'] = getpass()
if kwargs["qobuz"]:
config.file["qobuz"]["email"] = input("Qobuz email: ")
config.file["qobuz"]["password"] = getpass()
config.save()
if kwargs['tidal']:
config.file['tidal']['email'] = input("Tidal email: ")
config.file['tidal']['password'] = getpass()
if kwargs["tidal"]:
config.file["tidal"]["email"] = input("Tidal email: ")
config.file["tidal"]["password"] = getpass()
config.save()

View File

@ -1,12 +1,14 @@
import datetime
import click
import base64
import hashlib
import logging
import os
import time
from abc import ABC, abstractmethod
from pprint import pformat
from typing import Generator, Sequence, Tuple, Union
import click
import requests
import tidalapi
from dogpile.cache import make_region
@ -35,11 +37,11 @@ region = make_region().configure(
arguments={"filename": os.path.join(CACHE_DIR, "clients.db")},
)
TIDAL_BASE = 'https://api.tidalhifi.com/v1'
TIDAL_AUTH_URL = 'https://auth.tidal.com/v1/oauth2'
TIDAL_BASE = "https://api.tidalhifi.com/v1"
TIDAL_AUTH_URL = "https://auth.tidal.com/v1/oauth2"
TIDAL_CLIENT_INFO = {
'id': 'aR7gUaTK1ihpXOEP',
'secret': 'eVWBEkuL2FCjxgjOkR3yK0RYZEbcrMXRc2l8fU3ZCdE=',
"id": "aR7gUaTK1ihpXOEP",
"secret": "eVWBEkuL2FCjxgjOkR3yK0RYZEbcrMXRc2l8fU3ZCdE=",
}
logger = logging.getLogger(__name__)
@ -130,7 +132,7 @@ class QobuzClient(ClientInterface):
:type pwd: str
:param kwargs: app_id: str, secrets: list, return_secrets: bool
"""
click.secho(f"Logging into {self.source}", fg='green')
click.secho(f"Logging into {self.source}", fg="green")
if self.logged_in:
logger.debug("Already logged in")
return
@ -430,7 +432,7 @@ class TidalClient(ClientInterface):
self.logged_in = False
def login(self, email: str, pwd: str):
click.secho(f"Logging into {self.source}", fg='green')
click.secho(f"Logging into {self.source}", fg="green")
if self.logged_in:
return
@ -544,62 +546,103 @@ class TidalMQAClient:
def get_device_code(self):
data = {
'client_id': TIDAL_CLIENT_INFO['id'],
'scope': 'r_usr+w_usr+w_sub',
"client_id": TIDAL_CLIENT_INFO["id"],
"scope": "r_usr+w_usr+w_sub",
}
r = requests.post(f"{TIDAL_AUTH_URL}/device_authorization", data)
r.raise_for_status()
resp = self._api_post(f"{TIDAL_AUTH_URL}/device_authorization", data)
resp = r.json()
self.device_code = resp['deviceCode']
self.user_code = resp['userCode']
self.verification_url = resp['verificationUri']
self.expiry = resp['expiresIn']
self.auth_interval = resp['interval']
if 'status' in resp and resp['status'] != 200:
raise Exception(f"Device authorization failed {resp}")
logger.debug(pformat(resp))
self.device_code = resp["deviceCode"]
self.user_code = resp["userCode"]
self.verification_url = resp["verificationUri"]
self.user_code_expiry = resp["expiresIn"]
self.auth_interval = resp["interval"]
def check_auth_status(self):
data = {
'client_id': TIDAL_CLIENT_INFO['id'],
'device_code': self.device_code,
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'scope': 'r_usr+w_usr+w_sub',
"client_id": TIDAL_CLIENT_INFO["id"],
"device_code": self.device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
}
logger.debug(data)
r = requests.post(f"{TIDAL_AUTH_URL}/token", data=data, auth=(TIDAL_CLIENT_INFO['id'], TIDAL_CLIENT_INFO['secret']), verify=False).json()
logger.debug(r)
resp = self._api_post(
f"{TIDAL_AUTH_URL}/token",
data,
(TIDAL_CLIENT_INFO["id"], TIDAL_CLIENT_INFO["secret"]),
)
logger.debug(resp)
if r.get("status"):
if r['status'] != 200:
if r['status'] == 400 and r['sub_status'] == 1002:
return 2
else:
return 1
if resp.get("status", 200) != 200:
if resp["status"] == 400 and resp["sub_status"] == 1002:
return 2
else:
return 1
self.user_id = r['user']['userId']
self.country_code = r['user']['countryCode']
self.access_token = r['access_token']
self.refresh_token = r['refresh_token']
self.expires_in = r['expires_in']
self.user_id = resp["user"]["userId"]
self.country_code = resp["user"]["countryCode"]
self.access_token = resp["access_token"]
self.refresh_token = resp["refresh_token"]
self.access_token_expiry = resp["expires_in"]
return 0
def verify_access_token(self, token):
headers = {
'authorization': f"Bearer {token}",
"authorization": f"Bearer {token}",
}
r = requests.get('https://api.tidal.com/v1/sessions', headers=headers).json()
r = requests.get("https://api.tidal.com/v1/sessions", headers=headers).json()
if r.status != 200:
raise Exception("Login failed")
return True
def _api_request(self, path, params):
headers = {
'authorization': f"Bearer {self.access_token}"
def refresh_access_token(self):
data = {
"client_id": TIDAL_CLIENT_INFO["id"],
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
"scope": "r_usr+w_usr+w_sub",
}
params['countryCode'] = self.country_code
resp = self._api_post(
f"{TIDAL_AUTH_URL}/token",
data,
(TIDAL_CLIENT_INFO["id"], TIDAL_CLIENT_INFO["secret"]),
)
if resp.get("status") != 200:
raise Exception("Refresh failed")
self.user_id = resp["user"]["userId"]
self.country_code = resp["user"]["countryCode"]
self.access_token = resp["access_token"]
self.access_token_expiry = resp["expires_in"]
def login_by_access_token(self, token, user_id=None):
headers = {"authorization": f"Bearer {token}"}
resp = requests.get("https://api.tidal.com/v1/sessions", headers=headers).json()
if resp.get("status") != 200:
raise Exception("Login failed")
if str(resp.get("userId")) != str(user_id):
raise Exception(f"User id mismatch {locals()}")
self.user_id = resp["userId"]
self.country_code = resp["countryCode"]
self.access_token = token
def _api_request(self, path, params):
headers = {"authorization": f"Bearer {self.access_token}"}
params["countryCode"] = self.country_code
r = requests.get(f"{TIDAL_BASE}/{path}", headers=headers, params=params).json()
return r
def _api_post(self, url, data, auth=None):
r = requests.post(url, data=data, auth=auth, verify=False).json()
return r
def get_file_url(self, track_id, quality: int = 7):
params = {
"audioquality": TIDAL_Q_IDS[quality],
@ -607,4 +650,8 @@ class TidalMQAClient:
"assetpresentation": "FULL",
}
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
manifest = json.loads(base64.b64decode(resp['manifest']).decode("utf-8"))
codec = manifest['codecs']
file_url = manifest["urls"][0]
enc_key = manifest.get("keyId", "")
return resp

View File

@ -4,6 +4,7 @@ import shutil
import subprocess
from tempfile import gettempdir
from typing import Optional
from mutagen.flac import FLAC as FLAC_META
from .exceptions import ConversionError

View File

@ -98,8 +98,13 @@ class MusicDL(list):
"""
for source, url_type, item_id in self.parse_urls(url):
if item_id in self.db:
logger.info(f"ID {item_id} already downloaded, use --no-db to override.")
click.secho(f"ID {item_id} already downloaded, use --no-db to override.", fg='magenta')
logger.info(
f"ID {item_id} already downloaded, use --no-db to override."
)
click.secho(
f"ID {item_id} already downloaded, use --no-db to override.",
fg="magenta",
)
continue
self.handle_item(source, url_type, item_id)

View File

@ -431,7 +431,7 @@ class Track:
sampling_rate=kwargs.get("sampling_rate"),
remove_source=kwargs.get("remove_source", True),
)
click.secho(f"Converting {self!s}", fg='blue')
click.secho(f"Converting {self!s}", fg="blue")
engine.convert()
def get(self, *keys, default=None):
@ -1092,7 +1092,7 @@ class Artist(Tracklist):
"""Send an API call to get album info based on id."""
self.meta = self.client.get(self.id, media_type="artist")
# TODO find better fix for this
self.name = self.meta['items'][0]['artist']['name']
self.name = self.meta["items"][0]["artist"]["name"]
self._load_albums()
def _load_albums(self):

View File

@ -7,6 +7,8 @@ from typing import Optional
import requests
from pathvalidate import sanitize_filename
from tqdm import tqdm
from Crypto.Cipher import AES
from Crypto.Util import Counter
from .constants import LOG_DIR, TIDAL_COVER_URL
from .exceptions import NonStreamable
@ -155,3 +157,13 @@ def init_log(
def capitalize(s: str) -> str:
return s[0].upper() + s[1:]
def decrypt_mqa_file(in_path, out_path, key, nonce):
counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)
with open(in_path, "rb") as enc_file:
dec_bytes = decryptor.decrypt(enc_file.read())
with open(out_path, "wb") as dec_file:
dec_file.write(dec_bytes)