Implement Qobuz filters (#529)

* Implement artist filters

* Add warning message for filtering other sources
This commit is contained in:
Nathan Thomas 2024-01-10 13:39:15 -08:00 committed by GitHub
parent d7c1cbf8ae
commit 5b68d7865e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 59 deletions

View File

@ -108,6 +108,7 @@ bit_depth = 24
lossy_bitrate = 320
# Filter a Qobuz artist's discography. Set to 'true' to turn on a filter.
# This will also be applied to other sources, but is not guaranteed to work correctly
[qobuz_filters]
# Remove Collectors Editions, live recordings, etc.
extras = false

View File

@ -50,7 +50,6 @@ class PendingAlbum(Pending):
async def resolve(self) -> Album | None:
resp = await self.client.get_metadata(self.id, "album")
meta = AlbumMetadata.from_album_resp(resp, self.client.source)
if meta is None:
logger.error(

View File

@ -1,48 +0,0 @@
import asyncio
from dataclasses import dataclass
from ..client import Client
from ..config import Config
from .album import PendingAlbum
from .media import Media
@dataclass(slots=True)
class AlbumList(Media):
"""Represents a list of albums. Used by Artist and Label classes."""
name: str
albums: list[PendingAlbum]
client: Client
config: Config
async def preprocess(self):
pass
async def download(self):
# Resolve only 3 albums at a time to avoid
# initial latency of resolving ALL albums and tracks
# before any downloads
album_resolve_chunk_size = 10
async def _resolve_download(item: PendingAlbum):
album = await item.resolve()
if album is None:
return
await album.rip()
batches = self.batch(
[_resolve_download(album) for album in self.albums],
album_resolve_chunk_size,
)
for batch in batches:
await asyncio.gather(*batch)
async def postprocess(self):
pass
@staticmethod
def batch(iterable, n=1):
total = len(iterable)
for ndx in range(0, total, n):
yield iterable[ndx : min(ndx + n, total)]

View File

@ -1,16 +1,176 @@
import asyncio
import logging
import re
from dataclasses import dataclass
from ..client import Client
from ..config import Config
from ..config import Config, QobuzDiscographyFilterConfig
from ..console import console
from ..db import Database
from ..metadata import ArtistMetadata
from .album import PendingAlbum
from .album_list import AlbumList
from .media import Pending
from .album import Album, PendingAlbum
from .media import Media, Pending
logger = logging.getLogger("streamrip")
# Resolve only N albums at a time to avoid
# initial latency of resolving ALL albums and tracks
# before any downloads
RESOLVE_CHUNK_SIZE = 10
class Artist(AlbumList):
pass
@dataclass(slots=True)
class Artist(Media):
"""Represents a list of albums. Used by Artist and Label classes."""
name: str
albums: list[PendingAlbum]
client: Client
config: Config
async def preprocess(self):
pass
async def download(self):
filter_conf = self.config.session.qobuz_filters
if filter_conf.repeats:
console.log(
"Resolving [purple]ALL[/purple] artist albums to detect repeats. This may take a while."
)
await self._resolve_then_download(filter_conf)
else:
await self._download_async(filter_conf)
async def postprocess(self):
pass
async def _resolve_then_download(self, filters: QobuzDiscographyFilterConfig):
"""Resolve all artist albums, then download.
This is used if the repeat filter is turned on, since we need the titles
of all albums to remove repeated items.
"""
resolved_or_none: list[Album | None] = await asyncio.gather(
*[album.resolve() for album in self.albums]
)
resolved = [a for a in resolved_or_none if a is not None]
filtered_albums = self._apply_filters(resolved, filters)
batches = self.batch([a.rip() for a in filtered_albums], RESOLVE_CHUNK_SIZE)
for batch in batches:
await asyncio.gather(*batch)
async def _download_async(self, filters: QobuzDiscographyFilterConfig):
async def _rip(item: PendingAlbum):
album = await item.resolve()
# Skip if album doesn't pass the filter
if (
album is None
or (filters.extras and not self._extras(album))
or (filters.features and not self._features(album))
or (filters.non_studio_albums and not self._non_studio_albums(album))
or (filters.non_remaster and not self._non_remaster(album))
):
return
await album.rip()
batches = self.batch(
[_rip(album) for album in self.albums],
RESOLVE_CHUNK_SIZE,
)
for batch in batches:
await asyncio.gather(*batch)
def _apply_filters(
self, albums: list[Album], filt: QobuzDiscographyFilterConfig
) -> list[Album]:
_albums = albums
if filt.repeats:
_albums = self._filter_repeats(_albums)
if filt.extras:
_albums = filter(self._extras, _albums)
if filt.features:
_albums = filter(self._features, _albums)
if filt.non_studio_albums:
_albums = filter(self._non_studio_albums, _albums)
if filt.non_remaster:
_albums = filter(self._non_remaster, _albums)
return list(_albums)
# Will not fail on any nonempty string
_essence = re.compile(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*")
def _filter_repeats(self, albums: list[Album]) -> list[Album]:
"""When there are different versions of an album on the artist,
choose the one with the best quality.
It determines that two albums are identical if they have the same title
ignoring contents in brackets or parentheses.
"""
groups: dict[str, list[Album]] = {}
for a in albums:
match = self._essence.match(a.meta.album)
assert match is not None
title = match.group(1).strip().lower()
items = groups.get(title, [])
items.append(a)
groups[title] = items
ret: list[Album] = []
for group in groups.values():
best = None
max_bd, max_sr = 0, 0
# assume that highest bd is always with highest sr
for album in group:
bd = album.meta.info.bit_depth or 0
if bd > max_bd:
max_bd = bd
best = album
sr = album.meta.info.sampling_rate or 0
if sr > max_sr:
max_sr = sr
best = album
assert best is not None # true because all g != []
ret.append(best)
return ret
_extra_re = re.compile(
r"(?i)(anniversary|deluxe|live|collector|demo|expanded|remix)"
)
# ----- Filter predicates -----
def _non_studio_albums(self, a: Album) -> bool:
"""Filter out non studio albums."""
return a.meta.albumartist != "Various Artists" and self._extras(a)
def _features(self, a: Album) -> bool:
"""Filter out features."""
return a.meta.albumartist == self.name
def _extras(self, a: Album) -> bool:
"""Filter out extras.
See `_extra_re` for criteria.
"""
return self._extra_re.search(a.meta.album) is None
_remaster_re = re.compile(r"(?i)(re)?master(ed)?")
def _non_remaster(self, a: Album) -> bool:
"""Filter out albums that are not remasters."""
return self._remaster_re.search(a.meta.album) is not None
def _non_albums(self, a: Album) -> bool:
"""Filter out singles."""
return len(a.tracks) > 1
@staticmethod
def batch(iterable, n=1):
total = len(iterable)
for ndx in range(0, total, n):
yield iterable[ndx : min(ndx + n, total)]
@dataclass(slots=True)

View File

@ -1,3 +1,4 @@
import asyncio
from dataclasses import dataclass
from ..client import Client
@ -5,12 +6,48 @@ from ..config import Config
from ..db import Database
from ..metadata import LabelMetadata
from .album import PendingAlbum
from .album_list import AlbumList
from .media import Pending
from .media import Media, Pending
class Label(AlbumList):
pass
@dataclass(slots=True)
class Label(Media):
"""Represents a list of albums. Used by Artist and Label classes."""
name: str
albums: list[PendingAlbum]
client: Client
config: Config
async def preprocess(self):
pass
async def download(self):
# Resolve only 3 albums at a time to avoid
# initial latency of resolving ALL albums and tracks
# before any downloads
album_resolve_chunk_size = 10
async def _resolve_download(item: PendingAlbum):
album = await item.resolve()
if album is None:
return
await album.rip()
batches = self.batch(
[_resolve_download(album) for album in self.albums],
album_resolve_chunk_size,
)
for batch in batches:
await asyncio.gather(*batch)
async def postprocess(self):
pass
@staticmethod
def batch(iterable, n=1):
total = len(iterable)
for ndx in range(0, total, n):
yield iterable[ndx : min(ndx + n, total)]
@dataclass(slots=True)