This commit is contained in:
Nathan Thomas 2023-10-29 14:42:10 -07:00
parent 7cbd77edc5
commit 837e934476
31 changed files with 990 additions and 172 deletions

22
poetry.lock generated
View File

@ -503,7 +503,7 @@ rapidfuzz = ">=2.2.0,<3.0.0"
name = "click"
version = "8.1.7"
description = "Composable command line interface toolkit"
category = "dev"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@ -514,6 +514,24 @@ files = [
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "click-help-colors"
version = "0.9.2"
description = "Colorization of help messages in Click"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "click-help-colors-0.9.2.tar.gz", hash = "sha256:756245e542d29226bb3bc056bfa58886f212ba2b82f4e8cf5fc884176ac96d72"},
{file = "click_help_colors-0.9.2-py3-none-any.whl", hash = "sha256:82ef028cb0a332a154fa42fd7cca2c728a019b32bcb5a26bb32367551014a16f"},
]
[package.dependencies]
click = ">=7.0,<9"
[package.extras]
dev = ["pytest"]
[[package]]
name = "colorama"
version = "0.4.6"
@ -2052,4 +2070,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p
[metadata]
lock-version = "2.0"
python-versions = ">=3.8 <4.0"
content-hash = "727f56fee96d755d32de2f42404b98d8425382aa5d7eecd67e3c17dea1ffe5e4"
content-hash = "9d19c6f43db6871829f90cf3a5169172211b5f6df3818993bcaa0a850b42f835"

View File

@ -18,10 +18,10 @@ packages = [
]
[tool.poetry.scripts]
rip = "streamrip.cli:main"
rip = "streamrip.cli2:rip"
[tool.poetry.dependencies]
python = ">=3.8 <4.0"
python = ">=3.10 <4.0"
mutagen = "^1.45.1"
tqdm = "^4.61.1"
tomlkit = "^0.7.2"
@ -42,6 +42,7 @@ aiolimiter = "^1.1.0"
pytest-mock = "^3.11.1"
pytest-asyncio = "^0.21.1"
rich = "^13.6.0"
click-help-colors = "^0.9.2"
[tool.poetry.urls]
"Bug Reports" = "https://github.com/nathom/streamrip/issues"

View File

@ -1 +1 @@
__all__ = ["config"]

View File

@ -1,6 +1,7 @@
import asyncio
from dataclasses import dataclass
from .artwork import download_artwork
from .client import Client
from .config import Config
from .media import Media, Pending
@ -21,13 +22,16 @@ class PendingAlbum(Pending):
id: str
client: Client
config: Config
folder: str
async def resolve(self):
resp = self.client.get_metadata({"id": self.id}, "album")
meta = AlbumMetadata.from_resp(self.client.source, resp)
resp = await self.client.get_metadata(self.id, "album")
meta = AlbumMetadata.from_resp(resp, self.client.source)
tracklist = get_album_track_ids(self.client.source, resp)
album_folder = self._album_folder(self.folder, meta.album)
folder = self.config.session.downloads.folder
album_folder = self._album_folder(folder, meta.album)
embed_cover, _ = await download_artwork(
self.client.session, album_folder, meta.covers, self.config.session.artwork
)
pending_tracks = [
PendingTrack(
id=id,
@ -35,15 +39,16 @@ class PendingAlbum(Pending):
client=self.client,
config=self.config,
folder=album_folder,
cover_path=embed_cover,
)
for id in tracklist
]
tracks: list[Track] = await asyncio.gather(
*(track.resolve() for track in pending_tracks)
)
return Album(meta, tracks, self.config)
return Album(meta, tracks, self.config, album_folder)
def _album_folder(self, parent: str, album_name: str) -> str:
# find name of album folder
# create album folder if it doesnt exist
pass
raise NotImplementedError

View File

@ -22,6 +22,7 @@ async def download_artwork(
covers (Covers): The set of available covers.
Returns:
(path to embed cover, path to hires cover)
The path of the cover to embed, or None if there either is no artwork available or
if artwork embedding is turned off.
"""

182
streamrip/cli2.py Normal file
View File

@ -0,0 +1,182 @@
import asyncio
import logging
import os
import shutil
import subprocess
from functools import wraps
import click
from click import secho
from click_help_colors import HelpColorsGroup
from rich.logging import RichHandler
from rich.traceback import install
from .config import Config, set_user_defaults
from .main import Main
from .user_paths import BLANK_CONFIG_PATH, CONFIG_PATH
logging.basicConfig(
level="DEBUG", format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
logger = logging.getLogger("streamrip")
def echo_i(msg, **kwargs):
secho(msg, fg="green", **kwargs)
def echo_w(msg, **kwargs):
secho(msg, fg="yellow", **kwargs)
def echo_e(msg, **kwargs):
secho(msg, fg="yellow", **kwargs)
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
return asyncio.run(f(*args, **kwargs))
return wrapper
@click.group(
cls=HelpColorsGroup,
help_headers_color="yellow",
help_options_color="green",
)
@click.version_option(version="2.0")
@click.option(
"-c", "--config-path", default=CONFIG_PATH, help="Path to the configuration file"
)
@click.option(
"-v", "--verbose", help="Enable verbose output (debug mode)", is_flag=True
)
@click.pass_context
def rip(ctx, config_path, verbose):
"""
Streamrip: the all in one music downloader.
"""
if verbose:
install(suppress=[click], show_locals=True, locals_hide_sunder=False)
logger.setLevel(logging.DEBUG)
logger.debug("Showing all debug logs")
else:
install(suppress=[click, asyncio], max_frames=1)
logger.setLevel(logging.WARNING)
ctx.ensure_object(dict)
if not os.path.isfile(config_path):
echo_i(f"No file found at {config_path}, creating default config.")
shutil.copy(BLANK_CONFIG_PATH, config_path)
set_user_defaults(config_path)
ctx.obj["config_path"] = config_path
ctx.obj["verbose"] = verbose
@rip.command()
@click.argument("urls", nargs=-1, required=True)
@click.pass_context
@coro
async def url(ctx, urls):
"""Download content from URLs.
Example usage:
rip url TODO: find url
"""
config_path = ctx.obj["config_path"]
with Config(config_path) as cfg:
main = Main(cfg)
for u in urls:
await main.add(u)
await main.resolve()
await main.rip()
@rip.command()
@click.argument("path", required=True)
@click.pass_context
@coro
async def file(ctx, path):
"""Download content from URLs in a file seperated by newlines.
Example usage:
rip file urls.txt
"""
config_path = ctx.obj["config_path"]
with Config(config_path) as cfg:
main = Main(cfg)
with open(path) as f:
for u in f:
await main.add(u)
await main.resolve()
await main.rip()
@rip.group()
def config():
"""Manage configuration files."""
pass
@config.command("open")
@click.option("-v", "--vim", help="Open in (Neo)Vim", is_flag=True)
@click.pass_context
def config_open(ctx, vim):
"""Open the config file in a text editor."""
config_path = ctx.obj["config_path"]
echo_i(f"Opening file at {config_path}")
if vim:
if shutil.which("nvim") is not None:
subprocess.run(["nvim", config_path])
else:
subprocess.run(["vim", config_path])
else:
click.launch(config_path)
@config.command("reset")
@click.option("-y", "--yes", help="Don't ask for confirmation.", is_flag=True)
@click.pass_context
def config_reset(ctx, yes):
"""Reset the config file."""
config_path = ctx.obj["config_path"]
if not yes:
echo_w(
f"Are you sure you want to reset the config file at {config_path}? [y/n] ",
nl=False,
)
result = input()
if result.strip() != "y":
echo_i("Reset aborted.")
return
shutil.copy(BLANK_CONFIG_PATH, config_path)
set_user_defaults(config_path)
echo_i(f"Reset the config file at {config_path}!")
@rip.command()
@click.argument("query", required=True)
@click.argument("source", required=True)
@coro
async def search(query, source):
"""
Search for content using a specific source.
"""
echo_i(f'Searching for "{query}" in source: {source}')
@rip.command()
@click.argument("url", required=True)
def lastfm(url):
pass
if __name__ == "__main__":
rip()

View File

@ -21,13 +21,14 @@ class Client(ABC):
source: str
max_quality: int
session: aiohttp.ClientSession
logged_in: bool
@abstractmethod
async def login(self):
raise NotImplementedError
@abstractmethod
async def get_metadata(self, item: dict[str, Union[str, int, float]], media_type):
async def get_metadata(self, item: str, media_type):
raise NotImplementedError
@abstractmethod
@ -35,7 +36,7 @@ class Client(ABC):
raise NotImplementedError
@abstractmethod
async def get_downloadable(self, item_id: str, quality: int) -> Downloadable:
async def get_downloadable(self, item: dict, quality: int) -> Downloadable:
raise NotImplementedError
@staticmethod
@ -58,5 +59,7 @@ class Client(ABC):
def __del__(self):
# make sure http session is closed by end of program
if hasattr(self, "session"):
asyncio.run(self.session.close())
# if hasattr(self, "session"):
# loop = asyncio.get_event_loop()
# loop.run_until_complete(self.session.close())
pass

View File

@ -8,6 +8,13 @@ from dataclasses import dataclass, fields
from tomlkit.api import dumps, parse
from tomlkit.toml_document import TOMLDocument
from .user_paths import (
DEFAULT_DOWNLOADS_DB_PATH,
DEFAULT_DOWNLOADS_FOLDER,
DEFAULT_FAILED_DOWNLOADS_DB_PATH,
DEFAULT_YOUTUBE_VIDEO_DOWNLOADS_FOLDER,
)
logger = logging.getLogger("streamrip")
CURRENT_CONFIG_VERSION = "2.0"
@ -237,8 +244,10 @@ class ConfigData:
def from_toml(cls, toml_str: str):
# TODO: handle the mistake where Windows people forget to escape backslash
toml = parse(toml_str)
if toml["misc"]["version"] != CURRENT_CONFIG_VERSION: # type: ignore
raise Exception("Need to update config")
if (v := toml["misc"]["version"]) != CURRENT_CONFIG_VERSION: # type: ignore
raise Exception(
f"Need to update config from {v} to {CURRENT_CONFIG_VERSION}"
)
downloads = DownloadsConfig(**toml["downloads"]) # type: ignore
qobuz = QobuzConfig(**toml["qobuz"]) # type: ignore
@ -310,7 +319,7 @@ def update_toml_section_from_config(toml_section, config):
class Config:
def __init__(self, path: str):
def __init__(self, path: str, /):
self._path = path
with open(path) as toml_file:
@ -329,3 +338,24 @@ class Config:
@classmethod
def defaults(cls):
return cls(DEFAULT_CONFIG_PATH)
def __enter__(self):
return self
def __exit__(self, *_):
self.save_file()
def set_user_defaults(path: str, /):
"""Update the TOML file at the path with user-specific default values.
MUST copy updated blank config to `path` before calling this!
"""
with open(path) as f:
toml = parse(f.read())
toml["downloads"]["folder"] = DEFAULT_DOWNLOADS_FOLDER # type: ignore
toml["database"]["downloads_path"] = DEFAULT_DOWNLOADS_DB_PATH # type: ignore
toml["database"]["failed_downloads_path"] = DEFAULT_FAILED_DOWNLOADS_DB_PATH # type: ignore
toml["youtube"]["video_downloads_folder"] = DEFAULT_YOUTUBE_VIDEO_DOWNLOADS_FOLDER # type: ignore
with open(path, "w") as f:
f.write(dumps(toml))

View File

@ -11,6 +11,7 @@ concurrency = true
# The maximum number of tracks to download at once
# If you have very fast internet, you will benefit from a higher value,
# A value that is too high for your bandwidth may cause slowdowns
# Set to -1 for no limit
max_connections = 3
# Max number of API requests to handle per minute
# Set to -1 for no limit

View File

@ -394,7 +394,6 @@ class RipCore(list):
https://www.qobuz.com/us-en/type/name/id
https://open.qobuz.com/type/id
https://play.qobuz.com/type/id
https://www.deezer.com/us/type/id
https://tidal.com/browse/type/id

View File

@ -65,9 +65,7 @@ class BasicDownloadable(Downloadable):
self.extension = extension
async def _download(self, path: str, callback: Callable[[int], None]):
async with self.session.get(
self.url, allow_redirects=True, stream=True
) as response:
async with self.session.get(self.url, allow_redirects=True) as response:
response.raise_for_status()
async with aiofiles.open(path, "wb") as file:
async for chunk in response.content.iter_chunked(self.chunk_size):

86
streamrip/main.py Normal file
View File

@ -0,0 +1,86 @@
import asyncio
import logging
from click import secho
from .client import Client
from .config import Config
from .media import Media, Pending
from .prompter import get_prompter
from .qobuz_client import QobuzClient
from .thread_pool import AsyncThreadPool
from .universal_url import parse_url
logger = logging.getLogger("streamrip")
class Main:
"""Provides all of the functionality called into by the CLI.
* Logs in to Clients and prompts for credentials
* Handles output logging
* Handles downloading Media
User input (urls) -> Main --> Download files & Output messages to terminal
"""
def __init__(self, config: Config):
# Pipeline:
# input URL -> (URL) -> (Pending) -> (Media) -> (Downloadable) -> downloaded audio file
self.pending: list[Pending] = []
self.media: list[Media] = []
self.config = config
self.clients: dict[str, Client] = {
"qobuz": QobuzClient(config),
# "tidal": TidalClient(config),
# "deezer": DeezerClient(config),
# "soundcloud": SoundcloudClient(config),
# "deezloader": DeezloaderClient(config),
}
async def add(self, url: str):
parsed = parse_url(url)
if parsed is None:
secho(f"Unable to parse url {url}", fg="red")
raise Exception
client = await self.get_logged_in_client(parsed.source)
self.pending.append(await parsed.into_pending(client, self.config))
async def get_logged_in_client(self, source: str):
client = self.clients[source]
if not client.logged_in:
prompter = get_prompter(client, self.config)
if not prompter.has_creds():
# Get credentials from user and log into client
await prompter.prompt_and_login()
prompter.save()
else:
# Log into client using credentials from config
await client.login()
assert client.logged_in
return client
async def resolve(self):
logger.info(f"Resolving {len(self.pending)} items")
assert len(self.pending) != 0
coros = [p.resolve() for p in self.pending]
new_media: list[Media] = await asyncio.gather(*coros)
self.media.extend(new_media)
self.pending.clear()
assert len(self.pending) == 0
async def rip(self):
c = self.config.session.downloads
if c.concurrency:
max_connections = c.max_connections if c.max_connections > 0 else 9999
else:
max_connections = 1
async with AsyncThreadPool(max_connections) as pool:
await pool.gather([item.rip() for item in self.media])
for client in self.clients.values():
await client.session.close()

View File

@ -1,7 +1,7 @@
"""Manages the information that will be embeded in the audio file."""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass
@ -114,11 +114,13 @@ class TrackMetadata:
composer: Optional[str]
@classmethod
def from_qobuz(cls, album: AlbumMetadata, resp) -> TrackMetadata:
def from_qobuz(cls, album: AlbumMetadata, resp: dict) -> TrackMetadata:
with open("tests/qobuz_track_resp.json", "w") as f:
json.dump(resp, f)
title = typed(resp["title"].strip(), str)
version = resp.get("version")
work = resp.get("work")
version = typed(resp.get("version"), str | None)
work = typed(resp.get("work"), str | None)
if version is not None and version not in title:
title = f"{title} ({version})"
if work is not None and work not in title:
@ -128,9 +130,20 @@ class TrackMetadata:
tracknumber = typed(resp.get("track_number", 1), int)
discnumber = typed(resp.get("media_number", 1), int)
artist = typed(safe_get(resp, "performer", "name"), str)
track_id = typed(resp["id"], str)
track_id = str(resp["id"])
bit_depth = typed(resp.get("maximum_bit_depth"), int | None)
sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None)
# Is the info included?
explicit = False
info = TrackInfo(id=track_id, quality=album.info.quality)
info = TrackInfo(
id=track_id,
quality=album.info.quality,
bit_depth=bit_depth,
explicit=explicit,
sampling_rate=sampling_rate,
work=work,
)
return cls(
info=info,
title=title,
@ -187,7 +200,6 @@ class TrackInfo:
quality: int
bit_depth: Optional[int] = None
booklets = None
explicit: bool = False
sampling_rate: Optional[int] = None
work: Optional[str] = None
@ -218,28 +230,30 @@ class AlbumMetadata:
def format_folder_path(self, formatter: str) -> str:
# Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate",
# "id", and "albumcomposer"
info = {
# "id", and "albumcomposer",
none_str = "Unknown"
info: dict[str, str | int] = {
"albumartist": self.albumartist,
"albumcomposer": self.albumcomposer or "None",
"bit_depth": self.info.bit_depth,
"albumcomposer": self.albumcomposer or none_str,
"bit_depth": self.info.bit_depth or none_str,
"id": self.info.id,
"sampling_rate": self.info.sampling_rate,
"sampling_rate": self.info.sampling_rate or none_str,
"title": self.album,
"year": self.year,
"container": self.info.container,
}
return formatter.format(**info)
@classmethod
def from_qobuz(cls, resp) -> AlbumMetadata:
def from_qobuz(cls, resp: dict) -> AlbumMetadata:
album = resp.get("title", "Unknown Album")
tracktotal = resp.get("tracks_count", 1)
genre = resp.get("genres_list") or resp.get("genre") or []
genres = list(set(re.findall(r"([^\u2192\/]+)", "/".join(genre))))
date = resp.get("release_date_original") or resp.get("release_date")
year = date[:4]
year = date[:4] if date is not None else "Unknown"
_copyright = resp.get("copyright")
_copyright = resp.get("copyright", "")
_copyright = re.sub(r"(?i)\(P\)", PHON_COPYRIGHT, _copyright)
_copyright = re.sub(r"(?i)\(C\)", COPYRIGHT, _copyright)
@ -253,7 +267,7 @@ class AlbumMetadata:
if isinstance(_label, dict):
_label = _label["name"]
label = typed(_label, str | None)
description = typed(resp.get("description"), str | None)
description = typed(resp.get("description") or None, str | None)
disctotal = typed(
max(
track.get("media_number", 1)
@ -270,16 +284,26 @@ class AlbumMetadata:
streamable = typed(resp.get("streamable", False), bool)
assert streamable
bit_depth = typed(resp.get("maximum_bit_depth"), int | None)
sampling_rate = typed(resp.get("maximum_sampling_rate"), int | None)
sampling_rate = typed(resp.get("maximum_sampling_rate"), int | float | None)
quality = get_quality_id(bit_depth, sampling_rate)
booklets = resp.get("goodies")
item_id = resp.get("id")
# Make sure it is non-empty list
booklets = typed(resp.get("goodies", None) or None, list | None)
item_id = str(resp.get("qobuz_id"))
if sampling_rate is not None:
sampling_rate *= 1000
if sampling_rate and bit_depth:
container = "FLAC"
else:
container = "MP3"
info = AlbumInfo(
item_id, quality, label, explicit, sampling_rate, bit_depth, booklets
id=item_id,
quality=quality,
container=container,
label=label,
explicit=explicit,
sampling_rate=sampling_rate,
bit_depth=bit_depth,
booklets=booklets,
)
return AlbumMetadata(
info,
@ -315,7 +339,7 @@ class AlbumMetadata:
raise NotImplementedError
@classmethod
def from_resp(cls, resp, source) -> AlbumMetadata:
def from_resp(cls, resp: dict, source: str) -> AlbumMetadata:
if source == "qobuz":
return cls.from_qobuz(resp)
if source == "tidal":
@ -331,12 +355,12 @@ class AlbumMetadata:
class AlbumInfo:
id: str
quality: int
container: str
label: Optional[str] = None
explicit: bool = False
sampling_rate: Optional[int] = None
bit_depth: Optional[int] = None
booklets = None
work: Optional[str] = None
booklets: list[dict] | None = None
def safe_get(d: dict, *keys, default=None) -> dict | str | int | list | None:

View File

@ -25,7 +25,7 @@ class CredentialPrompter(ABC):
raise NotImplemented
@abstractmethod
def prompt(self):
async def prompt_and_login(self):
"""Prompt for credentials in the appropriate way,
and save them to the configuration."""
raise NotImplemented
@ -47,7 +47,7 @@ class QobuzPrompter(CredentialPrompter):
c = self.config.session.qobuz
return c.email_or_userid != "" and c.password_or_token != ""
async def prompt(self):
async def prompt_and_login(self):
if not self.has_creds():
self._prompt_creds_and_set_session_config()
@ -61,13 +61,12 @@ class QobuzPrompter(CredentialPrompter):
except MissingCredentials:
self._prompt_creds_and_set_session_config()
secho("Successfully logged in to Qobuz", fg="green")
def _prompt_creds_and_set_session_config(self):
secho("Enter Qobuz email:", fg="green")
secho("Enter Qobuz email: ", fg="green", nl=False)
email = input()
secho(
"Enter Qobuz password (will not show on screen):",
fg="green",
)
secho("Enter Qobuz password (will not show on screen): ", fg="green", nl=False)
pwd = hashlib.md5(getpass(prompt="").encode("utf-8")).hexdigest()
secho(
f'Credentials saved to config file at "{self.config._path}"',
@ -98,7 +97,7 @@ class TidalPrompter(CredentialPrompter):
def has_creds(self) -> bool:
return len(self.config.session.tidal.access_token) > 0
async def prompt(self):
async def prompt_and_login(self):
device_code = await self.client._get_device_code()
login_link = f"https://{device_code}"
@ -156,7 +155,7 @@ class DeezerPrompter(CredentialPrompter):
c = self.config.session.deezer
return c.arl != ""
async def prompt(self):
async def prompt_and_login(self):
if not self.has_creds():
self._prompt_creds_and_set_session_config()
while True:
@ -205,7 +204,7 @@ PROMPTERS = {
}
def get_prompter(client: Client, config: Config):
def get_prompter(client: Client, config: Config) -> CredentialPrompter:
"""Return an instance of a prompter."""
p, c = PROMPTERS[client.source]
assert isinstance(client, c)

View File

@ -65,10 +65,11 @@ class QobuzClient(Client):
logger.info("App id/secrets not found, fetching")
c.app_id, c.secrets = await self._get_app_id_and_secrets()
# write to file
self.config.file.qobuz.app_id = c.app_id
self.config.file.qobuz.secrets = c.secrets
self.config.file.set_modified()
logger.debug(f"Found {c.app_id = } {c.secrets = }")
f = self.config.file
f.qobuz.app_id = c.app_id
f.qobuz.secrets = c.secrets
f.set_modified()
logger.info(f"Found {c.app_id = } {c.secrets = }")
self.session.headers.update({"X-App-Id": c.app_id})
self.secret = await self._get_valid_secret(c.secrets)
@ -165,9 +166,9 @@ class QobuzClient(Client):
assert status == 200
yield resp
async def get_downloadable(self, item_id: str, quality: int) -> Downloadable:
async def get_downloadable(self, item: dict, quality: int) -> Downloadable:
assert self.secret is not None and self.logged_in and 1 <= quality <= 4
item_id = item["id"]
status, resp_json = await self._request_file_url(item_id, quality, self.secret)
assert status == 200
stream_url = resp_json.get("url")

View File

@ -32,40 +32,8 @@ class SoundcloudClient(Client):
self.config.client_id = c.app_version = app_version
self.global_config.file.set_modified()
async def _announce(self):
resp = await self._api_request("announcements")
return resp.status == 200
async def _refresh_tokens(self) -> tuple[str, str]:
"""Return a valid client_id, app_version pair."""
STOCK_URL = "https://soundcloud.com/"
async with self.session.get(STOCK_URL) as resp:
page_text = await resp.text(encoding="utf-8")
*_, client_id_url_match = re.finditer(
r"<script\s+crossorigin\s+src=\"([^\"]+)\"", page_text
)
if client_id_url_match is None:
raise Exception("Could not find client ID in %s" % STOCK_URL)
client_id_url = client_id_url_match.group(1)
app_version_match = re.search(
r'<script>window\.__sc_version="(\d+)"</script>', page_text
)
if app_version_match is None:
raise Exception("Could not find app version in %s" % client_id_url_match)
app_version = app_version_match.group(1)
async with self.session.get(client_id_url) as resp:
page_text2 = await resp.text(encoding="utf-8")
client_id_match = re.search(r'client_id:\s*"(\w+)"', page_text2)
assert client_id_match is not None
client_id = client_id_match.group(1)
return client_id, app_version
async def get_metadata(self, item_id: str, media_type: str) -> dict:
raise NotImplementedError
async def get_downloadable(self, item: dict, _) -> SoundcloudDownloadable:
if not item["streamable"] or item["policy"] == "BLOCK":
@ -128,3 +96,38 @@ class SoundcloudClient(Client):
async def _resolve_url(self, url: str) -> dict:
resp = await self._api_request(f"resolve?url={url}")
return await resp.json()
async def _announce(self):
resp = await self._api_request("announcements")
return resp.status == 200
async def _refresh_tokens(self) -> tuple[str, str]:
"""Return a valid client_id, app_version pair."""
STOCK_URL = "https://soundcloud.com/"
async with self.session.get(STOCK_URL) as resp:
page_text = await resp.text(encoding="utf-8")
*_, client_id_url_match = re.finditer(
r"<script\s+crossorigin\s+src=\"([^\"]+)\"", page_text
)
if client_id_url_match is None:
raise Exception("Could not find client ID in %s" % STOCK_URL)
client_id_url = client_id_url_match.group(1)
app_version_match = re.search(
r'<script>window\.__sc_version="(\d+)"</script>', page_text
)
if app_version_match is None:
raise Exception("Could not find app version in %s" % client_id_url_match)
app_version = app_version_match.group(1)
async with self.session.get(client_id_url) as resp:
page_text2 = await resp.text(encoding="utf-8")
client_id_match = re.search(r'client_id:\s*"(\w+)"', page_text2)
assert client_id_match is not None
client_id = client_id_match.group(1)
return client_id, app_version

View File

@ -1,15 +1,17 @@
import logging
import os
from dataclasses import dataclass
from enum import Enum
from typing import Generator
import aiofiles
import mutagen.id3 as id3
from mutagen.flac import FLAC, Picture
from mutagen.id3 import APIC, ID3, ID3NoHeaderError
from mutagen.id3 import APIC # type: ignore
from mutagen.id3 import ID3
from mutagen.mp4 import MP4, MP4Cover
from .metadata import Covers, TrackMetadata
from .metadata import TrackMetadata
logger = logging.getLogger("streamrip")
FLAC_MAX_BLOCKSIZE = 16777215 # 16.7 MB
@ -29,7 +31,6 @@ MP4_KEYS = (
"\xa9too",
"cprt",
"cpil",
"covr",
"trkn",
"disk",
None,
@ -38,24 +39,23 @@ MP4_KEYS = (
)
MP3_KEYS = (
id3.TIT2,
id3.TPE1,
id3.TALB,
id3.TPE2,
id3.TCOM,
id3.TYER,
id3.COMM,
id3.TT1,
id3.TT1,
id3.GP1,
id3.TCON,
id3.USLT,
id3.TEN,
id3.TCOP,
id3.TCMP,
None,
id3.TRCK,
id3.TPOS,
id3.TIT2, # type: ignore
id3.TPE1, # type: ignore
id3.TALB, # type: ignore
id3.TPE2, # type: ignore
id3.TCOM, # type: ignore
id3.TYER, # type: ignore
id3.COMM, # type: ignore
id3.TT1, # type: ignore
id3.TT1, # type: ignore
id3.GP1, # type: ignore
id3.TCON, # type: ignore
id3.USLT, # type: ignore
id3.TEN, # type: ignore
id3.TCOP, # type: ignore
id3.TCMP, # type: ignore
id3.TRCK, # type: ignore
id3.TPOS, # type: ignore
None,
None,
None,
@ -77,7 +77,6 @@ METADATA_TYPES = (
"encoder",
"copyright",
"compilation",
"cover",
"tracknumber",
"discnumber",
"tracktotal",
@ -102,14 +101,11 @@ class Container(Enum):
elif self == Container.AAC:
return MP4(path)
elif self == Container.MP3:
try:
return ID3(path)
except ID3NoHeaderError:
return ID3()
return ID3(path)
# unreachable
return {}
def get_tag_pairs(self, meta) -> Generator:
def get_tag_pairs(self, meta) -> list[tuple]:
if self == Container.FLAC:
return self._tag_flac(meta)
elif self == Container.MP3:
@ -117,9 +113,10 @@ class Container(Enum):
elif self == Container.AAC:
return self._tag_aac(meta)
# unreachable
yield
return []
def _tag_flac(self, meta):
def _tag_flac(self, meta) -> list[tuple]:
out = []
for k, v in FLAC_KEY.items():
tag = self._attr_from_meta(meta, k)
if tag:
@ -131,9 +128,11 @@ class Container(Enum):
}:
tag = f"{int(tag):02}"
yield (v, str(tag))
out.append((v, str(tag)))
return out
def _tag_mp3(self, meta):
out = []
for k, v in MP3_KEY.items():
if k == "tracknumber":
text = f"{meta.tracknumber}/{meta.tracktotal}"
@ -143,9 +142,11 @@ class Container(Enum):
text = self._attr_from_meta(meta, k)
if text is not None and v is not None:
yield (v.__name__, v(encoding=3, text=text))
out.append((v.__name__, v(encoding=3, text=text)))
return out
def _tag_aac(self, meta):
out = []
for k, v in MP4_KEY.items():
if k == "tracknumber":
text = [(meta.tracknumber, meta.tracktotal)]
@ -155,7 +156,8 @@ class Container(Enum):
text = self._attr_from_meta(meta, k)
if v is not None and text is not None:
yield (v, text)
out.append((v, text))
return out
def _attr_from_meta(self, meta: TrackMetadata, attr: str) -> str:
# TODO: verify this works
@ -172,7 +174,7 @@ class Container(Enum):
else:
return str(getattr(meta.album, attr))
def tag_audio(self, audio, tags):
def tag_audio(self, audio, tags: list[tuple]):
for k, v in tags:
audio[k] = v
@ -209,7 +211,7 @@ class Container(Enum):
async def tag_file(path: str, meta: TrackMetadata, cover_path: str | None):
ext = path.split(".")[-1].upper()
ext = path.split(".")[-1].lower()
if ext == "flac":
container = Container.FLAC
elif ext == "m4a":
@ -221,6 +223,7 @@ async def tag_file(path: str, meta: TrackMetadata, cover_path: str | None):
audio = container.get_mutagen_class(path)
tags = container.get_tag_pairs(meta)
logger.debug("Tagging with %s", tags)
container.tag_audio(audio, tags)
if cover_path is not None:
await container.embed_cover(audio, cover_path)

21
streamrip/thread_pool.py Normal file
View File

@ -0,0 +1,21 @@
import asyncio
class AsyncThreadPool:
"""Allows a maximum of `max_workers` coroutines to be running at once."""
def __init__(self, max_workers: int):
self.s = asyncio.Semaphore(max_workers)
async def gather(self, coros: list):
async def _wrapper(coro):
async with self.s:
await coro
return await asyncio.gather(*(_wrapper(c) for c in coros))
async def __aenter__(self):
return self
async def __aexit__(self, *_):
pass

View File

@ -45,6 +45,9 @@ class Track(Media):
if self.config.session.conversion.enabled:
await self._convert()
# if self.cover_path is not None:
# os.remove(self.cover_path)
async def _tag(self):
await tag_file(self.download_path, self.meta, self.cover_path)
@ -90,14 +93,14 @@ class PendingTrack(Pending):
client: Client
config: Config
folder: str
cover_path: str
cover_path: str | None
async def resolve(self) -> Track:
resp = await self.client.get_metadata({"id": self.id}, "track")
resp = await self.client.get_metadata(self.id, "track")
meta = TrackMetadata.from_resp(self.album, self.client.source, resp)
quality = getattr(self.config.session, self.client.source).quality
assert isinstance(quality, int)
downloadable = await self.client.get_downloadable(self.id, quality)
downloadable = await self.client.get_downloadable({"id": self.id}, quality)
return Track(meta, downloadable, self.config, self.folder, self.cover_path)
@ -114,18 +117,20 @@ class PendingSingle(Pending):
config: Config
async def resolve(self) -> Track:
resp = await self.client.get_metadata({"id": self.id}, "track")
resp = await self.client.get_metadata(self.id, "track")
album = AlbumMetadata.from_resp(resp["album"], self.client.source)
meta = TrackMetadata.from_resp(album, self.client.source, resp)
quality = getattr(self.config.session, self.client.source).quality
assert isinstance(quality, int)
folder = self._format_folder(album)
folder = os.path.join(
self.config.session.downloads.folder, self._format_folder(album)
)
os.makedirs(folder, exist_ok=True)
embedded_cover_path, downloadable = await asyncio.gather(
self._download_cover(album.covers, folder),
self.client.get_downloadable(self.id, quality),
self.client.get_downloadable({"id": self.id}, quality),
)
return Track(meta, downloadable, self.config, folder, embedded_cover_path)

180
streamrip/universal_url.py Normal file
View File

@ -0,0 +1,180 @@
from __future__ import annotations
import re
from abc import ABC, abstractmethod
from click import secho
from .album import PendingAlbum
from .client import Client
from .config import Config
from .media import Pending
from .track import PendingSingle
from .validation_regexps import (
DEEZER_DYNAMIC_LINK_REGEX,
LASTFM_URL_REGEX,
QOBUZ_INTERPRETER_URL_REGEX,
SOUNDCLOUD_URL_REGEX,
URL_REGEX,
YOUTUBE_URL_REGEX,
)
class URL(ABC):
match: re.Match
source: str
def __init__(self, match: re.Match, source: str):
self.match = match
self.source = source
@abstractmethod
def from_str(cls, url: str) -> URL | None:
raise NotImplementedError
@abstractmethod
async def into_pending(self, client: Client, config: Config) -> Pending:
raise NotImplementedError
class GenericURL(URL):
@classmethod
def from_str(cls, url: str) -> URL | None:
generic_url = URL_REGEX.match(url)
if generic_url is None:
return None
source = generic_url.group(1)
return cls(generic_url, source)
async def into_pending(self, client: Client, config: Config) -> Pending:
source, media_type, item_id = self.match.groups()
assert client.source == source
if media_type == "track":
return PendingSingle(item_id, client, config)
elif media_type == "album":
return PendingAlbum(item_id, client, config)
else:
raise NotImplementedError
class QobuzInterpreterURL(URL):
interpreter_artist_regex = re.compile(r"getSimilarArtist\(\s*'(\w+)'")
@classmethod
def from_str(cls, url: str) -> URL | None:
qobuz_interpreter_url = QOBUZ_INTERPRETER_URL_REGEX.match(url)
if qobuz_interpreter_url is None:
return None
return cls(qobuz_interpreter_url, "qobuz")
async def into_pending(self, client: Client, config: Config) -> Pending:
url = self.match.group(0)
artist_id = await self.extract_interpreter_url(url, client)
raise NotImplementedError
# return PendingArtist()
@staticmethod
async def extract_interpreter_url(url: str, client: Client) -> str:
"""Extract artist ID from a Qobuz interpreter url.
:param url: Urls of the form "https://www.qobuz.com/us-en/interpreter/{artist}/download-streaming-albums"
:type url: str
:rtype: str
"""
async with client.session.get(url) as resp:
match = QobuzInterpreterURL.interpreter_artist_regex.search(
await resp.text()
)
if match:
return match.group(1)
raise Exception(
"Unable to extract artist id from interpreter url. Use a "
"url that contains an artist id."
)
class DeezerDynamicURL(URL):
pass
class SoundCloudURL(URL):
pass
class LastFmURL(URL):
pass
def parse_url(url: str) -> URL | None:
url = url.strip()
parsed_urls: list[URL | None] = [
GenericURL.from_str(url),
QobuzInterpreterURL.from_str(url),
# TODO: the rest of the url types
]
return next((u for u in parsed_urls if u is not None), None)
# TODO: recycle this class
class UniversalURL:
"""
>>> u = UniversalURL('https://sampleurl.com')
>>> pending = await u.into_pending_item()
"""
source: str
media_type: str | None
match: re.Match | None
def __init__(self, url: str):
url = url.strip()
qobuz_interpreter_url = QOBUZ_INTERPRETER_URL_REGEX.match(url)
if qobuz_interpreter_url is not None:
self.source = "qobuz"
self.media_type = "artist"
self.url_type = "interpreter"
self.match = qobuz_interpreter_url
return
deezer_dynamic_url = DEEZER_DYNAMIC_LINK_REGEX.match(url)
if deezer_dynamic_url is not None:
self.match = deezer_dynamic_url
self.source = "deezer"
self.media_type = None
self.url_type = "deezer_dynamic"
return
soundcloud_url = SOUNDCLOUD_URL_REGEX.match(url)
if soundcloud_url is not None:
self.match = soundcloud_url
self.source = "soundcloud"
self.media_type = None
self.url_type = "soundcloud"
return
generic_url = URL_REGEX.match(url)
if generic_url is not None:
self.match = generic_url
self.source = self.match.group(1)
self.media_type = self.match.group(2)
self.url_type = "generic"
async def into_pending_item(self, client: Client, config: Config) -> Pending | None:
if self.url_type == "generic":
assert self.match is not None
item_id = self.match.group(3)
assert isinstance(item_id, str)
assert client.source == self.source
if self.media_type == "track":
return PendingSingle(item_id, client, config)
elif self.media_type == "album":
return PendingAlbum(item_id, client, config)
else:
raise NotImplementedError
else:
raise NotImplementedError

View File

@ -9,8 +9,17 @@ HOME = Path.home()
LOG_DIR = CACHE_DIR = CONFIG_DIR = APP_DIR
DEFAULT_CONFIG_PATH = os.path.join(CONFIG_DIR, "config.toml")
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.toml")
DB_PATH = os.path.join(LOG_DIR, "downloads.db")
FAILED_DB_PATH = os.path.join(LOG_DIR, "failed_downloads.db")
DOWNLOADS_DIR = os.path.join(HOME, "StreamripDownloads")
# file shipped with script
BLANK_CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.toml")
DEFAULT_DOWNLOADS_FOLDER = os.path.join(HOME, "StreamripDownloads")
DEFAULT_DOWNLOADS_DB_PATH = os.path.join(LOG_DIR, "downloads.db")
DEFAULT_FAILED_DOWNLOADS_DB_PATH = os.path.join(LOG_DIR, "failed_downloads.db")
DEFAULT_YOUTUBE_VIDEO_DOWNLOADS_FOLDER = os.path.join(
HOME, "StreamripDownloads", "YouTubeVideos"
)

24
tests/fixtures/clients.py vendored Normal file
View File

@ -0,0 +1,24 @@
import hashlib
import os
import pytest
from util import arun
from streamrip.config import Config
from streamrip.qobuz_client import QobuzClient
@pytest.fixture(scope="session")
def qobuz_client():
config = Config.defaults()
config.session.qobuz.email_or_userid = os.environ["QOBUZ_EMAIL"]
config.session.qobuz.password_or_token = hashlib.md5(
os.environ["QOBUZ_PASSWORD"].encode("utf-8")
).hexdigest()
if "QOBUZ_APP_ID" in os.environ and "QOBUZ_SECRETS" in os.environ:
config.session.qobuz.app_id = os.environ["QOBUZ_APP_ID"]
config.session.qobuz.secrets = os.environ["QOBUZ_SECRETS"].split(",")
client = QobuzClient(config)
arun(client.login())
return client

16
tests/fixtures/config.py vendored Normal file
View File

@ -0,0 +1,16 @@
import hashlib
import os
import pytest
from streamrip.config import Config
@pytest.fixture
def config():
c = Config.defaults()
c.session.qobuz.email_or_userid = os.environ["QOBUZ_EMAIL"]
c.session.qobuz.password_or_token = hashlib.md5(
os.environ["QOBUZ_PASSWORD"].encode("utf-8")
).hexdigest()
return c

17
tests/fixtures/util.py vendored Normal file
View File

@ -0,0 +1,17 @@
import asyncio
loop = asyncio.new_event_loop()
def arun(coro):
return loop.run_until_complete(coro)
def afor(async_gen):
async def _afor(async_gen):
l = []
async for item in async_gen:
l.append(item)
return l
return arun(_afor(async_gen))

103
tests/qobuz_album_resp.json Normal file
View File

@ -0,0 +1,103 @@
{
"maximum_bit_depth": 24,
"image": {
"small": "https://static.qobuz.com/images/covers/32/10/0603497941032_230.jpg",
"thumbnail": "https://static.qobuz.com/images/covers/32/10/0603497941032_50.jpg",
"large": "https://static.qobuz.com/images/covers/32/10/0603497941032_600.jpg",
"back": null
},
"media_count": 1,
"artist": {
"image": null,
"name": "Fleetwood Mac",
"id": 132127,
"albums_count": 424,
"slug": "fleetwood-mac",
"picture": null
},
"artists": [
{ "id": 132127, "name": "Fleetwood Mac", "roles": ["main-artist"] }
],
"upc": "0603497941032",
"released_at": 223858800,
"label": {
"name": "Rhino - Warner Records",
"id": 323970,
"albums_count": 3002,
"supplier_id": 5,
"slug": "rhino-warner-records"
},
"title": "Rumours",
"qobuz_id": 19512572,
"version": "2001 Remaster",
"url": "https://www.qobuz.com/fr-fr/album/rumours-fleetwood-mac/0603497941032",
"duration": 2387,
"parental_warning": false,
"popularity": 0,
"tracks_count": 11,
"genre": {
"path": [112, 119],
"color": "#5eabc1",
"name": "Rock",
"id": 119,
"slug": "rock"
},
"maximum_channel_count": 2,
"id": "0603497941032",
"maximum_sampling_rate": 96,
"articles": [],
"release_date_original": "1977-02-04",
"release_date_download": "1977-02-04",
"release_date_stream": "1977-02-04",
"purchasable": true,
"streamable": true,
"previewable": true,
"sampleable": true,
"downloadable": true,
"displayable": true,
"purchasable_at": 1693551600,
"streamable_at": 1690354800,
"hires": true,
"hires_streamable": true,
"awards": [
{
"name": "The Qobuz Ideal Discography",
"slug": "qobuz",
"award_slug": "discotheque_ideale",
"awarded_at": 1420066800,
"award_id": "70",
"publication_id": "2",
"publication_name": "Qobuz",
"publication_slug": "qobuz"
}
],
"goodies": [],
"area": null,
"catchline": "",
"composer": {
"id": 573076,
"name": "Various Composers",
"slug": "various-composers",
"albums_count": 583621,
"picture": null,
"image": null
},
"created_at": 0,
"genres_list": ["Pop/Rock", "Pop/Rock\u2192Rock"],
"period": null,
"copyright": "\u00a9 1977 Warner Records Inc. \u2117 1977 Warner Records Inc. Marketed by Rhino Entertainment Company, A Warner Music Group Company.",
"is_official": true,
"maximum_technical_specifications": "24 bits / 96.0 kHz - Stereo",
"product_sales_factors_monthly": 0,
"product_sales_factors_weekly": 0,
"product_sales_factors_yearly": 0,
"product_type": "album",
"product_url": "/fr-fr/album/rumours-fleetwood-mac/0603497941032",
"recording_information": "",
"relative_url": "/album/rumours-fleetwood-mac/0603497941032",
"release_tags": ["remaster"],
"release_type": "album",
"slug": "rumours-fleetwood-mac",
"subtitle": "Fleetwood Mac",
"description": ""
}

File diff suppressed because one or more lines are too long

View File

@ -69,7 +69,7 @@ def test_sample_config_data_fields(sample_config_data):
arl="testarl", quality=2, use_deezloader=True, deezloader_warnings=True
),
soundcloud=SoundcloudConfig(
client_id="clientid", app_version="appverison", quality=0
client_id="clientid", app_version="appversion", quality=0
),
youtube=YoutubeConfig(
video_downloads_folder="videodownloadsfolder",
@ -82,10 +82,14 @@ def test_sample_config_data_fields(sample_config_data):
folder_format="{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]",
track_format="{tracknumber}. {artist} - {title}{explicit}",
restrict_characters=False,
truncate=True,
truncate_to=200,
),
artwork=ArtworkConfig(
embed=True, size="large", max_width=-1, max_height=-1, keep_hires_cover=True
embed=True,
embed_size="large",
embed_max_width=-1,
save_artwork=True,
saved_max_width=-1,
),
metadata=MetadataConfig(
set_playlist_to_album=True, new_playlist_tracknumbers=True, exclude=[]
@ -112,6 +116,7 @@ def test_sample_config_data_fields(sample_config_data):
bit_depth=24,
lossy_bitrate=320,
),
misc=MiscConfig(version="2.0"),
_modified=False,
)
assert sample_config_data.downloads == test_config.downloads

View File

@ -11,6 +11,7 @@ concurrency = true
# The maximum number of tracks to download at once
# If you have very fast internet, you will benefit from a higher value,
# A value that is too high for your bandwidth may cause slowdowns
# Set to -1 for no limit
max_connections = 3
# Max number of API requests to handle per minute
# Set to -1 for no limit
@ -70,7 +71,7 @@ deezloader_warnings = true
quality = 0
# This changes periodically, so it needs to be updated
client_id = "clientid"
app_version = "appverison"
app_version = "appversion"
[youtube]
# Only 0 is available for now
@ -125,14 +126,16 @@ embed = true
# The size of the artwork to embed. Options: thumbnail, small, large, original.
# "original" images can be up to 30MB, and may fail embedding.
# Using "large" is recommended.
size = "large"
# Both of these options limit the size of the embedded artwork. If their values
# are larger than the actual dimensions of the image, they will be ignored.
# If either value is -1, the image is left untouched.
max_width = -1
max_height = -1
embed_size = "large"
# If this is set to a value > 0, max(width, height) of the embedded art will be set to this value in pixels
# Proportions of the image will remain the same
embed_max_width = -1
# Save the cover image at the highest quality as a seperate jpg file
keep_hires_cover = true
save_artwork = true
# If this is set to a value > 0, max(width, height) of the saved art will be set to this value in pixels
# Proportions of the image will remain the same
saved_max_width = -1
[metadata]
# Sets the value of the 'ALBUM' field in the metadata to the playlist's name.
@ -150,16 +153,16 @@ exclude = []
# template
add_singles_to_folder = false
# Available keys: "albumartist", "title", "year", "bit_depth", "sampling_rate",
# "container", "id", and "albumcomposer"
# "id", and "albumcomposer"
folder_format = "{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]"
# Available keys: "tracknumber", "artist", "albumartist", "composer", "title",
# and "albumcomposer"
# and "albumcomposer", "explicit"
track_format = "{tracknumber}. {artist} - {title}{explicit}"
# Only allow printable ASCII characters in filenames.
restrict_characters = false
# Truncate the filename if it is greater than 120 characters
# Truncate the filename if it is greater than this number of characters
# Setting this to false may cause downloads to fail on some systems
truncate = true
truncate_to = 200
# Last.fm playlists are downloaded by searching for the titles of the tracks
[lastfm]

64
tests/test_meta.py Normal file
View File

@ -0,0 +1,64 @@
import json
from streamrip.metadata import *
with open("tests/qobuz_album_resp.json") as f:
qobuz_album_resp = json.load(f)
with open("tests/qobuz_track_resp.json") as f:
qobuz_track_resp = json.load(f)
def test_album_metadata_qobuz():
m = AlbumMetadata.from_qobuz(qobuz_album_resp)
info = m.info
assert info.id == "19512572"
assert info.quality == 3
assert info.container == "FLAC"
assert info.label == "Rhino - Warner Records"
assert info.explicit == False
assert info.sampling_rate == 96
assert info.bit_depth == 24
assert info.booklets == None
assert m.album == "Rumours"
assert m.albumartist == "Fleetwood Mac"
assert m.year == "1977"
assert "Pop" in m.genre
assert "Rock" in m.genre
assert not m.covers.empty()
assert m.albumcomposer == "Various Composers"
assert m.comment == None
assert m.compilation == None
assert (
m.copyright
== "© 1977 Warner Records Inc. ℗ 1977 Warner Records Inc. Marketed by Rhino Entertainment Company, A Warner Music Group Company."
)
assert m.date == "1977-02-04"
assert m.description == None
assert m.disctotal == 1
assert m.encoder == None
assert m.grouping == None
assert m.lyrics == None
assert m.purchase_date == None
assert m.tracktotal == 11
def test_track_metadata_qobuz():
a = AlbumMetadata.from_qobuz(qobuz_track_resp["album"])
t = TrackMetadata.from_qobuz(a, qobuz_track_resp)
assert t.title == "Dreams (2001 Remaster)"
info = t.info
assert info.id == "19512574"
assert info.quality == 3
assert info.bit_depth == 24
assert info.sampling_rate == 96
assert info.work is None
assert t.title == "Dreams (2001 Remaster)"
assert t.album == a
assert t.artist == "Fleetwood Mac"
assert t.tracknumber == 2
assert t.discnumber == 1
assert t.composer == None

View File

@ -1,7 +1,4 @@
import asyncio
import hashlib
import logging
import os
import pytest
from util import afor, arun
@ -14,21 +11,10 @@ from streamrip.qobuz_client import QobuzClient
logger = logging.getLogger("streamrip")
@pytest.mark.usefixtures("qobuz_client")
@pytest.fixture
def config():
c = Config.defaults()
c.session.qobuz.email_or_userid = os.environ["QOBUZ_EMAIL"]
c.session.qobuz.password_or_token = hashlib.md5(
os.environ["QOBUZ_PASSWORD"].encode("utf-8")
).hexdigest()
return c
@pytest.fixture
def client(config):
c = QobuzClient(config) # type: ignore
arun(c.login())
return c
def client(qobuz_client):
return qobuz_client
def test_client_raises_missing_credentials():

30
tests/test_track.py Normal file
View File

@ -0,0 +1,30 @@
import os
import shutil
from util import arun
from streamrip.downloadable import Downloadable
from streamrip.qobuz_client import QobuzClient
from streamrip.track import PendingSingle, Track
def test_pending_resolve(qobuz_client: QobuzClient):
qobuz_client.config.session.downloads.folder = "./tests"
p = PendingSingle("19512574", qobuz_client, qobuz_client.config)
t = arun(p.resolve())
dir = "tests/Fleetwood Mac - Rumours (1977) [FLAC] [24B-96kHz]"
assert os.path.isdir(dir)
assert os.path.isfile(os.path.join(dir, "cover.jpg"))
assert os.path.isfile(os.path.join(dir, "embed_cover.jpg"))
assert isinstance(t, Track)
assert isinstance(t.downloadable, Downloadable)
assert t.cover_path is not None
shutil.rmtree(dir)
# def test_pending_resolve_mp3(qobuz_client: QobuzClient):
# qobuz_client.config.session.qobuz.quality = 1
# p = PendingSingle("19512574", qobuz_client, qobuz_client.config)
# t = arun(p.resolve())
# assert isinstance(t, Track)
# assert False