streamrip/streamrip/rip/cli.py

329 lines
8.7 KiB
Python

import asyncio
import logging
import os
import shutil
import subprocess
from functools import wraps
import aiofiles
import click
from click_help_colors import HelpColorsGroup # type: ignore
from rich.logging import RichHandler
from rich.prompt import Confirm
from rich.traceback import install
from .. import db
from ..config import DEFAULT_CONFIG_PATH, Config, set_user_defaults
from ..console import console
from .main import Main
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
return asyncio.run(f(*args, **kwargs))
return wrapper
@click.group(
cls=HelpColorsGroup,
help_headers_color="yellow",
help_options_color="green",
)
@click.version_option(version="2.0")
@click.option(
"--config-path",
default=DEFAULT_CONFIG_PATH,
help="Path to the configuration file",
)
@click.option("-f", "--folder", help="The folder to download items into.")
@click.option(
"-ndb",
"--no-db",
help="Download items even if they have been logged in the database",
default=False,
is_flag=True,
)
@click.option("-q", "--quality", help="The maximum quality allowed to download")
@click.option(
"-c",
"--convert",
help="Convert the downloaded files to an audio codec (ALAC, FLAC, MP3, AAC, or OGG)",
)
@click.option(
"--no-progress",
help="Do not show progress bars",
is_flag=True,
default=False,
)
@click.option(
"-v",
"--verbose",
help="Enable verbose output (debug mode)",
is_flag=True,
)
@click.pass_context
def rip(ctx, config_path, folder, no_db, quality, convert, no_progress, verbose):
"""Streamrip: the all in one music downloader."""
global logger
logging.basicConfig(
level="INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler()],
)
logger = logging.getLogger("streamrip")
if verbose:
install(
console=console,
suppress=[
click,
],
show_locals=True,
locals_hide_sunder=False,
)
logger.setLevel(logging.DEBUG)
logger.debug("Showing all debug logs")
else:
install(console=console, suppress=[click, asyncio], max_frames=1)
logger.setLevel(logging.INFO)
if not os.path.isfile(config_path):
console.print(
f"No file found at [bold cyan]{config_path}[/bold cyan], creating default config.",
)
set_user_defaults(config_path)
# pass to subcommands
ctx.ensure_object(dict)
ctx.obj["config_path"] = config_path
try:
c = Config(config_path)
except Exception as e:
console.print(
f"Error loading config from [bold cyan]{config_path}[/bold cyan]: {e}\n"
"Try running [bold]rip config reset[/bold]",
)
ctx.obj["config"] = None
return
# set session config values to command line args
c.session.database.downloads_enabled = not no_db
if folder is not None:
c.session.downloads.folder = folder
if quality is not None:
c.session.qobuz.quality = quality
c.session.tidal.quality = quality
c.session.deezer.quality = quality
c.session.soundcloud.quality = quality
if convert is not None:
c.session.conversion.enabled = True
assert convert.upper() in ("ALAC", "FLAC", "OGG", "MP3", "AAC")
c.session.conversion.codec = convert.upper()
if no_progress:
c.session.cli.progress_bars = False
ctx.obj["config"] = c
@rip.command()
@click.argument("urls", nargs=-1, required=True)
@click.pass_context
@coro
async def url(ctx, urls):
"""Download content from URLs."""
with ctx.obj["config"] as cfg:
async with Main(cfg) as main:
await main.add_all(urls)
await main.resolve()
await main.rip()
@rip.command()
@click.argument("path", required=True)
@click.pass_context
@coro
async def file(ctx, path):
"""Download content from URLs in a file seperated by newlines.
Example usage:
rip file urls.txt
"""
with ctx.obj["config"] as cfg:
async with Main(cfg) as main:
async with aiofiles.open(path) as f:
await main.add_all([line async for line in f])
await main.resolve()
await main.rip()
@rip.group()
def config():
"""Manage configuration files."""
@config.command("open")
@click.option("-v", "--vim", help="Open in (Neo)Vim", is_flag=True)
@click.pass_context
def config_open(ctx, vim):
"""Open the config file in a text editor."""
config_path = ctx.obj["config"].path
console.print(f"Opening file at [bold cyan]{config_path}")
if vim:
if shutil.which("nvim") is not None:
subprocess.run(["nvim", config_path])
else:
subprocess.run(["vim", config_path])
else:
click.launch(config_path)
@config.command("reset")
@click.option("-y", "--yes", help="Don't ask for confirmation.", is_flag=True)
@click.pass_context
def config_reset(ctx, yes):
"""Reset the config file."""
config_path = ctx.obj["config_path"]
if not yes:
if not Confirm.ask(
f"Are you sure you want to reset the config file at {config_path}?",
):
console.print("[green]Reset aborted")
return
set_user_defaults(config_path)
console.print(f"Reset the config file at [bold cyan]{config_path}!")
@config.command("path")
@click.pass_context
def config_path(ctx):
"""Display the path of the config file."""
config_path = ctx.obj["config_path"]
console.print(f"Config path: [bold cyan]'{config_path}'")
@rip.group()
def database():
"""View and modify the downloads and failed downloads databases."""
@database.command("browse")
@click.argument("table")
@click.pass_context
def database_browse(ctx, table):
"""Browse the contents of a table.
Available tables:
* Downloads
* Failed
"""
from rich.table import Table
cfg: Config = ctx.obj["config"]
if table.lower() == "downloads":
downloads = db.Downloads(cfg.session.database.downloads_path)
t = Table(title="Downloads database")
t.add_column("Row")
t.add_column("ID")
for i, row in enumerate(downloads.all()):
t.add_row(f"{i:02}", *row)
console.print(t)
elif table.lower() == "failed":
failed = db.Failed(cfg.session.database.failed_downloads_path)
t = Table(title="Failed downloads database")
t.add_column("Source")
t.add_column("Media Type")
t.add_column("ID")
for i, row in enumerate(failed.all()):
t.add_row(f"{i:02}", *row)
console.print(t)
else:
console.print(
f"[red]Invalid database[/red] [bold]{table}[/bold]. [red]Choose[/red] [bold]downloads "
"[red]or[/red] failed[/bold].",
)
@rip.command()
@click.option(
"-f",
"--first",
help="Automatically download the first search result without showing the menu.",
is_flag=True,
)
@click.argument("source", required=True)
@click.argument("media-type", required=True)
@click.argument("query", required=True)
@click.pass_context
@coro
async def search(ctx, first, source, media_type, query):
"""Search for content using a specific source.
Example:
-------
rip search qobuz album 'rumours'
"""
with ctx.obj["config"] as cfg:
async with Main(cfg) as main:
if first:
await main.search_take_first(source, media_type, query)
else:
await main.search_interactive(source, media_type, query)
await main.resolve()
await main.rip()
@rip.command()
@click.option("-s", "--source", help="The source to search tracks on.")
@click.option(
"-fs",
"--fallback-source",
help="The source to search tracks on if no results were found with the main source.",
)
@click.argument("url", required=True)
@click.pass_context
@coro
async def lastfm(ctx, source, fallback_source, url):
"""Download tracks from a last.fm playlist using a supported source."""
config = ctx.obj["config"]
if source is not None:
config.session.lastfm.source = source
if fallback_source is not None:
config.session.lastfm.fallback_source = fallback_source
with config as cfg:
async with Main(cfg) as main:
await main.resolve_lastfm(url)
await main.rip()
@rip.command()
@click.argument("source")
@click.argument("media-type")
@click.argument("id")
@click.pass_context
@coro
async def id(ctx, source, media_type, id):
"""Download an item by ID."""
with ctx.obj["config"] as cfg:
async with Main(cfg) as main:
await main.add_by_id(source, media_type, id)
await main.resolve()
await main.rip()
if __name__ == "__main__":
rip()