feat: overhauled downloader

This commit is contained in:
ThetaDev 2024-07-27 04:00:11 +02:00
parent fb7af3b966
commit 11a0038350
No known key found for this signature in database
GPG key ID: E319D3C5148D65B6
5 changed files with 839 additions and 337 deletions

View file

@ -1,19 +1,31 @@
#![warn(clippy::todo, clippy::dbg_macro)]
#![warn(missing_docs, clippy::todo, clippy::dbg_macro)]
//! # YouTube audio/video downloader
mod util;
use std::{borrow::Cow, cmp::Ordering, ffi::OsString, ops::Range, path::PathBuf, time::Duration};
use std::{
borrow::Cow,
cmp::Ordering,
ffi::OsString,
ops::Range,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use futures::stream::{self, StreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use once_cell::sync::Lazy;
use rand::Rng;
use regex::Regex;
use reqwest::{header, Client};
use reqwest::{header, Client, StatusCode};
use rustypipe::{
model::{traits::FileFormat, AudioCodec, VideoCodec, VideoPlayer},
client::{ClientType, RustyPipe},
model::{
traits::{FileFormat, YtEntity},
AudioCodec, VideoCodec, VideoPlayer,
},
param::StreamFilter,
};
use tokio::{
@ -21,7 +33,6 @@ use tokio::{
io::AsyncWriteExt,
process::Command,
};
use tracing::{debug, info};
use util::DownloadError;
@ -30,6 +41,569 @@ type Result<T> = core::result::Result<T, DownloadError>;
const CHUNK_SIZE_MIN: u64 = 9_000_000;
const CHUNK_SIZE_MAX: u64 = 10_000_000;
/// RustyPipe audio/video downloader
///
/// The downloader uses an [`Arc`] internally, so if you are using the client
/// at multiple locations, you can just clone it.
#[derive(Clone)]
pub struct Downloader {
i: Arc<DownloaderInner>,
}
/// Builder to construct a new downloader
pub struct DownloaderBuilder {
rp: Option<RustyPipe>,
ffmpeg: String,
multi: Option<MultiProgress>,
filter: StreamFilter,
video_format: DownloadVideoFormat,
n_retries: u32,
path_precheck: bool,
}
struct DownloaderInner {
/// YT client
rp: RustyPipe,
/// Path to the ffmpeg binary
ffmpeg: String,
/// Global progress
multi: Option<MultiProgress>,
/// Default stream filter
filter: StreamFilter,
/// Default video format
video_format: DownloadVideoFormat,
/// Number of retries in case of 403 error
n_retries: u32,
/// Check if destination path exists before player is fetched
path_precheck: bool,
}
/// Download query
pub struct DownloadQuery {
/// RustyPipe Downloader
dl: Downloader,
/// Video to download
video: DownloadVideo,
/// Destination
dest: DownloadDest,
/// Progress bar
multi: Option<MultiProgress>,
/// Stream filter
filter: Option<StreamFilter>,
/// Target video format
video_format: Option<DownloadVideoFormat>,
/// ClientType type for fetching videos
player_type: Option<ClientType>,
}
#[derive(Default)]
struct DownloadVideo {
id: String,
name: Option<String>,
channel_id: Option<String>,
channel_name: Option<String>,
}
impl DownloadVideo {
fn from_video(video: &impl YtEntity) -> Self {
DownloadVideo {
id: video.id().to_owned(),
name: Some(video.name().to_owned()),
channel_id: video.channel_id().map(str::to_owned),
channel_name: video
.channel_name()
.map(|n| n.strip_suffix(" - Topic").unwrap_or(n).to_owned()),
}
}
}
#[derive(Clone)]
enum DownloadDest {
Default,
File(PathBuf),
Dir(PathBuf),
Template(PathBuf),
}
fn video_filename(v: &DownloadVideo) -> String {
filenamify_lim(&format!(
"{} [{}]",
v.name.as_deref().unwrap_or_default(),
v.id
))
}
/// Video container format for downloading
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub enum DownloadVideoFormat {
/// .mp4
#[default]
Mp4,
/// .mkv
Mkv,
/// .webm
Webm,
}
impl DownloadVideoFormat {
/// Get the video format file extension
pub fn extension(&self) -> &'static str {
match self {
DownloadVideoFormat::Mp4 => "mp4",
DownloadVideoFormat::Mkv => "mkv",
DownloadVideoFormat::Webm => "webm",
}
}
/// Get the video format from the given file extension
pub fn from_extension(ext: &str) -> Option<Self> {
match ext {
"mp4" => Some(Self::Mp4),
"mkv" => Some(Self::Mkv),
"webm" => Some(Self::Webm),
_ => None,
}
}
}
impl DownloadDest {
fn get_dest_path(&self, v: &DownloadVideo) -> PathBuf {
match self {
DownloadDest::Default => PathBuf::from(video_filename(v)),
DownloadDest::File(p) => p.clone(),
DownloadDest::Dir(p) => p.join(video_filename(v)),
DownloadDest::Template(t) => t
.iter()
.map(|part| {
let s = part.to_string_lossy();
let mut s = s.replace("{id}", &v.id);
if let Some(name) = &v.name {
s = s.replace("{title}", name)
}
if let Some(channel) = &v.channel_name {
s = s.replace("{channel}", channel)
}
if let Some(id) = &v.channel_id {
s = s.replace("{channelId}", id);
}
filenamify_lim(&s)
})
.collect(),
}
}
}
impl Default for DownloaderBuilder {
fn default() -> Self {
Self {
rp: None,
ffmpeg: "ffmpeg".to_owned(),
multi: None,
filter: StreamFilter::new(),
video_format: DownloadVideoFormat::Mp4,
n_retries: 3,
path_precheck: false,
}
}
}
impl DownloaderBuilder {
/// Create a new [`DownloaderBuilder`]
///
/// This is the same as [`Downloader::builder`]
pub fn new() -> Self {
Self::default()
}
/// Use a custom [`RustyPipe`] client
#[must_use]
pub fn client(mut self, rp: &RustyPipe) -> Self {
self.rp = Some(rp.clone());
self
}
/// Set the path to ffmpeg, used to join video and audio files
///
/// The default system-wide `ffmpeg` binary is used by default.
#[must_use]
pub fn ffmpeg<S: Into<String>>(mut self, ffmpeg: S) -> Self {
self.ffmpeg = ffmpeg.into();
self
}
/// Set the indicatif [`MultiProgress`] used to show download progress
/// for all downloads
#[must_use]
pub fn progress_bar(mut self, progress: MultiProgress) -> Self {
self.multi = Some(progress);
self
}
/// Set the default [`StreamFilter`] for all downloads.
///
/// The filter can be overridden for individual download queries.
#[must_use]
pub fn stream_filter(mut self, filter: StreamFilter) -> Self {
self.filter = filter;
self
}
/// Set the [`VideoFormat`] of downloaded videos
#[must_use]
pub fn video_format(mut self, video_format: DownloadVideoFormat) -> Self {
self.video_format = video_format;
self
}
/// Set the number of retries in case a download fails with a 403 error
#[must_use]
pub fn n_retries(mut self, n_retries: u32) -> Self {
self.n_retries = n_retries;
self
}
/// Enable path precheck
///
/// The downloader will check if the destination path
/// (predicted from the entity to download and the StreamFilter) exists and
/// skips the download with [`DownloadError::Exists`] without fetching any player data.
///
/// This allows fast resumption of playlist downloads.
#[must_use]
pub fn path_precheck(mut self) -> Self {
self.path_precheck = true;
self
}
/// Create a new, configured [`Downloader`] instance
pub fn build(self) -> Downloader {
Downloader {
i: Arc::new(DownloaderInner {
rp: self.rp.unwrap_or_default(),
ffmpeg: self.ffmpeg,
multi: self.multi,
filter: self.filter,
video_format: self.video_format,
n_retries: self.n_retries,
path_precheck: self.path_precheck,
}),
}
}
}
impl Default for Downloader {
fn default() -> Self {
DownloaderBuilder::new().build()
}
}
impl Downloader {
/// Create a new [`Downloader`] using the given [`RustyPipe`] instance
pub fn new(rp: &RustyPipe) -> Self {
DownloaderBuilder::new().client(rp).build()
}
/// Create a new [`DownloaderBuilder`]
///
/// This is the same as [`DownloaderBuilder::new`]
pub fn builder() -> DownloaderBuilder {
DownloaderBuilder::default()
}
fn query(&self, video: DownloadVideo) -> DownloadQuery {
DownloadQuery {
dl: self.clone(),
video,
dest: DownloadDest::Default,
multi: None,
filter: None,
video_format: None,
player_type: None,
}
}
/// Download a video with the given ID
pub fn download_id<S: Into<String>>(&self, video_id: S) -> DownloadQuery {
self.query(DownloadVideo {
id: video_id.into(),
..Default::default()
})
}
/// Download a video from a [`YtEntity`] object (e.g. playlist/channel video)
///
/// Providing an entity has the advantage that the download path can be determined before the video
/// is fetched, so already downloaded videos get skipped right away.
pub fn download_entity(&self, video: &impl YtEntity) -> DownloadQuery {
self.query(DownloadVideo::from_video(video))
}
}
/// Output data from downloading a video
pub struct DownloadResult {
/// Download destination path
pub dest: PathBuf,
/// Fetched vvideo player data
pub player_data: VideoPlayer,
}
impl DownloadQuery {
/// Update the video format from the given path extension
///
/// The video format is not updated if it was already manually set
fn update_video_format(&mut self, path: &Path) {
if self.video_format.is_none() {
self.video_format = path
.extension()
.and_then(|ext| ext.to_str())
.and_then(DownloadVideoFormat::from_extension);
}
}
/// Download to the given file
///
/// Note that the file extension may be changed to fit the reuested video/audio format.
/// Refer to the [`DownloadResult`] to get the actual path after downloading.
pub fn to_file<P: Into<PathBuf>>(mut self, file: P) -> Self {
let file = file.into();
self.update_video_format(&file);
self.dest = DownloadDest::File(file);
self
}
/// Download to the given directory
///
/// The filename is created by this template: `{title} [{id}]`.
///
/// You can use a custom filename template using [`DownloadQuery::to_template`]
pub fn to_dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
self.dest = DownloadDest::Dir(dir.into());
self
}
/// Download to the given filename template
///
/// Templates are paths that may contain variables for video metadata.
///
/// ## Variables
/// - `{id}` Video ID
/// - `{title}` Video title
/// - `{channel}` Channel name
/// - `{channel_id}` Channel ID
///
/// Note that the file extension may be changed to fit the reuested video/audio format.
/// Refer to the [`DownloadResult`] to get the actual path after downloading.
pub fn to_template<P: Into<PathBuf>>(mut self, tmpl: P) -> Self {
let tmpl = tmpl.into();
self.update_video_format(&tmpl);
self.dest = DownloadDest::Template(tmpl);
self
}
/// Use a [`MultiProgress`] progress bar for all downloads
pub fn progress_bar(mut self, progress: MultiProgress) -> Self {
self.multi = Some(progress);
self
}
/// Set a [`StreamFilter`] for choosing a stream to be downloaded
pub fn stream_filter(mut self, filter: StreamFilter) -> Self {
self.filter = Some(filter);
self
}
/// Set the [`VideoFormat`] of downloaded videos
pub fn video_format(mut self, video_format: DownloadVideoFormat) -> Self {
self.video_format = Some(video_format);
self
}
/// Set the [`ClientType`] used to fetch the YT player
pub fn player_type(mut self, player_type: ClientType) -> Self {
self.player_type = Some(player_type);
self
}
/// Download the video
#[tracing::instrument(skip(self), fields(id = self.video.id))]
pub async fn download(&self) -> Result<DownloadResult> {
let mut last_err = None;
// Progress bar
let multi = self.multi.clone().or_else(|| self.dl.i.multi.clone());
let pb = multi.map(|m| {
let pb = ProgressBar::new(1);
pb.set_style(ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap()
.progress_chars("#>-"));
m.add(pb)
});
for n in 0..=self.dl.i.n_retries {
let err = match self.download_attempt(&pb, n).await {
Ok(res) => return Ok(res),
Err(DownloadError::Http(e)) => {
if e.status() != Some(StatusCode::FORBIDDEN) {
return Err(DownloadError::Http(e));
}
DownloadError::Http(e)
}
Err(e) => return Err(e),
};
if n != self.dl.i.n_retries {
tracing::warn!("Retry attempt #{}. Error: {}", n + 1, err);
tokio::time::sleep(Duration::from_secs(1)).await;
}
last_err = Some(err);
}
Err(last_err.unwrap())
}
async fn download_attempt(&self, pb: &Option<ProgressBar>, n: u32) -> Result<DownloadResult> {
let filter = self.filter.as_ref().unwrap_or(&self.dl.i.filter);
let video_format = self.video_format.unwrap_or(self.dl.i.video_format);
// Check if already downloaded
if self.video.name.is_some() && self.dl.i.path_precheck {
let op = self.dest.get_dest_path(&self.video);
if filter.is_video_none() {
for ext in ["m4a", "opus"] {
let p = op.with_extension(ext);
if p.is_file() {
return Err(DownloadError::Exists(p));
}
}
} else {
let p = op.with_extension(video_format.extension());
if p.is_file() {
return Err(DownloadError::Exists(p));
}
}
}
let attempt_suffix = if n > 0 {
format!(" (retry #{n})")
} else {
String::new()
};
if let Some(pb) = pb {
pb.set_message(format!(
"Fetching player data for {}{}",
self.video.name.as_deref().unwrap_or_default(),
attempt_suffix
))
}
let q = self.dl.i.rp.query();
let player_data = match self.player_type {
Some(player_type) => q.player_from_client(&self.video.id, player_type).await?,
None => q.player(&self.video.id).await?,
};
let user_agent = q.user_agent(player_data.client_type);
// Select streams to download
let (video, audio) = player_data.select_video_audio_stream(filter);
if video.is_none() && audio.is_none() {
return Err(DownloadError::Input("no stream found".into()));
}
let extension = match video {
Some(_) => video_format.extension(),
None => match audio {
Some(audio) => match audio.codec {
AudioCodec::Mp4a => "m4a",
AudioCodec::Opus => "opus",
_ => return Err(DownloadError::Input("unknown audio codec".into())),
},
None => unreachable!(),
},
};
let pv = DownloadVideo::from_video(&player_data);
let output_path = self.dest.get_dest_path(&pv).with_extension(extension);
if output_path.exists() {
return Err(DownloadError::Exists(output_path));
}
if let Some(parent) = output_path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut downloads: Vec<StreamDownload> = Vec::new();
if let Some(v) = video {
downloads.push(StreamDownload {
file: output_path.with_extension(format!("video{}", v.format.extension())),
url: v.url.clone(),
video_codec: Some(v.codec),
audio_codec: None,
});
}
if let Some(a) = audio {
downloads.push(StreamDownload {
file: output_path.with_extension(format!("audio{}", a.format.extension())),
url: a.url.clone(),
video_codec: None,
audio_codec: Some(a.codec),
});
}
if let Some(pb) = pb {
pb.set_message(format!(
"Downloading {}{}",
player_data.name(),
attempt_suffix
))
}
download_streams(
&downloads,
self.dl.i.rp.http_client(),
&user_agent,
pb.clone(),
)
.await?;
if let Some(pb) = &pb {
pb.set_message(format!("Converting {}", player_data.name()));
pb.set_style(
ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}]")
.unwrap(),
);
pb.enable_steady_tick(Duration::from_millis(500));
}
convert_streams(
&downloads,
&output_path,
&self.dl.i.ffmpeg,
player_data.name(),
)
.await?;
if let Some(pb) = pb {
pb.disable_steady_tick();
}
// Delete original files
stream::iter(&downloads)
.map(|d| fs::remove_file(d.file.clone()))
.buffer_unordered(downloads.len())
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<core::result::Result<_, _>>()?;
if let Some(pb) = pb {
pb.finish_and_clear();
}
Ok(DownloadResult {
dest: output_path,
player_data,
})
}
}
fn get_download_range(offset: u64, size: Option<u64>) -> Range<u64> {
let mut rng = rand::thread_rng();
let chunk_size = rng.gen_range(CHUNK_SIZE_MIN..CHUNK_SIZE_MAX);
@ -64,11 +638,26 @@ fn parse_cr_header(cr_header: &str) -> Result<(u64, u64)> {
))
}
fn filenamify_lim(name: &str) -> String {
let lim = 200;
let n = filenamify::filenamify(name);
if n.len() > lim {
n.char_indices()
.take_while(|(i, _)| i < &lim)
.map(|(_, c)| c)
.collect::<String>()
} else {
n
}
}
async fn download_single_file<P: Into<PathBuf>>(
url: &str,
output: P,
http: Client,
pb: ProgressBar,
http: &Client,
user_agent: &str,
pb: Option<ProgressBar>,
) -> Result<()> {
// Check if file is already downloaded
let output_path: PathBuf = output.into();
@ -99,6 +688,7 @@ async fn download_single_file<P: Into<PathBuf>>(
let res = http
.head(url.to_owned())
.header(header::USER_AGENT, user_agent)
.header(header::RANGE, "bytes=0-0")
.send()
.await?
@ -125,8 +715,10 @@ async fn download_single_file<P: Into<PathBuf>>(
size = Some(original_size);
offset = file_size;
pb.inc_length(original_size);
pb.inc(offset);
if let Some(pb) = &pb {
pb.inc_length(original_size);
pb.inc(offset);
}
}
Ordering::Equal => {
// Already downloaded
@ -153,9 +745,10 @@ async fn download_single_file<P: Into<PathBuf>>(
.await?;
if is_gvideo && size.is_some() {
download_chunks_by_param(http, &mut file, url, size.unwrap(), offset, pb).await?;
download_chunks_by_param(http, &mut file, url, size.unwrap(), offset, user_agent, pb)
.await?;
} else {
download_chunks_by_header(http, &mut file, url, size, offset, pb).await?;
download_chunks_by_header(http, &mut file, url, size, offset, user_agent, pb).await?;
}
fs::rename(&output_path_tmp, &output_path).await?;
@ -166,22 +759,24 @@ async fn download_single_file<P: Into<PathBuf>>(
// This is the standardized method that works on all web servers,
// but I have observed throttling using this method.
async fn download_chunks_by_header(
http: Client,
http: &Client,
file: &mut File,
url: &str,
size: Option<u64>,
offset: u64,
pb: ProgressBar,
user_agent: &str,
pb: Option<ProgressBar>,
) -> Result<()> {
let mut offset = offset;
let mut size = size;
loop {
let range = get_download_range(offset, size);
debug!("Fetching range {}-{}", range.start, range.end);
tracing::debug!("Fetching range {}-{}", range.start, range.end);
let res = http
.get(url.to_owned())
.header(header::USER_AGENT, user_agent)
.header(header::ORIGIN, "https://www.youtube.com")
.header(header::REFERER, "https://www.youtube.com/")
.header(
@ -211,15 +806,19 @@ async fn download_chunks_by_header(
offset = parsed_offset + 1;
if size.is_none() {
size = Some(parsed_size);
pb.inc_length(parsed_size);
if let Some(pb) = &pb {
pb.inc_length(parsed_size);
}
}
debug!("Retrieving chunks...");
tracing::debug!("Retrieving chunks...");
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
// Retrieve chunk.
let mut chunk = item?;
pb.inc(chunk.len() as u64);
if let Some(pb) = &pb {
pb.inc(chunk.len() as u64);
}
file.write_all_buf(&mut chunk).await?;
}
@ -234,22 +833,26 @@ async fn download_chunks_by_header(
// This ist used by YouTube's web player. The file size
// must be known beforehand (it is included in the stream url).
async fn download_chunks_by_param(
http: Client,
http: &Client,
file: &mut File,
url: &str,
size: u64,
offset: u64,
pb: ProgressBar,
user_agent: &str,
pb: Option<ProgressBar>,
) -> Result<()> {
let mut offset = offset;
pb.inc_length(size);
if let Some(pb) = &pb {
pb.inc_length(size);
}
loop {
let range = get_download_range(offset, Some(size));
debug!("Fetching range {}-{}", range.start, range.end);
tracing::debug!("Fetching range {}-{}", range.start, range.end);
let res = http
.get(format!("{}&range={}-{}", url, range.start, range.end))
.header(header::USER_AGENT, user_agent)
.header(header::ORIGIN, "https://www.youtube.com")
.header(header::REFERER, "https://www.youtube.com/")
.send()
@ -258,17 +861,19 @@ async fn download_chunks_by_param(
let clen = res.content_length().unwrap();
debug!("Retrieving chunks...");
tracing::debug!("Retrieving chunks...");
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
// Retrieve chunk.
let mut chunk = item?;
pb.inc(chunk.len() as u64);
if let Some(pb) = &pb {
pb.inc(chunk.len() as u64);
}
file.write_all_buf(&mut chunk).await?;
}
offset += clen;
debug!("offset inc by {}, new: {}", clen, offset);
tracing::debug!("offset inc by {}, new: {}", clen, offset);
if offset >= size {
break;
}
@ -279,146 +884,21 @@ async fn download_chunks_by_param(
#[allow(dead_code)]
struct StreamDownload {
file: PathBuf,
// track_name: String TODO: add for multiple audio languages,
url: String,
audio_codec: Option<AudioCodec>,
video_codec: Option<VideoCodec>,
}
#[allow(clippy::too_many_arguments)]
pub async fn download_video(
player_data: &VideoPlayer,
output_dir: &str,
output_fname: Option<String>,
output_format: Option<String>,
filter: &StreamFilter<'_>,
ffmpeg: &str,
http: Client,
pb: ProgressBar,
) -> Result<()> {
// Download filepath
let download_dir = PathBuf::from(output_dir);
let title = player_data.details.name.clone();
let output_fname_set = output_fname.is_some();
let output_fname = output_fname.unwrap_or_else(|| {
filenamify::filenamify(format!("{} [{}]", title, player_data.details.id))
});
// Select streams to download
let (video, audio) = player_data.select_video_audio_stream(filter);
if video.is_none() && audio.is_none() {
return Err(DownloadError::Input("no stream found".into()));
}
let format = output_format.unwrap_or(
match video {
Some(_) => "mp4",
None => match audio {
Some(audio) => match audio.codec {
AudioCodec::Mp4a => "m4a",
AudioCodec::Opus => "opus",
_ => return Err(DownloadError::Input("unknown audio codec".into())),
},
None => unreachable!(),
},
}
.to_owned(),
);
let output_path = download_dir.join(&output_fname).with_extension(&format);
if output_path.exists() {
// If the downloaded video already exists, only error if the download path was
// chosen explicitly.
if output_fname_set {
return Err(DownloadError::Input(
format!("File {} already exists", output_path.to_string_lossy()).into(),
))?;
}
info!(
"Downloaded video {} already exists",
output_path.to_string_lossy()
);
return Ok(());
}
match (video, audio) {
// Downloading combined video/audio stream (no conversion)
(Some(video), None) => {
pb.set_message(format!("Downloading {title}"));
download_single_file(
&video.url,
download_dir.join(output_fname).with_extension(&format),
http,
pb.clone(),
)
.await?;
}
// Downloading split video/audio streams (requires conversion with ffmpeg)
_ => {
let mut downloads: Vec<StreamDownload> = Vec::new();
if let Some(v) = video {
downloads.push(StreamDownload {
file: download_dir.join(format!(
"{}.video{}",
output_fname,
v.format.extension()
)),
url: v.url.clone(),
video_codec: Some(v.codec),
audio_codec: None,
});
}
if let Some(a) = audio {
downloads.push(StreamDownload {
file: download_dir.join(format!(
"{}.audio{}",
output_fname,
a.format.extension()
)),
url: a.url.clone(),
video_codec: None,
audio_codec: Some(a.codec),
});
}
pb.set_message(format!("Downloading {title}"));
download_streams(&downloads, http, pb.clone()).await?;
pb.set_message(format!("Converting {title}"));
pb.set_style(
ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}]")
.unwrap(),
);
pb.enable_steady_tick(Duration::from_millis(100));
convert_streams(&downloads, output_path, ffmpeg).await?;
pb.disable_steady_tick();
// Delete original files
stream::iter(&downloads)
.map(|d| fs::remove_file(d.file.clone()))
.buffer_unordered(downloads.len())
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<core::result::Result<_, _>>()?;
}
}
pb.finish_and_clear();
Ok(())
}
async fn download_streams(
downloads: &Vec<StreamDownload>,
http: Client,
pb: ProgressBar,
http: &Client,
user_agent: &str,
pb: Option<ProgressBar>,
) -> Result<()> {
let n = downloads.len();
stream::iter(downloads)
.map(|d| download_single_file(&d.url, d.file.clone(), http.clone(), pb.clone()))
.map(|d| download_single_file(&d.url, d.file.clone(), http, user_agent, pb.clone()))
.buffer_unordered(n)
.collect::<Vec<_>>()
.await
@ -432,6 +912,7 @@ async fn convert_streams<P: Into<PathBuf>>(
downloads: &[StreamDownload],
output: P,
ffmpeg: &str,
title: &str,
) -> Result<()> {
let output_path: PathBuf = output.into();
@ -451,6 +932,9 @@ async fn convert_streams<P: Into<PathBuf>>(
args.push("-c".into());
args.push("copy".into());
args.push("-metadata".into());
args.push(format!("title={title}").into());
args.push(output_path.into());
let res = Command::new(ffmpeg).args(args).output().await?;

View file

@ -1,4 +1,4 @@
use std::{borrow::Cow, collections::BTreeMap};
use std::{borrow::Cow, collections::BTreeMap, path::PathBuf};
use reqwest::Url;
@ -6,18 +6,28 @@ use reqwest::Url;
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum DownloadError {
/// RustyPipe error
#[error("{0}")]
RustyPipe(#[from] rustypipe::error::Error),
/// Error from the HTTP client
#[error("http error: {0}")]
Http(#[from] reqwest::Error),
/// File IO error
#[error(transparent)]
Io(#[from] std::io::Error),
/// FFmpeg returned an error
#[error("FFmpeg error: {0}")]
Ffmpeg(Cow<'static, str>),
/// Error parsing ranges for progressive download
#[error("Progressive download error: {0}")]
Progressive(Cow<'static, str>),
/// Video could not be downloaded because of invalid player data
#[error("input error: {0}")]
Input(Cow<'static, str>),
/// Download target already exists
#[error("file {0} already exists")]
Exists(PathBuf),
/// Other error
#[error("error: {0}")]
Other(Cow<'static, str>),
}