This commit is contained in:
Nathan Thomas 2023-10-22 00:07:17 -07:00
parent 36fd27c83c
commit 7cbd77edc5
13 changed files with 845 additions and 641 deletions

961
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -40,6 +40,8 @@ aiohttp = "^3.7"
aiodns = "^3.0.0"
aiolimiter = "^1.1.0"
pytest-mock = "^3.11.1"
pytest-asyncio = "^0.21.1"
rich = "^13.6.0"
[tool.poetry.urls]
"Bug Reports" = "https://github.com/nathom/streamrip/issues"
@ -53,7 +55,15 @@ black = "^22"
isort = "^5.9.3"
flake8 = "^3.9.2"
setuptools = "^67.4.0"
pytest = "^6.2.5"
pytest = "^7.4"
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q"
testpaths = [ "tests" ]
log_level = "DEBUG"
asyncio_mode = 'auto'
log_cli = true
[build-system]
requires = ["poetry-core>=1.0.0"]

View File

@ -1,5 +1,75 @@
import asyncio
import os
import aiohttp
from PIL import Image
from .config import ArtworkConfig
from .downloadable import BasicDownloadable
from .metadata import Covers
async def download_artwork(
session: aiohttp.ClientSession, folder: str, covers: Covers, config: ArtworkConfig
) -> tuple[str | None, str | None]:
"""Download artwork, which may include a seperate file to keep.
Also updates the passed Covers object with downloaded filepaths.
Because it is a single, we will assume that none of the covers have already been
downloaded, so existing paths in `covers` will be discarded and overwritten.
Args:
covers (Covers): The set of available covers.
Returns:
The path of the cover to embed, or None if there either is no artwork available or
if artwork embedding is turned off.
"""
if (not config.save_artwork and not config.embed) or covers.empty():
# No need to download anything
return None, None
downloadables = []
saved_cover_path = None
if config.save_artwork:
_, l_url, _ = covers.largest()
assert l_url is not None # won't be true unless covers is empty
saved_cover_path = os.path.join(folder, "cover.jpg")
downloadables.append(
BasicDownloadable(session, l_url, "jpg").download(
saved_cover_path, lambda _: None
)
)
embed_cover_path = None
if config.embed:
_, embed_url, _ = covers.get_size(config.embed_size)
assert embed_url is not None
embed_cover_path = os.path.join(folder, "embed_cover.jpg")
downloadables.append(
BasicDownloadable(session, embed_url, "jpg").download(
embed_cover_path, lambda _: None
)
)
await asyncio.gather(*downloadables)
# Update `covers` to reflect the current download state
if config.save_artwork:
assert saved_cover_path is not None
covers.set_largest_path(saved_cover_path)
if config.saved_max_width > 0:
downscale_image(saved_cover_path, config.saved_max_width)
if config.embed:
assert embed_cover_path is not None
covers.set_path(config.embed_size, embed_cover_path)
if config.embed_max_width > 0:
downscale_image(embed_cover_path, config.embed_max_width)
return embed_cover_path, saved_cover_path
def downscale_image(input_image_path: str, max_dimension: int):
"""Downscale an image in place given a maximum allowed dimension.

View File

@ -168,7 +168,7 @@ class FilepathsConfig:
restrict_characters: bool
# Truncate the filename if it is greater than 120 characters
# Setting this to false may cause downloads to fail on some systems
truncate: bool
truncate_to: int
@dataclass(slots=True)
@ -203,6 +203,11 @@ class ThemeConfig:
progress_bar: str
@dataclass(slots=True)
class MiscConfig:
version: str
@dataclass(slots=True)
class ConfigData:
toml: TOMLDocument
@ -224,6 +229,8 @@ class ConfigData:
database: DatabaseConfig
conversion: ConversionConfig
misc: MiscConfig
_modified: bool = False
@classmethod
@ -247,6 +254,7 @@ class ConfigData:
theme = ThemeConfig(**toml["theme"]) # type: ignore
database = DatabaseConfig(**toml["database"]) # type: ignore
conversion = ConversionConfig(**toml["conversion"]) # type: ignore
misc = MiscConfig(**toml["misc"]) # type: ignore
return cls(
toml=toml,
@ -264,6 +272,7 @@ class ConfigData:
theme=theme,
database=database,
conversion=conversion,
misc=misc,
)
@classmethod

View File

@ -155,13 +155,13 @@ add_singles_to_folder = false
# "id", and "albumcomposer"
folder_format = "{albumartist} - {title} ({year}) [{container}] [{bit_depth}B-{sampling_rate}kHz]"
# Available keys: "tracknumber", "artist", "albumartist", "composer", "title",
# and "albumcomposer"
# and "albumcomposer", "explicit"
track_format = "{tracknumber}. {artist} - {title}{explicit}"
# Only allow printable ASCII characters in filenames.
restrict_characters = false
# Truncate the filename if it is greater than 120 characters
# Truncate the filename if it is greater than this number of characters
# Setting this to false may cause downloads to fail on some systems
truncate = true
truncate_to = 120
# Last.fm playlists are downloaded by searching for the titles of the tracks
[lastfm]

View File

@ -1,23 +0,0 @@
from .client import Client
class DeezloaderClient(Client):
source = "deezer"
max_quality = 2
def __init__(self, config):
self.session = SRSession()
self.global_config = config
self.logged_in = True
async def search(self, query: str, media_type: str, limit: int = 200):
pass
async def login(self):
pass
async def get(self, item_id: str, media_type: str):
pass
async def get_downloadable(self, item_id: str, quality: int):
pass

View File

@ -1,35 +1,13 @@
from string import Formatter, printable
from string import printable
from pathvalidate import sanitize_filename
from pathvalidate import sanitize_filename # type: ignore
ALLOWED_CHARS = set(printable)
def clean_filename(fn: str, restrict=False) -> str:
def clean_filename(fn: str, restrict: bool = False) -> str:
path = str(sanitize_filename(fn))
if restrict:
allowed_chars = set(printable)
path = "".join(c for c in path if c in allowed_chars)
path = "".join(c for c in path if c in ALLOWED_CHARS)
return path
def clean_format(formatter: str, format_info: dict, restrict: bool = False) -> str:
"""Format track or folder names sanitizing every formatter key.
:param formatter:
:type formatter: str
:param kwargs:
"""
fmt_keys = filter(None, (i[1] for i in Formatter().parse(formatter)))
clean_dict = {}
for key in fmt_keys:
if isinstance(format_info.get(key), (str, float)):
clean_dict[key] = clean_filename(str(format_info[key]), restrict=restrict)
elif key == "explicit":
clean_dict[key] = " (Explicit) " if format_info.get(key, False) else ""
elif isinstance(format_info.get(key), int): # track/discnumber
clean_dict[key] = f"{format_info[key]:02}"
else:
clean_dict[key] = "Unknown"
return formatter.format(**clean_dict)

View File

@ -5,20 +5,8 @@ from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from string import Formatter
from typing import Optional, Type, TypeVar
# from .constants import (
# ALBUM_KEYS,
# COPYRIGHT,
# FLAC_KEY,
# MP3_KEY,
# MP4_KEY,
# PHON_COPYRIGHT,
# TIDAL_Q_MAP,
# TRACK_KEYS,
# )
logger = logging.getLogger("streamrip")
@ -29,81 +17,85 @@ def get_album_track_ids(source: str, resp) -> list[str]:
return [track["id"] for track in tracklist]
# (url to cover, downloaded path of cover)
@dataclass(slots=True)
class Covers:
CoverEntry = tuple[str | None, str | None]
thumbnail: CoverEntry
small: CoverEntry
large: CoverEntry
original: CoverEntry
CoverEntry = tuple[str, str | None, str | None]
_covers: list[CoverEntry]
def __init__(self):
# ordered from largest to smallest
self._covers = [
("original", None, None),
("large", None, None),
("small", None, None),
("thumbnail", None, None),
]
def set_cover(self, size: str, url: str | None, path: str | None):
i = self._indexof(size)
self._covers[i] = (size, url, path)
def set_cover_url(self, size: str, url: str):
self.set_cover(size, url, None)
@staticmethod
def _indexof(size: str) -> int:
if size == "original":
return 0
if size == "large":
return 1
if size == "small":
return 2
if size == "thumbnail":
return 3
raise Exception(f"Invalid {size = }")
def empty(self) -> bool:
return all(
url is None
for url, _ in (self.original, self.large, self.small, self.thumbnail)
)
return all(url is None for _, url, _ in self._covers)
def set_largest_path(self, path: str):
for size, url, _ in self._covers:
if url is not None:
self.set_cover(size, url, path)
return
raise Exception(f"No covers found in {self}")
def set_path(self, size: str, path: str):
i = self._indexof(size)
size, url, _ = self._covers[i]
self._covers[i] = (size, url, path)
def largest(self) -> CoverEntry:
# Return first item with url
if self.original[0]:
return self.original
for s, u, p in self._covers:
if u is not None:
return (s, u, p)
if self.large[0]:
return self.large
if self.small[0]:
return self.small
if self.thumbnail[0]:
return self.thumbnail
raise Exception("No covers found")
raise Exception(f"No covers found in {self}")
@classmethod
def from_qobuz(cls, resp):
cover_urls = {k: (v, None) for k, v in resp["image"].items()}
cover_urls["original"] = ("org".join(cover_urls["large"].rsplit("600", 1)), None) # type: ignore
return cls(**cover_urls) # type: ignore
img = resp["image"]
c = cls()
c.set_cover_url("original", "org".join(img["large"].rsplit("600", 1)))
c.set_cover_url("large", img["large"])
c.set_cover_url("small", img["small"])
c.set_cover_url("thumbnail", img["thumbnail"])
return c
def get_size(self, size: str) -> CoverEntry:
"""Get the cover size, or the largest cover smaller than `size`.
i = self._indexof(size)
size, url, path = self._covers[i]
if url is not None:
return (size, url, path)
if i + 1 < len(self._covers):
for s, u, p in self._covers[i + 1 :]:
if u is not None:
return (s, u, p)
raise Exception(f"Cover not found for {size = }. Available: {self}")
Args:
size (str):
Returns:
CoverEntry
Raises:
Exception: If a suitable cover doesn't exist
"""
fallback = False
if size == "original":
if self.original[0] is not None:
return self.original
else:
fallback = True
if fallback or size == "large":
if self.large[0] is not None:
return self.large
else:
fallback = True
if fallback or size == "small":
if self.small[0] is not None:
return self.small
else:
fallback = True
# At this point, either size == 'thumbnail' or nothing else was found
if self.thumbnail[0] is None:
raise Exception(f"No covers found for {size = }. Covers: {self}")
return self.thumbnail
def __repr__(self):
covers = "\n".join(map(repr, self._covers))
return f"Covers({covers})"
COPYRIGHT = "\u2117"
@ -173,18 +165,20 @@ class TrackMetadata:
return cls.from_deezer(album, resp)
raise Exception
def format_track_path(self, formatter: str) -> str:
def format_track_path(self, format_string: str) -> str:
# Available keys: "tracknumber", "artist", "albumartist", "composer", "title",
# and "albumcomposer"
# and "explicit", "albumcomposer"
none_text = "Unknown"
info = {
"title": self.title,
"tracknumber": self.tracknumber,
"artist": self.artist,
"albumartist": self.album.albumartist,
"albumcomposer": self.album.albumcomposer or "None",
"composer": self.composer or "None",
"albumcomposer": self.album.albumcomposer or none_text,
"composer": self.composer or none_text,
"explicit": " (Explicit) " if self.info.explicit else "",
}
return formatter.format(**info)
return format_string.format(**info)
@dataclass(slots=True)
@ -345,14 +339,6 @@ class AlbumInfo:
work: Optional[str] = None
_formatter = Formatter()
def keys_in_format_string(s: str):
"""Returns the items in {} in a format string."""
return [f[1] for f in _formatter.parse(s) if f[1] is not None]
def safe_get(d: dict, *keys, default=None) -> dict | str | int | list | None:
"""Nested __getitem__ calls with a default value.

View File

@ -1,8 +1,9 @@
import re
from .client import Client, NonStreamable
from .client import Client
from .config import Config
from .downloadable import SoundcloudDownloadable
from .exceptions import NonStreamable
BASE = "https://api-v2.soundcloud.com"
SOUNDCLOUD_USER_ID = "672320-86895-162383-801513"
@ -74,7 +75,7 @@ class SoundcloudClient(Client):
resp = await self._api_request(f"tracks/{item['id']}/download")
resp_json = await resp.json()
return SoundcloudDownloadable(
{"url": resp_json["redirectUri"], "type": "original"}
self.session, {"url": resp_json["redirectUri"], "type": "original"}
)
else:
@ -89,7 +90,9 @@ class SoundcloudClient(Client):
resp = await self._request(url)
resp_json = await resp.json()
return SoundcloudDownloadable({"url": resp_json["url"], "type": "mp3"})
return SoundcloudDownloadable(
self.session, {"url": resp_json["url"], "type": "mp3"}
)
async def search(
self, query: str, media_type: str, limit: int = 50, offset: int = 0

View File

@ -3,10 +3,11 @@ import os
from dataclasses import dataclass
from . import converter
from .artwork import downscale_image
from .artwork import download_artwork
from .client import Client
from .config import Config
from .downloadable import BasicDownloadable, Downloadable
from .downloadable import Downloadable
from .filepath_utils import clean_filename
from .media import Media, Pending
from .metadata import AlbumMetadata, Covers, TrackMetadata
from .progress import get_progress_bar
@ -69,9 +70,17 @@ class Track(Media):
self.download_path = engine.final_fn # because the extension changed
def _set_download_path(self):
formatter = self.config.session.filepaths.track_format
track_path = self.meta.format_track_path(formatter)
self.download_path = os.path.join(self.folder, track_path)
c = self.config.session.filepaths
formatter = c.track_format
track_path = clean_filename(
self.meta.format_track_path(formatter), restrict=c.restrict_characters
)
if c.truncate_to > 0 and len(track_path) > c.truncate_to:
track_path = track_path[: c.truncate_to]
self.download_path = os.path.join(
self.folder, f"{track_path}.{self.downloadable.extension}"
)
@dataclass(slots=True)
@ -127,49 +136,7 @@ class PendingSingle(Pending):
return os.path.join(parent, meta.format_folder_path(formatter))
async def _download_cover(self, covers: Covers, folder: str) -> str | None:
"""Download artwork, which may include a seperate file to keep.
Args:
covers (Covers): The set of available covers.
"""
c = self.config.session.artwork
if not c.save_artwork and not c.embed:
# No need to download anything
return None
session = self.client.session
downloadables = []
hires_cover_path = None
if c.save_artwork:
l_url, _ = covers.largest()
assert l_url is not None
hires_cover_path = os.path.join(folder, "cover.jpg")
downloadables.append(
BasicDownloadable(session, l_url, "jpg").download(
hires_cover_path, lambda _: None
)
)
embed_cover_path = None
if c.embed:
embed_url, _ = covers.get_size(c.embed_size)
assert embed_url is not None
embed_cover_path = os.path.join(folder, "embed_cover.jpg")
downloadables.append(
BasicDownloadable(session, embed_url, "jpg").download(
embed_cover_path, lambda _: None
)
)
await asyncio.gather(*downloadables)
if c.embed and c.embed_max_width > 0:
assert embed_cover_path is not None
downscale_image(embed_cover_path, c.embed_max_width)
if c.save_artwork and c.saved_max_width > 0:
assert hires_cover_path is not None
downscale_image(hires_cover_path, c.saved_max_width)
return embed_cover_path
embed_path, _ = await download_artwork(
self.client.session, folder, covers, self.config.session.artwork
)
return embed_path

View File

@ -0,0 +1,41 @@
import pytest
import tomlkit
from streamrip.config import *
@pytest.fixture
def toml():
with open("streamrip/config.toml") as f:
t = tomlkit.parse(f.read())
return t
@pytest.fixture
def config():
return ConfigData.defaults()
def test_toml_subset_of_py(toml, config):
"""Test that all keys in the TOML file are in the config classes."""
for k, v in toml.items():
if k in config.__slots__:
if isinstance(v, TOMLDocument):
test_toml_subset_of_py(v, getattr(config, k))
else:
raise Exception(f"{k} not in {config.__slots__}")
exclude = {"toml", "_modified"}
def test_py_subset_of_toml(toml, config):
"""Test that all keys in the python classes are in the TOML file."""
for item in config.__slots__:
if item in exclude:
continue
if item in toml:
if "Config" in item.__class__.__name__:
test_py_subset_of_toml(toml[item], getattr(config, item))
else:
raise Exception(f"Config field {item} not in {list(toml.keys())}")

71
tests/test_covers.py Normal file
View File

@ -0,0 +1,71 @@
import pytest
from streamrip.metadata import Covers
@pytest.fixture
def covers_all():
c = Covers()
c.set_cover("original", "ourl", None)
c.set_cover("large", "lurl", None)
c.set_cover("small", "surl", None)
c.set_cover("thumbnail", "turl", None)
return c
@pytest.fixture
def covers_none():
return Covers()
@pytest.fixture
def covers_one():
c = Covers()
c.set_cover("small", "surl", None)
return c
@pytest.fixture
def covers_some():
c = Covers()
c.set_cover("large", "lurl", None)
c.set_cover("small", "surl", None)
return c
def test_covers_all(covers_all):
assert covers_all._covers == [
("original", "ourl", None),
("large", "lurl", None),
("small", "surl", None),
("thumbnail", "turl", None),
]
assert covers_all.largest() == ("original", "ourl", None)
assert covers_all.get_size("original") == ("original", "ourl", None)
assert covers_all.get_size("thumbnail") == ("thumbnail", "turl", None)
def test_covers_none(covers_none):
assert covers_none.empty()
with pytest.raises(Exception):
covers_none.largest()
with pytest.raises(Exception):
covers_none.get_size("original")
def test_covers_one(covers_one):
assert not covers_one.empty()
assert covers_one.largest() == ("small", "surl", None)
assert covers_one.get_size("original") == ("small", "surl", None)
with pytest.raises(Exception):
covers_one.get_size("thumbnail")
def test_covers_some(covers_some):
assert not covers_some.empty()
assert covers_some.largest() == ("large", "lurl", None)
assert covers_some.get_size("original") == ("large", "lurl", None)
assert covers_some.get_size("small") == ("small", "surl", None)
with pytest.raises(Exception):
covers_some.get_size("thumbnail")

17
tests/util.py Normal file
View File

@ -0,0 +1,17 @@
import asyncio
loop = asyncio.new_event_loop()
def arun(coro):
return loop.run_until_complete(coro)
def afor(async_gen):
async def _afor(async_gen):
l = []
async for item in async_gen:
l.append(item)
return l
return arun(_afor(async_gen))