diff --git a/Cargo.toml b/Cargo.toml index 2eac990..a5257ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ tracing = { version = "0.1.37", features = ["log"] } indicatif = "0.17.0" anyhow = "1.0" clap = { version = "4.0.29", features = ["derive"] } -tracing-subscriber = "0.3.17" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } serde_yaml = "0.9.19" dirs = "5.0.0" filenamify = "0.1.0" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index c5021aa..a4cfc9d 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -52,6 +52,7 @@ serde_json.workspace = true indicatif.workspace = true anyhow.workspace = true clap.workspace = true +tracing.workspace = true tracing-subscriber.workspace = true serde_yaml.workspace = true dirs.workspace = true diff --git a/cli/src/main.rs b/cli/src/main.rs index 138e915..cd5c6fe 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,18 +1,19 @@ #![warn(clippy::todo, clippy::dbg_macro)] -use std::{path::PathBuf, str::FromStr, time::Duration}; +use std::{path::PathBuf, str::FromStr}; -use anyhow::{Context, Result}; use clap::{Parser, Subcommand, ValueEnum}; use futures::stream::{self, StreamExt}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use reqwest::{Client, ClientBuilder}; use rustypipe::{ client::{ClientType, RustyPipe}, model::{UrlTarget, VideoId, YouTubeItem}, param::{search_filter, ChannelVideoTab, Country, Language, StreamFilter}, }; +use rustypipe_downloader::{DownloadQuery, DownloaderBuilder}; use serde::Serialize; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::{fmt::MakeWriter, EnvFilter}; #[derive(Parser)] #[clap(author, version, about, long_about = None)] @@ -33,6 +34,41 @@ struct Cli { country: Option, } +#[derive(Parser)] +#[group(multiple = false)] +struct DownloadTarget { + #[clap(short, long)] + output: Option, + #[clap(long)] + output_file: Option, + #[clap(long)] + template: Option, +} + +impl DownloadTarget { + fn assert_dir(&self) { + if self.output_file.is_some() { + panic!("Cannot download multiple videos to a single file") + } else if let Some(template) = &self.template { + if !template.contains("{id}") && !template.contains("{title}") { + panic!("Template must contain {{id}} or {{title}} variables") + } + } + } + + fn apply(&self, q: DownloadQuery) -> DownloadQuery { + if let Some(output_file) = &self.output_file { + q.to_file(output_file) + } else if let Some(output) = &self.output { + q.to_dir(output) + } else if let Some(template) = &self.template { + q.to_template(template) + } else { + q + } + } +} + #[derive(Subcommand)] enum Commands { /// Download a video, playlist, album or channel @@ -40,18 +76,22 @@ enum Commands { Download { /// ID or URL id: String, - /// Output path - #[clap(short, default_value = ".")] - output: PathBuf, + #[clap(flatten)] + target: DownloadTarget, /// Video resolution (e.g. 720, 1080). Set to 0 for audio-only. #[clap(short, long)] resolution: Option, /// Number of videos downloaded in parallel #[clap(short, long, default_value_t = 8)] parallel: usize, + /// Use YouTube Music for downloading playlists + #[clap(long)] + music: bool, /// Limit the number of videos to download #[clap(long, default_value_t = 1000)] limit: usize, + #[clap(long)] + player_type: Option, }, /// Extract video, playlist, album or channel data Get { @@ -116,6 +156,7 @@ enum Commands { #[clap(long)] music: Option, }, + Vdata, } #[derive(Copy, Clone, ValueEnum)] @@ -252,64 +293,6 @@ impl From for ClientType { } } -#[allow(clippy::too_many_arguments)] -async fn download_single_video( - video_id: &str, - video_title: &str, - output_dir: &str, - output_fname: Option, - resolution: Option, - ffmpeg: &str, - rp: &RustyPipe, - http: Client, - multi: MultiProgress, - main: Option, -) -> Result<()> { - let pb = multi.add(ProgressBar::new(1)); - pb.set_style(ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap() - .progress_chars("#>-")); - pb.set_message(format!("Fetching player data for {video_title}")); - - let res = async { - let player_data = rp - .query() - .player(video_id) - .await - .context(format!("Failed to fetch player data for video {video_id}"))?; - - let mut filter = StreamFilter::new(); - if let Some(res) = resolution { - if res == 0 { - filter = filter.no_video(); - } else { - filter = filter.video_max_res(res); - } - } - - rustypipe_downloader::download_video( - &player_data, - output_dir, - output_fname, - None, - &filter, - ffmpeg, - http, - pb, - ) - .await - .context(format!( - "Failed to download video '{}' [{}]", - player_data.details.name, video_id - )) - } - .await; - - if let Some(main) = main { - main.inc(1); - } - res -} - fn print_data(data: &T, format: Format, pretty: bool) { let stdout = std::io::stdout().lock(); match format { @@ -327,55 +310,59 @@ fn print_data(data: &T, format: Format, pretty: bool) { async fn download_video( rp: &RustyPipe, id: &str, - output_dir: &str, - output_fname: Option, + target: &DownloadTarget, resolution: Option, + player_type: Option, + multi: MultiProgress, ) { - let http = ClientBuilder::new() - .user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; rv:107.0) Gecko/20100101 Firefox/107.0") - .gzip(true) - .brotli(true) - .timeout(Duration::from_secs(10)) - .build() - .expect("unable to build the HTTP client"); - - // Indicatif setup - let multi = MultiProgress::new(); - - download_single_video( - id, - id, - output_dir, - output_fname, - resolution, - "ffmpeg", - rp, - http, - multi, - None, - ) - .await - .unwrap_or_else(|e| println!("ERROR: {e:?}")); + let mut filter = StreamFilter::new(); + if let Some(res) = resolution { + if res == 0 { + filter = filter.no_video(); + } else { + filter = filter.video_max_res(res); + } + } + let dl = DownloaderBuilder::new() + .client(rp) + .stream_filter(filter) + .progress_bar(multi) + .build(); + let mut q = target.apply(dl.download_id(id)); + if let Some(player_type) = player_type { + q = q.player_type(player_type.into()); + } + let res = q.download().await; + if let Err(e) = res { + tracing::error!("{e}") + } } async fn download_videos( rp: &RustyPipe, videos: &[VideoId], - output_dir: &str, - output_fname: Option, + target: &DownloadTarget, resolution: Option, parallel: usize, + player_type: Option, + multi: MultiProgress, ) { - let http = ClientBuilder::new() - .user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; rv:107.0) Gecko/20100101 Firefox/107.0") - .gzip(true) - .brotli(true) - .timeout(Duration::from_secs(10)) - .build() - .expect("unable to build the HTTP client"); + let mut filter = StreamFilter::new(); + if let Some(res) = resolution { + if res == 0 { + filter = filter.no_video(); + } else { + filter = filter.video_max_res(res); + } + } + let dl = DownloaderBuilder::new() + .client(rp) + .stream_filter(filter) + .progress_bar(multi.clone()) + .path_precheck() + .build(); // Indicatif setup - let multi = MultiProgress::new(); let main = multi.add(ProgressBar::new( videos.len().try_into().unwrap_or_default(), )); @@ -389,38 +376,61 @@ async fn download_videos( main.tick(); stream::iter(videos) - .map(|video| { - download_single_video( - &video.id, - &video.name, - output_dir, - output_fname.clone(), - resolution, - "ffmpeg", - rp, - http.clone(), - multi.clone(), - Some(main.clone()), - ) - }) - .buffer_unordered(parallel) - .collect::>() - .await - .into_iter() - .for_each(|res| match res { - Ok(_) => {} - Err(e) => { - println!("ERROR: {e:?}"); + .for_each_concurrent(parallel, |video| { + let dl = dl.clone(); + let main = main.clone(); + + let mut q = target.apply(dl.download_entity(video)); + if let Some(player_type) = player_type { + q = q.player_type(player_type.into()); } - }); + + async move { + if let Err(e) = q.download().await { + tracing::error!("{e:?}"); + } else { + main.inc(1); + } + } + }) + .await; +} + +/// Stderr writer that suspends the progress bars before printing logs +#[derive(Clone)] +struct ProgWriter(MultiProgress); + +impl<'a> MakeWriter<'a> for ProgWriter { + type Writer = ProgWriter; + + fn make_writer(&'a self) -> Self::Writer { + self.clone() + } +} + +impl std::io::Write for ProgWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.suspend(|| std::io::stderr().write(buf)) + } + + fn flush(&mut self) -> std::io::Result<()> { + std::io::stderr().flush() + } } #[tokio::main] async fn main() { - // env_logger::builder().format_timestamp_micros().init(); - tracing_subscriber::fmt::init(); - let cli = Cli::parse(); + let multi = MultiProgress::new(); + + tracing_subscriber::fmt::SubscriberBuilder::default() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .with_writer(ProgWriter(multi.clone())) + .init(); let mut rp = RustyPipe::builder().visitor_data_opt(cli.vdata); if cli.report { @@ -442,48 +452,20 @@ async fn main() { match cli.command { Commands::Download { id, - output, + target, resolution, parallel, + music, limit, + player_type, } => { - // Cases: Existing folder, non-existing file with existing parent folder, - // Error cases: non-existing parent folder, existing file - let output_path = std::fs::canonicalize(output).unwrap(); - if output_path.is_file() { - println!("Output file already exists"); - return; - } - let (output_dir, output_fname) = if output_path.is_dir() { - (output_path.to_string_lossy().to_string(), None) - } else { - let output_dir_parent = output_path.parent().unwrap(); - if !output_dir_parent.is_dir() { - println!( - "Parent folder {} does not exist", - output_dir_parent.to_string_lossy() - ); - return; - } - - ( - output_dir_parent.to_string_lossy().to_string(), - Some( - output_path - .file_name() - .unwrap() - .to_string_lossy() - .to_string(), - ), - ) - }; - - let target = rp.query().resolve_string(&id, false).await.unwrap(); - match target { + let url_target = rp.query().resolve_string(&id, false).await.unwrap(); + match url_target { UrlTarget::Video { id, .. } => { - download_video(&rp, &id, &output_dir, output_fname, resolution).await; + download_video(&rp, &id, &target, resolution, player_type, multi).await; } UrlTarget::Channel { id } => { + target.assert_dir(); let mut channel = rp.query().channel_videos(id).await.unwrap(); channel .content @@ -500,38 +482,58 @@ async fn main() { download_videos( &rp, &videos, - &output_dir, - output_fname, + &target, resolution, parallel, + player_type, + multi, ) .await; } UrlTarget::Playlist { id } => { - let mut playlist = rp.query().playlist(id).await.unwrap(); - playlist - .videos - .extend_limit(&rp.query(), limit) - .await - .unwrap(); - let videos: Vec = playlist - .videos - .items - .into_iter() - .take(limit) - .map(VideoId::from) - .collect(); + target.assert_dir(); + let videos: Vec = if music { + let mut playlist = rp.query().music_playlist(id).await.unwrap(); + playlist + .tracks + .extend_limit(&rp.query(), limit) + .await + .unwrap(); + playlist + .tracks + .items + .into_iter() + .take(limit) + .map(VideoId::from) + .collect() + } else { + let mut playlist = rp.query().playlist(id).await.unwrap(); + playlist + .videos + .extend_limit(&rp.query(), limit) + .await + .unwrap(); + playlist + .videos + .items + .into_iter() + .take(limit) + .map(VideoId::from) + .collect() + }; download_videos( &rp, &videos, - &output_dir, - output_fname, + &target, resolution, parallel, + player_type, + multi, ) .await; } UrlTarget::Album { id } => { + target.assert_dir(); let album = rp.query().music_album(id).await.unwrap(); let videos: Vec = album .tracks @@ -542,10 +544,11 @@ async fn main() { download_videos( &rp, &videos, - &output_dir, - output_fname, + &target, resolution, parallel, + player_type, + multi, ) .await; } @@ -740,5 +743,9 @@ async fn main() { print_data(&res, format, pretty); } }, + Commands::Vdata => { + let vd = rp.query().get_visitor_data().await.unwrap(); + println!("{vd}"); + } }; } diff --git a/downloader/src/lib.rs b/downloader/src/lib.rs index a706c3c..0429ef2 100644 --- a/downloader/src/lib.rs +++ b/downloader/src/lib.rs @@ -1,19 +1,31 @@ -#![warn(clippy::todo, clippy::dbg_macro)] +#![warn(missing_docs, clippy::todo, clippy::dbg_macro)] //! # YouTube audio/video downloader mod util; -use std::{borrow::Cow, cmp::Ordering, ffi::OsString, ops::Range, path::PathBuf, time::Duration}; +use std::{ + borrow::Cow, + cmp::Ordering, + ffi::OsString, + ops::Range, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use futures::stream::{self, StreamExt}; -use indicatif::{ProgressBar, ProgressStyle}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use once_cell::sync::Lazy; use rand::Rng; use regex::Regex; -use reqwest::{header, Client}; +use reqwest::{header, Client, StatusCode}; use rustypipe::{ - model::{traits::FileFormat, AudioCodec, VideoCodec, VideoPlayer}, + client::{ClientType, RustyPipe}, + model::{ + traits::{FileFormat, YtEntity}, + AudioCodec, VideoCodec, VideoPlayer, + }, param::StreamFilter, }; use tokio::{ @@ -21,7 +33,6 @@ use tokio::{ io::AsyncWriteExt, process::Command, }; -use tracing::{debug, info}; use util::DownloadError; @@ -30,6 +41,569 @@ type Result = core::result::Result; const CHUNK_SIZE_MIN: u64 = 9_000_000; const CHUNK_SIZE_MAX: u64 = 10_000_000; +/// RustyPipe audio/video downloader +/// +/// The downloader uses an [`Arc`] internally, so if you are using the client +/// at multiple locations, you can just clone it. +#[derive(Clone)] +pub struct Downloader { + i: Arc, +} + +/// Builder to construct a new downloader +pub struct DownloaderBuilder { + rp: Option, + ffmpeg: String, + multi: Option, + filter: StreamFilter, + video_format: DownloadVideoFormat, + n_retries: u32, + path_precheck: bool, +} + +struct DownloaderInner { + /// YT client + rp: RustyPipe, + /// Path to the ffmpeg binary + ffmpeg: String, + /// Global progress + multi: Option, + /// Default stream filter + filter: StreamFilter, + /// Default video format + video_format: DownloadVideoFormat, + /// Number of retries in case of 403 error + n_retries: u32, + /// Check if destination path exists before player is fetched + path_precheck: bool, +} + +/// Download query +pub struct DownloadQuery { + /// RustyPipe Downloader + dl: Downloader, + /// Video to download + video: DownloadVideo, + /// Destination + dest: DownloadDest, + /// Progress bar + multi: Option, + /// Stream filter + filter: Option, + /// Target video format + video_format: Option, + /// ClientType type for fetching videos + player_type: Option, +} + +#[derive(Default)] +struct DownloadVideo { + id: String, + name: Option, + channel_id: Option, + channel_name: Option, +} + +impl DownloadVideo { + fn from_video(video: &impl YtEntity) -> Self { + DownloadVideo { + id: video.id().to_owned(), + name: Some(video.name().to_owned()), + channel_id: video.channel_id().map(str::to_owned), + channel_name: video + .channel_name() + .map(|n| n.strip_suffix(" - Topic").unwrap_or(n).to_owned()), + } + } +} + +#[derive(Clone)] +enum DownloadDest { + Default, + File(PathBuf), + Dir(PathBuf), + Template(PathBuf), +} + +fn video_filename(v: &DownloadVideo) -> String { + filenamify_lim(&format!( + "{} [{}]", + v.name.as_deref().unwrap_or_default(), + v.id + )) +} + +/// Video container format for downloading +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] +pub enum DownloadVideoFormat { + /// .mp4 + #[default] + Mp4, + /// .mkv + Mkv, + /// .webm + Webm, +} + +impl DownloadVideoFormat { + /// Get the video format file extension + pub fn extension(&self) -> &'static str { + match self { + DownloadVideoFormat::Mp4 => "mp4", + DownloadVideoFormat::Mkv => "mkv", + DownloadVideoFormat::Webm => "webm", + } + } + + /// Get the video format from the given file extension + pub fn from_extension(ext: &str) -> Option { + match ext { + "mp4" => Some(Self::Mp4), + "mkv" => Some(Self::Mkv), + "webm" => Some(Self::Webm), + _ => None, + } + } +} + +impl DownloadDest { + fn get_dest_path(&self, v: &DownloadVideo) -> PathBuf { + match self { + DownloadDest::Default => PathBuf::from(video_filename(v)), + DownloadDest::File(p) => p.clone(), + DownloadDest::Dir(p) => p.join(video_filename(v)), + DownloadDest::Template(t) => t + .iter() + .map(|part| { + let s = part.to_string_lossy(); + let mut s = s.replace("{id}", &v.id); + if let Some(name) = &v.name { + s = s.replace("{title}", name) + } + if let Some(channel) = &v.channel_name { + s = s.replace("{channel}", channel) + } + if let Some(id) = &v.channel_id { + s = s.replace("{channelId}", id); + } + filenamify_lim(&s) + }) + .collect(), + } + } +} + +impl Default for DownloaderBuilder { + fn default() -> Self { + Self { + rp: None, + ffmpeg: "ffmpeg".to_owned(), + multi: None, + filter: StreamFilter::new(), + video_format: DownloadVideoFormat::Mp4, + n_retries: 3, + path_precheck: false, + } + } +} + +impl DownloaderBuilder { + /// Create a new [`DownloaderBuilder`] + /// + /// This is the same as [`Downloader::builder`] + pub fn new() -> Self { + Self::default() + } + + /// Use a custom [`RustyPipe`] client + #[must_use] + pub fn client(mut self, rp: &RustyPipe) -> Self { + self.rp = Some(rp.clone()); + self + } + + /// Set the path to ffmpeg, used to join video and audio files + /// + /// The default system-wide `ffmpeg` binary is used by default. + #[must_use] + pub fn ffmpeg>(mut self, ffmpeg: S) -> Self { + self.ffmpeg = ffmpeg.into(); + self + } + + /// Set the indicatif [`MultiProgress`] used to show download progress + /// for all downloads + #[must_use] + pub fn progress_bar(mut self, progress: MultiProgress) -> Self { + self.multi = Some(progress); + self + } + + /// Set the default [`StreamFilter`] for all downloads. + /// + /// The filter can be overridden for individual download queries. + #[must_use] + pub fn stream_filter(mut self, filter: StreamFilter) -> Self { + self.filter = filter; + self + } + + /// Set the [`VideoFormat`] of downloaded videos + #[must_use] + pub fn video_format(mut self, video_format: DownloadVideoFormat) -> Self { + self.video_format = video_format; + self + } + + /// Set the number of retries in case a download fails with a 403 error + #[must_use] + pub fn n_retries(mut self, n_retries: u32) -> Self { + self.n_retries = n_retries; + self + } + + /// Enable path precheck + /// + /// The downloader will check if the destination path + /// (predicted from the entity to download and the StreamFilter) exists and + /// skips the download with [`DownloadError::Exists`] without fetching any player data. + /// + /// This allows fast resumption of playlist downloads. + #[must_use] + pub fn path_precheck(mut self) -> Self { + self.path_precheck = true; + self + } + + /// Create a new, configured [`Downloader`] instance + pub fn build(self) -> Downloader { + Downloader { + i: Arc::new(DownloaderInner { + rp: self.rp.unwrap_or_default(), + ffmpeg: self.ffmpeg, + multi: self.multi, + filter: self.filter, + video_format: self.video_format, + n_retries: self.n_retries, + path_precheck: self.path_precheck, + }), + } + } +} + +impl Default for Downloader { + fn default() -> Self { + DownloaderBuilder::new().build() + } +} + +impl Downloader { + /// Create a new [`Downloader`] using the given [`RustyPipe`] instance + pub fn new(rp: &RustyPipe) -> Self { + DownloaderBuilder::new().client(rp).build() + } + + /// Create a new [`DownloaderBuilder`] + /// + /// This is the same as [`DownloaderBuilder::new`] + pub fn builder() -> DownloaderBuilder { + DownloaderBuilder::default() + } + + fn query(&self, video: DownloadVideo) -> DownloadQuery { + DownloadQuery { + dl: self.clone(), + video, + dest: DownloadDest::Default, + multi: None, + filter: None, + video_format: None, + player_type: None, + } + } + + /// Download a video with the given ID + pub fn download_id>(&self, video_id: S) -> DownloadQuery { + self.query(DownloadVideo { + id: video_id.into(), + ..Default::default() + }) + } + + /// Download a video from a [`YtEntity`] object (e.g. playlist/channel video) + /// + /// Providing an entity has the advantage that the download path can be determined before the video + /// is fetched, so already downloaded videos get skipped right away. + pub fn download_entity(&self, video: &impl YtEntity) -> DownloadQuery { + self.query(DownloadVideo::from_video(video)) + } +} + +/// Output data from downloading a video +pub struct DownloadResult { + /// Download destination path + pub dest: PathBuf, + /// Fetched vvideo player data + pub player_data: VideoPlayer, +} + +impl DownloadQuery { + /// Update the video format from the given path extension + /// + /// The video format is not updated if it was already manually set + fn update_video_format(&mut self, path: &Path) { + if self.video_format.is_none() { + self.video_format = path + .extension() + .and_then(|ext| ext.to_str()) + .and_then(DownloadVideoFormat::from_extension); + } + } + + /// Download to the given file + /// + /// Note that the file extension may be changed to fit the reuested video/audio format. + /// Refer to the [`DownloadResult`] to get the actual path after downloading. + pub fn to_file>(mut self, file: P) -> Self { + let file = file.into(); + self.update_video_format(&file); + self.dest = DownloadDest::File(file); + self + } + + /// Download to the given directory + /// + /// The filename is created by this template: `{title} [{id}]`. + /// + /// You can use a custom filename template using [`DownloadQuery::to_template`] + pub fn to_dir>(mut self, dir: P) -> Self { + self.dest = DownloadDest::Dir(dir.into()); + self + } + + /// Download to the given filename template + /// + /// Templates are paths that may contain variables for video metadata. + /// + /// ## Variables + /// - `{id}` Video ID + /// - `{title}` Video title + /// - `{channel}` Channel name + /// - `{channel_id}` Channel ID + /// + /// Note that the file extension may be changed to fit the reuested video/audio format. + /// Refer to the [`DownloadResult`] to get the actual path after downloading. + pub fn to_template>(mut self, tmpl: P) -> Self { + let tmpl = tmpl.into(); + self.update_video_format(&tmpl); + self.dest = DownloadDest::Template(tmpl); + self + } + + /// Use a [`MultiProgress`] progress bar for all downloads + pub fn progress_bar(mut self, progress: MultiProgress) -> Self { + self.multi = Some(progress); + self + } + + /// Set a [`StreamFilter`] for choosing a stream to be downloaded + pub fn stream_filter(mut self, filter: StreamFilter) -> Self { + self.filter = Some(filter); + self + } + + /// Set the [`VideoFormat`] of downloaded videos + pub fn video_format(mut self, video_format: DownloadVideoFormat) -> Self { + self.video_format = Some(video_format); + self + } + + /// Set the [`ClientType`] used to fetch the YT player + pub fn player_type(mut self, player_type: ClientType) -> Self { + self.player_type = Some(player_type); + self + } + + /// Download the video + #[tracing::instrument(skip(self), fields(id = self.video.id))] + pub async fn download(&self) -> Result { + let mut last_err = None; + + // Progress bar + let multi = self.multi.clone().or_else(|| self.dl.i.multi.clone()); + let pb = multi.map(|m| { + let pb = ProgressBar::new(1); + pb.set_style(ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap() + .progress_chars("#>-")); + m.add(pb) + }); + + for n in 0..=self.dl.i.n_retries { + let err = match self.download_attempt(&pb, n).await { + Ok(res) => return Ok(res), + Err(DownloadError::Http(e)) => { + if e.status() != Some(StatusCode::FORBIDDEN) { + return Err(DownloadError::Http(e)); + } + DownloadError::Http(e) + } + Err(e) => return Err(e), + }; + + if n != self.dl.i.n_retries { + tracing::warn!("Retry attempt #{}. Error: {}", n + 1, err); + tokio::time::sleep(Duration::from_secs(1)).await; + } + last_err = Some(err); + } + Err(last_err.unwrap()) + } + + async fn download_attempt(&self, pb: &Option, n: u32) -> Result { + let filter = self.filter.as_ref().unwrap_or(&self.dl.i.filter); + let video_format = self.video_format.unwrap_or(self.dl.i.video_format); + + // Check if already downloaded + if self.video.name.is_some() && self.dl.i.path_precheck { + let op = self.dest.get_dest_path(&self.video); + + if filter.is_video_none() { + for ext in ["m4a", "opus"] { + let p = op.with_extension(ext); + if p.is_file() { + return Err(DownloadError::Exists(p)); + } + } + } else { + let p = op.with_extension(video_format.extension()); + if p.is_file() { + return Err(DownloadError::Exists(p)); + } + } + } + + let attempt_suffix = if n > 0 { + format!(" (retry #{n})") + } else { + String::new() + }; + if let Some(pb) = pb { + pb.set_message(format!( + "Fetching player data for {}{}", + self.video.name.as_deref().unwrap_or_default(), + attempt_suffix + )) + } + + let q = self.dl.i.rp.query(); + let player_data = match self.player_type { + Some(player_type) => q.player_from_client(&self.video.id, player_type).await?, + None => q.player(&self.video.id).await?, + }; + let user_agent = q.user_agent(player_data.client_type); + + // Select streams to download + let (video, audio) = player_data.select_video_audio_stream(filter); + + if video.is_none() && audio.is_none() { + return Err(DownloadError::Input("no stream found".into())); + } + + let extension = match video { + Some(_) => video_format.extension(), + None => match audio { + Some(audio) => match audio.codec { + AudioCodec::Mp4a => "m4a", + AudioCodec::Opus => "opus", + _ => return Err(DownloadError::Input("unknown audio codec".into())), + }, + None => unreachable!(), + }, + }; + + let pv = DownloadVideo::from_video(&player_data); + let output_path = self.dest.get_dest_path(&pv).with_extension(extension); + + if output_path.exists() { + return Err(DownloadError::Exists(output_path)); + } + if let Some(parent) = output_path.parent() { + std::fs::create_dir_all(parent)?; + } + + let mut downloads: Vec = Vec::new(); + + if let Some(v) = video { + downloads.push(StreamDownload { + file: output_path.with_extension(format!("video{}", v.format.extension())), + url: v.url.clone(), + video_codec: Some(v.codec), + audio_codec: None, + }); + } + if let Some(a) = audio { + downloads.push(StreamDownload { + file: output_path.with_extension(format!("audio{}", a.format.extension())), + url: a.url.clone(), + video_codec: None, + audio_codec: Some(a.codec), + }); + } + + if let Some(pb) = pb { + pb.set_message(format!( + "Downloading {}{}", + player_data.name(), + attempt_suffix + )) + } + download_streams( + &downloads, + self.dl.i.rp.http_client(), + &user_agent, + pb.clone(), + ) + .await?; + + if let Some(pb) = &pb { + pb.set_message(format!("Converting {}", player_data.name())); + pb.set_style( + ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}]") + .unwrap(), + ); + pb.enable_steady_tick(Duration::from_millis(500)); + } + + convert_streams( + &downloads, + &output_path, + &self.dl.i.ffmpeg, + player_data.name(), + ) + .await?; + if let Some(pb) = pb { + pb.disable_steady_tick(); + } + + // Delete original files + stream::iter(&downloads) + .map(|d| fs::remove_file(d.file.clone())) + .buffer_unordered(downloads.len()) + .collect::>() + .await + .into_iter() + .collect::>()?; + + if let Some(pb) = pb { + pb.finish_and_clear(); + } + Ok(DownloadResult { + dest: output_path, + player_data, + }) + } +} + fn get_download_range(offset: u64, size: Option) -> Range { let mut rng = rand::thread_rng(); let chunk_size = rng.gen_range(CHUNK_SIZE_MIN..CHUNK_SIZE_MAX); @@ -64,11 +638,26 @@ fn parse_cr_header(cr_header: &str) -> Result<(u64, u64)> { )) } +fn filenamify_lim(name: &str) -> String { + let lim = 200; + let n = filenamify::filenamify(name); + + if n.len() > lim { + n.char_indices() + .take_while(|(i, _)| i < &lim) + .map(|(_, c)| c) + .collect::() + } else { + n + } +} + async fn download_single_file>( url: &str, output: P, - http: Client, - pb: ProgressBar, + http: &Client, + user_agent: &str, + pb: Option, ) -> Result<()> { // Check if file is already downloaded let output_path: PathBuf = output.into(); @@ -99,6 +688,7 @@ async fn download_single_file>( let res = http .head(url.to_owned()) + .header(header::USER_AGENT, user_agent) .header(header::RANGE, "bytes=0-0") .send() .await? @@ -125,8 +715,10 @@ async fn download_single_file>( size = Some(original_size); offset = file_size; - pb.inc_length(original_size); - pb.inc(offset); + if let Some(pb) = &pb { + pb.inc_length(original_size); + pb.inc(offset); + } } Ordering::Equal => { // Already downloaded @@ -153,9 +745,10 @@ async fn download_single_file>( .await?; if is_gvideo && size.is_some() { - download_chunks_by_param(http, &mut file, url, size.unwrap(), offset, pb).await?; + download_chunks_by_param(http, &mut file, url, size.unwrap(), offset, user_agent, pb) + .await?; } else { - download_chunks_by_header(http, &mut file, url, size, offset, pb).await?; + download_chunks_by_header(http, &mut file, url, size, offset, user_agent, pb).await?; } fs::rename(&output_path_tmp, &output_path).await?; @@ -166,22 +759,24 @@ async fn download_single_file>( // This is the standardized method that works on all web servers, // but I have observed throttling using this method. async fn download_chunks_by_header( - http: Client, + http: &Client, file: &mut File, url: &str, size: Option, offset: u64, - pb: ProgressBar, + user_agent: &str, + pb: Option, ) -> Result<()> { let mut offset = offset; let mut size = size; loop { let range = get_download_range(offset, size); - debug!("Fetching range {}-{}", range.start, range.end); + tracing::debug!("Fetching range {}-{}", range.start, range.end); let res = http .get(url.to_owned()) + .header(header::USER_AGENT, user_agent) .header(header::ORIGIN, "https://www.youtube.com") .header(header::REFERER, "https://www.youtube.com/") .header( @@ -211,15 +806,19 @@ async fn download_chunks_by_header( offset = parsed_offset + 1; if size.is_none() { size = Some(parsed_size); - pb.inc_length(parsed_size); + if let Some(pb) = &pb { + pb.inc_length(parsed_size); + } } - debug!("Retrieving chunks..."); + tracing::debug!("Retrieving chunks..."); let mut stream = res.bytes_stream(); while let Some(item) = stream.next().await { // Retrieve chunk. let mut chunk = item?; - pb.inc(chunk.len() as u64); + if let Some(pb) = &pb { + pb.inc(chunk.len() as u64); + } file.write_all_buf(&mut chunk).await?; } @@ -234,22 +833,26 @@ async fn download_chunks_by_header( // This ist used by YouTube's web player. The file size // must be known beforehand (it is included in the stream url). async fn download_chunks_by_param( - http: Client, + http: &Client, file: &mut File, url: &str, size: u64, offset: u64, - pb: ProgressBar, + user_agent: &str, + pb: Option, ) -> Result<()> { let mut offset = offset; - pb.inc_length(size); + if let Some(pb) = &pb { + pb.inc_length(size); + } loop { let range = get_download_range(offset, Some(size)); - debug!("Fetching range {}-{}", range.start, range.end); + tracing::debug!("Fetching range {}-{}", range.start, range.end); let res = http .get(format!("{}&range={}-{}", url, range.start, range.end)) + .header(header::USER_AGENT, user_agent) .header(header::ORIGIN, "https://www.youtube.com") .header(header::REFERER, "https://www.youtube.com/") .send() @@ -258,17 +861,19 @@ async fn download_chunks_by_param( let clen = res.content_length().unwrap(); - debug!("Retrieving chunks..."); + tracing::debug!("Retrieving chunks..."); let mut stream = res.bytes_stream(); while let Some(item) = stream.next().await { // Retrieve chunk. let mut chunk = item?; - pb.inc(chunk.len() as u64); + if let Some(pb) = &pb { + pb.inc(chunk.len() as u64); + } file.write_all_buf(&mut chunk).await?; } offset += clen; - debug!("offset inc by {}, new: {}", clen, offset); + tracing::debug!("offset inc by {}, new: {}", clen, offset); if offset >= size { break; } @@ -279,146 +884,21 @@ async fn download_chunks_by_param( #[allow(dead_code)] struct StreamDownload { file: PathBuf, - // track_name: String TODO: add for multiple audio languages, url: String, audio_codec: Option, video_codec: Option, } -#[allow(clippy::too_many_arguments)] -pub async fn download_video( - player_data: &VideoPlayer, - output_dir: &str, - output_fname: Option, - output_format: Option, - filter: &StreamFilter<'_>, - ffmpeg: &str, - http: Client, - pb: ProgressBar, -) -> Result<()> { - // Download filepath - let download_dir = PathBuf::from(output_dir); - let title = player_data.details.name.clone(); - let output_fname_set = output_fname.is_some(); - let output_fname = output_fname.unwrap_or_else(|| { - filenamify::filenamify(format!("{} [{}]", title, player_data.details.id)) - }); - - // Select streams to download - let (video, audio) = player_data.select_video_audio_stream(filter); - - if video.is_none() && audio.is_none() { - return Err(DownloadError::Input("no stream found".into())); - } - - let format = output_format.unwrap_or( - match video { - Some(_) => "mp4", - None => match audio { - Some(audio) => match audio.codec { - AudioCodec::Mp4a => "m4a", - AudioCodec::Opus => "opus", - _ => return Err(DownloadError::Input("unknown audio codec".into())), - }, - None => unreachable!(), - }, - } - .to_owned(), - ); - - let output_path = download_dir.join(&output_fname).with_extension(&format); - if output_path.exists() { - // If the downloaded video already exists, only error if the download path was - // chosen explicitly. - if output_fname_set { - return Err(DownloadError::Input( - format!("File {} already exists", output_path.to_string_lossy()).into(), - ))?; - } - info!( - "Downloaded video {} already exists", - output_path.to_string_lossy() - ); - return Ok(()); - } - - match (video, audio) { - // Downloading combined video/audio stream (no conversion) - (Some(video), None) => { - pb.set_message(format!("Downloading {title}")); - download_single_file( - &video.url, - download_dir.join(output_fname).with_extension(&format), - http, - pb.clone(), - ) - .await?; - } - // Downloading split video/audio streams (requires conversion with ffmpeg) - _ => { - let mut downloads: Vec = Vec::new(); - - if let Some(v) = video { - downloads.push(StreamDownload { - file: download_dir.join(format!( - "{}.video{}", - output_fname, - v.format.extension() - )), - url: v.url.clone(), - video_codec: Some(v.codec), - audio_codec: None, - }); - } - if let Some(a) = audio { - downloads.push(StreamDownload { - file: download_dir.join(format!( - "{}.audio{}", - output_fname, - a.format.extension() - )), - url: a.url.clone(), - video_codec: None, - audio_codec: Some(a.codec), - }); - } - - pb.set_message(format!("Downloading {title}")); - download_streams(&downloads, http, pb.clone()).await?; - - pb.set_message(format!("Converting {title}")); - pb.set_style( - ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}]") - .unwrap(), - ); - pb.enable_steady_tick(Duration::from_millis(100)); - convert_streams(&downloads, output_path, ffmpeg).await?; - pb.disable_steady_tick(); - - // Delete original files - stream::iter(&downloads) - .map(|d| fs::remove_file(d.file.clone())) - .buffer_unordered(downloads.len()) - .collect::>() - .await - .into_iter() - .collect::>()?; - } - } - - pb.finish_and_clear(); - Ok(()) -} - async fn download_streams( downloads: &Vec, - http: Client, - pb: ProgressBar, + http: &Client, + user_agent: &str, + pb: Option, ) -> Result<()> { let n = downloads.len(); stream::iter(downloads) - .map(|d| download_single_file(&d.url, d.file.clone(), http.clone(), pb.clone())) + .map(|d| download_single_file(&d.url, d.file.clone(), http, user_agent, pb.clone())) .buffer_unordered(n) .collect::>() .await @@ -432,6 +912,7 @@ async fn convert_streams>( downloads: &[StreamDownload], output: P, ffmpeg: &str, + title: &str, ) -> Result<()> { let output_path: PathBuf = output.into(); @@ -451,6 +932,9 @@ async fn convert_streams>( args.push("-c".into()); args.push("copy".into()); + args.push("-metadata".into()); + args.push(format!("title={title}").into()); + args.push(output_path.into()); let res = Command::new(ffmpeg).args(args).output().await?; diff --git a/downloader/src/util.rs b/downloader/src/util.rs index c805afd..d95b8f0 100644 --- a/downloader/src/util.rs +++ b/downloader/src/util.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, collections::BTreeMap}; +use std::{borrow::Cow, collections::BTreeMap, path::PathBuf}; use reqwest::Url; @@ -6,18 +6,28 @@ use reqwest::Url; #[derive(thiserror::Error, Debug)] #[non_exhaustive] pub enum DownloadError { + /// RustyPipe error + #[error("{0}")] + RustyPipe(#[from] rustypipe::error::Error), /// Error from the HTTP client #[error("http error: {0}")] Http(#[from] reqwest::Error), /// File IO error #[error(transparent)] Io(#[from] std::io::Error), + /// FFmpeg returned an error #[error("FFmpeg error: {0}")] Ffmpeg(Cow<'static, str>), + /// Error parsing ranges for progressive download #[error("Progressive download error: {0}")] Progressive(Cow<'static, str>), + /// Video could not be downloaded because of invalid player data #[error("input error: {0}")] Input(Cow<'static, str>), + /// Download target already exists + #[error("file {0} already exists")] + Exists(PathBuf), + /// Other error #[error("error: {0}")] Other(Cow<'static, str>), }