fix: reworked retry system

This commit is contained in:
ThetaDev 2023-05-12 17:19:56 +02:00
parent d128ca4214
commit a2bbc850a7
17 changed files with 273 additions and 121 deletions

View file

@ -322,6 +322,14 @@ struct ClientData {
pub version: String,
}
/// Result of a successful HTTP request
struct RequestResult<T> {
/// Result of the deserialiation/mapping
res: Result<MapResult<T>, Error>,
status: StatusCode,
body: String,
}
impl<T> CacheEntry<T> {
fn get(&self) -> Option<&T> {
match self {
@ -487,8 +495,9 @@ impl RustyPipeBuilder {
/// Set the number of retries for HTTP requests.
///
/// If a HTTP requests fails and retries are enabled,
/// If a HTTP requests fails because of a serverside error and retries are enabled,
/// RustyPipe waits 1 second before the next attempt.
///
/// The waiting time is doubled for subsequent attempts (including a bit of
/// random jitter to be less predictable).
///
@ -591,40 +600,43 @@ impl RustyPipe {
}
/// Execute the given http request.
async fn http_request(&self, request: Request) -> Result<Response, reqwest::Error> {
let mut last_res = None;
async fn http_request(&self, request: &Request) -> Result<Response, reqwest::Error> {
let mut last_resp = None;
for n in 0..=self.inner.n_http_retries {
let res = self.inner.http.execute(request.try_clone().unwrap()).await;
let emsg = match &res {
Ok(response) => {
let status = response.status();
// Immediately return in case of success or unrecoverable status code
if status.is_success() || (!status.is_server_error() && status != 429) {
return res;
}
status.to_string()
}
Err(e) => {
// Immediately return in case of unrecoverable error
if !e.is_timeout() && !e.is_connect() {
return res;
}
e.to_string()
}
};
let resp = self
.inner
.http
.execute(request.try_clone().unwrap())
.await?;
let ms = util::retry_delay(n, 1000, 60000, 3);
log::warn!("Retry attempt #{}. Error: {}. Waiting {} ms", n, emsg, ms);
tokio::time::sleep(Duration::from_millis(ms.into())).await;
let status = resp.status();
// Immediately return in case of success or unrecoverable status code
if status.is_success()
|| (!status.is_server_error() && status != StatusCode::TOO_MANY_REQUESTS)
{
return Ok(resp);
}
last_res = Some(res);
// Retry in case of a recoverable status code (server err, too many requests)
if n != self.inner.n_http_retries {
let ms = util::retry_delay(n, 1000, 60000, 3);
log::warn!(
"Retry attempt #{}. Error: {}. Waiting {} ms",
n + 1,
status,
ms
);
tokio::time::sleep(Duration::from_millis(ms.into())).await;
}
last_resp = Some(resp);
}
last_res.unwrap()
Ok(last_resp.unwrap())
}
/// Execute the given http request, returning an error in case of a
/// non-successful status code.
async fn http_request_estatus(&self, request: Request) -> Result<Response, Error> {
async fn http_request_estatus(&self, request: &Request) -> Result<Response, Error> {
let res = self.http_request(request).await?;
let status = res.status();
@ -636,7 +648,7 @@ impl RustyPipe {
}
/// Execute the given http request, returning the response body as a string.
async fn http_request_txt(&self, request: Request) -> Result<String, Error> {
async fn http_request_txt(&self, request: &Request) -> Result<String, Error> {
Ok(self.http_request_estatus(request).await?.text().await?)
}
@ -672,7 +684,8 @@ impl RustyPipe {
let from_swjs = sw_url.map(|sw_url| async move {
let swjs = self
.http_request_txt(
self.inner
&self
.inner
.http
.get(sw_url)
.header(header::ORIGIN, origin)
@ -696,7 +709,7 @@ impl RustyPipe {
builder = builder.header(header::USER_AGENT, ua);
}
let html = self.http_request_txt(builder.build().unwrap()).await?;
let html = self.http_request_txt(&builder.build().unwrap()).await?;
util::get_cg_from_regexes(CLIENT_VERSION_REGEXES.iter(), &html, 1).ok_or(
Error::Extraction(ExtractionError::InvalidData(Cow::Borrowed(
@ -1069,6 +1082,85 @@ impl RustyPipeQuery {
}
}
async fn yt_request_attempt<R: DeserializeOwned + MapResponse<M> + Debug, M>(
&self,
request: &Request,
id: &str,
deobf: Option<&DeobfData>,
) -> Result<RequestResult<M>, Error> {
let response = self
.client
.inner
.http
.execute(request.try_clone().unwrap())
.await?;
let status = response.status();
let body = response.text().await?;
let res = if status.is_client_error() || status.is_server_error() {
let error_msg = serde_json::from_str::<response::ErrorResponse>(&body)
.map(|r| Cow::from(r.error.message));
Err(match status {
StatusCode::NOT_FOUND => Error::Extraction(ExtractionError::NotFound {
id: id.to_owned(),
msg: error_msg.unwrap_or("404".into()),
}),
StatusCode::BAD_REQUEST => {
Error::Extraction(ExtractionError::BadRequest(error_msg.unwrap_or_default()))
}
_ => Error::HttpStatus(status.as_u16(), error_msg.unwrap_or_default()),
})
} else {
match serde_json::from_str::<R>(&body) {
Ok(deserialized) => match deserialized.map_response(id, self.opts.lang, deobf) {
Ok(mapres) => Ok(mapres),
Err(e) => Err(e.into()),
},
Err(e) => Err(Error::from(ExtractionError::from(e))),
}
};
Ok(RequestResult { res, status, body })
}
async fn yt_request<R: DeserializeOwned + MapResponse<M> + Debug, M>(
&self,
request: &Request,
id: &str,
deobf: Option<&DeobfData>,
) -> Result<RequestResult<M>, Error> {
let mut last_resp = None;
for n in 0..=self.client.inner.n_http_retries {
let resp = self.yt_request_attempt::<R, M>(request, id, deobf).await?;
let err = match &resp.res {
Ok(_) => return Ok(resp),
Err(e) => {
if !e.should_retry() {
return Ok(resp);
}
e
}
};
if n != self.client.inner.n_http_retries {
let ms = util::retry_delay(n, 1000, 60000, 3);
log::warn!(
"Retry attempt #{}. Error: {}. Waiting {} ms",
n + 1,
err,
ms
);
tokio::time::sleep(Duration::from_millis(ms.into())).await;
}
last_resp = Some(resp);
}
Ok(last_resp.unwrap())
}
/// Execute a request to the YouTube API, then deobfuscate and map the response.
///
/// Creates a report in case of failure for easy debugging.
@ -1104,18 +1196,31 @@ impl RustyPipeQuery {
.json(body)
.build()?;
let request_url = request.url().to_string();
let request_headers = request.headers().to_owned();
let response = self.client.http_request(request).await?;
let status = response.status();
let resp_str = response.text().await?;
let req_res = self.yt_request::<R, M>(&request, id, deobf).await?;
// Uncomment to debug response text
// println!("{}", &resp_str);
// println!("{}", &req_res.body);
let create_report = |level: Level, error: Option<String>, msgs: Vec<String>| {
let (level, error, msgs, res) = match req_res.res {
Ok(mapres) => {
let level = if mapres.warnings.is_empty() {
Level::DBG
} else {
Level::WRN
};
(level, None, mapres.warnings, Ok(mapres.c))
}
Err(e) => {
let level = if e.should_report() {
Level::ERR
} else {
Level::DBG
};
(level, Some(e.to_string()), Vec::new(), Err(e))
}
};
if level > Level::DBG || self.opts.report {
if let Some(reporter) = &self.client.inner.reporter {
let report = Report {
info: Default::default(),
@ -1125,75 +1230,29 @@ impl RustyPipeQuery {
msgs,
deobf_data: deobf.cloned(),
http_request: crate::report::HTTPRequest {
url: request_url,
url: request.url().to_string(),
method: "POST".to_string(),
req_header: request_headers
req_header: request
.headers()
.iter()
.map(|(k, v)| {
(k.to_string(), v.to_str().unwrap_or_default().to_owned())
})
.collect(),
req_body: serde_json::to_string(body).unwrap_or_default(),
status: status.into(),
resp_body: resp_str.to_owned(),
status: req_res.status.into(),
resp_body: req_res.body,
},
};
reporter.report(&report);
}
};
if status.is_client_error() || status.is_server_error() {
let error_msg = serde_json::from_str::<response::ErrorResponse>(&resp_str)
.map(|r| Cow::from(r.error.message));
return match status {
StatusCode::NOT_FOUND => Err(Error::Extraction(ExtractionError::NotFound {
id: id.to_owned(),
msg: error_msg.unwrap_or("404".into()),
})),
StatusCode::BAD_REQUEST => Err(Error::Extraction(ExtractionError::BadRequest(
error_msg.unwrap_or_default(),
))),
_ => Err(Error::HttpStatus(
status.as_u16(),
error_msg.unwrap_or_default(),
)),
};
}
match serde_json::from_str::<R>(&resp_str) {
Ok(deserialized) => match deserialized.map_response(id, self.opts.lang, deobf) {
Ok(mapres) => {
if !mapres.warnings.is_empty() {
create_report(
Level::WRN,
Some(ExtractionError::DeserializationWarnings.to_string()),
mapres.warnings,
);
if self.opts.strict {
return Err(Error::Extraction(
ExtractionError::DeserializationWarnings,
));
}
} else if self.opts.report {
create_report(Level::DBG, None, vec![]);
}
Ok(mapres.c)
}
Err(e) => {
if e.should_report() || self.opts.report {
create_report(Level::ERR, Some(e.to_string()), Vec::new());
}
Err(e.into())
}
},
Err(e) => {
create_report(Level::ERR, Some(e.to_string()), Vec::new());
Err(Error::from(ExtractionError::from(e)))
}
if res.is_ok() && level > Level::DBG && self.opts.strict {
return Err(Error::Extraction(ExtractionError::DeserializationWarnings));
}
res
}
/// Execute a request to the YouTube API, then map the response.
@ -1238,7 +1297,7 @@ impl RustyPipeQuery {
.json(body)
.build()?;
self.client.http_request_txt(request).await
self.client.http_request_txt(&request).await
}
}