yt-dlp/yt_dlp/downloader/websocket.py

54 lines
1.8 KiB
Python
Raw Normal View History

import contextlib
import os
import signal
import threading
from .common import FileDownloader
from .external import FFmpegFD
from ..compat import asyncio
from ..dependencies import websockets
class FFmpegSinkFD(FileDownloader):
""" A sink to ffmpeg for downloading fragments in any form """
def real_download(self, filename, info_dict):
info_copy = info_dict.copy()
info_copy['url'] = '-'
async def call_conn(proc, stdin):
try:
await self.real_connection(stdin, info_dict)
except (BrokenPipeError, OSError):
pass
finally:
with contextlib.suppress(OSError):
stdin.flush()
stdin.close()
os.kill(os.getpid(), signal.SIGINT)
class FFmpegStdinFD(FFmpegFD):
@classmethod
def get_basename(cls):
return FFmpegFD.get_basename()
def on_process_started(self, proc, stdin):
thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), ))
thread.start()
return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy)
async def real_connection(self, sink, info_dict):
""" Override this in subclasses """
raise NotImplementedError('This method must be implemented by subclasses')
class WebSocketFragmentFD(FFmpegSinkFD):
async def real_connection(self, sink, info_dict):
async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws:
while True:
recv = await ws.recv()
if isinstance(recv, str):
recv = recv.encode('utf8')
sink.write(recv)