streamrip/streamrip/soundcloud_client.py

268 lines
9.3 KiB
Python

import asyncio
import itertools
import logging
import re
from .client import Client
from .config import Config
from .downloadable import SoundcloudDownloadable
from .exceptions import NonStreamable
BASE = "https://api-v2.soundcloud.com"
SOUNDCLOUD_USER_ID = "672320-86895-162383-801513"
logger = logging.getLogger("streamrip")
class SoundcloudClient(Client):
source = "soundcloud"
logged_in = False
NON_STREAMABLE = "_non_streamable"
ORIGINAL_DOWNLOAD = "_original_download"
NOT_RESOLVED = "_not_resolved"
def __init__(self, config: Config):
self.global_config = config
self.config = config.session.soundcloud
self.rate_limiter = self.get_rate_limiter(
config.session.downloads.requests_per_minute
)
async def login(self):
self.session = await self.get_session()
client_id, app_version = self.config.client_id, self.config.app_version
if not client_id or not app_version or not (await self._announce()):
client_id, app_version = await self._refresh_tokens()
# update file and session configs and save to disk
cf = self.global_config.file.soundcloud
cs = self.global_config.session.soundcloud
cs.client_id = client_id
cs.app_version = app_version
cf.client_id = client_id
cf.app_version = app_version
self.global_config.file.set_modified()
logger.debug(f"Current valid {client_id=} {app_version=}")
self.logged_in = True
async def get_metadata(self, item_id: str, media_type: str) -> dict:
"""Fetch metadata for an item in Soundcloud API.
Args:
item_id (str): Plain soundcloud item ID (e.g 1633786176)
media_type (str): track or playlist
Returns:
API response.
"""
if media_type == "track":
# parse custom id that we injected
_item_id, _ = item_id.split("|")
return await self._get_track(_item_id)
elif media_type == "playlist":
return await self._get_playlist(item_id)
else:
raise Exception(f"{media_type} not supported")
async def _get_track(self, item_id: str):
resp, status = await self._api_request(f"tracks/{item_id}")
assert status == 200
return resp
async def _get_playlist(self, item_id: str):
original_resp, status = await self._api_request(f"playlists/{item_id}")
assert status == 200
unresolved_tracks = [
track["id"] for track in original_resp["tracks"] if "media" not in track
]
if len(unresolved_tracks) == 0:
return original_resp
MAX_BATCH_SIZE = 50
batches = batched(unresolved_tracks, MAX_BATCH_SIZE)
requests = [
self._api_request(
"tracks",
params={"ids": ",".join(str(id) for id in filter_none(batch))},
)
for batch in batches
]
# (list of track metadata, status code)
responses: list[tuple[list, int]] = await asyncio.gather(*requests)
assert all(status == 200 for _, status in responses)
remaining_tracks = list(itertools.chain(*[resp for resp, _ in responses]))
# Insert the new metadata into the original response
track_map: dict[str, dict] = {track["id"]: track for track in remaining_tracks}
for i, track in enumerate(original_resp["tracks"]):
if "media" in track: # track already has metadata
continue
this_track = track_map.get(track["id"])
if this_track is None:
raise Exception(f"Requested {track['id']} but got no response")
original_resp["tracks"][i] = this_track
# Overwrite all ids in playlist
for track in original_resp["tracks"]:
track["id"] = self._get_custom_id(track)
return original_resp
@classmethod
def _get_custom_id(cls, resp: dict) -> str:
item_id = resp["id"]
assert "media" in resp, f"track {resp} should be resolved"
if not resp["streamable"] or resp["policy"] == "BLOCK":
return f"{item_id}|{cls.NON_STREAMABLE}"
if resp["downloadable"] and resp["has_downloads_left"]:
return f"{item_id}|{cls.ORIGINAL_DOWNLOAD}"
url = None
for tc in resp["media"]["transcodings"]:
fmt = tc["format"]
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg":
url = tc["url"]
break
assert url is not None
return f"{item_id}|{url}"
async def get_downloadable(self, item_info: str, _) -> SoundcloudDownloadable:
# We have `get_metadata` overwrite the "id" field so that it contains
# some extra information we need to download soundcloud tracks
# item_id is the soundcloud ID of the track
# download_url is either the url that points to an mp3 download or ""
# if download_url == '_non_streamable' then we raise an exception
infos: list[str] = item_info.split("|")
logger.debug(f"{infos=}")
assert len(infos) == 2, infos
item_id, download_info = infos
assert re.match(r"\d+", item_id) is not None
if download_info == self.NON_STREAMABLE:
raise NonStreamable(item_info)
if download_info == self.ORIGINAL_DOWNLOAD:
resp_json, status = await self._api_request(f"tracks/{item_id}/download")
assert status == 200
return SoundcloudDownloadable(
self.session, {"url": resp_json["redirectUri"], "type": "original"}
)
if download_info == self.NOT_RESOLVED:
raise NotImplementedError(item_info)
# download_info contains mp3 stream url
resp_json, status = await self._request(download_info)
return SoundcloudDownloadable(
self.session, {"url": resp_json["url"], "type": "mp3"}
)
async def search(
self, query: str, media_type: str, limit: int = 50, offset: int = 0
):
params = {
"q": query,
"facet": "genre",
"user_id": SOUNDCLOUD_USER_ID,
"limit": limit,
"offset": offset,
"linked_partitioning": "1",
}
resp, status = await self._api_request(f"search/{media_type}s", params=params)
assert status == 200
return resp
async def _api_request(self, path, params=None, headers=None):
url = f"{BASE}/{path}"
return await self._request(url, params=params, headers=headers)
async def _request(self, url, params=None, headers=None) -> tuple[dict, int]:
c = self.config
_params = {
"client_id": c.client_id,
"app_version": c.app_version,
"app_locale": "en",
}
if params is not None:
_params.update(params)
logger.debug(f"Requesting {url} with {_params=}, {headers=}")
async with self.session.get(url, params=_params, headers=headers) as resp:
return await resp.json(), resp.status
async def _request_body(self, url, params=None, headers=None):
c = self.config
_params = {
"client_id": c.client_id,
"app_version": c.app_version,
"app_locale": "en",
}
if params is not None:
_params.update(params)
async with self.session.get(url, params=_params, headers=headers) as resp:
return await resp.content.read(), resp.status
async def _resolve_url(self, url: str) -> dict:
resp, status = await self._api_request("resolve", params={"url": url})
assert status == 200
return resp
async def _announce(self):
url = f"{BASE}/announcements"
_, status = await self._request_body(url)
return status == 200
async def _refresh_tokens(self) -> tuple[str, str]:
"""Return a valid client_id, app_version pair."""
STOCK_URL = "https://soundcloud.com/"
async with self.session.get(STOCK_URL) as resp:
page_text = await resp.text(encoding="utf-8")
*_, client_id_url_match = re.finditer(
r"<script\s+crossorigin\s+src=\"([^\"]+)\"", page_text
)
if client_id_url_match is None:
raise Exception("Could not find client ID in %s" % STOCK_URL)
client_id_url = client_id_url_match.group(1)
app_version_match = re.search(
r'<script>window\.__sc_version="(\d+)"</script>', page_text
)
if app_version_match is None:
raise Exception("Could not find app version in %s" % client_id_url_match)
app_version = app_version_match.group(1)
async with self.session.get(client_id_url) as resp:
page_text2 = await resp.text(encoding="utf-8")
client_id_match = re.search(r'client_id:\s*"(\w+)"', page_text2)
assert client_id_match is not None
client_id = client_id_match.group(1)
logger.debug(f"Refreshed soundcloud tokens as {client_id=} {app_version=}")
return client_id, app_version
def batched(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return list(itertools.zip_longest(*args, fillvalue=fillvalue))
def filter_none(iterable):
return (x for x in iterable if x is not None)