add builder to RustyPipe

This commit is contained in:
ThetaDev 2022-09-17 00:41:23 +02:00
parent d6cfc7e914
commit 17b6844eb0
10 changed files with 526 additions and 316 deletions

View file

@ -7,7 +7,7 @@ edition = "2021"
members = [".", "codegen", "cli"]
[features]
default = ["default-tls", "yaml"]
default = ["default-tls"]
# Reqwest TLS
default-tls = ["reqwest/default-tls"]
@ -15,7 +15,7 @@ rustls-tls-webpki-roots = ["reqwest/rustls-tls-webpki-roots"]
rustls-tls-native-roots = ["reqwest/rustls-tls-native-roots"]
# Error reports in yaml format
yaml = ["serde_yaml"]
report-yaml = ["serde_yaml"]
[dependencies]
# quick-js = "0.4.1"
@ -27,7 +27,7 @@ thiserror = "1.0.31"
url = "2.2.2"
log = "0.4.17"
reqwest = {version = "0.11.11", default-features = false, features = ["json", "gzip", "brotli", "stream"]}
tokio = {version = "1.20.0", features = ["macros", "fs", "process"]}
tokio = {version = "1.20.0", features = ["macros", "time", "fs", "process"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.82"
serde_yaml = {version = "0.9.11", optional = true}

View file

@ -85,7 +85,7 @@ pub async fn collect_dates(project_root: &Path, concurrency: usize) {
(DateCase::Dec, "PL1J-6JOckZtHo91uApeb10Qlf2XhkfM-9"),
];
let rp = RustyPipe::default();
let rp = RustyPipe::new();
let collected_dates = stream::iter(LANGUAGES)
.map(|lang| {
let rp = rp.clone();

View file

@ -4,7 +4,6 @@ use std::{
};
use rustypipe::{
cache::FileStorage,
client::{ClientType, RustyPipe},
report::{Report, Reporter},
};
@ -43,11 +42,11 @@ impl Reporter for TestFileReporter {
fn rp_testfile(json_path: &Path) -> RustyPipe {
let reporter = TestFileReporter::new(json_path);
RustyPipe::new(
Some(Box::new(FileStorage::default())),
Some(Box::new(reporter)),
None,
)
RustyPipe::builder()
.reporter(Box::new(reporter))
.report()
.strict()
.build()
}
pub async fn download_testfiles(project_root: &Path) {
@ -74,17 +73,12 @@ async fn player(testfiles: &Path) {
}
let rp = rp_testfile(&json_path);
rp.query()
.report(true)
.strict(true)
.get_player(video_id, client_type)
.await
.unwrap();
rp.query().get_player(video_id, client_type).await.unwrap();
}
}
async fn player_model(testfiles: &Path) {
let rp = RustyPipe::default();
let rp = RustyPipe::builder().strict().build();
for (name, id) in [("multilanguage", "tVWWp1PqDus"), ("hdr", "LXb3EKWsInQ")] {
let mut json_path = testfiles.to_path_buf();
@ -97,7 +91,6 @@ async fn player_model(testfiles: &Path) {
let player_data = rp
.query()
.strict(true)
.get_player(id, ClientType::Desktop)
.await
.unwrap();
@ -122,11 +115,6 @@ async fn playlist(testfiles: &Path) {
}
let rp = rp_testfile(&json_path);
rp.query()
.report(true)
.strict(true)
.get_playlist(id)
.await
.unwrap();
rp.query().get_playlist(id).await.unwrap();
}
}

View file

@ -12,7 +12,9 @@ use fancy_regex::Regex;
use log::{error, warn};
use once_cell::sync::Lazy;
use rand::Rng;
use reqwest::{header, Client, ClientBuilder, Method, Request, RequestBuilder, Response};
use reqwest::{
header, Client, ClientBuilder, Method, Request, RequestBuilder, Response, StatusCode,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::sync::Mutex;
@ -20,7 +22,7 @@ use crate::{
cache::{CacheStorage, FileStorage},
deobfuscate::{DeobfData, Deobfuscator},
model::{Country, Language},
report::{JsonFileReporter, Level, Report, Reporter},
report::{FileReporter, Level, Report, Reporter},
serializer::MapResult,
util,
};
@ -63,9 +65,16 @@ impl ClientType {
}
}
#[derive(Clone, Debug, Serialize)]
struct YTQuery<T> {
context: YTContext,
#[serde(flatten)]
data: T,
}
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ContextYT {
struct YTContext {
client: ClientInfo,
/// only used on desktop
#[serde(skip_serializing_if = "Option::is_none")]
@ -161,9 +170,11 @@ struct RustyPipeRef {
http: Client,
storage: Option<Box<dyn CacheStorage + Sync + Send>>,
reporter: Option<Box<dyn Reporter + Sync + Send>>,
n_retries: u32,
user_agent: String,
consent_cookie: String,
cache: Mutex<CacheData>,
default_opts: RustyPipeOpts,
}
#[derive(Clone)]
@ -174,22 +185,20 @@ struct RustyPipeOpts {
strict: bool,
}
pub struct RustyPipeBuilder {
storage: Option<Box<dyn CacheStorage + Sync + Send>>,
reporter: Option<Box<dyn Reporter + Sync + Send>>,
n_retries: u32,
user_agent: String,
default_opts: RustyPipeOpts,
}
#[derive(Clone)]
pub struct RustyPipeQuery {
client: RustyPipe,
opts: RustyPipeOpts,
}
impl Default for RustyPipe {
fn default() -> Self {
Self::new(
Some(Box::new(FileStorage::default())),
Some(Box::new(JsonFileReporter::default())),
None,
)
}
}
impl Default for RustyPipeOpts {
fn default() -> Self {
Self {
@ -247,23 +256,36 @@ impl<T> From<T> for CacheEntry<T> {
}
}
impl RustyPipe {
/// Create a new RustyPipe instance
pub fn new(
storage: Option<Box<dyn CacheStorage + Sync + Send>>,
reporter: Option<Box<dyn Reporter + Sync + Send>>,
user_agent: Option<String>,
) -> Self {
let user_agent = user_agent.unwrap_or_else(|| DEFAULT_UA.to_owned());
impl Default for RustyPipeBuilder {
fn default() -> Self {
Self::new()
}
}
impl RustyPipeBuilder {
/// Constructs a new `RustyPipeBuilder`.
///
/// This is the same as `RustyPipe::builder()`
pub fn new() -> Self {
RustyPipeBuilder {
default_opts: RustyPipeOpts::default(),
storage: Some(Box::new(FileStorage::default())),
reporter: Some(Box::new(FileReporter::default())),
n_retries: 3,
user_agent: DEFAULT_UA.to_owned(),
}
}
/// Returns a new, configured RustyPipe instance.
pub fn build(self) -> RustyPipe {
let http = ClientBuilder::new()
.user_agent(user_agent.to_owned())
.user_agent(self.user_agent.to_owned())
.gzip(true)
.brotli(true)
.build()
.expect("unable to build the HTTP client");
.unwrap();
let cache = if let Some(storage) = &storage {
let cache = if let Some(storage) = &self.storage {
if let Some(data) = storage.read() {
match serde_json::from_str::<CacheData>(&data) {
Ok(data) => data,
@ -282,9 +304,10 @@ impl RustyPipe {
RustyPipe {
inner: Arc::new(RustyPipeRef {
http,
storage,
reporter,
user_agent,
storage: self.storage,
reporter: self.reporter,
n_retries: self.n_retries,
user_agent: self.user_agent,
consent_cookie: format!(
"{}={}{}",
CONSENT_COOKIE,
@ -292,45 +315,341 @@ impl RustyPipe {
rand::thread_rng().gen_range(100..1000)
),
cache: Mutex::new(cache),
default_opts: RustyPipeOpts::default(),
}),
}
}
/// Create a new RustyPipe instance configured for testing
#[cfg(test)]
#[cfg(feature = "yaml")]
pub fn new_test() -> Self {
Self::new(
Some(Box::new(FileStorage::default())),
Some(Box::new(crate::report::YamlFileReporter::default())),
None,
)
/// Add a `CacheStorage` backend for persisting cached information
/// (YouTube client versions, deobfuscation code) between
/// program executions.
///
/// **Default value**: `FileStorage` in `rustypipe_cache.json`
pub fn storage(mut self, storage: Box<dyn CacheStorage + Sync + Send>) -> Self {
self.storage = Some(storage);
self
}
/// Disable cache storage
pub fn no_storage(mut self) -> Self {
self.storage = None;
self
}
/// Add a `Reporter` to collect error details
///
/// **Default value**: `FileReporter` creating reports in `./rustypipe_reports`
pub fn reporter(mut self, reporter: Box<dyn Reporter + Sync + Send>) -> Self {
self.reporter = Some(reporter);
self
}
/// Disable the creation of report files in case of errors and warnings.
pub fn no_reporter(mut self) -> Self {
self.reporter = None;
self
}
/// Set the number of retries for HTTP requests.
///
/// If a HTTP requests fails and retries are enabled,
/// RustyPipe waits 1 second before the next attempt.
/// The waiting time is doubled for subsequent attempts (including a bit of
/// random jitter to be less predictable).
///
/// **Default value**: 3
pub fn n_retries(mut self, n_retries: u32) -> Self {
self.n_retries = n_retries;
self
}
/// Set the user agent used for making requests to the web API.
///
/// **Default value**: `Mozilla/5.0 (X11; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0`
/// (Firefox ESR on Debian)
pub fn user_agent(mut self, user_agent: &str) -> Self {
self.user_agent = user_agent.to_owned();
self
}
/// Set the language parameter used when accessing the YouTube API.
/// This will change multilanguage video titles, descriptions and textual dates
///
/// **Default value**: `Language::En` (English)
///
/// **Info**: you can set this option for individual queries, too
pub fn lang(mut self, lang: Language) -> Self {
self.default_opts.lang = lang;
self
}
/// Set the country parameter used when accessing the YouTube API.
/// This will change trends and recommended content.
///
/// **Default value**: `Country::Us` (USA)
///
/// **Info**: you can set this option for individual queries, too
pub fn country(mut self, country: Country) -> Self {
self.default_opts.country = country;
self
}
/// Generate a report on every operation.
/// This should only be used for debugging.
///
/// **Info**: you can set this option for individual queries, too
pub fn report(mut self) -> Self {
self.default_opts.report = true;
self
}
/// Enable strict mode, causing operations to fail if there
/// are warnings during deserialization (e.g. invalid items).
/// This should only be used for testing.
///
/// **Info**: you can set this option for individual queries, too
pub fn strict(mut self) -> Self {
self.default_opts.strict = true;
self
}
}
impl Default for RustyPipe {
fn default() -> Self {
Self::new()
}
}
impl RustyPipe {
/// Create a new RustyPipe instance with default settings.
///
/// To create an instance with custom options, use `RustyPipeBuilder` instead.
pub fn new() -> Self {
RustyPipeBuilder::new().build()
}
/// Constructs a new `RustyPipeBuilder`.
///
/// This is the same as `RustyPipeBuilder::new()`
pub fn builder() -> RustyPipeBuilder {
RustyPipeBuilder::new()
}
/// Constructs a new `RustyPipeQuery`.
pub fn query(&self) -> RustyPipeQuery {
RustyPipeQuery {
client: self.clone(),
opts: RustyPipeOpts {
lang: Language::En,
country: Country::Us,
report: false,
strict: false,
opts: self.inner.default_opts.clone(),
}
}
/// Execute the given http request.
async fn http_request(&self, request: Request) -> Result<Response, reqwest::Error> {
let mut last_res: Option<Result<Response, reqwest::Error>> = None;
for n in 0..self.inner.n_retries {
let res = self.inner.http.execute(request.try_clone().unwrap()).await;
let emsg = match &res {
Ok(response) => {
let status = response.status();
// Immediately return in case of success or unrecoverable status code
if status.is_success() || !status.is_server_error() {
return res;
}
status.to_string()
}
Err(e) => {
// Immediately return in case of unrecoverable error
if !e.is_timeout() && !e.is_connect() {
return res;
}
e.to_string()
}
};
let ms = util::retry_delay(n, 1000, 60000, 3);
warn!("Retry attempt #{}. Error: {}. Waiting {} ms", n, emsg, ms);
tokio::time::sleep(std::time::Duration::from_millis(ms.into())).await;
last_res = Some(res);
}
last_res.unwrap()
}
/// Execute the given http request, returning an error in case of a
/// non-successful status code.
async fn http_request_estatus(&self, request: Request) -> Result<Response> {
Ok(self.http_request(request).await?.error_for_status()?)
}
/// Execute the given http request, returning the response body as a string.
async fn http_request_txt(&self, request: Request) -> Result<String> {
Ok(self.http_request_estatus(request).await?.text().await?)
}
/// Extract the current version of the YouTube desktop client from the website.
async fn extract_desktop_client_version(&self) -> Result<String> {
let from_swjs = async {
let swjs = self
.http_request_txt(
self.inner
.http
.get("https://www.youtube.com/sw.js")
.header(header::ORIGIN, "https://www.youtube.com")
.header(header::REFERER, "https://www.youtube.com")
.header(header::COOKIE, self.inner.consent_cookie.to_owned())
.build()
.unwrap(),
)
.await
.context("Failed to download sw.js")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version in sw.js"))
};
let from_html = async {
let html = self
.http_request_txt(
self.inner
.http
.get("https://www.youtube.com/results?search_query=")
.build()
.unwrap(),
)
.await
.context("Failed to get YT Desktop page")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version on html page"))
};
match from_swjs.await {
Ok(client_version) => Ok(client_version),
Err(_) => from_html.await,
}
}
/// Extract the current version of the YouTube Music desktop client from the website.
async fn extract_music_client_version(&self) -> Result<String> {
let from_swjs = async {
let swjs = self
.http_request_txt(
self.inner
.http
.get("https://music.youtube.com/sw.js")
.header(header::ORIGIN, "https://music.youtube.com")
.header(header::REFERER, "https://music.youtube.com")
.header(header::COOKIE, self.inner.consent_cookie.to_owned())
.build()
.unwrap(),
)
.await
.context("Failed to download sw.js")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version in sw.js"))
};
let from_html = async {
let html = self
.http_request_txt(
self.inner
.http
.get("https://music.youtube.com")
.build()
.unwrap(),
)
.await
.context("Failed to get YT Desktop page")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version on html page"))
};
match from_swjs.await {
Ok(client_version) => Ok(client_version),
Err(_) => from_html.await,
}
}
/// Get the current version of the YouTube web client from the following sources
///
/// 1. from cache
/// 2. from YouTube's service worker script (`sw.js`)
/// 3. from the YouTube website
/// 4. fall back to the hardcoded version
async fn get_desktop_client_version(&self) -> String {
let mut cache = self.inner.cache.lock().await;
match cache.desktop_client.get() {
Some(cdata) => cdata.version.to_owned(),
None => match self.extract_desktop_client_version().await {
Ok(version) => {
cache.desktop_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
self.store_cache(&cache);
version
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_CLIENT_VERSION.to_owned()
}
},
}
}
#[cfg(test)]
pub fn test_query(&self) -> RustyPipeQuery {
RustyPipeQuery {
client: self.clone(),
opts: RustyPipeOpts {
lang: Language::En,
country: Country::Us,
report: false,
strict: true,
/// Get the current version of the YouTube Music web client from the following sources
///
/// 1. from cache
/// 2. from YouTube Music's service worker script (`sw.js`)
/// 3. from the YouTube Music website
/// 4. fall back to the hardcoded version
async fn get_music_client_version(&self) -> String {
let mut cache = self.inner.cache.lock().await;
match cache.music_client.get() {
Some(cdata) => cdata.version.to_owned(),
None => match self.extract_music_client_version().await {
Ok(version) => {
cache.music_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
self.store_cache(&cache);
version
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_MUSIC_CLIENT_VERSION.to_owned()
}
},
}
}
/// Instantiate a new deobfuscator from either cached or extracted YouTube JavaScript code.
async fn get_deobf(&self) -> Result<Deobfuscator> {
let mut cache = self.inner.cache.lock().await;
match cache.deobf.get() {
Some(deobf) => Ok(Deobfuscator::from(deobf.to_owned())),
None => {
let deobf = Deobfuscator::new(self.inner.http.clone()).await?;
cache.deobf = CacheEntry::from(deobf.get_data());
self.store_cache(&cache);
Ok(deobf)
}
}
}
/// Write the given cache data to the storage backend.
fn store_cache(&self, cache: &CacheData) {
if let Some(storage) = &self.inner.storage {
match serde_json::to_string(cache) {
Ok(data) => storage.write(&data),
Err(e) => error!("Could not serialize cache. Error: {}", e),
}
}
}
}
impl RustyPipeQuery {
@ -350,20 +669,26 @@ impl RustyPipeQuery {
/// Generate a report on every operation.
/// This should only be used for debugging.
pub fn report(mut self, report: bool) -> Self {
self.opts.report = report;
pub fn report(mut self) -> Self {
self.opts.report = true;
self
}
/// Enable strict mode, causing operations to fail if there
/// are warnings during deserialization (e.g. invalid items).
/// This should only be used for testing.
pub fn strict(mut self, strict: bool) -> Self {
self.opts.strict = strict;
pub fn strict(mut self) -> Self {
self.opts.strict = true;
self
}
async fn get_context(&self, ctype: ClientType, localized: bool) -> ContextYT {
/// Create a new context object, which is included in every request to
/// the YouTube API and contains language, country and device parameters.
///
/// # Parameters
/// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...)
/// - `localized`: Whether to include the configured language and country
async fn get_context(&self, ctype: ClientType, localized: bool) -> YTContext {
let hl = match localized {
true => self.opts.lang,
false => Language::En,
@ -374,10 +699,10 @@ impl RustyPipeQuery {
};
match ctype {
ClientType::Desktop => ContextYT {
ClientType::Desktop => YTContext {
client: ClientInfo {
client_name: "WEB".to_owned(),
client_version: self.get_desktop_client_version().await,
client_version: self.client.get_desktop_client_version().await,
client_screen: None,
device_model: None,
platform: "DESKTOP".to_owned(),
@ -389,10 +714,10 @@ impl RustyPipeQuery {
user: User::default(),
third_party: None,
},
ClientType::DesktopMusic => ContextYT {
ClientType::DesktopMusic => YTContext {
client: ClientInfo {
client_name: "WEB_REMIX".to_owned(),
client_version: self.get_music_client_version().await,
client_version: self.client.get_music_client_version().await,
client_screen: None,
device_model: None,
platform: "DESKTOP".to_owned(),
@ -404,7 +729,7 @@ impl RustyPipeQuery {
user: User::default(),
third_party: None,
},
ClientType::TvHtml5Embed => ContextYT {
ClientType::TvHtml5Embed => YTContext {
client: ClientInfo {
client_name: "TVHTML5_SIMPLY_EMBEDDED_PLAYER".to_owned(),
client_version: TVHTML5_CLIENT_VERSION.to_owned(),
@ -421,7 +746,7 @@ impl RustyPipeQuery {
embed_url: "https://www.youtube.com/".to_owned(),
}),
},
ClientType::Android => ContextYT {
ClientType::Android => YTContext {
client: ClientInfo {
client_name: "ANDROID".to_owned(),
client_version: MOBILE_CLIENT_VERSION.to_owned(),
@ -436,7 +761,7 @@ impl RustyPipeQuery {
user: User::default(),
third_party: None,
},
ClientType::Ios => ContextYT {
ClientType::Ios => YTContext {
client: ClientInfo {
client_name: "IOS".to_owned(),
client_version: MOBILE_CLIENT_VERSION.to_owned(),
@ -454,6 +779,13 @@ impl RustyPipeQuery {
}
}
/// Create a new Reqwest HTTP request builder with the URL and headers required
/// for accessing the YouTube API
///
/// # Parameters
/// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...)
/// - `method`: HTTP method
/// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/<XYZ>?key=...`)
async fn request_builder(
&self,
ctype: ClientType,
@ -478,7 +810,7 @@ impl RustyPipeQuery {
.header("X-YouTube-Client-Name", "1")
.header(
"X-YouTube-Client-Version",
self.get_desktop_client_version().await,
self.client.get_desktop_client_version().await,
),
ClientType::DesktopMusic => self
.client
@ -500,7 +832,7 @@ impl RustyPipeQuery {
.header("X-YouTube-Client-Name", "67")
.header(
"X-YouTube-Client-Version",
self.get_music_client_version().await,
self.client.get_music_client_version().await,
),
ClientType::TvHtml5Embed => self
.client
@ -564,6 +896,20 @@ impl RustyPipeQuery {
}
}
/// Execute a request to the YouTube API, then deobfuscate and map the response.
///
/// Creates a report in case of failure for easy debugging.
///
/// # Parameters
/// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...)
/// - `operation`: Name of the RustyPipe operation (only for reporting, e.g. `get_player`)
/// - `id`: ID of the requested entity (Video ID, Channel ID, ...).
/// The ID is included in reports and is also passed to the mapper for validating the response.
/// Set it to an empty string if you are not requesting an entity with an ID.
/// - `method`: HTTP method
/// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/<XYZ>?key=...`)
/// - `body`: Serializable request body to be sent in json format
/// - `deobf`: Deobfuscator (is passed to the mapper to deobfuscate stream URLs).
#[allow(clippy::too_many_arguments)]
async fn execute_request_deobf<
R: DeserializeOwned + MapResponse<M> + Debug,
@ -573,9 +919,9 @@ impl RustyPipeQuery {
&self,
ctype: ClientType,
operation: &str,
id: &str,
method: Method,
endpoint: &str,
id: &str,
body: &B,
deobf: Option<&Deobfuscator>,
) -> Result<M> {
@ -600,7 +946,7 @@ impl RustyPipeQuery {
version: "0.1.0".to_owned(),
date: chrono::Local::now(),
level,
operation: operation.to_owned(),
operation: format!("{}({})", operation, id),
error,
msgs,
deobf_data: deobf.map(Deobfuscator::get_data),
@ -661,6 +1007,19 @@ impl RustyPipeQuery {
}
}
/// Execute a request to the YouTube API, then map the response.
///
/// Creates a report in case of failure for easy debugging.
///
/// # Parameters
/// - `ctype`: Client type (`Desktop`, `DesktopMusic`, `Android`, ...)
/// - `operation`: Name of the RustyPipe operation (only for reporting, e.g. `get_player`)
/// - `id`: ID of the requested entity (Video ID, Channel ID, ...).
/// The ID is included in reports and is also passed to the mapper for validating the response.
/// Set it to an empty string if you are not requesting an entity with an ID.
/// - `method`: HTTP method
/// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/<XYZ>?key=...`)
/// - `body`: Serializable request body to be sent in json format
async fn execute_request<
R: DeserializeOwned + MapResponse<M> + Debug,
M,
@ -669,174 +1028,31 @@ impl RustyPipeQuery {
&self,
ctype: ClientType,
operation: &str,
id: &str,
method: Method,
endpoint: &str,
id: &str,
body: &B,
) -> Result<M> {
self.execute_request_deobf::<R, M, B>(ctype, operation, method, endpoint, id, body, None)
self.execute_request_deobf::<R, M, B>(ctype, operation, id, method, endpoint, body, None)
.await
}
async fn get_desktop_client_version(&self) -> String {
let mut cache = self.client.inner.cache.lock().await;
match cache.desktop_client.get() {
Some(cdata) => cdata.version.to_owned(),
None => match extract_desktop_client_version(
self.client.inner.http.clone(),
self.client.inner.consent_cookie.to_owned(),
)
.await
{
Ok(version) => {
cache.desktop_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
self.write_cache(&cache);
version
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_CLIENT_VERSION.to_owned()
}
},
}
}
async fn get_music_client_version(&self) -> String {
let mut cache = self.client.inner.cache.lock().await;
match cache.music_client.get() {
Some(cdata) => cdata.version.to_owned(),
None => match extract_music_client_version(
self.client.inner.http.clone(),
self.client.inner.consent_cookie.to_owned(),
)
.await
{
Ok(version) => {
cache.music_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
self.write_cache(&cache);
version
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_MUSIC_CLIENT_VERSION.to_owned()
}
},
}
}
async fn get_deobf(&self) -> Result<Deobfuscator> {
let mut cache = self.client.inner.cache.lock().await;
match cache.deobf.get() {
Some(deobf) => Ok(Deobfuscator::from(deobf.to_owned())),
None => {
let deobf = Deobfuscator::new(self.client.inner.http.clone()).await?;
cache.deobf = CacheEntry::from(deobf.get_data());
self.write_cache(&cache);
Ok(deobf)
}
}
}
fn write_cache(&self, cache: &CacheData) {
if let Some(storage) = &self.client.inner.storage {
match serde_json::to_string(cache) {
Ok(data) => storage.write(&data),
Err(e) => error!("Could not serialize cache. Error: {}", e),
}
}
}
}
async fn extract_desktop_client_version(http: Client, consent_cookie: String) -> Result<String> {
let from_swjs = async {
let swjs = exec_request_text(
http.clone(),
http.get("https://www.youtube.com/sw.js")
.header(header::ORIGIN, "https://www.youtube.com")
.header(header::REFERER, "https://www.youtube.com")
.header(header::COOKIE, consent_cookie)
.build()
.unwrap(),
)
.await
.context("Failed to download sw.js")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version in sw.js"))
};
let from_html = async {
let html = exec_request_text(
http.clone(),
http.get("https://www.youtube.com/results?search_query=")
.build()
.unwrap(),
)
.await
.context("Failed to get YT Desktop page")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version on html page"))
};
match from_swjs.await {
Ok(client_version) => Ok(client_version),
Err(_) => from_html.await,
}
}
async fn extract_music_client_version(http: Client, consent_cookie: String) -> Result<String> {
let from_swjs = async {
let swjs = exec_request_text(
http.clone(),
http.get("https://music.youtube.com/sw.js")
.header(header::ORIGIN, "https://music.youtube.com")
.header(header::REFERER, "https://music.youtube.com")
.header(header::COOKIE, consent_cookie)
.build()
.unwrap(),
)
.await
.context("Failed to download sw.js")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &swjs, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version in sw.js"))
};
let from_html = async {
let html = exec_request_text(
http.clone(),
http.get("https://music.youtube.com").build().unwrap(),
)
.await
.context("Failed to get YT Desktop page")?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1)
.ok_or_else(|| anyhow!("Could not find desktop client version on html page"))
};
match from_swjs.await {
Ok(client_version) => Ok(client_version),
Err(_) => from_html.await,
}
}
async fn exec_request(http: Client, request: Request) -> Result<Response> {
Ok(http.execute(request).await?.error_for_status()?)
}
async fn exec_request_text(http: Client, request: Request) -> Result<String> {
Ok(exec_request(http, request).await?.text().await?)
}
/// Implement this for YouTube API response structs that need to be mapped to
/// RustyPipe models.
trait MapResponse<T> {
/// Map the YouTube API response structs to a RustyPipe model.
///
/// Returns an error if crucial data required for the model could not be extracted.
///
/// Returns a `MapResult` with warnings if there were issues with the deserializing/mapping,
/// but the resulting data is still usable.
///
/// # Parameters
/// - `id`: The ID of the requested entity (Video ID, Channel ID, ...). If possible, assert
/// that the returned entity matches this ID and return an error instead.
/// - `lang`: Language of the request. Used for mapping localized information like dates.
/// - `deobf`: Deobfuscator (if passed to the `execute_request_deobf` method)
fn map_response(
self,
id: &str,
@ -846,7 +1062,6 @@ trait MapResponse<T> {
}
#[cfg(test)]
#[cfg(feature = "yaml")]
mod tests {
// use super::*;
}

View file

@ -21,13 +21,13 @@ use crate::{
use super::{
response::{self, player},
ClientType, ContextYT, MapResponse, MapResult, RustyPipeQuery,
ClientType, MapResponse, MapResult, RustyPipeQuery, YTContext,
};
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct QPlayer {
context: ContextYT,
context: YTContext,
/// Website playback context
#[serde(skip_serializing_if = "Option::is_none")]
playback_context: Option<QPlaybackContext>,
@ -61,7 +61,7 @@ impl RustyPipeQuery {
pub async fn get_player(self, video_id: &str, client_type: ClientType) -> Result<VideoPlayer> {
let q1 = self.clone();
let t_context = tokio::spawn(async move { q1.get_context(client_type, false).await });
let q2 = self.clone();
let q2 = self.client.clone();
let t_deobf = tokio::spawn(async move { q2.get_deobf().await });
let (context, deobf) = tokio::join!(t_context, t_deobf);
@ -96,9 +96,9 @@ impl RustyPipeQuery {
self.execute_request_deobf::<response::Player, _, _>(
client_type,
"get_player",
video_id,
Method::POST,
"player",
video_id,
&request_body,
Some(&deobf),
)
@ -559,7 +559,6 @@ fn get_audio_codec(codecs: Vec<&str>) -> AudioCodec {
}
#[cfg(test)]
#[cfg(feature = "yaml")]
mod tests {
use std::{fs::File, io::BufReader, path::Path};
@ -632,9 +631,9 @@ mod tests {
#[case::ios(ClientType::Ios)]
#[test_log::test(tokio::test)]
async fn t_get_player(#[case] client_type: ClientType) {
let rp = RustyPipe::new_test();
let rp = RustyPipe::builder().strict().build();
let player_data = rp
.test_query()
.query()
.get_player("n4tK7LYFxI0", client_type)
.await
.unwrap();

View file

@ -9,19 +9,19 @@ use crate::{
timeago, util,
};
use super::{response, ClientType, ContextYT, MapResponse, MapResult, RustyPipeQuery};
use super::{response, ClientType, MapResponse, MapResult, RustyPipeQuery, YTContext};
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct QPlaylist {
context: ContextYT,
context: YTContext,
browse_id: String,
}
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct QPlaylistCont {
context: ContextYT,
context: YTContext,
continuation: String,
}
@ -36,9 +36,9 @@ impl RustyPipeQuery {
self.execute_request::<response::Playlist, _, _>(
ClientType::Desktop,
"get_playlist",
playlist_id,
Method::POST,
"browse",
playlist_id,
&request_body,
)
.await
@ -57,9 +57,9 @@ impl RustyPipeQuery {
.execute_request::<response::PlaylistCont, _, _>(
ClientType::Desktop,
"get_playlist_cont",
&playlist.id,
Method::POST,
"browse",
&playlist.id,
&request_body,
)
.await?;
@ -339,8 +339,8 @@ mod tests {
#[case] description: Option<String>,
#[case] channel: Option<Channel>,
) {
let rp = RustyPipe::new_test();
let playlist = rp.test_query().get_playlist(id).await.unwrap();
let rp = RustyPipe::builder().strict().build();
let playlist = rp.query().get_playlist(id).await.unwrap();
assert_eq!(playlist.id, id);
assert_eq!(playlist.name, name);
@ -380,18 +380,15 @@ mod tests {
#[test_log::test(tokio::test)]
async fn t_playlist_cont() {
let rp = RustyPipe::new_test();
let rp = RustyPipe::builder().strict().build();
let mut playlist = rp
.test_query()
.query()
.get_playlist("PLbZIPy20-1pN7mqjckepWF78ndb6ci_qi")
.await
.unwrap();
while playlist.ctoken.is_some() {
rp.test_query()
.get_playlist_cont(&mut playlist)
.await
.unwrap();
rp.query().get_playlist_cont(&mut playlist).await.unwrap();
}
assert!(playlist.videos.len() > 100);

View file

@ -65,11 +65,11 @@ pub trait Reporter {
fn report(&self, report: &Report);
}
pub struct JsonFileReporter {
pub struct FileReporter {
path: PathBuf,
}
impl JsonFileReporter {
impl FileReporter {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
path: path.as_ref().to_path_buf(),
@ -77,13 +77,21 @@ impl JsonFileReporter {
}
fn _report(&self, report: &Report) -> Result<()> {
let report_path = get_report_path(&self.path, report)?;
serde_json::to_writer_pretty(&File::create(report_path)?, &report)?;
#[cfg(not(feature = "report-yaml"))]
{
let report_path = get_report_path(&self.path, report, "json")?;
serde_json::to_writer_pretty(&File::create(report_path)?, &report)?;
}
#[cfg(feature = "report-yaml")]
{
let report_path = get_report_path(&self.path, report, "yaml")?;
serde_yaml::to_writer(&File::create(report_path)?, &report)?;
}
Ok(())
}
}
impl Default for JsonFileReporter {
impl Default for FileReporter {
fn default() -> Self {
Self {
path: Path::new("rustypipe_reports").to_path_buf(),
@ -91,51 +99,14 @@ impl Default for JsonFileReporter {
}
}
impl Reporter for JsonFileReporter {
impl Reporter for FileReporter {
fn report(&self, report: &Report) {
self._report(report)
.unwrap_or_else(|e| error!("Could not store report file. Err: {}", e));
}
}
#[cfg(feature = "yaml")]
pub struct YamlFileReporter {
path: PathBuf,
}
#[cfg(feature = "yaml")]
impl YamlFileReporter {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
path: path.as_ref().to_path_buf(),
}
}
fn _report(&self, report: &Report) -> Result<()> {
let report_path = get_report_path(&self.path, report)?;
serde_yaml::to_writer(&File::create(report_path)?, &report)?;
Ok(())
}
}
#[cfg(feature = "yaml")]
impl Default for YamlFileReporter {
fn default() -> Self {
Self {
path: Path::new("rustypipe_reports").to_path_buf(),
}
}
}
#[cfg(feature = "yaml")]
impl Reporter for YamlFileReporter {
fn report(&self, report: &Report) {
self._report(report)
.unwrap_or_else(|e| error!("Could not store report file. Err: {}", e));
}
}
fn get_report_path(root: &Path, report: &Report) -> Result<PathBuf> {
fn get_report_path(root: &Path, report: &Report, ext: &str) -> Result<PathBuf> {
if !root.is_dir() {
std::fs::create_dir_all(root)?;
}
@ -143,13 +114,13 @@ fn get_report_path(root: &Path, report: &Report) -> Result<PathBuf> {
let filename_prefix = format!("{}_{:?}", report.date.format("%F_%H-%M-%S"), report.level);
let mut report_path = root.to_path_buf();
report_path.push(format!("{}.yaml", filename_prefix));
report_path.push(format!("{}.{}", filename_prefix, ext));
// ensure unique filename
for i in 1..u32::MAX {
if report_path.exists() {
report_path = root.to_path_buf();
report_path.push(format!("{}_{}.yaml", filename_prefix, i));
report_path.push(format!("{}_{}.{}", filename_prefix, i, ext));
} else {
break;
}

View file

@ -8,6 +8,10 @@ pub use vec_log_err::VecLogError;
use std::fmt::Debug;
/// This represents a result from a deserializing/mapping operation.
/// It holds the desired content (`c`) and a list of warning messages,
/// if there occurred minor error during the deserializing or mapping
/// (e.g. certain list items could not be deserialized).
#[derive(Clone)]
pub struct MapResult<T> {
pub c: T,

View file

@ -8,6 +8,11 @@ use serde_with::{de::DeserializeAsWrap, DeserializeAs};
use super::MapResult;
/// Deserializes a list of arbitrary items into a `MapResult`,
/// creating warnings for items that could not be deserialized.
///
/// This is similar to `VecSkipError`, but it does not silently ignore
/// faulty items.
pub struct VecLogError<T>(PhantomData<T>);
impl<'de, T, U> DeserializeAs<'de, MapResult<Vec<T>>> for VecLogError<U>

View file

@ -89,6 +89,21 @@ where
numbers
}
pub fn retry_delay(
n_past_retries: u32,
min_retry_interval: u32,
max_retry_interval: u32,
backoff_base: u32,
) -> u32 {
let unjittered_delay = backoff_base.checked_pow(n_past_retries).unwrap_or(u32::MAX);
let jitter_factor = rand::thread_rng().gen_range(800..1500);
let jittered_delay = unjittered_delay
.checked_mul(jitter_factor)
.unwrap_or(u32::MAX);
min_retry_interval.max(jittered_delay.min(max_retry_interval))
}
#[cfg(test)]
mod tests {
use rstest::rstest;
@ -111,4 +126,20 @@ mod tests {
let n = parse_numeric_vec::<u32>(string);
assert_eq!(n, expect);
}
#[rstest]
#[case(0, 800, 1500)]
#[case(1, 2400, 4500)]
#[case(2, 7200, 13500)]
#[case(100, 60000, 60000)]
fn t_retry_delay(#[case] n: u32, #[case] expect_min: u32, #[case] expect_max: u32) {
let res = retry_delay(n, 1000, 60000, 3);
assert!(
res >= expect_min && res <= expect_max,
"res: {} not within {} and {}",
res,
expect_min,
expect_max
);
}
}