Remove old code

This commit is contained in:
Nathan Thomas 2023-11-28 21:44:26 -08:00
parent 6bd2d0cf0e
commit 8aba2a5612
2 changed files with 0 additions and 1762 deletions

View File

@ -1,852 +0,0 @@
import concurrent.futures
import logging
import os
import threading
from typing import Optional
import requests
from cleo.application import Application as BaseApplication
from cleo.commands.command import Command
from cleo.formatters.style import Style
from cleo.helpers import argument, option
from click import launch
from .config import Config
from .user_paths import DEFAULT_CONFIG_PATH
# from . import __version__
# from .core import RipCore
logging.basicConfig(level="WARNING")
logger = logging.getLogger("streamrip")
outdated = False
newest_version: Optional[str] = None
class DownloadCommand(Command):
name = "url"
description = "Download items using urls."
arguments = [
argument(
"urls",
"One or more Qobuz, Tidal, Deezer, or SoundCloud urls",
optional=True,
multiple=True,
)
]
options = [
option(
"file",
"-f",
"Path to a text file containing urls",
flag=False,
default="None",
),
option(
"codec",
"-c",
"Convert the downloaded files to <cmd>ALAC</cmd>, <cmd>FLAC</cmd>, <cmd>MP3</cmd>, <cmd>AAC</cmd>, or <cmd>OGG</cmd>",
flag=False,
default="None",
),
option(
"max-quality",
"m",
"The maximum quality to download. Can be <cmd>0</cmd>, <cmd>1</cmd>, <cmd>2</cmd>, <cmd>3 </cmd>or <cmd>4</cmd>",
flag=False,
default="None",
),
option(
"ignore-db",
"-i",
description="Download items even if they have been logged in the database.",
),
option("config", description="Path to config file.", flag=False),
option("directory", "-d", "Directory to download items into.", flag=False),
]
help = (
"\nDownload <title>Dreams</title> by <title>Fleetwood Mac</title>:\n"
"$ <cmd>rip url https://www.deezer.com/us/track/67549262</cmd>\n\n"
"Batch download urls from a text file named <path>urls.txt</path>:\n"
"$ <cmd>rip url --file urls.txt</cmd>\n\n"
"For more information on Quality IDs, see\n"
"<url>https://github.com/nathom/streamrip/wiki/Quality-IDs</url>\n"
)
def handle(self):
global outdated
global newest_version
# Use a thread so that it doesn't slow down startup
update_check = threading.Thread(target=is_outdated, daemon=True)
update_check.start()
path, quality, no_db, directory, config_path_arg = clean_options(
self.option("file"),
self.option("max-quality"),
self.option("ignore-db"),
self.option("directory"),
self.option("config"),
)
config_path = config_path_arg or DEFAULT_CONFIG_PATH
assert isinstance(config_path, str)
config = Config(config_path)
if directory is not None:
assert isinstance(directory, str)
config.session.downloads.folder = directory
if no_db:
config.session.database.downloads_enabled = False
if quality is not None:
assert isinstance(quality, int)
config.session.qobuz.quality = quality
config.session.tidal.quality = quality
config.session.deezer.quality = quality
core = RipCore(config)
urls = self.argument("urls")
if path is not None:
assert isinstance(path, str)
if os.path.isfile(path):
core.handle_txt(path)
else:
self.line(
f"<error>File <comment>{path}</comment> does not exist.</error>"
)
return 1
if urls:
core.handle_urls(";".join(urls))
if len(core) > 0:
core.download()
elif not urls and path is None:
self.line("<error>Must pass arguments. See </><cmd>rip url -h</cmd>.")
update_check.join()
if outdated:
import re
self.line(
f"\n<info>A new version of streamrip <title>v{newest_version}</title>"
" is available! Run <cmd>pip3 install streamrip --upgrade</cmd>"
" to update.</info>\n"
)
md_header = re.compile(r"#\s+(.+)")
bullet_point = re.compile(r"-\s+(.+)")
code = re.compile(r"`([^`]+)`")
issue_reference = re.compile(r"(#\d+)")
release_notes = requests.get(
"https://api.github.com/repos/nathom/streamrip/releases/latest"
).json()["body"]
release_notes = md_header.sub(r"<header>\1</header>", release_notes)
release_notes = bullet_point.sub(r"<options=bold>•</> \1", release_notes)
release_notes = code.sub(r"<cmd>\1</cmd>", release_notes)
release_notes = issue_reference.sub(r"<options=bold>\1</>", release_notes)
self.line(release_notes)
return 0
# class SearchCommand(Command):
# name = "search"
# description = "Search for an item"
# arguments = [
# argument(
# "query",
# "The name to search for",
# optional=False,
# multiple=False,
# )
# ]
# options = [
# option(
# "source",
# "-s",
# "Qobuz, Tidal, Soundcloud, Deezer, or Deezloader",
# flag=False,
# default="qobuz",
# ),
# option(
# "type",
# "-t",
# "Album, Playlist, Track, or Artist",
# flag=False,
# default="album",
# ),
# ]
#
# help = (
# "\nSearch for <title>Rumours</title> by <title>Fleetwood Mac</title>\n"
# "$ <cmd>rip search 'rumours fleetwood mac'</cmd>\n\n"
# "Search for <title>444</title> by <title>Jay-Z</title> on TIDAL\n"
# "$ <cmd>rip search --source tidal '444'</cmd>\n\n"
# "Search for <title>Bob Dylan</title> on Deezer\n"
# "$ <cmd>rip search --type artist --source deezer 'bob dylan'</cmd>\n"
# )
#
# def handle(self):
# query = self.argument("query")
# source, type = clean_options(self.option("source"), self.option("type"))
# assert isinstance(source, str)
# assert isinstance(type, str)
#
# config = Config()
# core = RipCore(config)
#
# if core.interactive_search(query, source, type):
# core.download()
# else:
# self.line("<error>No items chosen, exiting.</error>")
#
#
# class DiscoverCommand(Command):
# name = "discover"
# description = "Download items from the charts or a curated playlist"
# arguments = [
# argument(
# "list",
# "The list to fetch",
# optional=True,
# multiple=False,
# default="ideal-discography",
# )
# ]
# options = [
# option(
# "scrape",
# description="Download all of the items in the list",
# ),
# option(
# "max-items",
# "-m",
# description="The number of items to fetch",
# flag=False,
# default=50,
# ),
# option(
# "source",
# "-s",
# description="The source to download from (<cmd>qobuz</cmd> or <cmd>deezer</cmd>)",
# flag=False,
# default="qobuz",
# ),
# ]
# help = (
# "\nBrowse the Qobuz ideal-discography list\n"
# "$ <cmd>rip discover</cmd>\n\n"
# "Browse the best-sellers list\n"
# "$ <cmd>rip discover best-sellers</cmd>\n\n"
# "Available options for Qobuz <cmd>list</cmd>:\n\n"
# " • most-streamed\n"
# " • recent-releases\n"
# " • best-sellers\n"
# " • press-awards\n"
# " • ideal-discography\n"
# " • editor-picks\n"
# " • most-featured\n"
# " • qobuzissims\n"
# " • new-releases\n"
# " • new-releases-full\n"
# " • harmonia-mundi\n"
# " • universal-classic\n"
# " • universal-jazz\n"
# " • universal-jeunesse\n"
# " • universal-chanson\n\n"
# "Browse the Deezer editorial releases list\n"
# "$ <cmd>rip discover --source deezer</cmd>\n\n"
# "Browse the Deezer charts\n"
# "$ <cmd>rip discover --source deezer charts</cmd>\n\n"
# "Available options for Deezer <cmd>list</cmd>:\n\n"
# " • releases\n"
# " • charts\n"
# " • selection\n"
# )
#
# def handle(self):
# source = self.option("source")
# scrape = self.option("scrape")
# chosen_list = self.argument("list")
# max_items = self.option("max-items")
#
# if source == "qobuz":
# from streamrip.constants import QOBUZ_FEATURED_KEYS
#
# if chosen_list not in QOBUZ_FEATURED_KEYS:
# self.line(f'<error>Error: list "{chosen_list}" not available</error>')
# self.line(self.help)
# return 1
# elif source == "deezer":
# from streamrip.constants import DEEZER_FEATURED_KEYS
#
# if chosen_list not in DEEZER_FEATURED_KEYS:
# self.line(f'<error>Error: list "{chosen_list}" not available</error>')
# self.line(self.help)
# return 1
#
# else:
# self.line(
# "<error>Invalid source. Choose either <cmd>qobuz</cmd> or <cmd>deezer</cmd></error>"
# )
# return 1
#
# config = Config()
# core = RipCore(config)
#
# if scrape:
# core.scrape(chosen_list, max_items)
# core.download()
# return 0
#
# if core.interactive_search(
# chosen_list, source, "featured", limit=int(max_items)
# ):
# core.download()
# else:
# self.line("<error>No items chosen, exiting.</error>")
#
# return 0
#
#
# class LastfmCommand(Command):
# name = "lastfm"
# description = "Search for tracks from a last.fm playlist and download them."
#
# arguments = [
# argument(
# "urls",
# "Last.fm playlist urls",
# optional=False,
# multiple=True,
# )
# ]
# options = [
# option(
# "source",
# "-s",
# description="The source to search for items on",
# flag=False,
# default="qobuz",
# ),
# ]
# help = (
# "You can use this command to download Spotify, Apple Music, and YouTube "
# "playlists.\nTo get started, create an account at "
# "<url>https://www.last.fm</url>. Once you have\nreached the home page, "
# "go to <path>Profile Icon</path> => <path>View profile</path> => "
# "<path>Playlists</path> => <path>IMPORT</path>\nand paste your url.\n\n"
# "Download the <info>young & free</info> Apple Music playlist (already imported)\n"
# "$ <cmd>rip lastfm https://www.last.fm/user/nathan3895/playlists/12089888</cmd>\n"
# )
#
# def handle(self):
# source = self.option("source")
# urls = self.argument("urls")
#
# config = Config()
# core = RipCore(config)
# config.session["lastfm"]["source"] = source
# core.handle_lastfm_urls(";".join(urls))
# core.download()
#
#
# class ConfigCommand(Command):
# name = "config"
# description = "Manage the configuration file."
#
# options = [
# option(
# "open",
# "-o",
# description="Open the config file in the default application",
# flag=True,
# ),
# option(
# "open-vim",
# "-O",
# description="Open the config file in (neo)vim",
# flag=True,
# ),
# option(
# "directory",
# "-d",
# description="Open the directory that the config file is located in",
# flag=True,
# ),
# option("path", "-p", description="Show the config file's path", flag=True),
# option("qobuz", description="Set the credentials for Qobuz", flag=True),
# option("tidal", description="Log into Tidal", flag=True),
# option("deezer", description="Set the Deezer ARL", flag=True),
# option(
# "music-app",
# description="Configure the config file for usage with the macOS Music App",
# flag=True,
# ),
# option("reset", description="Reset the config file", flag=True),
# option(
# "--update",
# description="Reset the config file, keeping the credentials",
# flag=True,
# ),
# ]
# """
# Manage the configuration file.
#
# config
# {--o|open : Open the config file in the default application}
# {--O|open-vim : Open the config file in (neo)vim}
# {--d|directory : Open the directory that the config file is located in}
# {--p|path : Show the config file's path}
# {--qobuz : Set the credentials for Qobuz}
# {--tidal : Log into Tidal}
# {--deezer : Set the Deezer ARL}
# {--music-app : Configure the config file for usage with the macOS Music App}
# {--reset : Reset the config file}
# {--update : Reset the config file, keeping the credentials}
# """
#
# _config: Config
#
# def handle(self):
# import shutil
#
# from .constants import CONFIG_DIR, CONFIG_PATH
#
# self._config = Config()
#
# if self.option("path"):
# self.line(f"<info>{CONFIG_PATH}</info>")
#
# if self.option("open"):
# self.line(f"Opening <url>{CONFIG_PATH}</url> in default application")
# launch(CONFIG_PATH)
#
# if self.option("reset"):
# self._config.reset()
#
# if self.option("update"):
# self._config.update()
#
# if self.option("open-vim"):
# if shutil.which("nvim") is not None:
# os.system(f"nvim '{CONFIG_PATH}'")
# else:
# os.system(f"vim '{CONFIG_PATH}'")
#
# if self.option("directory"):
# self.line(f"Opening <url>{CONFIG_DIR}</url>")
# launch(CONFIG_DIR)
#
# if self.option("tidal"):
# from streamrip.clients import TidalClient
#
# client = TidalClient()
# client.login()
# self._config.file["tidal"].update(client.get_tokens())
# self._config.save()
# self.line("<info>Credentials saved to config.</info>")
#
# if self.option("deezer"):
# from streamrip.clients import DeezerClient
# from streamrip.exceptions import AuthenticationError
#
# self.line(
# "Follow the instructions at <url>https://github.com"
# "/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie</url>"
# )
#
# given_arl = self.ask("Paste your ARL here: ").strip()
# self.line("<comment>Validating arl...</comment>")
#
# try:
# DeezerClient().login(arl=given_arl)
# self._config.file["deezer"]["arl"] = given_arl
# self._config.save()
# self.line("<b>Sucessfully logged in!</b>")
#
# except AuthenticationError:
# self.line("<error>Could not log in. Double check your ARL</error>")
#
# if self.option("qobuz"):
# import getpass
# import hashlib
#
# self._config.file["qobuz"]["email"] = self.ask("Qobuz email:")
# self._config.file["qobuz"]["password"] = hashlib.md5(
# getpass.getpass("Qobuz password (won't show on screen): ").encode()
# ).hexdigest()
# self._config.save()
#
# if self.option("music-app"):
# self._conf_music_app()
#
# def _conf_music_app(self):
# import subprocess
# import xml.etree.ElementTree as ET
# from pathlib import Path
# from tempfile import mktemp
#
# # Find the Music library folder
# temp_file = mktemp()
# music_pref_plist = Path(Path.home()) / Path(
# "Library/Preferences/com.apple.Music.plist"
# )
# # copy preferences to tempdir
# subprocess.run(["cp", music_pref_plist, temp_file])
# # convert binary to xml for parsing
# subprocess.run(["plutil", "-convert", "xml1", temp_file])
# items = iter(ET.parse(temp_file).getroot()[0])
#
# for item in items:
# if item.text == "NSNavLastRootDirectory":
# break
#
# library_folder = Path(next(items).text)
# os.remove(temp_file)
#
# # cp ~/library/preferences/com.apple.music.plist music.plist
# # plutil -convert xml1 music.plist
# # cat music.plist | pbcopy
#
# self._config.file["downloads"]["folder"] = os.path.join(
# library_folder, "Automatically Add to Music.localized"
# )
#
# conversion_config = self._config.file["conversion"]
# conversion_config["enabled"] = True
# conversion_config["codec"] = "ALAC"
# conversion_config["sampling_rate"] = 48000
# conversion_config["bit_depth"] = 24
#
# self._config.file["filepaths"]["folder_format"] = ""
# self._config.file["artwork"]["keep_hires_cover"] = False
# self._config.save()
#
#
# class ConvertCommand(Command):
# name = "convert"
# description = (
# "A standalone tool that converts audio files to other codecs en masse."
# )
# arguments = [
# argument(
# "codec",
# description="<cmd>FLAC</cmd>, <cmd>ALAC</cmd>, <cmd>OPUS</cmd>, <cmd>MP3</cmd>, or <cmd>AAC</cmd>.",
# ),
# argument(
# "path",
# description="The path to the audio file or a directory that contains audio files.",
# ),
# ]
# options = [
# option(
# "sampling-rate",
# "-s",
# description="Downsample the tracks to this rate, in Hz.",
# default=192000,
# flag=False,
# ),
# option(
# "bit-depth",
# "-b",
# description="Downsample the tracks to this bit depth.",
# default=24,
# flag=False,
# ),
# option(
# "keep-source", "-k", description="Keep the original file after conversion."
# ),
# ]
#
# help = (
# "\nConvert all of the audio files in <path>/my/music</path> to MP3s\n"
# "$ <cmd>rip convert MP3 /my/music</cmd>\n\n"
# "Downsample the audio to 48kHz after converting them to ALAC\n"
# "$ <cmd>rip convert --sampling-rate 48000 ALAC /my/music\n"
# )
#
# def handle(self):
# from streamrip import converter
#
# CODEC_MAP = {
# "FLAC": converter.FLAC,
# "ALAC": converter.ALAC,
# "OPUS": converter.OPUS,
# "MP3": converter.LAME,
# "AAC": converter.AAC,
# }
#
# codec = self.argument("codec")
# path = self.argument("path")
#
# ConverterCls = CODEC_MAP.get(codec.upper())
# if ConverterCls is None:
# self.line(
# f'<error>Invalid codec "{codec}". See </error><cmd>rip convert'
# " -h</cmd>."
# )
# return 1
#
# sampling_rate, bit_depth, keep_source = clean_options(
# self.option("sampling-rate"),
# self.option("bit-depth"),
# self.option("keep-source"),
# )
#
# converter_args = {
# "sampling_rate": sampling_rate,
# "bit_depth": bit_depth,
# "remove_source": not keep_source,
# }
#
# if os.path.isdir(path):
# import itertools
# from pathlib import Path
#
# from tqdm import tqdm
#
# dirname = path
# audio_extensions = ("flac", "m4a", "aac", "opus", "mp3", "ogg")
# path_obj = Path(dirname)
# audio_files = (
# path.as_posix()
# for path in itertools.chain.from_iterable(
# (path_obj.rglob(f"*.{ext}") for ext in audio_extensions)
# )
# )
#
# with concurrent.futures.ThreadPoolExecutor() as executor:
# futures = []
# for file in audio_files:
# futures.append(
# executor.submit(
# ConverterCls(
# filename=os.path.join(dirname, file),
# **converter_args,
# ).convert
# )
# )
# from streamrip.utils import TQDM_BAR_FORMAT
#
# for future in tqdm(
# concurrent.futures.as_completed(futures),
# total=len(futures),
# desc="Converting",
# unit="track",
# bar_format=TQDM_BAR_FORMAT,
# ):
# # Only show loading bar
# future.result()
#
# elif os.path.isfile(path):
# ConverterCls(filename=path, **converter_args).convert()
# else:
# self.line(
# f'<error>Path <path>"{path}"</path> does not exist.</error>',
# )
#
#
# class RepairCommand(Command):
# name = "repair"
# description = "Retry failed downloads."
#
# options = [
# option(
# "max-items",
# "-m",
# flag=False,
# description="The maximum number of tracks to download}",
# default="None",
# )
# ]
#
# help = "\nRetry up to 20 failed downloads\n$ <cmd>rip repair --max-items 20</cmd>\n"
#
# def handle(self):
# max_items = next(clean_options(self.option("max-items")))
# config = Config()
# RipCore(config).repair(max_items=max_items)
#
#
# class DatabaseCommand(Command):
# name = "db"
# description = "View and manage rip's databases."
#
# arguments = [
# argument(
# "name", description="<cmd>downloads</cmd> or <cmd>failed-downloads</cmd>."
# )
# ]
# options = [
# option("list", "-l", description="Display the contents of the database."),
# option("reset", description="Reset the database."),
# ]
#
# _table_style = "box-double"
#
# def handle(self) -> None:
# from . import db
# from .config import Config
#
# config = Config()
# db_name = self.argument("name").replace("-", "_")
#
# self._path = config.file["database"][db_name]["path"]
# self._db = db.CLASS_MAP[db_name](self._path)
#
# if self.option("list"):
# getattr(self, f"_render_{db_name}")()
#
# if self.option("reset"):
# os.remove(self._path)
#
# def _render_downloads(self):
# from cleo.ui.table import Table
#
# id_table = Table(self._io)
# id_table.set_style(self._table_style)
# id_table.set_header_title("IDs")
# id_table.set_headers(list(self._db.structure.keys()))
# id_table.add_rows(id for id in iter(self._db) if id[0].isalnum())
# if id_table._rows:
# id_table.render()
#
# url_table = Table(self._io)
# url_table.set_style(self._table_style)
# url_table.set_header_title("URLs")
# url_table.set_headers(list(self._db.structure.keys()))
# url_table.add_rows(id for id in iter(self._db) if not id[0].isalnum())
# # prevent wierd formatting
# if url_table._rows:
# url_table.render()
#
# def _render_failed_downloads(self):
# from cleo.ui.table import Table
#
# id_table = Table(self._io)
# id_table.set_style(self._table_style)
# id_table.set_header_title("Failed Downloads")
# id_table.set_headers(["Source", "Media Type", "ID"])
# id_table.add_rows(iter(self._db))
# id_table.render()
#
#
STRING_TO_PRIMITIVE = {
"None": None,
"True": True,
"False": False,
}
class Application(BaseApplication):
def __init__(self):
# TODO: fix version
super().__init__("rip", "2.0")
def _run(self, io):
# if io.is_debug():
# from .constants import CONFIG_DIR
#
# logger.setLevel(logging.DEBUG)
# fh = logging.FileHandler(os.path.join(CONFIG_DIR, "streamrip.log"))
# fh.setLevel(logging.DEBUG)
# logger.addHandler(fh)
super()._run(io)
def create_io(self, input=None, output=None, error_output=None):
io = super().create_io(input, output, error_output)
# Set our own CLI styles
formatter = io.output.formatter
formatter.set_style("url", Style("blue", options=["underline"]))
formatter.set_style("path", Style("green", options=["bold"]))
formatter.set_style("cmd", Style("magenta"))
formatter.set_style("title", Style("yellow", options=["bold"]))
formatter.set_style("header", Style("yellow", options=["bold", "underline"]))
io.output.set_formatter(formatter)
io.error_output.set_formatter(formatter)
self._io = io
return io
@property
def _default_definition(self):
default_globals = super()._default_definition
# as of 1.0.0a3, the descriptions don't wrap properly
# so I'm truncating the description for help as a hack
default_globals._options["help"]._description = (
default_globals._options["help"]._description.split(".")[0] + "."
)
return default_globals
def render_error(self, error, io):
super().render_error(error, io)
io.write_line(
"\n<error>If this was unexpected, please open a <path>Bug Report</path> at </error>"
"<url>https://github.com/nathom/streamrip/issues/new/choose</url>"
)
def clean_options(*opts):
for opt in opts:
if isinstance(opt, str):
if opt.startswith("="):
opt = opt[1:]
opt = opt.strip()
if opt.isdigit():
opt = int(opt)
else:
opt = STRING_TO_PRIMITIVE.get(opt, opt)
yield opt
def is_outdated():
global outdated
global newest_version
r = requests.get("https://pypi.org/pypi/streamrip/json").json()
newest_version = r["info"]["version"]
# Compare versions
curr_version_parsed = map(int, __version__.split("."))
assert isinstance(newest_version, str)
newest_version_parsed = map(int, newest_version.split("."))
outdated = False
for c, n in zip(curr_version_parsed, newest_version_parsed):
outdated = c < n
if c != n:
break
def main():
application = Application()
application.add(DownloadCommand())
# application.add(SearchCommand())
# application.add(DiscoverCommand())
# application.add(LastfmCommand())
# application.add(ConfigCommand())
# application.add(ConvertCommand())
# application.add(RepairCommand())
# application.add(DatabaseCommand())
application.run()
if __name__ == "__main__":
main()

View File

@ -1,910 +0,0 @@
"""The stuff that ties everything together for the CLI to use."""
import asyncio
import concurrent.futures
import html
import logging
import os
import re
from abc import ABC, abstractmethod
from getpass import getpass
from hashlib import md5
from string import Formatter
from typing import Dict, Generator, List, Optional, Tuple, Type, Union
import requests
from click import secho, style
from tqdm import tqdm
from streamrip.constants import MEDIA_TYPES
from streamrip.exceptions import (
AuthenticationError,
IneligibleError,
ItemExists,
MissingCredentials,
NonStreamable,
NoResultsFound,
ParsingError,
PartialFailure,
)
from streamrip.media import (
Album,
Artist,
Label,
Playlist,
Track,
Tracklist,
Video,
YoutubeVideo,
)
from streamrip.utils import TQDM_DEFAULT_THEME, set_progress_bar_theme
from . import db
from .clients import (
Client,
DeezerClient,
DeezloaderClient,
QobuzClient,
SoundcloudClient,
TidalClient,
)
from .config import Config
from .exceptions import DeezloaderFallback
from .media import Media
from .user_paths import DB_PATH, FAILED_DB_PATH
from .utils import extract_deezer_dynamic_link, extract_interpreter_url
from .validation_regexps import (
DEEZER_DYNAMIC_LINK_REGEX,
LASTFM_URL_REGEX,
QOBUZ_INTERPRETER_URL_REGEX,
SOUNDCLOUD_URL_REGEX,
URL_REGEX,
YOUTUBE_URL_REGEX,
)
logger = logging.getLogger("streamrip")
# ---------------- Constants ------------------ #
MEDIA_CLASS: Dict[str, Media] = {
"album": Album,
"playlist": Playlist,
"artist": Artist,
"track": Track,
"label": Label,
"video": Video,
}
DB_PATH_MAP = {"downloads": DB_PATH, "failed_downloads": FAILED_DB_PATH}
# ---------------------------------------------- #
class RipCore(list):
def __init__(self, config: Config):
"""Create a RipCore object.
:param config:
:type config: Optional[Config]
"""
self.config = config
self.clients: dict[str, Client] = {
"qobuz": QobuzClient(config),
"tidal": TidalClient(config),
"deezer": DeezerClient(config),
"soundcloud": SoundcloudClient(config),
"deezloader": DeezloaderClient(config),
}
c = self.config.session
theme = c.theme.progress_bar
set_progress_bar_theme(theme)
self.db = db.Downloads(
c.database.downloads_path, dummy=not c.database.downloads_enabled
)
self.failed = db.FailedDownloads(
c.database.failed_downloads_path,
dummy=not c.database.failed_downloads_enabled,
)
def handle_urls(self, urls):
"""Download a url.
:param url:
:type url: str
:raises InvalidSourceError
:raises ParsingError
"""
if isinstance(urls, str):
url = urls
elif isinstance(urls, tuple):
url = " ".join(urls)
else:
raise Exception(f"Urls has invalid type {type(urls)}")
# youtube is handled by youtube-dl, so much of the
# processing is not necessary
youtube_urls = YOUTUBE_URL_REGEX.findall(url)
if youtube_urls != []:
self.extend(YoutubeVideo(u) for u in youtube_urls)
parsed = self.parse_urls(url)
if not parsed and len(self) == 0:
if "last.fm" in url:
message = (
f"For last.fm urls, use the {style('lastfm', fg='yellow')} "
f"command. See {style('rip lastfm --help', fg='yellow')}."
)
else:
message = f"Cannot find urls in text: {url}"
raise ParsingError(message)
for source, url_type, item_id in parsed:
if item_id in self.db:
secho(
f"ID {item_id} already downloaded, use --ignore-db to override.",
fg="magenta",
)
continue
self.handle_item(source, url_type, item_id)
def handle_item(self, source: str, media_type: str, item_id: str):
"""Get info and parse into a Media object.
:param source:
:type source: str
:param media_type:
:type media_type: str
:param item_id:
:type item_id: str
"""
client = self.get_client_and_log_in(source)
if media_type not in MEDIA_TYPES:
if "playlist" in media_type: # for SoundCloud
media_type = "playlist"
assert media_type in MEDIA_TYPES, media_type
item = MEDIA_CLASS[media_type](client=client, id=item_id)
self.append(item)
def _get_download_args(self) -> dict:
"""Get the arguments to pass to Media.download.
:rtype: dict
"""
session = self.config.session
logger.debug(session)
# So that the dictionary isn't searched for the same keys multiple times
artwork, conversion, filepaths, metadata = (
session[key] for key in ("artwork", "conversion", "filepaths", "metadata")
)
concurrency = session["downloads"]["concurrency"]
return {
"restrict_filenames": filepaths["restrict_characters"],
"truncate_filenames": filepaths["truncate"],
"parent_folder": session["downloads"]["folder"],
"folder_format": filepaths["folder_format"],
"track_format": filepaths["track_format"],
"embed_cover": artwork["embed"],
"embed_cover_size": artwork["size"],
"keep_hires_cover": artwork["keep_hires_cover"],
"set_playlist_to_album": metadata["set_playlist_to_album"],
"stay_temp": conversion["enabled"],
"conversion": conversion,
"concurrent_downloads": concurrency["enabled"],
"max_connections": concurrency["max_connections"],
"new_tracknumbers": metadata["new_playlist_tracknumbers"],
"download_videos": session["tidal"]["download_videos"],
"download_booklets": session["qobuz"]["download_booklets"],
"download_youtube_videos": session["youtube"]["download_videos"],
"youtube_video_downloads_folder": session["youtube"][
"video_downloads_folder"
],
"add_singles_to_folder": filepaths["add_singles_to_folder"],
"max_artwork_width": int(artwork["max_width"]),
"max_artwork_height": int(artwork["max_height"]),
"exclude_tags": metadata["exclude"],
}
def repair(self, max_items=None):
"""Iterate through the failed_downloads database and retry them.
:param max_items: The maximum number of items to download.
"""
if max_items is None:
max_items = float("inf")
self.db = db.Downloads("", dummy=True)
if self.failed_db.is_dummy:
secho(
"Failed downloads database must be enabled in the config file "
"to repair!",
fg="red",
)
exit()
for counter, (source, media_type, item_id) in enumerate(self.failed_db):
if counter >= max_items:
break
self.handle_item(source, media_type, item_id)
self.download()
def download(self):
"""Download all the items in self."""
try:
arguments = self._get_download_args()
except KeyError as e:
self._config_updating_message()
self.config.update()
logger.debug("Config update error: %s", e)
exit()
except Exception as err:
self._config_corrupted_message(err)
exit()
logger.debug("Arguments from config: %s", arguments)
source_subdirs = self.config.session["downloads"]["source_subdirectories"]
for item in self:
# Item already checked in database in handle_urls
if source_subdirs:
arguments["parent_folder"] = self.__get_source_subdir(
item.client.source
)
if item is YoutubeVideo:
item.download(**arguments)
continue
arguments["quality"] = self.config.session[item.client.source]["quality"]
if isinstance(item, Artist):
filters_ = tuple(
k for k, v in self.config.session["filters"].items() if v
)
arguments["filters"] = filters_
logger.debug("Added filter argument for artist/label: %s", filters_)
if not isinstance(item, Tracklist) or not item.loaded:
logger.debug("Loading metadata")
try:
item.load_meta(**arguments)
except NonStreamable:
self.failed_db.add((item.client.source, item.type, item.id))
secho(f"{item!s} is not available, skipping.", fg="red")
continue
try:
item.download(**arguments)
for item_id in item.downloaded_ids:
# Add items row by row
self.db.add((item_id,))
except NonStreamable as e:
e.print(item)
self.failed_db.add((item.client.source, item.type, item.id))
continue
except PartialFailure as e:
# add successful downloads to database?
for failed_item_info in e.failed_items:
self.failed_db.add(failed_item_info)
continue
except ItemExists as e:
secho(f'"{e!s}" already exists. Skipping.', fg="yellow")
continue
if hasattr(item, "id"):
self.db.add(str(item.id))
for item_id in item.downloaded_ids:
self.db.add(str(item_id))
if isinstance(item, Track):
item.tag(exclude_tags=arguments["exclude_tags"])
if arguments["conversion"]["enabled"]:
item.convert(**arguments["conversion"])
def scrape(self, featured_list: str, max_items: int = 500):
"""Download all of the items in a Qobuz featured list.
:param featured_list: The name of the list. See `rip discover --help`.
:type featured_list: str
"""
self.extend(self.search("qobuz", featured_list, "featured", limit=max_items))
def get_client_and_log_in(self, source: str) -> Client:
"""Get a client given the source and log in.
:param source:
:type source: str
:rtype: Client
"""
client = self.clients[source]
if not client.logged_in:
try:
self.login(client)
except DeezloaderFallback:
client = self.clients["deezloader"]
return client
async def login(self, client):
"""Log into a client, if applicable.
:param client:
"""
c = self.config.session
if client.source == "deezer" and c.deezer.arl == "":
if c.deezer.deezloader_warnings:
secho(
"Falling back to Deezloader (unstable). If you have a subscription, run ",
nl=False,
fg="yellow",
)
secho("rip config --deezer ", nl=False, bold=True)
secho("to log in.", fg="yellow")
raise DeezloaderFallback
while True:
try:
await client.login()
break
except AuthenticationError:
secho("Invalid credentials, try again.", fg="yellow")
self.prompt_and_set_credentials(client.source)
except MissingCredentials:
if client.source == "qobuz":
get_tokens = asyncio.create_task(client._get_app_id_and_secrets())
self.prompt_and_set_credentials(client.source)
await get_tokens
else:
self.prompt_and_set_credentials(client.source)
if (
client.source == "qobuz"
and not creds.get("secrets")
and not creds.get("app_id")
):
(
self.config.file["qobuz"]["app_id"],
self.config.file["qobuz"]["secrets"],
) = client.get_tokens()
self.config.save()
elif (
client.source == "soundcloud"
and not creds.get("client_id")
and not creds.get("app_version")
):
(
self.config.file["soundcloud"]["client_id"],
self.config.file["soundcloud"]["app_version"],
) = client.get_tokens()
self.config.save()
elif client.source == "tidal":
self.config.file["tidal"].update(client.get_tokens())
self.config.save() # only for the expiry stamp
def parse_urls(self, url: str) -> List[Tuple[str, str, str]]:
"""Return the type of the url and the id.
Compatible with urls of the form:
https://www.qobuz.com/us-en/type/name/id
https://open.qobuz.com/type/id
https://play.qobuz.com/type/id
https://www.deezer.com/us/type/id
https://tidal.com/browse/type/id
:raises exceptions.ParsingError:
"""
parsed: List[Tuple[str, str, str]] = []
interpreter_urls = QOBUZ_INTERPRETER_URL_REGEX.findall(url)
if interpreter_urls:
secho(
"Extracting IDs from Qobuz interpreter urls. Use urls "
"that include the artist ID for faster preprocessing.",
fg="yellow",
)
parsed.extend(
("qobuz", "artist", extract_interpreter_url(u))
for u in interpreter_urls
)
url = QOBUZ_INTERPRETER_URL_REGEX.sub("", url)
dynamic_urls = DEEZER_DYNAMIC_LINK_REGEX.findall(url)
if dynamic_urls:
secho(
"Extracting IDs from Deezer dynamic link. Use urls "
"of the form https://www.deezer.com/{country}/{type}/{id} for "
"faster processing.",
fg="yellow",
)
parsed.extend(
("deezer", *extract_deezer_dynamic_link(url)) for url in dynamic_urls
)
parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Deezer
soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url)
if soundcloud_urls:
soundcloud_client = self.get_client_and_log_in("soundcloud")
assert isinstance(soundcloud_client, SoundcloudClient) # for typing
# TODO: Make this async
soundcloud_items = (
soundcloud_client.resolve_url(u) for u in soundcloud_urls
)
parsed.extend(
("soundcloud", item["kind"], str(item["id"]))
for item in soundcloud_items
)
logger.debug("Parsed urls: %s", parsed)
return parsed
def handle_lastfm_urls(self, urls: str):
"""Get info from lastfm url, and parse into Media objects.
This works by scraping the last.fm page and using a regex to
find the track titles and artists. The information is queried
in a Client.search(query, 'track') call and the first result is
used.
:param urls:
"""
# Available keys: ['artist', 'title']
QUERY_FORMAT: Dict[str, str] = {
"tidal": "{title}",
"qobuz": "{title} {artist}",
"deezer": "{title} {artist}",
"soundcloud": "{title} {artist}",
}
# For testing:
# https://www.last.fm/user/nathan3895/playlists/12058911
user_regex = re.compile(r"https://www\.last\.fm/user/([^/]+)/playlists/\d+")
lastfm_urls = LASTFM_URL_REGEX.findall(urls)
try:
lastfm_source = self.config.session["lastfm"]["source"]
lastfm_fallback_source = self.config.session["lastfm"]["fallback_source"]
except KeyError:
self._config_updating_message()
self.config.update()
exit()
except Exception as err:
self._config_corrupted_message(err)
exit()
# Do not include tracks that have (re)mix, live, karaoke in their titles
# within parentheses or brackets
# This will match somthing like "Test (Person Remix]" though, so its not perfect
banned_words_plain = re.compile(r"(?i)(?:(?:re)?mix|live|karaoke)")
banned_words = re.compile(
r"(?i)[\(\[][^\)\]]*?(?:(?:re)?mix|live|karaoke)[^\)\]]*[\]\)]"
)
def search_query(title, artist, playlist) -> bool:
"""Search for a query and add the first result to playlist.
:param query:
:type query: str
:param playlist:
:type playlist: Playlist
:rtype: bool
"""
def try_search(source) -> Optional[Track]:
try:
query = QUERY_FORMAT[lastfm_source].format(
title=title, artist=artist
)
query_is_clean = banned_words_plain.search(query) is None
search_results = self.search(source, query, media_type="track")
track = next(search_results)
if query_is_clean:
while banned_words.search(track["title"]) is not None:
logger.debug("Track title banned for query=%s", query)
track = next(search_results)
# Because the track is searched as a single we need to set
# this manually
track.part_of_tracklist = True
return track
except (NoResultsFound, StopIteration):
return None
track = try_search(lastfm_source) or try_search(lastfm_fallback_source)
if track is None:
return False
if self.config.session["metadata"]["set_playlist_to_album"]:
# so that the playlist name (actually the album) isn't
# amended to include version and work tags from individual tracks
track.meta.version = track.meta.work = None
playlist.append(track)
return True
from streamrip.utils import TQDM_BAR_FORMAT
for purl in lastfm_urls:
secho(f"Fetching playlist at {purl}", fg="blue")
title, queries = self.get_lastfm_playlist(purl)
pl = Playlist(client=self.get_client_and_log_in(lastfm_source), name=title)
creator_match = user_regex.search(purl)
if creator_match is not None:
pl.creator = creator_match.group(1)
tracks_not_found = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
futures = [
executor.submit(search_query, title, artist, pl)
for title, artist in queries
]
# only for the progress bar
for search_attempt in tqdm(
concurrent.futures.as_completed(futures),
unit="Tracks",
dynamic_ncols=True,
total=len(futures),
desc="Searching...",
bar_format=TQDM_BAR_FORMAT,
):
if not search_attempt.result():
tracks_not_found += 1
pl.loaded = True
if tracks_not_found > 0:
secho(f"{tracks_not_found} tracks not found.", fg="yellow")
self.append(pl)
def handle_txt(self, filepath: str):
"""
Handle a text file containing URLs. Lines starting with `#` are ignored.
:param filepath:
:type filepath: Union[str, os.PathLike]
:raises OSError
:raises exceptions.ParsingError
"""
with open(filepath) as txt:
self.handle_urls(txt.read())
def search(
self,
source: str,
query: str,
media_type: str = "album",
check_db: bool = False,
limit: int = 200,
) -> Generator:
"""Universal search.
:param source:
:type source: str
:param query:
:type query: str
:param media_type:
:type media_type: str
:param limit: Not Implemented
:type limit: int
:rtype: Generator
"""
logger.debug("searching for %s", query)
client = self.get_client_and_log_in(source)
if isinstance(client, DeezloaderClient) and media_type == "featured":
raise IneligibleError(
"Must have premium Deezer account to access editorial lists."
)
results = client.search(query, media_type)
if media_type == "featured":
media_type = "album"
if isinstance(results, Generator): # QobuzClient
for page in results:
tracklist = (
page[f"{media_type}s"]["items"]
if media_type != "featured"
else page["albums"]["items"]
)
for i, item in enumerate(tracklist):
yield MEDIA_CLASS[ # type: ignore
media_type if media_type != "featured" else "album"
].from_api(item, client)
if i >= limit - 1:
return
else:
items = (
results.get("data")
or results.get("items")
or results.get("collection")
or results.get("albums", {}).get("data", False)
)
if not items:
raise NoResultsFound(query)
logger.debug("Number of results: %d", len(items))
for i, item in enumerate(items):
logger.debug(item)
yield MEDIA_CLASS[media_type].from_api(item, client) # type: ignore
if i >= limit - 1:
return
def preview_media(self, media) -> str:
"""Return a preview string of a Media object.
:param media:
"""
if isinstance(media, Album):
fmt = (
"{albumartist} - {album}\n"
"Released on {year}\n{tracktotal} tracks\n"
"{bit_depth} bit / {sampling_rate} Hz\n"
"Version: {version}\n"
"Genre: {genre}"
)
elif isinstance(media, Artist):
fmt = "{name}"
elif isinstance(media, Track):
fmt = "{artist} - {title}\nReleased on {year}"
elif isinstance(media, Playlist):
fmt = (
"{title}\n"
"{tracktotal} tracks\n"
"{popularity}\n"
"Description: {description}"
)
else:
raise NotImplementedError
fields = (fname for _, fname, _, _ in Formatter().parse(fmt) if fname)
ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields})
return ret
def interactive_search(
self,
query: str,
source: str = "qobuz",
media_type: str = "album",
limit: int = 50,
):
"""Show an interactive menu that contains search results.
:param query:
:type query: str
:param source:
:type source: str
:param media_type:
:type media_type: str
"""
results = tuple(self.search(source, query, media_type, limit=limit))
def title(res):
index, item = res
item_no = index + 1
if isinstance(item, Album):
return f"{item_no}. {item.album}"
elif isinstance(item, Track):
return f"{item_no}. {item.meta.title}"
elif isinstance(item, Playlist):
return f"{item_no}. {item.name}"
elif isinstance(item, Artist):
return f"{item_no}. {item.name}"
else:
raise NotImplementedError(item.type)
def from_title(s):
num = []
for char in s:
if char != ".":
num.append(char)
else:
break
return self.preview_media(results[int("".join(num)) - 1])
if os.name == "nt":
from pick import pick
choice = pick(
tuple(enumerate(results)),
title=(
f"{source.capitalize()} {media_type} search.\n"
"Press SPACE to select, RETURN to download, ctrl-C to exit."
),
options_map_func=title,
multiselect=True,
)
if isinstance(choice, list):
for item in choice:
self.append(item[0][1])
elif isinstance(choice, tuple):
self.append(choice[0][1])
return True
else:
from simple_term_menu import TerminalMenu
menu = TerminalMenu(
map(title, enumerate(results)),
preview_command=from_title,
preview_size=0.5,
title=(
f"{source.capitalize()} {media_type} search.\n"
"SPACE - multiselection, ENTER - download, ESC - exit"
),
cycle_cursor=True,
clear_screen=True,
multi_select=True,
)
choice = menu.show()
if choice is None:
return False
else:
if isinstance(choice, int):
self.append(results[choice])
elif isinstance(choice, tuple):
for i in choice:
self.append(results[i])
return True
def get_lastfm_playlist(self, url: str) -> Tuple[str, list]:
"""From a last.fm url, find the playlist title and tracks.
Each page contains 50 results, so `num_tracks // 50 + 1` requests
are sent per playlist.
:param url:
:type url: str
:rtype: Tuple[str, list]
"""
logger.debug("Fetching lastfm playlist")
info = []
words = re.compile(r"[\w\s]+")
title_tags = re.compile(r'<a\s+href="[^"]+"\s+title="([^"]+)"')
def essence(s):
s = re.sub(r"&#\d+;", "", s) # remove HTML entities
# TODO: change to finditer
return "".join(words.findall(s))
def get_titles(s):
titles = title_tags.findall(s) # [2:]
for i in range(0, len(titles) - 1, 2):
info.append((essence(titles[i]), essence(titles[i + 1])))
r = requests.get(url)
get_titles(r.text)
remaining_tracks_match = re.search(
r'data-playlisting-entry-count="(\d+)"', r.text
)
if remaining_tracks_match is None:
raise ParsingError("Error parsing lastfm page: %s", r.text)
total_tracks = int(remaining_tracks_match.group(1))
logger.debug("Total tracks: %d", total_tracks)
remaining_tracks = total_tracks - 50
playlist_title_match = re.search(
r'<h1 class="playlisting-playlist-header-title">([^<]+)</h1>',
r.text,
)
if playlist_title_match is None:
raise ParsingError("Error finding title from response")
playlist_title = html.unescape(playlist_title_match.group(1))
if remaining_tracks > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
last_page = (
1 + int(remaining_tracks // 50) + int(remaining_tracks % 50 != 0)
)
logger.debug("Fetching up to page %d", last_page)
futures = [
executor.submit(requests.get, f"{url}?page={page}")
for page in range(2, last_page + 1)
]
for future in concurrent.futures.as_completed(futures):
get_titles(future.result().text)
return playlist_title, info
def __get_source_subdir(self, source: str) -> str:
path = self.config.session["downloads"]["folder"]
return os.path.join(path, source.capitalize())
async def prompt_and_set_credentials(self, source: str):
"""Prompt the user for credentials.
:param source:
:type source: str
"""
prompter = PROMPTERS[source]
client = self.clients[source]
while True:
prompter.prompt()
try:
await client.login()
break
except AuthenticationError:
secho("Invalid credentials, try again.", fg="yellow")
except MissingCredentials:
secho("Credentials not found, try again.", fg="yellow")
self.prompt_and_set_credentials(client.source)
if source == "qobuz":
secho("Enter Qobuz email:", fg="green")
self.config.file[source]["email"] = input()
secho(
"Enter Qobuz password (will not show on screen):",
fg="green",
)
self.config.file[source]["password"] = md5(
getpass(prompt="").encode("utf-8")
).hexdigest()
self.config.save()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
elif source == "deezer":
secho(
"If you're not sure how to find the ARL cookie, see the instructions at ",
italic=True,
nl=False,
dim=True,
)
secho(
"https://github.com/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie",
underline=True,
italic=True,
fg="blue",
)
self.config.file["deezer"]["arl"] = input(style("ARL: ", fg="green"))
self.config.save()
secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
else:
raise Exception
def _config_updating_message(self):
secho(
"Updating config file... Some settings may be lost. Please run the "
"command again.",
fg="magenta",
)
def _config_corrupted_message(self, err: Exception):
secho(
"There was a problem with your config file. This happens "
"sometimes after updates. Run ",
nl=False,
fg="red",
)
secho("rip config --reset ", fg="yellow", nl=False)
secho("to reset it. You will need to log in again.", fg="red")
secho(str(err), fg="red")