feat: rewrite request attempt system, retry with different visitor data

This commit is contained in:
ThetaDev 2025-02-06 03:12:54 +01:00
parent 8385b87c63
commit dfd03edfad
No known key found for this signature in database
GPG key ID: E319D3C5148D65B6

View file

@ -578,12 +578,14 @@ struct ClientData {
pub version: String,
}
/// Result of a successful HTTP request
/// Result of a YouTube HTTP request
struct RequestResult<T> {
/// Result of the deserialiation/mapping
res: Result<MapResult<T>, Error>,
status: StatusCode,
body: String,
visitor_data: String,
request: Request,
}
impl<T> CacheEntry<T> {
@ -838,7 +840,7 @@ impl RustyPipeBuilder {
self
}
/// Set the number of retries for HTTP requests.
/// Set the maximum number of attempts for HTTP requests.
///
/// If a HTTP requests fails because of a serverside error and retries are enabled,
/// RustyPipe waits 1 second before the next attempt.
@ -849,7 +851,7 @@ impl RustyPipeBuilder {
/// **Default value**: 2
#[must_use]
pub fn n_http_retries(mut self, n_retries: u32) -> Self {
self.n_http_retries = n_retries;
self.n_http_retries = n_retries.max(1);
self
}
@ -2178,11 +2180,49 @@ impl RustyPipeQuery {
})
}
async fn yt_request_attempt<R: DeserializeOwned + MapResponse<M> + Debug, M>(
async fn execute_request_attempt<
R: DeserializeOwned + MapResponse<M> + Debug,
M,
B: Serialize + ?Sized,
>(
&self,
request: &Request,
ctx: &MapRespCtx<'_>,
ctype: ClientType,
id: &str,
endpoint: &str,
body: &B,
ctx_src: &MapRespOptions<'_>,
) -> Result<RequestResult<M>, Error> {
let visitor_data = match ctx_src
.visitor_data
.or(self.opts.visitor_data.as_deref())
.map(Cow::Borrowed)
{
Some(vd) => vd,
None => self.client.inner.visitor_data_cache.get().await?.into(),
};
let context = self
.get_context(ctype, !ctx_src.unlocalized, &visitor_data)
.await;
let req_body = QBody { context, body };
let ctx = MapRespCtx {
id,
lang: self.opts.lang,
deobf: ctx_src.deobf,
visitor_data: Some(&visitor_data),
client_type: ctype,
artist: ctx_src.artist.clone(),
authenticated: self.opts.auth.unwrap_or_default(),
session_po_token: ctx_src.session_po_token.clone(),
};
let request = self
.request_builder(ctype, endpoint, ctx.visitor_data)
.await?
.json(&req_body)
.build()?;
let response = self
.client
.inner
@ -2211,7 +2251,7 @@ impl RustyPipeQuery {
})
} else {
match serde_json::from_str::<R>(&body) {
Ok(deserialized) => match deserialized.map_response(ctx) {
Ok(deserialized) => match deserialized.map_response(&ctx) {
Ok(mapres) => Ok(mapres),
Err(e) => Err(e.into()),
},
@ -2220,18 +2260,32 @@ impl RustyPipeQuery {
};
tracing::trace!("mapped response");
Ok(RequestResult { res, status, body })
Ok(RequestResult {
res,
status,
body,
request,
visitor_data: visitor_data.into_owned(),
})
}
#[tracing::instrument(skip_all)]
async fn yt_request<R: DeserializeOwned + MapResponse<M> + Debug, M>(
async fn execute_request_inner<
R: DeserializeOwned + MapResponse<M> + Debug,
M,
B: Serialize + ?Sized,
>(
&self,
request: &Request,
ctx: &MapRespCtx<'_>,
ctype: ClientType,
id: &str,
endpoint: &str,
body: &B,
ctx_src: &MapRespOptions<'_>,
) -> Result<RequestResult<M>, Error> {
let mut last_resp = None;
for n in 0..=self.client.inner.n_http_retries {
let resp = self.yt_request_attempt::<R, M>(request, ctx).await?;
let resp = self
.execute_request_attempt::<R, M, B>(ctype, id, endpoint, body, ctx_src)
.await?;
let err = match &resp.res {
Ok(_) => return Ok(resp),
@ -2243,6 +2297,9 @@ impl RustyPipeQuery {
}
};
// Remove the used visitor data from cache if the request resulted in a recoverable error
self.remove_visitor_data(&resp.visitor_data);
if n != self.client.inner.n_http_retries {
let ms = util::retry_delay(n, 1000, 60000, 3);
tracing::warn!(
@ -2273,7 +2330,6 @@ impl RustyPipeQuery {
/// - `endpoint`: YouTube API endpoint (`https://www.youtube.com/youtubei/v1/<XYZ>?key=...`)
/// - `body`: Serializable request body to be sent in json format
/// - `ctx_src`: Context source (additional parameters for fetching and mapping, used to build the MapRespCtx)
#[allow(clippy::too_many_arguments)]
async fn execute_request_ctx<
R: DeserializeOwned + MapResponse<M> + Debug,
M,
@ -2289,38 +2345,10 @@ impl RustyPipeQuery {
) -> Result<M, Error> {
tracing::debug!("getting {}({})", operation, id);
let visitor_data = match ctx_src
.visitor_data
.or(self.opts.visitor_data.as_deref())
.map(Cow::Borrowed)
{
Some(vd) => vd,
None => self.client.inner.visitor_data_cache.get().await?.into(),
};
let context = self
.get_context(ctype, !ctx_src.unlocalized, &visitor_data)
.await;
let req_body = QBody { context, body };
let ctx = MapRespCtx {
id,
lang: self.opts.lang,
deobf: ctx_src.deobf,
visitor_data: Some(&visitor_data),
client_type: ctype,
artist: ctx_src.artist,
authenticated: self.opts.auth.unwrap_or_default(),
session_po_token: ctx_src.session_po_token,
};
let request = self
.request_builder(ctype, endpoint, ctx.visitor_data)
.await?
.json(&req_body)
.build()?;
let req_res = self.yt_request::<R, M>(&request, &ctx).await?;
let req_res = self
.execute_request_inner::<R, M, B>(ctype, id, endpoint, body, &ctx_src)
.await?;
let request = req_res.request;
// Uncomment to debug response text
// println!("{}", &req_res.body);
@ -2352,7 +2380,7 @@ impl RustyPipeQuery {
operation: &format!("{operation}({id})"),
error,
msgs,
deobf_data: ctx.deobf.cloned(),
deobf_data: ctx_src.deobf.cloned(),
http_request: crate::report::HTTPRequest {
url: request.url().as_str(),
method: request.method().as_str(),
@ -2371,7 +2399,7 @@ impl RustyPipeQuery {
})
.collect(),
),
req_body: serde_json::to_string(&req_body).ok(),
req_body: serde_json::to_string(&req_res.body).ok(),
status: req_res.status.into(),
resp_body: req_res.body,
},