From dfd03edfadff2657e9cfbf04e5d313ba409520ac Mon Sep 17 00:00:00 2001 From: ThetaDev Date: Thu, 6 Feb 2025 03:12:54 +0100 Subject: [PATCH] feat: rewrite request attempt system, retry with different visitor data --- src/client/mod.rs | 124 ++++++++++++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 48 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index dfb5140..c1dfc8c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -578,12 +578,14 @@ struct ClientData { pub version: String, } -/// Result of a successful HTTP request +/// Result of a YouTube HTTP request struct RequestResult { /// Result of the deserialiation/mapping res: Result, Error>, status: StatusCode, body: String, + visitor_data: String, + request: Request, } impl CacheEntry { @@ -838,7 +840,7 @@ impl RustyPipeBuilder { self } - /// Set the number of retries for HTTP requests. + /// Set the maximum number of attempts for HTTP requests. /// /// If a HTTP requests fails because of a serverside error and retries are enabled, /// RustyPipe waits 1 second before the next attempt. @@ -849,7 +851,7 @@ impl RustyPipeBuilder { /// **Default value**: 2 #[must_use] pub fn n_http_retries(mut self, n_retries: u32) -> Self { - self.n_http_retries = n_retries; + self.n_http_retries = n_retries.max(1); self } @@ -2178,11 +2180,49 @@ impl RustyPipeQuery { }) } - async fn yt_request_attempt + Debug, M>( + async fn execute_request_attempt< + R: DeserializeOwned + MapResponse + Debug, + M, + B: Serialize + ?Sized, + >( &self, - request: &Request, - ctx: &MapRespCtx<'_>, + ctype: ClientType, + id: &str, + endpoint: &str, + body: &B, + ctx_src: &MapRespOptions<'_>, ) -> Result, Error> { + let visitor_data = match ctx_src + .visitor_data + .or(self.opts.visitor_data.as_deref()) + .map(Cow::Borrowed) + { + Some(vd) => vd, + None => self.client.inner.visitor_data_cache.get().await?.into(), + }; + + let context = self + .get_context(ctype, !ctx_src.unlocalized, &visitor_data) + .await; + let req_body = QBody { context, body }; + + let ctx = MapRespCtx { + id, + lang: self.opts.lang, + deobf: ctx_src.deobf, + visitor_data: Some(&visitor_data), + client_type: ctype, + artist: ctx_src.artist.clone(), + authenticated: self.opts.auth.unwrap_or_default(), + session_po_token: ctx_src.session_po_token.clone(), + }; + + let request = self + .request_builder(ctype, endpoint, ctx.visitor_data) + .await? + .json(&req_body) + .build()?; + let response = self .client .inner @@ -2211,7 +2251,7 @@ impl RustyPipeQuery { }) } else { match serde_json::from_str::(&body) { - Ok(deserialized) => match deserialized.map_response(ctx) { + Ok(deserialized) => match deserialized.map_response(&ctx) { Ok(mapres) => Ok(mapres), Err(e) => Err(e.into()), }, @@ -2220,18 +2260,32 @@ impl RustyPipeQuery { }; tracing::trace!("mapped response"); - Ok(RequestResult { res, status, body }) + Ok(RequestResult { + res, + status, + body, + request, + visitor_data: visitor_data.into_owned(), + }) } - #[tracing::instrument(skip_all)] - async fn yt_request + Debug, M>( + async fn execute_request_inner< + R: DeserializeOwned + MapResponse + Debug, + M, + B: Serialize + ?Sized, + >( &self, - request: &Request, - ctx: &MapRespCtx<'_>, + ctype: ClientType, + id: &str, + endpoint: &str, + body: &B, + ctx_src: &MapRespOptions<'_>, ) -> Result, Error> { let mut last_resp = None; for n in 0..=self.client.inner.n_http_retries { - let resp = self.yt_request_attempt::(request, ctx).await?; + let resp = self + .execute_request_attempt::(ctype, id, endpoint, body, ctx_src) + .await?; let err = match &resp.res { Ok(_) => return Ok(resp), @@ -2243,6 +2297,9 @@ impl RustyPipeQuery { } }; + // Remove the used visitor data from cache if the request resulted in a recoverable error + self.remove_visitor_data(&resp.visitor_data); + if n != self.client.inner.n_http_retries { let ms = util::retry_delay(n, 1000, 60000, 3); tracing::warn!( @@ -2273,7 +2330,6 @@ impl RustyPipeQuery { /// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/?key=...`) /// - `body`: Serializable request body to be sent in json format /// - `ctx_src`: Context source (additional parameters for fetching and mapping, used to build the MapRespCtx) - #[allow(clippy::too_many_arguments)] async fn execute_request_ctx< R: DeserializeOwned + MapResponse + Debug, M, @@ -2289,38 +2345,10 @@ impl RustyPipeQuery { ) -> Result { tracing::debug!("getting {}({})", operation, id); - let visitor_data = match ctx_src - .visitor_data - .or(self.opts.visitor_data.as_deref()) - .map(Cow::Borrowed) - { - Some(vd) => vd, - None => self.client.inner.visitor_data_cache.get().await?.into(), - }; - - let context = self - .get_context(ctype, !ctx_src.unlocalized, &visitor_data) - .await; - let req_body = QBody { context, body }; - - let ctx = MapRespCtx { - id, - lang: self.opts.lang, - deobf: ctx_src.deobf, - visitor_data: Some(&visitor_data), - client_type: ctype, - artist: ctx_src.artist, - authenticated: self.opts.auth.unwrap_or_default(), - session_po_token: ctx_src.session_po_token, - }; - - let request = self - .request_builder(ctype, endpoint, ctx.visitor_data) - .await? - .json(&req_body) - .build()?; - - let req_res = self.yt_request::(&request, &ctx).await?; + let req_res = self + .execute_request_inner::(ctype, id, endpoint, body, &ctx_src) + .await?; + let request = req_res.request; // Uncomment to debug response text // println!("{}", &req_res.body); @@ -2352,7 +2380,7 @@ impl RustyPipeQuery { operation: &format!("{operation}({id})"), error, msgs, - deobf_data: ctx.deobf.cloned(), + deobf_data: ctx_src.deobf.cloned(), http_request: crate::report::HTTPRequest { url: request.url().as_str(), method: request.method().as_str(), @@ -2371,7 +2399,7 @@ impl RustyPipeQuery { }) .collect(), ), - req_body: serde_json::to_string(&req_body).ok(), + req_body: serde_json::to_string(&req_res.body).ok(), status: req_res.status.into(), resp_body: req_res.body, },