From 17b6844eb00cc23f9239a72f5d0311d5a4430bea Mon Sep 17 00:00:00 2001 From: ThetaDev Date: Sat, 17 Sep 2022 00:41:23 +0200 Subject: [PATCH] add builder to RustyPipe --- Cargo.toml | 6 +- codegen/src/collect_playlist_dates.rs | 2 +- codegen/src/download_testfiles.rs | 28 +- src/client/mod.rs | 667 +++++++++++++++++--------- src/client/player.rs | 13 +- src/client/playlist.rs | 23 +- src/report.rs | 63 +-- src/serializer/mod.rs | 4 + src/serializer/vec_log_err.rs | 5 + src/util.rs | 31 ++ 10 files changed, 526 insertions(+), 316 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1914cfb..255c669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" members = [".", "codegen", "cli"] [features] -default = ["default-tls", "yaml"] +default = ["default-tls"] # Reqwest TLS default-tls = ["reqwest/default-tls"] @@ -15,7 +15,7 @@ rustls-tls-webpki-roots = ["reqwest/rustls-tls-webpki-roots"] rustls-tls-native-roots = ["reqwest/rustls-tls-native-roots"] # Error reports in yaml format -yaml = ["serde_yaml"] +report-yaml = ["serde_yaml"] [dependencies] # quick-js = "0.4.1" @@ -27,7 +27,7 @@ thiserror = "1.0.31" url = "2.2.2" log = "0.4.17" reqwest = {version = "0.11.11", default-features = false, features = ["json", "gzip", "brotli", "stream"]} -tokio = {version = "1.20.0", features = ["macros", "fs", "process"]} +tokio = {version = "1.20.0", features = ["macros", "time", "fs", "process"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.82" serde_yaml = {version = "0.9.11", optional = true} diff --git a/codegen/src/collect_playlist_dates.rs b/codegen/src/collect_playlist_dates.rs index 593d13b..23b7f69 100644 --- a/codegen/src/collect_playlist_dates.rs +++ b/codegen/src/collect_playlist_dates.rs @@ -85,7 +85,7 @@ pub async fn collect_dates(project_root: &Path, concurrency: usize) { (DateCase::Dec, "PL1J-6JOckZtHo91uApeb10Qlf2XhkfM-9"), ]; - let rp = RustyPipe::default(); + let rp = RustyPipe::new(); let collected_dates = stream::iter(LANGUAGES) .map(|lang| { let rp = rp.clone(); diff --git a/codegen/src/download_testfiles.rs b/codegen/src/download_testfiles.rs index 4fbc0f3..58d4910 100644 --- a/codegen/src/download_testfiles.rs +++ b/codegen/src/download_testfiles.rs @@ -4,7 +4,6 @@ use std::{ }; use rustypipe::{ - cache::FileStorage, client::{ClientType, RustyPipe}, report::{Report, Reporter}, }; @@ -43,11 +42,11 @@ impl Reporter for TestFileReporter { fn rp_testfile(json_path: &Path) -> RustyPipe { let reporter = TestFileReporter::new(json_path); - RustyPipe::new( - Some(Box::new(FileStorage::default())), - Some(Box::new(reporter)), - None, - ) + RustyPipe::builder() + .reporter(Box::new(reporter)) + .report() + .strict() + .build() } pub async fn download_testfiles(project_root: &Path) { @@ -74,17 +73,12 @@ async fn player(testfiles: &Path) { } let rp = rp_testfile(&json_path); - rp.query() - .report(true) - .strict(true) - .get_player(video_id, client_type) - .await - .unwrap(); + rp.query().get_player(video_id, client_type).await.unwrap(); } } async fn player_model(testfiles: &Path) { - let rp = RustyPipe::default(); + let rp = RustyPipe::builder().strict().build(); for (name, id) in [("multilanguage", "tVWWp1PqDus"), ("hdr", "LXb3EKWsInQ")] { let mut json_path = testfiles.to_path_buf(); @@ -97,7 +91,6 @@ async fn player_model(testfiles: &Path) { let player_data = rp .query() - .strict(true) .get_player(id, ClientType::Desktop) .await .unwrap(); @@ -122,11 +115,6 @@ async fn playlist(testfiles: &Path) { } let rp = rp_testfile(&json_path); - rp.query() - .report(true) - .strict(true) - .get_playlist(id) - .await - .unwrap(); + rp.query().get_playlist(id).await.unwrap(); } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 62e7885..4db6509 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -12,7 +12,9 @@ use fancy_regex::Regex; use log::{error, warn}; use once_cell::sync::Lazy; use rand::Rng; -use reqwest::{header, Client, ClientBuilder, Method, Request, RequestBuilder, Response}; +use reqwest::{ + header, Client, ClientBuilder, Method, Request, RequestBuilder, Response, StatusCode, +}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::sync::Mutex; @@ -20,7 +22,7 @@ use crate::{ cache::{CacheStorage, FileStorage}, deobfuscate::{DeobfData, Deobfuscator}, model::{Country, Language}, - report::{JsonFileReporter, Level, Report, Reporter}, + report::{FileReporter, Level, Report, Reporter}, serializer::MapResult, util, }; @@ -63,9 +65,16 @@ impl ClientType { } } +#[derive(Clone, Debug, Serialize)] +struct YTQuery { + context: YTContext, + #[serde(flatten)] + data: T, +} + #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] -struct ContextYT { +struct YTContext { client: ClientInfo, /// only used on desktop #[serde(skip_serializing_if = "Option::is_none")] @@ -161,9 +170,11 @@ struct RustyPipeRef { http: Client, storage: Option>, reporter: Option>, + n_retries: u32, user_agent: String, consent_cookie: String, cache: Mutex, + default_opts: RustyPipeOpts, } #[derive(Clone)] @@ -174,22 +185,20 @@ struct RustyPipeOpts { strict: bool, } +pub struct RustyPipeBuilder { + storage: Option>, + reporter: Option>, + n_retries: u32, + user_agent: String, + default_opts: RustyPipeOpts, +} + #[derive(Clone)] pub struct RustyPipeQuery { client: RustyPipe, opts: RustyPipeOpts, } -impl Default for RustyPipe { - fn default() -> Self { - Self::new( - Some(Box::new(FileStorage::default())), - Some(Box::new(JsonFileReporter::default())), - None, - ) - } -} - impl Default for RustyPipeOpts { fn default() -> Self { Self { @@ -247,23 +256,36 @@ impl From for CacheEntry { } } -impl RustyPipe { - /// Create a new RustyPipe instance - pub fn new( - storage: Option>, - reporter: Option>, - user_agent: Option, - ) -> Self { - let user_agent = user_agent.unwrap_or_else(|| DEFAULT_UA.to_owned()); +impl Default for RustyPipeBuilder { + fn default() -> Self { + Self::new() + } +} +impl RustyPipeBuilder { + /// Constructs a new `RustyPipeBuilder`. + /// + /// This is the same as `RustyPipe::builder()` + pub fn new() -> Self { + RustyPipeBuilder { + default_opts: RustyPipeOpts::default(), + storage: Some(Box::new(FileStorage::default())), + reporter: Some(Box::new(FileReporter::default())), + n_retries: 3, + user_agent: DEFAULT_UA.to_owned(), + } + } + + /// Returns a new, configured RustyPipe instance. + pub fn build(self) -> RustyPipe { let http = ClientBuilder::new() - .user_agent(user_agent.to_owned()) + .user_agent(self.user_agent.to_owned()) .gzip(true) .brotli(true) .build() - .expect("unable to build the HTTP client"); + .unwrap(); - let cache = if let Some(storage) = &storage { + let cache = if let Some(storage) = &self.storage { if let Some(data) = storage.read() { match serde_json::from_str::(&data) { Ok(data) => data, @@ -282,9 +304,10 @@ impl RustyPipe { RustyPipe { inner: Arc::new(RustyPipeRef { http, - storage, - reporter, - user_agent, + storage: self.storage, + reporter: self.reporter, + n_retries: self.n_retries, + user_agent: self.user_agent, consent_cookie: format!( "{}={}{}", CONSENT_COOKIE, @@ -292,45 +315,341 @@ impl RustyPipe { rand::thread_rng().gen_range(100..1000) ), cache: Mutex::new(cache), + default_opts: RustyPipeOpts::default(), }), } } - /// Create a new RustyPipe instance configured for testing - #[cfg(test)] - #[cfg(feature = "yaml")] - pub fn new_test() -> Self { - Self::new( - Some(Box::new(FileStorage::default())), - Some(Box::new(crate::report::YamlFileReporter::default())), - None, - ) + /// Add a `CacheStorage` backend for persisting cached information + /// (YouTube client versions, deobfuscation code) between + /// program executions. + /// + /// **Default value**: `FileStorage` in `rustypipe_cache.json` + pub fn storage(mut self, storage: Box) -> Self { + self.storage = Some(storage); + self } + /// Disable cache storage + pub fn no_storage(mut self) -> Self { + self.storage = None; + self + } + + /// Add a `Reporter` to collect error details + /// + /// **Default value**: `FileReporter` creating reports in `./rustypipe_reports` + pub fn reporter(mut self, reporter: Box) -> Self { + self.reporter = Some(reporter); + self + } + + /// Disable the creation of report files in case of errors and warnings. + pub fn no_reporter(mut self) -> Self { + self.reporter = None; + self + } + + /// Set the number of retries for HTTP requests. + /// + /// If a HTTP requests fails and retries are enabled, + /// RustyPipe waits 1 second before the next attempt. + /// The waiting time is doubled for subsequent attempts (including a bit of + /// random jitter to be less predictable). + /// + /// **Default value**: 3 + pub fn n_retries(mut self, n_retries: u32) -> Self { + self.n_retries = n_retries; + self + } + + /// Set the user agent used for making requests to the web API. + /// + /// **Default value**: `Mozilla/5.0 (X11; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0` + /// (Firefox ESR on Debian) + pub fn user_agent(mut self, user_agent: &str) -> Self { + self.user_agent = user_agent.to_owned(); + self + } + + /// Set the language parameter used when accessing the YouTube API. + /// This will change multilanguage video titles, descriptions and textual dates + /// + /// **Default value**: `Language::En` (English) + /// + /// **Info**: you can set this option for individual queries, too + pub fn lang(mut self, lang: Language) -> Self { + self.default_opts.lang = lang; + self + } + + /// Set the country parameter used when accessing the YouTube API. + /// This will change trends and recommended content. + /// + /// **Default value**: `Country::Us` (USA) + /// + /// **Info**: you can set this option for individual queries, too + pub fn country(mut self, country: Country) -> Self { + self.default_opts.country = country; + self + } + + /// Generate a report on every operation. + /// This should only be used for debugging. + /// + /// **Info**: you can set this option for individual queries, too + pub fn report(mut self) -> Self { + self.default_opts.report = true; + self + } + + /// Enable strict mode, causing operations to fail if there + /// are warnings during deserialization (e.g. invalid items). + /// This should only be used for testing. + /// + /// **Info**: you can set this option for individual queries, too + pub fn strict(mut self) -> Self { + self.default_opts.strict = true; + self + } +} + +impl Default for RustyPipe { + fn default() -> Self { + Self::new() + } +} + +impl RustyPipe { + /// Create a new RustyPipe instance with default settings. + /// + /// To create an instance with custom options, use `RustyPipeBuilder` instead. + pub fn new() -> Self { + RustyPipeBuilder::new().build() + } + + /// Constructs a new `RustyPipeBuilder`. + /// + /// This is the same as `RustyPipeBuilder::new()` + pub fn builder() -> RustyPipeBuilder { + RustyPipeBuilder::new() + } + + /// Constructs a new `RustyPipeQuery`. pub fn query(&self) -> RustyPipeQuery { RustyPipeQuery { client: self.clone(), - opts: RustyPipeOpts { - lang: Language::En, - country: Country::Us, - report: false, - strict: false, + opts: self.inner.default_opts.clone(), + } + } + + /// Execute the given http request. + async fn http_request(&self, request: Request) -> Result { + let mut last_res: Option> = None; + for n in 0..self.inner.n_retries { + let res = self.inner.http.execute(request.try_clone().unwrap()).await; + let emsg = match &res { + Ok(response) => { + let status = response.status(); + // Immediately return in case of success or unrecoverable status code + if status.is_success() || !status.is_server_error() { + return res; + } + status.to_string() + } + Err(e) => { + // Immediately return in case of unrecoverable error + if !e.is_timeout() && !e.is_connect() { + return res; + } + e.to_string() + } + }; + + let ms = util::retry_delay(n, 1000, 60000, 3); + warn!("Retry attempt #{}. Error: {}. Waiting {} ms", n, emsg, ms); + tokio::time::sleep(std::time::Duration::from_millis(ms.into())).await; + + last_res = Some(res); + } + last_res.unwrap() + } + + /// Execute the given http request, returning an error in case of a + /// non-successful status code. + async fn http_request_estatus(&self, request: Request) -> Result { + Ok(self.http_request(request).await?.error_for_status()?) + } + + /// Execute the given http request, returning the response body as a string. + async fn http_request_txt(&self, request: Request) -> Result { + Ok(self.http_request_estatus(request).await?.text().await?) + } + + /// Extract the current version of the YouTube desktop client from the website. + async fn extract_desktop_client_version(&self) -> Result { + let from_swjs = async { + let swjs = self + .http_request_txt( + self.inner + .http + .get("https://www.youtube.com/sw.js") + .header(header::ORIGIN, "https://www.youtube.com") + .header(header::REFERER, "https://www.youtube.com") + .header(header::COOKIE, self.inner.consent_cookie.to_owned()) + .build() + .unwrap(), + ) + .await + .context("Failed to download sw.js")?; + + util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1) + .ok_or_else(|| anyhow!("Could not find desktop client version in sw.js")) + }; + + let from_html = async { + let html = self + .http_request_txt( + self.inner + .http + .get("https://www.youtube.com/results?search_query=") + .build() + .unwrap(), + ) + .await + .context("Failed to get YT Desktop page")?; + + util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1) + .ok_or_else(|| anyhow!("Could not find desktop client version on html page")) + }; + + match from_swjs.await { + Ok(client_version) => Ok(client_version), + Err(_) => from_html.await, + } + } + + /// Extract the current version of the YouTube Music desktop client from the website. + async fn extract_music_client_version(&self) -> Result { + let from_swjs = async { + let swjs = self + .http_request_txt( + self.inner + .http + .get("https://music.youtube.com/sw.js") + .header(header::ORIGIN, "https://music.youtube.com") + .header(header::REFERER, "https://music.youtube.com") + .header(header::COOKIE, self.inner.consent_cookie.to_owned()) + .build() + .unwrap(), + ) + .await + .context("Failed to download sw.js")?; + + util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1) + .ok_or_else(|| anyhow!("Could not find desktop client version in sw.js")) + }; + + let from_html = async { + let html = self + .http_request_txt( + self.inner + .http + .get("https://music.youtube.com") + .build() + .unwrap(), + ) + .await + .context("Failed to get YT Desktop page")?; + + util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1) + .ok_or_else(|| anyhow!("Could not find desktop client version on html page")) + }; + + match from_swjs.await { + Ok(client_version) => Ok(client_version), + Err(_) => from_html.await, + } + } + + /// Get the current version of the YouTube web client from the following sources + /// + /// 1. from cache + /// 2. from YouTube's service worker script (`sw.js`) + /// 3. from the YouTube website + /// 4. fall back to the hardcoded version + async fn get_desktop_client_version(&self) -> String { + let mut cache = self.inner.cache.lock().await; + + match cache.desktop_client.get() { + Some(cdata) => cdata.version.to_owned(), + None => match self.extract_desktop_client_version().await { + Ok(version) => { + cache.desktop_client = CacheEntry::from(ClientData { + version: version.to_owned(), + }); + self.store_cache(&cache); + version + } + Err(e) => { + warn!("{}, falling back to hardcoded version", e); + DESKTOP_CLIENT_VERSION.to_owned() + } }, } } - #[cfg(test)] - pub fn test_query(&self) -> RustyPipeQuery { - RustyPipeQuery { - client: self.clone(), - opts: RustyPipeOpts { - lang: Language::En, - country: Country::Us, - report: false, - strict: true, + /// Get the current version of the YouTube Music web client from the following sources + /// + /// 1. from cache + /// 2. from YouTube Music's service worker script (`sw.js`) + /// 3. from the YouTube Music website + /// 4. fall back to the hardcoded version + async fn get_music_client_version(&self) -> String { + let mut cache = self.inner.cache.lock().await; + + match cache.music_client.get() { + Some(cdata) => cdata.version.to_owned(), + None => match self.extract_music_client_version().await { + Ok(version) => { + cache.music_client = CacheEntry::from(ClientData { + version: version.to_owned(), + }); + self.store_cache(&cache); + version + } + Err(e) => { + warn!("{}, falling back to hardcoded version", e); + DESKTOP_MUSIC_CLIENT_VERSION.to_owned() + } }, } } + + /// Instantiate a new deobfuscator from either cached or extracted YouTube JavaScript code. + async fn get_deobf(&self) -> Result { + let mut cache = self.inner.cache.lock().await; + + match cache.deobf.get() { + Some(deobf) => Ok(Deobfuscator::from(deobf.to_owned())), + None => { + let deobf = Deobfuscator::new(self.inner.http.clone()).await?; + cache.deobf = CacheEntry::from(deobf.get_data()); + self.store_cache(&cache); + Ok(deobf) + } + } + } + + /// Write the given cache data to the storage backend. + fn store_cache(&self, cache: &CacheData) { + if let Some(storage) = &self.inner.storage { + match serde_json::to_string(cache) { + Ok(data) => storage.write(&data), + Err(e) => error!("Could not serialize cache. Error: {}", e), + } + } + } } impl RustyPipeQuery { @@ -350,20 +669,26 @@ impl RustyPipeQuery { /// Generate a report on every operation. /// This should only be used for debugging. - pub fn report(mut self, report: bool) -> Self { - self.opts.report = report; + pub fn report(mut self) -> Self { + self.opts.report = true; self } /// Enable strict mode, causing operations to fail if there /// are warnings during deserialization (e.g. invalid items). /// This should only be used for testing. - pub fn strict(mut self, strict: bool) -> Self { - self.opts.strict = strict; + pub fn strict(mut self) -> Self { + self.opts.strict = true; self } - async fn get_context(&self, ctype: ClientType, localized: bool) -> ContextYT { + /// Create a new context object, which is included in every request to + /// the YouTube API and contains language, country and device parameters. + /// + /// # Parameters + /// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...) + /// - `localized`: Whether to include the configured language and country + async fn get_context(&self, ctype: ClientType, localized: bool) -> YTContext { let hl = match localized { true => self.opts.lang, false => Language::En, @@ -374,10 +699,10 @@ impl RustyPipeQuery { }; match ctype { - ClientType::Desktop => ContextYT { + ClientType::Desktop => YTContext { client: ClientInfo { client_name: "WEB".to_owned(), - client_version: self.get_desktop_client_version().await, + client_version: self.client.get_desktop_client_version().await, client_screen: None, device_model: None, platform: "DESKTOP".to_owned(), @@ -389,10 +714,10 @@ impl RustyPipeQuery { user: User::default(), third_party: None, }, - ClientType::DesktopMusic => ContextYT { + ClientType::DesktopMusic => YTContext { client: ClientInfo { client_name: "WEB_REMIX".to_owned(), - client_version: self.get_music_client_version().await, + client_version: self.client.get_music_client_version().await, client_screen: None, device_model: None, platform: "DESKTOP".to_owned(), @@ -404,7 +729,7 @@ impl RustyPipeQuery { user: User::default(), third_party: None, }, - ClientType::TvHtml5Embed => ContextYT { + ClientType::TvHtml5Embed => YTContext { client: ClientInfo { client_name: "TVHTML5_SIMPLY_EMBEDDED_PLAYER".to_owned(), client_version: TVHTML5_CLIENT_VERSION.to_owned(), @@ -421,7 +746,7 @@ impl RustyPipeQuery { embed_url: "https://www.youtube.com/".to_owned(), }), }, - ClientType::Android => ContextYT { + ClientType::Android => YTContext { client: ClientInfo { client_name: "ANDROID".to_owned(), client_version: MOBILE_CLIENT_VERSION.to_owned(), @@ -436,7 +761,7 @@ impl RustyPipeQuery { user: User::default(), third_party: None, }, - ClientType::Ios => ContextYT { + ClientType::Ios => YTContext { client: ClientInfo { client_name: "IOS".to_owned(), client_version: MOBILE_CLIENT_VERSION.to_owned(), @@ -454,6 +779,13 @@ impl RustyPipeQuery { } } + /// Create a new Reqwest HTTP request builder with the URL and headers required + /// for accessing the YouTube API + /// + /// # Parameters + /// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...) + /// - `method`: HTTP method + /// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/?key=...`) async fn request_builder( &self, ctype: ClientType, @@ -478,7 +810,7 @@ impl RustyPipeQuery { .header("X-YouTube-Client-Name", "1") .header( "X-YouTube-Client-Version", - self.get_desktop_client_version().await, + self.client.get_desktop_client_version().await, ), ClientType::DesktopMusic => self .client @@ -500,7 +832,7 @@ impl RustyPipeQuery { .header("X-YouTube-Client-Name", "67") .header( "X-YouTube-Client-Version", - self.get_music_client_version().await, + self.client.get_music_client_version().await, ), ClientType::TvHtml5Embed => self .client @@ -564,6 +896,20 @@ impl RustyPipeQuery { } } + /// Execute a request to the YouTube API, then deobfuscate and map the response. + /// + /// Creates a report in case of failure for easy debugging. + /// + /// # Parameters + /// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...) + /// - `operation`: Name of the RustyPipe operation (only for reporting, e.g. `get_player`) + /// - `id`: ID of the requested entity (Video ID, Channel ID, ...). + /// The ID is included in reports and is also passed to the mapper for validating the response. + /// Set it to an empty string if you are not requesting an entity with an ID. + /// - `method`: HTTP method + /// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/?key=...`) + /// - `body`: Serializable request body to be sent in json format + /// - `deobf`: Deobfuscator (is passed to the mapper to deobfuscate stream URLs). #[allow(clippy::too_many_arguments)] async fn execute_request_deobf< R: DeserializeOwned + MapResponse + Debug, @@ -573,9 +919,9 @@ impl RustyPipeQuery { &self, ctype: ClientType, operation: &str, + id: &str, method: Method, endpoint: &str, - id: &str, body: &B, deobf: Option<&Deobfuscator>, ) -> Result { @@ -600,7 +946,7 @@ impl RustyPipeQuery { version: "0.1.0".to_owned(), date: chrono::Local::now(), level, - operation: operation.to_owned(), + operation: format!("{}({})", operation, id), error, msgs, deobf_data: deobf.map(Deobfuscator::get_data), @@ -661,6 +1007,19 @@ impl RustyPipeQuery { } } + /// Execute a request to the YouTube API, then map the response. + /// + /// Creates a report in case of failure for easy debugging. + /// + /// # Parameters + /// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...) + /// - `operation`: Name of the RustyPipe operation (only for reporting, e.g. `get_player`) + /// - `id`: ID of the requested entity (Video ID, Channel ID, ...). + /// The ID is included in reports and is also passed to the mapper for validating the response. + /// Set it to an empty string if you are not requesting an entity with an ID. + /// - `method`: HTTP method + /// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/?key=...`) + /// - `body`: Serializable request body to be sent in json format async fn execute_request< R: DeserializeOwned + MapResponse + Debug, M, @@ -669,174 +1028,31 @@ impl RustyPipeQuery { &self, ctype: ClientType, operation: &str, + id: &str, method: Method, endpoint: &str, - id: &str, body: &B, ) -> Result { - self.execute_request_deobf::(ctype, operation, method, endpoint, id, body, None) + self.execute_request_deobf::(ctype, operation, id, method, endpoint, body, None) .await } - - async fn get_desktop_client_version(&self) -> String { - let mut cache = self.client.inner.cache.lock().await; - - match cache.desktop_client.get() { - Some(cdata) => cdata.version.to_owned(), - None => match extract_desktop_client_version( - self.client.inner.http.clone(), - self.client.inner.consent_cookie.to_owned(), - ) - .await - { - Ok(version) => { - cache.desktop_client = CacheEntry::from(ClientData { - version: version.to_owned(), - }); - self.write_cache(&cache); - version - } - Err(e) => { - warn!("{}, falling back to hardcoded version", e); - DESKTOP_CLIENT_VERSION.to_owned() - } - }, - } - } - - async fn get_music_client_version(&self) -> String { - let mut cache = self.client.inner.cache.lock().await; - - match cache.music_client.get() { - Some(cdata) => cdata.version.to_owned(), - None => match extract_music_client_version( - self.client.inner.http.clone(), - self.client.inner.consent_cookie.to_owned(), - ) - .await - { - Ok(version) => { - cache.music_client = CacheEntry::from(ClientData { - version: version.to_owned(), - }); - self.write_cache(&cache); - version - } - Err(e) => { - warn!("{}, falling back to hardcoded version", e); - DESKTOP_MUSIC_CLIENT_VERSION.to_owned() - } - }, - } - } - - async fn get_deobf(&self) -> Result { - let mut cache = self.client.inner.cache.lock().await; - - match cache.deobf.get() { - Some(deobf) => Ok(Deobfuscator::from(deobf.to_owned())), - None => { - let deobf = Deobfuscator::new(self.client.inner.http.clone()).await?; - cache.deobf = CacheEntry::from(deobf.get_data()); - self.write_cache(&cache); - Ok(deobf) - } - } - } - - fn write_cache(&self, cache: &CacheData) { - if let Some(storage) = &self.client.inner.storage { - match serde_json::to_string(cache) { - Ok(data) => storage.write(&data), - Err(e) => error!("Could not serialize cache. Error: {}", e), - } - } - } -} - -async fn extract_desktop_client_version(http: Client, consent_cookie: String) -> Result { - let from_swjs = async { - let swjs = exec_request_text( - http.clone(), - http.get("https://www.youtube.com/sw.js") - .header(header::ORIGIN, "https://www.youtube.com") - .header(header::REFERER, "https://www.youtube.com") - .header(header::COOKIE, consent_cookie) - .build() - .unwrap(), - ) - .await - .context("Failed to download sw.js")?; - - util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1) - .ok_or_else(|| anyhow!("Could not find desktop client version in sw.js")) - }; - - let from_html = async { - let html = exec_request_text( - http.clone(), - http.get("https://www.youtube.com/results?search_query=") - .build() - .unwrap(), - ) - .await - .context("Failed to get YT Desktop page")?; - - util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1) - .ok_or_else(|| anyhow!("Could not find desktop client version on html page")) - }; - - match from_swjs.await { - Ok(client_version) => Ok(client_version), - Err(_) => from_html.await, - } -} - -async fn extract_music_client_version(http: Client, consent_cookie: String) -> Result { - let from_swjs = async { - let swjs = exec_request_text( - http.clone(), - http.get("https://music.youtube.com/sw.js") - .header(header::ORIGIN, "https://music.youtube.com") - .header(header::REFERER, "https://music.youtube.com") - .header(header::COOKIE, consent_cookie) - .build() - .unwrap(), - ) - .await - .context("Failed to download sw.js")?; - - util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1) - .ok_or_else(|| anyhow!("Could not find desktop client version in sw.js")) - }; - - let from_html = async { - let html = exec_request_text( - http.clone(), - http.get("https://music.youtube.com").build().unwrap(), - ) - .await - .context("Failed to get YT Desktop page")?; - - util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1) - .ok_or_else(|| anyhow!("Could not find desktop client version on html page")) - }; - - match from_swjs.await { - Ok(client_version) => Ok(client_version), - Err(_) => from_html.await, - } -} - -async fn exec_request(http: Client, request: Request) -> Result { - Ok(http.execute(request).await?.error_for_status()?) -} - -async fn exec_request_text(http: Client, request: Request) -> Result { - Ok(exec_request(http, request).await?.text().await?) } +/// Implement this for YouTube API response structs that need to be mapped to +/// RustyPipe models. trait MapResponse { + /// Map the YouTube API response structs to a RustyPipe model. + /// + /// Returns an error if crucial data required for the model could not be extracted. + /// + /// Returns a `MapResult` with warnings if there were issues with the deserializing/mapping, + /// but the resulting data is still usable. + /// + /// # Parameters + /// - `id`: The ID of the requested entity (Video ID, Channel ID, ...). If possible, assert + /// that the returned entity matches this ID and return an error instead. + /// - `lang`: Language of the request. Used for mapping localized information like dates. + /// - `deobf`: Deobfuscator (if passed to the `execute_request_deobf` method) fn map_response( self, id: &str, @@ -846,7 +1062,6 @@ trait MapResponse { } #[cfg(test)] -#[cfg(feature = "yaml")] mod tests { // use super::*; } diff --git a/src/client/player.rs b/src/client/player.rs index 3d6dac3..c91f2b9 100644 --- a/src/client/player.rs +++ b/src/client/player.rs @@ -21,13 +21,13 @@ use crate::{ use super::{ response::{self, player}, - ClientType, ContextYT, MapResponse, MapResult, RustyPipeQuery, + ClientType, MapResponse, MapResult, RustyPipeQuery, YTContext, }; #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] struct QPlayer { - context: ContextYT, + context: YTContext, /// Website playback context #[serde(skip_serializing_if = "Option::is_none")] playback_context: Option, @@ -61,7 +61,7 @@ impl RustyPipeQuery { pub async fn get_player(self, video_id: &str, client_type: ClientType) -> Result { let q1 = self.clone(); let t_context = tokio::spawn(async move { q1.get_context(client_type, false).await }); - let q2 = self.clone(); + let q2 = self.client.clone(); let t_deobf = tokio::spawn(async move { q2.get_deobf().await }); let (context, deobf) = tokio::join!(t_context, t_deobf); @@ -96,9 +96,9 @@ impl RustyPipeQuery { self.execute_request_deobf::( client_type, "get_player", + video_id, Method::POST, "player", - video_id, &request_body, Some(&deobf), ) @@ -559,7 +559,6 @@ fn get_audio_codec(codecs: Vec<&str>) -> AudioCodec { } #[cfg(test)] -#[cfg(feature = "yaml")] mod tests { use std::{fs::File, io::BufReader, path::Path}; @@ -632,9 +631,9 @@ mod tests { #[case::ios(ClientType::Ios)] #[test_log::test(tokio::test)] async fn t_get_player(#[case] client_type: ClientType) { - let rp = RustyPipe::new_test(); + let rp = RustyPipe::builder().strict().build(); let player_data = rp - .test_query() + .query() .get_player("n4tK7LYFxI0", client_type) .await .unwrap(); diff --git a/src/client/playlist.rs b/src/client/playlist.rs index e83cf4c..9056ab7 100644 --- a/src/client/playlist.rs +++ b/src/client/playlist.rs @@ -9,19 +9,19 @@ use crate::{ timeago, util, }; -use super::{response, ClientType, ContextYT, MapResponse, MapResult, RustyPipeQuery}; +use super::{response, ClientType, MapResponse, MapResult, RustyPipeQuery, YTContext}; #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] struct QPlaylist { - context: ContextYT, + context: YTContext, browse_id: String, } #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] struct QPlaylistCont { - context: ContextYT, + context: YTContext, continuation: String, } @@ -36,9 +36,9 @@ impl RustyPipeQuery { self.execute_request::( ClientType::Desktop, "get_playlist", + playlist_id, Method::POST, "browse", - playlist_id, &request_body, ) .await @@ -57,9 +57,9 @@ impl RustyPipeQuery { .execute_request::( ClientType::Desktop, "get_playlist_cont", + &playlist.id, Method::POST, "browse", - &playlist.id, &request_body, ) .await?; @@ -339,8 +339,8 @@ mod tests { #[case] description: Option, #[case] channel: Option, ) { - let rp = RustyPipe::new_test(); - let playlist = rp.test_query().get_playlist(id).await.unwrap(); + let rp = RustyPipe::builder().strict().build(); + let playlist = rp.query().get_playlist(id).await.unwrap(); assert_eq!(playlist.id, id); assert_eq!(playlist.name, name); @@ -380,18 +380,15 @@ mod tests { #[test_log::test(tokio::test)] async fn t_playlist_cont() { - let rp = RustyPipe::new_test(); + let rp = RustyPipe::builder().strict().build(); let mut playlist = rp - .test_query() + .query() .get_playlist("PLbZIPy20-1pN7mqjckepWF78ndb6ci_qi") .await .unwrap(); while playlist.ctoken.is_some() { - rp.test_query() - .get_playlist_cont(&mut playlist) - .await - .unwrap(); + rp.query().get_playlist_cont(&mut playlist).await.unwrap(); } assert!(playlist.videos.len() > 100); diff --git a/src/report.rs b/src/report.rs index e4726f6..795fcb3 100644 --- a/src/report.rs +++ b/src/report.rs @@ -65,11 +65,11 @@ pub trait Reporter { fn report(&self, report: &Report); } -pub struct JsonFileReporter { +pub struct FileReporter { path: PathBuf, } -impl JsonFileReporter { +impl FileReporter { pub fn new>(path: P) -> Self { Self { path: path.as_ref().to_path_buf(), @@ -77,13 +77,21 @@ impl JsonFileReporter { } fn _report(&self, report: &Report) -> Result<()> { - let report_path = get_report_path(&self.path, report)?; - serde_json::to_writer_pretty(&File::create(report_path)?, &report)?; + #[cfg(not(feature = "report-yaml"))] + { + let report_path = get_report_path(&self.path, report, "json")?; + serde_json::to_writer_pretty(&File::create(report_path)?, &report)?; + } + #[cfg(feature = "report-yaml")] + { + let report_path = get_report_path(&self.path, report, "yaml")?; + serde_yaml::to_writer(&File::create(report_path)?, &report)?; + } Ok(()) } } -impl Default for JsonFileReporter { +impl Default for FileReporter { fn default() -> Self { Self { path: Path::new("rustypipe_reports").to_path_buf(), @@ -91,51 +99,14 @@ impl Default for JsonFileReporter { } } -impl Reporter for JsonFileReporter { +impl Reporter for FileReporter { fn report(&self, report: &Report) { self._report(report) .unwrap_or_else(|e| error!("Could not store report file. Err: {}", e)); } } -#[cfg(feature = "yaml")] -pub struct YamlFileReporter { - path: PathBuf, -} - -#[cfg(feature = "yaml")] -impl YamlFileReporter { - pub fn new>(path: P) -> Self { - Self { - path: path.as_ref().to_path_buf(), - } - } - - fn _report(&self, report: &Report) -> Result<()> { - let report_path = get_report_path(&self.path, report)?; - serde_yaml::to_writer(&File::create(report_path)?, &report)?; - Ok(()) - } -} - -#[cfg(feature = "yaml")] -impl Default for YamlFileReporter { - fn default() -> Self { - Self { - path: Path::new("rustypipe_reports").to_path_buf(), - } - } -} - -#[cfg(feature = "yaml")] -impl Reporter for YamlFileReporter { - fn report(&self, report: &Report) { - self._report(report) - .unwrap_or_else(|e| error!("Could not store report file. Err: {}", e)); - } -} - -fn get_report_path(root: &Path, report: &Report) -> Result { +fn get_report_path(root: &Path, report: &Report, ext: &str) -> Result { if !root.is_dir() { std::fs::create_dir_all(root)?; } @@ -143,13 +114,13 @@ fn get_report_path(root: &Path, report: &Report) -> Result { let filename_prefix = format!("{}_{:?}", report.date.format("%F_%H-%M-%S"), report.level); let mut report_path = root.to_path_buf(); - report_path.push(format!("{}.yaml", filename_prefix)); + report_path.push(format!("{}.{}", filename_prefix, ext)); // ensure unique filename for i in 1..u32::MAX { if report_path.exists() { report_path = root.to_path_buf(); - report_path.push(format!("{}_{}.yaml", filename_prefix, i)); + report_path.push(format!("{}_{}.{}", filename_prefix, i, ext)); } else { break; } diff --git a/src/serializer/mod.rs b/src/serializer/mod.rs index f7a2af9..6ac390c 100644 --- a/src/serializer/mod.rs +++ b/src/serializer/mod.rs @@ -8,6 +8,10 @@ pub use vec_log_err::VecLogError; use std::fmt::Debug; +/// This represents a result from a deserializing/mapping operation. +/// It holds the desired content (`c`) and a list of warning messages, +/// if there occurred minor error during the deserializing or mapping +/// (e.g. certain list items could not be deserialized). #[derive(Clone)] pub struct MapResult { pub c: T, diff --git a/src/serializer/vec_log_err.rs b/src/serializer/vec_log_err.rs index 3b557a9..d8cf058 100644 --- a/src/serializer/vec_log_err.rs +++ b/src/serializer/vec_log_err.rs @@ -8,6 +8,11 @@ use serde_with::{de::DeserializeAsWrap, DeserializeAs}; use super::MapResult; +/// Deserializes a list of arbitrary items into a `MapResult`, +/// creating warnings for items that could not be deserialized. +/// +/// This is similar to `VecSkipError`, but it does not silently ignore +/// faulty items. pub struct VecLogError(PhantomData); impl<'de, T, U> DeserializeAs<'de, MapResult>> for VecLogError diff --git a/src/util.rs b/src/util.rs index b5dbe2f..766250b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -89,6 +89,21 @@ where numbers } +pub fn retry_delay( + n_past_retries: u32, + min_retry_interval: u32, + max_retry_interval: u32, + backoff_base: u32, +) -> u32 { + let unjittered_delay = backoff_base.checked_pow(n_past_retries).unwrap_or(u32::MAX); + let jitter_factor = rand::thread_rng().gen_range(800..1500); + let jittered_delay = unjittered_delay + .checked_mul(jitter_factor) + .unwrap_or(u32::MAX); + + min_retry_interval.max(jittered_delay.min(max_retry_interval)) +} + #[cfg(test)] mod tests { use rstest::rstest; @@ -111,4 +126,20 @@ mod tests { let n = parse_numeric_vec::(string); assert_eq!(n, expect); } + + #[rstest] + #[case(0, 800, 1500)] + #[case(1, 2400, 4500)] + #[case(2, 7200, 13500)] + #[case(100, 60000, 60000)] + fn t_retry_delay(#[case] n: u32, #[case] expect_min: u32, #[case] expect_max: u32) { + let res = retry_delay(n, 1000, 60000, 3); + assert!( + res >= expect_min && res <= expect_max, + "res: {} not within {} and {}", + res, + expect_min, + expect_max + ); + } }