This commit is contained in:
Nathan Thomas 2023-10-06 13:54:27 -07:00
parent 95e906a196
commit 4448220085
6 changed files with 310 additions and 718 deletions

View File

@ -24,7 +24,7 @@ class PendingAlbum(Pending):
folder: str
async def resolve(self):
resp = self.client.get_metadata(id, "album")
resp = self.client.get_metadata({"id": self.id}, "album")
meta = AlbumMetadata.from_resp(self.client.source, resp)
tracklist = get_album_track_ids(self.client.source, resp)
album_folder = self._album_folder(self.folder, meta.album)

View File

@ -78,101 +78,6 @@ DB_PATH_MAP = {"downloads": DB_PATH, "failed_downloads": FAILED_DB_PATH}
# ---------------------------------------------- #
class CredentialPrompter(ABC):
def __init__(self, config: Config):
self.config = config
@abstractmethod
def has_creds(self) -> bool:
raise NotImplemented
@abstractmethod
def prompt(self):
"""Prompt for credentials in the appropriate way,
and save them to the configuration."""
raise NotImplemented
@abstractmethod
def save(self):
"""Save current config to file"""
raise NotImplemented
class QobuzPrompter(CredentialPrompter):
def has_creds(self) -> bool:
c = self.config.session.qobuz
return c.email_or_userid != "" and c.password_or_token != ""
def prompt(self):
secho("Enter Qobuz email:", fg="green")
email = input()
secho(
"Enter Qobuz password (will not show on screen):",
fg="green",
)
pwd = md5(getpass(prompt="").encode("utf-8")).hexdigest()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
c = self.config.session.qobuz
c.use_auth_token = False
c.email_or_userid = email
c.password_or_token = pwd
def save(self):
c = self.config.session.qobuz
cf = self.config.file.qobuz
cf.use_auth_token = False
cf.email_or_userid = c.email_or_userid
cf.password_or_token = c.password_or_token
self.config.file.set_modified()
class TidalPrompter(CredentialPrompter):
def prompt(self):
# TODO: needs to be moved from TidalClient to here
raise NotImplemented
class DeezerPrompter(CredentialPrompter):
def has_creds(self):
c = self.config.session.deezer
return c.arl != ""
def prompt(self):
secho(
"If you're not sure how to find the ARL cookie, see the instructions at ",
nl=False,
dim=True,
)
secho(
"https://github.com/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie",
underline=True,
fg="blue",
)
c = self.config.session.deezer
c.arl = input(style("ARL: ", fg="green"))
def save(self):
c = self.config.session.deezer
cf = self.config.file.deezer
cf.arl = c.arl
self.config.file.set_modified()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
PROMPTERS = {
"qobuz": QobuzPrompter,
"deezer": DeezerPrompter,
"tidal": TidalPrompter,
}
class RipCore(list):
def __init__(self, config: Config):
"""Create a RipCore object.

View File

@ -38,15 +38,9 @@ class CoverUrls:
large: Optional[str]
original: Optional[str]
def largest(self):
if self.original is not None:
return self.original
if self.large is not None:
return self.large
if self.small is not None:
return self.small
if self.thumbnail is not None:
return self.thumbnail
def largest(self) -> Optional[str]:
# Return first non-None item
return self.original or self.large or self.small or self.thumbnail
@dataclass(slots=True)
@ -109,7 +103,7 @@ class AlbumMetadata:
albumartist: str
year: str
genre: list[str]
covers: list[CoverUrls]
covers: CoverUrls
albumcomposer: Optional[str] = None
comment: Optional[str] = None
@ -127,7 +121,68 @@ class AlbumMetadata:
@classmethod
def from_qobuz(cls, resp) -> AlbumMetadata:
raise NotImplemented
album = resp.get("title", "Unknown Album")
tracktotal = resp.get("tracks_count", 1)
genre = resp.get("genres_list") or resp.get("genre") or []
genres = list(set(re.findall(r"([^\u2192\/]+)", "/".join(genre))))
date = resp.get("release_date_original") or resp.get("release_date")
year = date[:4]
copyright = resp.get("copyright")
if artists := resp.get("artists"):
albumartist = ", ".join(a["name"] for a in artists)
else:
albumartist = safe_get(resp, "artist", "name")
albumcomposer = safe_get(resp, "composer", "name")
label = resp.get("label")
description = resp.get("description")
disctotal = (
max(
track.get("media_number", 1)
for track in safe_get(resp, "tracks", "items", default=[{}])
)
or 1
)
explicit = resp.get("parental_warning", False)
if isinstance(label, dict):
label = self.label.get("name")
# Non-embedded information
version = resp.get("version")
cover_urls = CoverUrls.from_qobuz(resp)
streamable = resp.get("streamable", False)
bit_depth = resp.get("maximum_bit_depth")
sampling_rate = resp.get("maximum_sampling_rate")
quality = get_quality_id(self.bit_depth, self.sampling_rate)
booklets = resp.get("goodies")
item_id = resp.get("id")
if sampling_rate is not None:
sampling_rate *= 1000
info = AlbumInfo(item_id, quality, explicit, sampling_rate, bit_depth, booklets)
return AlbumMetadata(
album,
albumartist,
year,
genre=genres,
covers=cover_urls,
albumcomposer,
comment,
compilation,
copyright(),
cover,
date,
description,
disctotal,
encoder,
grouping,
lyrics,
purchase_date,
tracktotal,
)
@classmethod
def from_deezer(cls, resp) -> AlbumMetadata:
@ -164,613 +219,3 @@ class AlbumInfo:
booklets = None
work: Optional[str] = None
class TrackMetadata1:
"""Contains all of the metadata needed to tag the file.
Tags contained:
* title
* artist
* album
* albumartist
* composer
* year
* comment
* description
* purchase_date
* grouping
* genre
* lyrics
* encoder
* copyright
* compilation
* cover
* tracknumber
* discnumber
* tracktotal
* disctotal
"""
albumartist: str
composer: Optional[str] = None
albumcomposer: Optional[str] = None
comment: Optional[str] = None
description: Optional[str] = None
purchase_date: Optional[str] = None
date: Optional[str] = None
grouping: Optional[str] = None
lyrics: Optional[str] = None
encoder: Optional[str] = None
compilation: Optional[str] = None
cover: Optional[str] = None
tracktotal: Optional[int] = None
tracknumber: Optional[int] = None
discnumber: Optional[int] = None
disctotal: Optional[int] = None
# not included in tags
explicit: bool = False
quality: Optional[int] = None
sampling_rate: Optional[int] = None
bit_depth: Optional[int] = None
booklets = None
cover_urls = Optional[OrderedDict]
work: Optional[str]
id: Optional[str]
# Internals
_artist: Optional[str] = None
_copyright: Optional[str] = None
_genres: Optional[Iterable] = None
_title: Optional[str]
def __init__(
self,
track: Optional[Union[TrackMetadata, dict]] = None,
album: Optional[Union[TrackMetadata, dict]] = None,
source="qobuz",
):
"""Create a TrackMetadata object.
:param track: track dict from API
:type track: Optional[dict]
:param album: album dict from API
:type album: Optional[dict]
"""
# embedded information
# TODO: add this to static attrs
self.__source = source
if isinstance(track, TrackMetadata):
self.update(track)
elif track is not None:
self.add_track_meta(track)
if isinstance(album, TrackMetadata):
self.update(album)
elif album is not None:
self.add_album_meta(album)
def update(self, meta: TrackMetadata):
"""Update the attributes from another TrackMetadata object.
:param meta:
:type meta: TrackMetadata
"""
assert isinstance(meta, TrackMetadata)
for k, v in meta.asdict().items():
if v is not None:
setattr(self, k, v)
def add_album_meta(self, resp: dict):
"""Parse the metadata from an resp dict returned by the API.
:param dict resp: from API
"""
if self.__source == "qobuz":
# Tags
self.album = resp.get("title", "Unknown Album")
self.tracktotal = resp.get("tracks_count", 1)
self.genre = resp.get("genres_list") or resp.get("genre") or []
self.date = resp.get("release_date_original") or resp.get("release_date")
self.copyright = resp.get("copyright")
if artists := resp.get("artists"):
self.albumartist = ", ".join(a["name"] for a in artists)
else:
self.albumartist = safe_get(resp, "artist", "name")
self.albumcomposer = safe_get(resp, "composer", "name")
self.label = resp.get("label")
self.description = resp.get("description")
self.disctotal = (
max(
track.get("media_number", 1)
for track in safe_get(resp, "tracks", "items", default=[{}])
)
or 1
)
self.explicit = resp.get("parental_warning", False)
if isinstance(self.label, dict):
self.label = self.label.get("name")
# Non-embedded information
self.version = resp.get("version")
self.cover_urls = get_cover_urls(resp, self.__source)
self.streamable = resp.get("streamable", False)
self.bit_depth = resp.get("maximum_bit_depth")
self.sampling_rate = resp.get("maximum_sampling_rate")
self.quality = get_quality_id(self.bit_depth, self.sampling_rate)
self.booklets = resp.get("goodies")
self.id = resp.get("id")
if self.sampling_rate is not None:
self.sampling_rate *= 1000
elif self.__source == "tidal":
self.album = resp.get("title", "Unknown Album")
self.tracktotal = resp.get("numberOfTracks", 1)
# genre not returned by API
self.date = resp.get("releaseDate")
self.copyright = resp.get("copyright")
if artists := resp.get("artists"):
self.albumartist = ", ".join(a["name"] for a in artists)
else:
self.albumartist = safe_get(resp, "artist", "name")
self.disctotal = resp.get("numberOfVolumes", 1)
self.isrc = resp.get("isrc")
# label not returned by API
# non-embedded
self.explicit = resp.get("explicit", False)
# 80, 160, 320, 640, 1280
self.cover_urls = get_cover_urls(resp, self.__source)
self.streamable = resp.get("allowStreaming", False)
self.id = resp.get("id")
if q := resp.get("audioQuality"): # for album entries in single tracks
self._get_tidal_quality(q)
elif self.__source == "deezer":
self.album = resp.get("title", "Unknown Album")
self.tracktotal = resp.get("track_total", 0) or resp.get("nb_tracks", 0)
self.disctotal = (
max(track.get("disk_number") for track in resp.get("tracks", [{}])) or 1
)
self.genre = safe_get(resp, "genres", "data")
self.date = resp.get("release_date")
self.albumartist = safe_get(resp, "artist", "name")
self.label = resp.get("label")
self.url = resp.get("link")
self.explicit = resp.get("parental_warning", False)
# not embedded
self.quality = 2
self.bit_depth = 16
self.sampling_rate = 44100
self.cover_urls = get_cover_urls(resp, self.__source)
self.streamable = True
self.id = resp.get("id")
elif self.__source == "soundcloud":
raise NotImplementedError
else:
raise InvalidSourceError(self.__source)
def add_track_meta(self, track: dict):
"""Parse the metadata from a track dict returned by an API.
:param track:
"""
if self.__source == "qobuz":
self.title = track["title"].strip()
self._mod_title(track.get("version"), track.get("work"))
self.composer = track.get("composer", {}).get("name")
self.tracknumber = track.get("track_number", 1)
self.discnumber = track.get("media_number", 1)
self.artist = safe_get(track, "performer", "name")
elif self.__source == "tidal":
self.title = track["title"].strip()
self._mod_title(track.get("version"), None)
self.tracknumber = track.get("trackNumber", 1)
self.discnumber = track.get("volumeNumber", 1)
self.artist = track.get("artist", {}).get("name")
self._get_tidal_quality(track["audioQuality"])
elif self.__source == "deezer":
self.title = track["title"].strip()
self._mod_title(track.get("version"), None)
self.tracknumber = track.get("track_position", 1)
self.discnumber = track.get("disk_number", 1)
self.artist = safe_get(track, "artist", "name")
elif self.__source == "soundcloud":
self.title = track["title"].strip()
self.genre = track["genre"]
self.artist = self.albumartist = track["user"]["username"]
self.year = track["created_at"][:4]
self.label = track["label_name"]
self.description = track["description"]
self.album = safe_get(track, "publisher_metadata", "album_title")
self.copyright = safe_get(track, "publisher_metadata", "p_line")
self.tracknumber = 0
self.tracktotal = 0
self.quality = 0
self.cover_urls = get_cover_urls(track, "soundcloud")
else:
raise ValueError(self.__source)
if track.get("album"):
self.add_album_meta(track["album"])
def _mod_title(self, version: Optional[str], work: Optional[str]):
"""Modify title using the version and work.
:param version:
:type version: str
:param work:
:type work: str
"""
if version is not None and version not in self.title:
self.title = f"{self.title} ({version})"
if work is not None and work not in self.title:
logger.debug("Work found: %s", work)
self.title = f"{work}: {self.title}"
def _get_tidal_quality(self, q: str):
self.quality = TIDAL_Q_MAP[q]
if self.quality >= 2:
self.bit_depth = 24 if self.get("quality") == 3 else 16
self.sampling_rate = 44100
@property
def title(self) -> Optional[str]:
if not hasattr(self, "_title"):
return None
# if self.explicit:
# return f"{self._title} (Explicit)"
return self._title
@title.setter
def title(self, new_title):
self._title = new_title
@property
def album(self) -> str:
"""Return the album of the track.
:rtype: str
"""
assert hasattr(self, "_album"), "Must set album before accessing"
album = self._album
if self.get("version") and self["version"] not in album:
album = f"{self._album} ({self.version})"
if self.get("work") and self["work"] not in album:
album = f"{self.work}: {album}"
return album
@album.setter
def album(self, val):
"""Set the value of the album.
:param val:
"""
self._album = val
@property
def artist(self) -> Optional[str]:
"""Return the value to set for the artist tag.
Defaults to `self.albumartist` if there is no track artist.
:rtype: str
"""
if self._artist is not None:
return self._artist
return None
@artist.setter
def artist(self, val: str):
"""Set the internal artist variable to val.
:param val:
:type val: str
"""
self._artist = val
@property
def genre(self) -> Optional[str]:
"""Format the genre list returned by an API.
It cleans up the Qobuz Response:
>>> meta.genre = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé']
>>> meta.genre
'Pop, Rock, Alternatif et Indé'
:rtype: str
"""
if not self.get("_genres"):
return None
if isinstance(self._genres, dict):
self._genres = self._genres["name"]
if isinstance(self._genres, list):
if self.__source == "qobuz":
genres: Iterable = re.findall(r"([^\u2192\/]+)", "/".join(self._genres))
genres = set(genres)
elif self.__source == "deezer":
genres = (g["name"] for g in self._genres)
else:
raise Exception
return ", ".join(genres)
elif isinstance(self._genres, str):
return self._genres
raise TypeError(f"Genre must be list or str, not {type(self._genres)}")
@genre.setter
def genre(self, val: Union[Iterable, dict]):
"""Set the internal `genre` field to the given list.
It is not formatted until it is requested with `meta.genre`.
:param val:
:type val: Union[str, list]
"""
self._genres = val
@property
def copyright(self) -> Optional[str]:
"""Format the copyright string to use unicode characters.
:rtype: str, None
"""
if hasattr(self, "_copyright"):
if self._copyright is None:
return None
copyright: str = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, self._copyright)
copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, copyright)
return copyright
logger.debug("Accessed copyright tag before setting, returning None")
return None
@copyright.setter
def copyright(self, val: Optional[str]):
"""Set the internal copyright variable to the given value.
Only formatted when requested.
:param val:
:type val: str
"""
self._copyright = val
@property
def year(self) -> Optional[str]:
"""Return the year published of the track.
:rtype: str
"""
if hasattr(self, "_year"):
return self._year
if hasattr(self, "date") and isinstance(self.date, str):
return self.date[:4]
return None
@year.setter
def year(self, val):
"""Set the internal year variable to val.
:param val:
"""
self._year = val
def get_formatter(self, max_quality: int) -> dict:
"""Return a dict that is used to apply values to file format strings.
:rtype: dict
"""
# the keys in the tuple are the possible keys for format strings
return {k: getattr(self, k) for k in TRACK_KEYS}
def get_album_formatter(self, max_quality: int) -> dict:
"""Return a dict that is used to apply values to file format strings.
:param max_quality:
:type max_quality: int
:rtype: dict
"""
formatter = {k: self.get(k) for k in ALBUM_KEYS}
formatter["container"] = "FLAC" if max_quality >= 2 else "MP3"
formatter["sampling_rate"] /= 1000
return formatter
def tags(self, container: str = "flac", exclude: Optional[set] = None) -> Generator:
"""Create a generator of key, value pairs for use with mutagen.
The *_KEY dicts are organized in the format:
>>> {attribute_name: key_to_use_for_metadata}
They are then converted to the format
>>> {key_to_use_for_metadata: value_of_attribute}
so that they can be used like this:
>>> audio = MP4(path)
>>> for k, v in meta.tags(container='MP4'):
... audio[k] = v
>>> audio.save()
:param container: the container format
:type container: str
:rtype: Generator
"""
if exclude is None:
exclude = set()
logger.debug("Excluded tags: %s", exclude)
container = container.lower()
if container in ("flac", "vorbis"):
return self.__gen_flac_tags(exclude)
if container in ("mp3", "id3"):
return self.__gen_mp3_tags(exclude)
if container in ("alac", "m4a", "mp4", "aac"):
return self.__gen_mp4_tags(exclude)
raise InvalidContainerError(f"Invalid container {container}")
def __gen_flac_tags(self, exclude: set) -> Generator:
"""Generate key, value pairs to tag FLAC files.
:rtype: Tuple[str, str]
"""
for k, v in FLAC_KEY.items():
logger.debug("attr: %s", k)
if k in exclude:
continue
tag = getattr(self, k)
if tag:
if k in {
"tracknumber",
"discnumber",
"tracktotal",
"disctotal",
}:
tag = f"{int(tag):02}"
logger.debug("Adding tag %s: %s", v, tag)
yield (v, str(tag))
def __gen_mp3_tags(self, exclude: set) -> Generator:
"""Generate key, value pairs to tag MP3 files.
:rtype: Tuple[str, str]
"""
for k, v in MP3_KEY.items():
if k in exclude:
continue
if k == "tracknumber":
text = f"{self.tracknumber}/{self.tracktotal}"
elif k == "discnumber":
text = f"{self.discnumber}/{self.get('disctotal', 1)}"
else:
text = getattr(self, k)
if text is not None and v is not None:
yield (v.__name__, v(encoding=3, text=text))
def __gen_mp4_tags(self, exclude: set) -> Generator:
"""Generate key, value pairs to tag ALAC or AAC files.
:rtype: Tuple[str, str]
"""
for k, v in MP4_KEY.items():
if k in exclude:
continue
if k == "tracknumber":
text = [(self.tracknumber, self.tracktotal)]
elif k == "discnumber":
text = [(self.discnumber, self.get("disctotal", 1))]
else:
text = getattr(self, k)
if v is not None and text is not None:
yield (v, text)
def asdict(self) -> dict:
"""Return a dict representation of self.
:rtype: dict
"""
ret = {}
for attr in dir(self):
if not attr.startswith("_") and not callable(getattr(self, attr)):
ret[attr] = getattr(self, attr)
return ret
def __setitem__(self, key, val):
"""Dict-like access for tags.
:param key:
:param val:
"""
setattr(self, key, val)
def __getitem__(self, key):
"""Dict-like access for tags.
:param key:
"""
return getattr(self, key)
def get(self, key, default=None):
"""Return the requested attribute of the object, with a default value.
:param key:
:param default:
"""
if hasattr(self, key):
res = self.__getitem__(key)
if res is not None:
return res
return default
return default
def set(self, key, val) -> str:
"""Set an attribute.
Equivalent to:
>>> meta[key] = val
:param key:
:param val:
:rtype: str
"""
return self.__setitem__(key, val)
def __hash__(self) -> int:
"""Get a hash of this.
Warning: slow.
:rtype: int
"""
return sum(hash(v) for v in self.asdict().values() if isinstance(v, Hashable))
def __repr__(self) -> str:
"""Return the string representation of the metadata object.
:rtype: str
"""
# TODO: make a more readable repr
return f"<TrackMetadata object {hex(hash(self))}>"

211
src/prompter.py Normal file
View File

@ -0,0 +1,211 @@
import hashlib
import time
from abc import ABC, abstractmethod
from getpass import getpass
from click import launch, secho, style
from .client import AuthenticationError, Client, MissingCredentials
from .config import Config
from .deezer_client import DeezerClient
from .qobuz_client import QobuzClient
from .tidal_client import TidalClient
class CredentialPrompter(ABC):
client: Client
def __init__(self, config: Config, client: Client):
self.config = config
self.client = self.type_check_client(client)
@abstractmethod
def has_creds(self) -> bool:
raise NotImplemented
@abstractmethod
def prompt(self):
"""Prompt for credentials in the appropriate way,
and save them to the configuration."""
raise NotImplemented
@abstractmethod
def save(self):
"""Save current config to file"""
raise NotImplemented
@abstractmethod
def type_check_client(self, client: Client):
raise NotImplemented
class QobuzPrompter(CredentialPrompter):
client: QobuzClient
def has_creds(self) -> bool:
c = self.config.session.qobuz
return c.email_or_userid != "" and c.password_or_token != ""
async def prompt(self):
if not self.has_creds():
self._prompt_creds_and_set_session_config()
while True:
try:
await self.client.login()
break
except AuthenticationError:
secho("Invalid credentials, try again.", fg="yellow")
self._prompt_creds_and_set_session_config()
except MissingCredentials:
self._prompt_creds_and_set_session_config()
def _prompt_creds_and_set_session_config(self):
secho("Enter Qobuz email:", fg="green")
email = input()
secho(
"Enter Qobuz password (will not show on screen):",
fg="green",
)
pwd = hashlib.md5(getpass(prompt="").encode("utf-8")).hexdigest()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
c = self.config.session.qobuz
c.use_auth_token = False
c.email_or_userid = email
c.password_or_token = pwd
def save(self):
c = self.config.session.qobuz
cf = self.config.file.qobuz
cf.use_auth_token = False
cf.email_or_userid = c.email_or_userid
cf.password_or_token = c.password_or_token
self.config.file.set_modified()
def type_check_client(self, client) -> QobuzClient:
assert isinstance(client, QobuzClient)
return client
class TidalPrompter(CredentialPrompter):
timeout_s: int = 600 # 5 mins to login
client: TidalClient
def has_creds(self) -> bool:
return len(self.config.session.tidal.access_token) > 0
async def prompt(self):
device_code = await self.client._get_device_code()
login_link = f"https://{device_code}"
secho(
f"Go to {login_link} to log into Tidal within 5 minutes.",
fg="blue",
)
launch(login_link)
start = time.time()
elapsed = 0.0
info = {}
while elapsed < self.timeout_s:
elapsed = time.time() - start
status, info = await self.client._get_auth_status(device_code)
if status == 2:
# pending
time.sleep(4)
continue
elif status == 0:
# successful
break
else:
raise Exception
c = self.config.session.tidal
c.user_id = info["user_id"] # type: ignore
c.country_code = info["country_code"] # type: ignore
c.access_token = info["access_token"] # type: ignore
c.refresh_token = info["refresh_token"] # type: ignore
c.token_expiry = info["token_expiry"] # type: ignore
self.client._update_authorization_from_config()
self.save()
def type_check_client(self, client) -> TidalClient:
assert isinstance(client, TidalClient)
return client
def save(self):
c = self.config.session.tidal
cf = self.config.file.tidal
cf.user_id = c.user_id
cf.country_code = c.country_code
cf.access_token = c.access_token
cf.refresh_token = c.refresh_token
cf.token_expiry = c.token_expiry
self.config.file.set_modified()
class DeezerPrompter(CredentialPrompter):
client: DeezerClient
def has_creds(self):
c = self.config.session.deezer
return c.arl != ""
async def prompt(self):
if not self.has_creds():
self._prompt_creds_and_set_session_config()
while True:
try:
await self.client.login()
break
except AuthenticationError:
secho("Invalid arl, try again.", fg="yellow")
self._prompt_creds_and_set_session_config()
self.save()
def _prompt_creds_and_set_session_config(self):
secho(
"If you're not sure how to find the ARL cookie, see the instructions at ",
nl=False,
dim=True,
)
secho(
"https://github.com/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie",
underline=True,
fg="blue",
)
c = self.config.session.deezer
c.arl = input(style("ARL: ", fg="green"))
def save(self):
c = self.config.session.deezer
cf = self.config.file.deezer
cf.arl = c.arl
self.config.file.set_modified()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
def type_check_client(self, client) -> DeezerClient:
assert isinstance(client, DeezerClient)
return client
PROMPTERS = {
"qobuz": (QobuzPrompter, QobuzClient),
"deezer": (DeezerPrompter, QobuzClient),
"tidal": (TidalPrompter, QobuzClient),
}
def get_prompter(client: Client, config: Config):
"""Return an instance of a prompter."""
p, c = PROMPTERS[client.source]
assert isinstance(client, c)
return p(config, client)

View File

@ -147,6 +147,20 @@ class TidalClient(Client):
c.token_expiry = resp_json["expires_in"] + time.time()
self._update_authorization_from_config()
async def _get_device_code(self):
"""Get the device code that will be used to log in on the browser."""
data = {
"client_id": CLIENT_ID,
"scope": "r_usr+w_usr+w_sub",
}
_resp = await self._api_post(f"{AUTH_URL}/device_authorization", data)
resp = await _resp.json()
if resp.get("status", 200) != 200:
raise Exception(f"Device authorization failed {resp}")
return resp["verificationUriComplete"]
async def _api_post(self, url, data, auth=None):
"""Post to the Tidal API.

View File

@ -1,3 +1,4 @@
import os
from dataclasses import dataclass
from .client import Client
@ -13,6 +14,22 @@ class Track(Media):
downloadable: Downloadable
config: Config
folder: str
download_path: str = ""
async def preprocess(self):
folder = self._get_folder(self.folder)
os.makedirs(folder, exist_ok=True)
# Run in background while track downloads?
# Don't download again if part of album
await self._download_cover()
async def download(self):
async with get_progress_bar(self.config, self.downloadable.size()) as bar:
self.downloadable.download(self.download_path, lambda x: bar.update(x))
async def postprocess(self):
await self.tag()
await self.convert()
@dataclass(slots=True)
@ -24,7 +41,7 @@ class PendingTrack(Pending):
folder: str
async def resolve(self) -> Track:
resp = await self.client.get_metadata(id, "track")
resp = await self.client.get_metadata({"id": self.id}, "track")
meta = TrackMetadata.from_resp(self.album, self.client.source, resp)
quality = getattr(self.config.session, self.client.source).quality
assert isinstance(quality, int)