fix: retry on empty continuation responses

This commit is contained in:
ThetaDev 2022-10-11 18:49:15 +02:00
parent ef35c48890
commit 562ac2df7e
10 changed files with 142 additions and 88 deletions

View file

@ -281,12 +281,11 @@ impl MapResponse<Paginator<ChannelVideo>> for response::ChannelCont {
_deobf: Option<&crate::deobfuscate::Deobfuscator>, _deobf: Option<&crate::deobfuscate::Deobfuscator>,
) -> Result<MapResult<Paginator<ChannelVideo>>, ExtractionError> { ) -> Result<MapResult<Paginator<ChannelVideo>>, ExtractionError> {
let mut actions = self.on_response_received_actions; let mut actions = self.on_response_received_actions;
let res = some_or_bail!( let res = actions
actions.try_swap_remove(0), .try_swap_remove(0)
Err(ExtractionError::InvalidData("no received action".into())) .ok_or(ExtractionError::Retry)?
) .append_continuation_items_action
.append_continuation_items_action .continuation_items;
.continuation_items;
Ok(map_videos(res, lang)) Ok(map_videos(res, lang))
} }
@ -300,12 +299,11 @@ impl MapResponse<Paginator<ChannelPlaylist>> for response::ChannelCont {
_deobf: Option<&crate::deobfuscate::Deobfuscator>, _deobf: Option<&crate::deobfuscate::Deobfuscator>,
) -> Result<MapResult<Paginator<ChannelPlaylist>>, ExtractionError> { ) -> Result<MapResult<Paginator<ChannelPlaylist>>, ExtractionError> {
let mut actions = self.on_response_received_actions; let mut actions = self.on_response_received_actions;
let res = some_or_bail!( let res = actions
actions.try_swap_remove(0), .try_swap_remove(0)
Err(ExtractionError::InvalidData("no received action".into())) .ok_or(ExtractionError::Retry)?
) .append_continuation_items_action
.append_continuation_items_action .continuation_items;
.continuation_items;
Ok(map_playlists(res)) Ok(map_playlists(res))
} }

View file

@ -169,7 +169,8 @@ struct RustyPipeRef {
http: Client, http: Client,
storage: Option<Box<dyn CacheStorage>>, storage: Option<Box<dyn CacheStorage>>,
reporter: Option<Box<dyn Reporter>>, reporter: Option<Box<dyn Reporter>>,
n_retries: u32, n_http_retries: u32,
n_query_retries: u32,
consent_cookie: String, consent_cookie: String,
cache: CacheHolder, cache: CacheHolder,
default_opts: RustyPipeOpts, default_opts: RustyPipeOpts,
@ -186,7 +187,8 @@ struct RustyPipeOpts {
pub struct RustyPipeBuilder { pub struct RustyPipeBuilder {
storage: Option<Box<dyn CacheStorage>>, storage: Option<Box<dyn CacheStorage>>,
reporter: Option<Box<dyn Reporter>>, reporter: Option<Box<dyn Reporter>>,
n_retries: u32, n_http_retries: u32,
n_query_retries: u32,
user_agent: String, user_agent: String,
default_opts: RustyPipeOpts, default_opts: RustyPipeOpts,
} }
@ -277,7 +279,8 @@ impl RustyPipeBuilder {
default_opts: RustyPipeOpts::default(), default_opts: RustyPipeOpts::default(),
storage: Some(Box::new(FileStorage::default())), storage: Some(Box::new(FileStorage::default())),
reporter: Some(Box::new(FileReporter::default())), reporter: Some(Box::new(FileReporter::default())),
n_retries: 3, n_http_retries: 3,
n_query_retries: 2,
user_agent: DEFAULT_UA.to_owned(), user_agent: DEFAULT_UA.to_owned(),
} }
} }
@ -312,7 +315,8 @@ impl RustyPipeBuilder {
http, http,
storage: self.storage, storage: self.storage,
reporter: self.reporter, reporter: self.reporter,
n_retries: self.n_retries, n_http_retries: self.n_http_retries,
n_query_retries: self.n_query_retries,
consent_cookie: format!( consent_cookie: format!(
"{}={}{}", "{}={}{}",
CONSENT_COOKIE, CONSENT_COOKIE,
@ -367,8 +371,18 @@ impl RustyPipeBuilder {
/// random jitter to be less predictable). /// random jitter to be less predictable).
/// ///
/// **Default value**: 3 /// **Default value**: 3
pub fn n_retries(mut self, n_retries: u32) -> Self { pub fn n_http_retries(mut self, n_retries: u32) -> Self {
self.n_retries = n_retries; self.n_http_retries = n_retries;
self
}
/// Set the number of retries for YouTube API queries.
///
/// If a YouTube API requests returns invalid data, the request is repeated.
///
/// **Default value**: 2
pub fn n_query_retries(mut self, n_retries: u32) -> Self {
self.n_http_retries = n_retries;
self self
} }
@ -458,7 +472,7 @@ impl RustyPipe {
request: Request, request: Request,
) -> core::result::Result<Response, reqwest::Error> { ) -> core::result::Result<Response, reqwest::Error> {
let mut last_res = None; let mut last_res = None;
for n in 0..self.inner.n_retries { for n in 0..self.inner.n_http_retries {
let res = self.inner.http.execute(request.try_clone().unwrap()).await; let res = self.inner.http.execute(request.try_clone().unwrap()).await;
let emsg = match &res { let emsg = match &res {
Ok(response) => { Ok(response) => {
@ -939,6 +953,44 @@ impl RustyPipeQuery {
endpoint: &str, endpoint: &str,
body: &B, body: &B,
deobf: Option<&Deobfuscator>, deobf: Option<&Deobfuscator>,
) -> Result<M> {
for n in 0..self.client.inner.n_query_retries.saturating_sub(1) {
let res = self
._try_execute_request_deobf::<R, M, B>(ctype, operation, id, endpoint, body, deobf)
.await;
let emsg = match res {
Ok(res) => return Ok(res),
Err(error) => match &error {
Error::Extraction(e) => match e {
ExtractionError::Deserialization(_)
| ExtractionError::InvalidData(_)
| ExtractionError::WrongResult(_)
| ExtractionError::Retry => e.to_string(),
_ => return Err(error),
},
_ => return Err(error),
},
};
warn!("{} retry attempt #{}. Error: {}.", operation, n, emsg);
}
self._try_execute_request_deobf::<R, M, B>(ctype, operation, id, endpoint, body, deobf)
.await
}
/// Single try of `execute_request_deobf`
async fn _try_execute_request_deobf<
R: DeserializeOwned + MapResponse<M> + Debug,
M,
B: Serialize + ?Sized,
>(
&self,
ctype: ClientType,
operation: &str,
id: &str,
endpoint: &str,
body: &B,
deobf: Option<&Deobfuscator>,
) -> Result<M> { ) -> Result<M> {
let request = self let request = self
.request_builder(ctype, endpoint) .request_builder(ctype, endpoint)
@ -949,7 +1001,7 @@ impl RustyPipeQuery {
let request_url = request.url().to_string(); let request_url = request.url().to_string();
let request_headers = request.headers().to_owned(); let request_headers = request.headers().to_owned();
let response = self.client.inner.http.execute(request).await?; let response = self.client.http_request(request).await?;
let status = response.status(); let status = response.status();
let resp_str = response.text().await?; let resp_str = response.text().await?;
@ -1013,7 +1065,8 @@ impl RustyPipeQuery {
ExtractionError::VideoUnavailable(_, _) ExtractionError::VideoUnavailable(_, _)
| ExtractionError::VideoAgeRestricted | ExtractionError::VideoAgeRestricted
| ExtractionError::ContentUnavailable(_) | ExtractionError::ContentUnavailable(_)
| ExtractionError::NoData => (), | ExtractionError::NoData
| ExtractionError::Retry => (),
_ => create_report(Level::ERR, Some(e.to_string()), Vec::new()), _ => create_report(Level::ERR, Some(e.to_string()), Vec::new()),
} }
Err(e.into()) Err(e.into())

View file

@ -193,12 +193,7 @@ impl MapResponse<Paginator<PlaylistVideo>> for response::PlaylistCont {
_deobf: Option<&Deobfuscator>, _deobf: Option<&Deobfuscator>,
) -> Result<MapResult<Paginator<PlaylistVideo>>, ExtractionError> { ) -> Result<MapResult<Paginator<PlaylistVideo>>, ExtractionError> {
let mut actions = self.on_response_received_actions; let mut actions = self.on_response_received_actions;
let action = some_or_bail!( let action = actions.try_swap_remove(0).ok_or(ExtractionError::Retry)?;
actions.try_swap_remove(0),
Err(ExtractionError::InvalidData(
"no continuation action".into()
))
);
let (items, ctoken) = let (items, ctoken) =
map_playlist_items(action.append_continuation_items_action.continuation_items.c); map_playlist_items(action.append_continuation_items_action.continuation_items.c);

View file

@ -25,6 +25,8 @@ pub struct Channel {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ChannelCont { pub struct ChannelCont {
#[serde(default)]
#[serde_as(as = "VecSkipError<_>")]
pub on_response_received_actions: Vec<OnResponseReceivedAction>, pub on_response_received_actions: Vec<OnResponseReceivedAction>,
} }

View file

@ -19,6 +19,7 @@ pub struct Playlist {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct PlaylistCont { pub struct PlaylistCont {
#[serde(default)]
#[serde_as(as = "VecSkipError<_>")] #[serde_as(as = "VecSkipError<_>")]
pub on_response_received_actions: Vec<OnResponseReceivedAction>, pub on_response_received_actions: Vec<OnResponseReceivedAction>,
} }

View file

@ -25,6 +25,7 @@ pub struct Search {
pub struct SearchCont { pub struct SearchCont {
#[serde_as(as = "Option<JsonString>")] #[serde_as(as = "Option<JsonString>")]
pub estimated_results: Option<u64>, pub estimated_results: Option<u64>,
#[serde_as(as = "VecSkipError<_>")]
pub on_response_received_commands: Vec<SearchContCommand>, pub on_response_received_commands: Vec<SearchContCommand>,
} }

View file

@ -11,7 +11,8 @@ use crate::serializer::{
}; };
use super::{ use super::{
ContinuationEndpoint, ContinuationItemRenderer, Icon, Thumbnails, VideoListItem, VideoOwner, ContinuationEndpoint, ContinuationItemRenderer, Icon, MusicContinuation, Thumbnails,
VideoListItem, VideoOwner,
}; };
/* /*
@ -282,6 +283,8 @@ pub struct RecommendationResults {
/// Can be `None` for age-restricted videos /// Can be `None` for age-restricted videos
#[serde_as(as = "Option<VecLogError<_>>")] #[serde_as(as = "Option<VecLogError<_>>")]
pub results: Option<MapResult<Vec<VideoListItem>>>, pub results: Option<MapResult<Vec<VideoListItem>>>,
#[serde_as(as = "Option<VecSkipError<_>>")]
pub continuations: Option<Vec<MusicContinuation>>,
} }
/// The engagement panels are displayed below the video and contain chapter markers /// The engagement panels are displayed below the video and contain chapter markers
@ -418,9 +421,12 @@ pub struct CommentItemSectionHeaderMenuItem {
*/ */
/// Video recommendations continuation response /// Video recommendations continuation response
#[serde_as]
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct VideoRecommendations { pub struct VideoRecommendations {
#[serde(default)]
#[serde_as(as = "VecSkipError<_>")]
pub on_response_received_endpoints: Vec<RecommendationsContItem>, pub on_response_received_endpoints: Vec<RecommendationsContItem>,
} }
@ -459,8 +465,8 @@ pub struct VideoComments {
/// - Comment replies: appendContinuationItemsAction /// - Comment replies: appendContinuationItemsAction
/// - n*commentRenderer, continuationItemRenderer: /// - n*commentRenderer, continuationItemRenderer:
/// replies + continuation /// replies + continuation
#[serde_as(as = "VecLogError<_>")] #[serde_as(as = "Option<VecLogError<_>>")]
pub on_response_received_endpoints: MapResult<Vec<CommentsContItem>>, pub on_response_received_endpoints: Option<MapResult<Vec<CommentsContItem>>>,
} }
/// Video comments continuation /// Video comments continuation

View file

@ -251,7 +251,7 @@ impl MapResponse<VideoDetails> for response::VideoDetails {
.secondary_results .secondary_results
.and_then(|sr| { .and_then(|sr| {
sr.secondary_results.results.map(|r| { sr.secondary_results.results.map(|r| {
let mut res = map_recommendations(r, lang); let mut res = map_recommendations(r, sr.secondary_results.continuations, lang);
warnings.append(&mut res.warnings); warnings.append(&mut res.warnings);
res.c res.c
}) })
@ -342,15 +342,11 @@ impl MapResponse<Paginator<RecommendedVideo>> for response::VideoRecommendations
_deobf: Option<&crate::deobfuscate::Deobfuscator>, _deobf: Option<&crate::deobfuscate::Deobfuscator>,
) -> Result<MapResult<Paginator<RecommendedVideo>>, ExtractionError> { ) -> Result<MapResult<Paginator<RecommendedVideo>>, ExtractionError> {
let mut endpoints = self.on_response_received_endpoints; let mut endpoints = self.on_response_received_endpoints;
let cont = some_or_bail!( let cont = endpoints.try_swap_remove(0).ok_or(ExtractionError::Retry)?;
endpoints.try_swap_remove(0),
Err(ExtractionError::InvalidData(
"no continuation endpoint".into()
))
);
Ok(map_recommendations( Ok(map_recommendations(
cont.append_continuation_items_action.continuation_items, cont.append_continuation_items_action.continuation_items,
None,
lang, lang,
)) ))
} }
@ -363,57 +359,54 @@ impl MapResponse<Paginator<Comment>> for response::VideoComments {
lang: Language, lang: Language,
_deobf: Option<&crate::deobfuscate::Deobfuscator>, _deobf: Option<&crate::deobfuscate::Deobfuscator>,
) -> Result<MapResult<Paginator<Comment>>, ExtractionError> { ) -> Result<MapResult<Paginator<Comment>>, ExtractionError> {
let mut warnings = self.on_response_received_endpoints.warnings; let received_endpoints = self
.on_response_received_endpoints
.ok_or(ExtractionError::Retry)?;
let mut warnings = received_endpoints.warnings;
let mut comments = Vec::new(); let mut comments = Vec::new();
let mut comment_count = None; let mut comment_count = None;
let mut ctoken = None; let mut ctoken = None;
self.on_response_received_endpoints received_endpoints.c.into_iter().for_each(|citem| {
.c let mut items = citem.append_continuation_items_action.continuation_items;
.into_iter() warnings.append(&mut items.warnings);
.for_each(|citem| { items.c.into_iter().for_each(|item| match item {
let mut items = citem.append_continuation_items_action.continuation_items; response::video_details::CommentListItem::CommentThreadRenderer {
warnings.append(&mut items.warnings); comment,
items.c.into_iter().for_each(|item| match item { replies,
response::video_details::CommentListItem::CommentThreadRenderer { rendering_priority,
comment, } => {
replies, let mut res = map_comment(
comment.comment_renderer,
Some(replies),
rendering_priority, rendering_priority,
} => { lang,
let mut res = map_comment( );
comment.comment_renderer, comments.push(res.c);
Some(replies), warnings.append(&mut res.warnings)
rendering_priority, }
lang, response::video_details::CommentListItem::CommentRenderer(comment) => {
); let mut res = map_comment(
comments.push(res.c); comment,
warnings.append(&mut res.warnings) None,
} response::video_details::CommentPriority::RenderingPriorityUnknown,
response::video_details::CommentListItem::CommentRenderer(comment) => { lang,
let mut res = map_comment( );
comment, comments.push(res.c);
None, warnings.append(&mut res.warnings)
response::video_details::CommentPriority::RenderingPriorityUnknown, }
lang, response::video_details::CommentListItem::ContinuationItemRenderer {
); continuation_endpoint,
comments.push(res.c); } => {
warnings.append(&mut res.warnings) ctoken = Some(continuation_endpoint.continuation_command.token);
} }
response::video_details::CommentListItem::ContinuationItemRenderer { response::video_details::CommentListItem::CommentsHeaderRenderer { count_text } => {
continuation_endpoint, comment_count = count_text
} => { .and_then(|txt| util::parse_numeric_or_warn::<u64>(&txt, &mut warnings));
ctoken = Some(continuation_endpoint.continuation_command.token); }
}
response::video_details::CommentListItem::CommentsHeaderRenderer {
count_text,
} => {
comment_count = count_text.and_then(|txt| {
util::parse_numeric_or_warn::<u64>(&txt, &mut warnings)
});
}
});
}); });
});
Ok(MapResult { Ok(MapResult {
c: Paginator::new(comment_count, comments, ctoken), c: Paginator::new(comment_count, comments, ctoken),
@ -424,6 +417,7 @@ impl MapResponse<Paginator<Comment>> for response::VideoComments {
fn map_recommendations( fn map_recommendations(
r: MapResult<Vec<response::VideoListItem>>, r: MapResult<Vec<response::VideoListItem>>,
continuations: Option<Vec<response::MusicContinuation>>,
lang: Language, lang: Language,
) -> MapResult<Paginator<RecommendedVideo>> { ) -> MapResult<Paginator<RecommendedVideo>> {
let mut warnings = r.warnings; let mut warnings = r.warnings;
@ -475,6 +469,12 @@ fn map_recommendations(
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if let Some(continuations) = continuations {
continuations.into_iter().for_each(|c| {
ctoken = Some(c.next_continuation_data.continuation);
})
};
MapResult { MapResult {
c: Paginator::new(None, items, ctoken), c: Paginator::new(None, items, ctoken),
warnings, warnings,

View file

@ -87,6 +87,8 @@ pub enum ExtractionError {
WrongResult(String), WrongResult(String),
#[error("Warnings during deserialization/mapping")] #[error("Warnings during deserialization/mapping")]
DeserializationWarnings, DeserializationWarnings,
#[error("Got no data from YouTube, attempt retry")]
Retry,
} }
/// Internal error /// Internal error

View file

@ -19,7 +19,7 @@ use rustypipe::param::{
#[case::tv_html5_embed(ClientType::TvHtml5Embed)] #[case::tv_html5_embed(ClientType::TvHtml5Embed)]
#[case::android(ClientType::Android)] #[case::android(ClientType::Android)]
#[case::ios(ClientType::Ios)] #[case::ios(ClientType::Ios)]
#[test_log::test(tokio::test)] #[tokio::test]
async fn get_player(#[case] client_type: ClientType) { async fn get_player(#[case] client_type: ClientType) {
let rp = RustyPipe::builder().strict().build(); let rp = RustyPipe::builder().strict().build();
let player_data = rp.query().player("n4tK7LYFxI0", client_type).await.unwrap(); let player_data = rp.query().player("n4tK7LYFxI0", client_type).await.unwrap();
@ -179,7 +179,7 @@ async fn get_playlist(
assert!(!playlist.thumbnail.is_empty()); assert!(!playlist.thumbnail.is_empty());
} }
#[test_log::test(tokio::test)] #[tokio::test]
async fn playlist_cont() { async fn playlist_cont() {
let rp = RustyPipe::builder().strict().build(); let rp = RustyPipe::builder().strict().build();
let mut playlist = rp let mut playlist = rp
@ -197,7 +197,7 @@ async fn playlist_cont() {
assert!(playlist.videos.count.unwrap() > 100); assert!(playlist.videos.count.unwrap() > 100);
} }
#[test_log::test(tokio::test)] #[tokio::test]
async fn playlist_cont2() { async fn playlist_cont2() {
let rp = RustyPipe::builder().strict().build(); let rp = RustyPipe::builder().strict().build();
let mut playlist = rp let mut playlist = rp
@ -311,7 +311,6 @@ async fn get_video_details_music() {
assert!(!details.is_live); assert!(!details.is_live);
assert!(!details.is_ccommons); assert!(!details.is_ccommons);
assert!(!details.recommended.items.is_empty());
assert!(!details.recommended.is_exhausted()); assert!(!details.recommended.is_exhausted());
// Comments are disabled for this video // Comments are disabled for this video
@ -369,7 +368,6 @@ async fn get_video_details_ccommons() {
assert!(!details.is_live); assert!(!details.is_live);
assert!(details.is_ccommons); assert!(details.is_ccommons);
assert!(!details.recommended.items.is_empty());
assert!(!details.recommended.is_exhausted()); assert!(!details.recommended.is_exhausted());
assert!( assert!(
@ -506,7 +504,6 @@ async fn get_video_details_chapters() {
] ]
"###); "###);
assert!(!details.recommended.items.is_empty());
assert!(!details.recommended.is_exhausted()); assert!(!details.recommended.is_exhausted());
assert!( assert!(
@ -566,7 +563,6 @@ async fn get_video_details_live() {
assert!(details.is_live); assert!(details.is_live);
assert!(!details.is_ccommons); assert!(!details.is_ccommons);
assert!(!details.recommended.items.is_empty());
assert!(!details.recommended.is_exhausted()); assert!(!details.recommended.is_exhausted());
// No comments because livestream // No comments because livestream