fix: limit retry attempts to fetch client versions and deobf data

This commit is contained in:
ThetaDev 2024-12-13 03:47:37 +01:00
parent 5262becca1
commit 44ae456d2c
No known key found for this signature in database
GPG key ID: E319D3C5148D65B6
2 changed files with 112 additions and 73 deletions

View file

@ -470,7 +470,7 @@ impl Default for RustyPipeOpts {
}
}
#[derive(Default, Debug)]
#[derive(Debug)]
struct CacheHolder {
clients: HashMap<ClientType, AsyncRwLock<CacheEntry<ClientData>>>,
deobf: AsyncRwLock<CacheEntry<DeobfData>>,
@ -488,15 +488,20 @@ struct CacheData {
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
enum CacheEntry<T> {
#[default]
None,
Some {
#[serde(with = "time::serde::rfc3339")]
last_update: OffsetDateTime,
data: T,
},
#[serde(default)]
struct CacheEntry<T> {
#[serde(
with = "time::serde::rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
last_update: Option<OffsetDateTime>,
/// If the entry failed to update, wait until this time before retrying
#[serde(
with = "time::serde::rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
retry_at: Option<OffsetDateTime>,
data: Option<T>,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -515,36 +520,34 @@ struct RequestResult<T> {
impl<T> CacheEntry<T> {
/// Get the content of the cache if it is still fresh
fn get(&self) -> Option<&T> {
match self {
CacheEntry::Some { last_update, data } => {
if last_update < &(OffsetDateTime::now_utc() - time::Duration::hours(24)) {
None
} else {
Some(data)
}
}
CacheEntry::None => None,
}
self.data.as_ref().filter(|_| {
self.last_update.unwrap_or(OffsetDateTime::UNIX_EPOCH)
> (OffsetDateTime::now_utc() - time::Duration::days(1))
})
}
/// Get the content of the cache, even if it is expired
fn get_expired(&self) -> Option<&T> {
match self {
CacheEntry::Some { data, .. } => Some(data),
CacheEntry::None => None,
}
self.data.as_ref()
}
fn is_none(&self) -> bool {
matches!(self, Self::None)
self.data.is_none()
}
fn should_retry(&self) -> bool {
self.retry_at
.map(|d| OffsetDateTime::now_utc() > d)
.unwrap_or(true)
}
}
impl<T> From<T> for CacheEntry<T> {
fn from(f: T) -> Self {
Self::Some {
last_update: util::now_sec(),
data: f,
Self {
last_update: Some(util::now_sec()),
retry_at: None,
data: Some(f),
}
}
}
@ -1004,30 +1007,38 @@ impl RustyPipe {
match client.get() {
Some(cdata) => cdata.version.clone().into(),
None => {
tracing::debug!("getting {client_type:?} client version");
match self.extract_client_version(client_type).await {
Ok(version) => {
*client = CacheEntry::from(ClientData {
version: version.clone(),
});
drop(client);
self.store_cache().await;
version.into()
}
Err(e) => {
tracing::warn!(
"{e}, falling back to hardcoded {client_type:?} client version"
);
match client_type {
ClientType::Desktop => DESKTOP_CLIENT_VERSION,
ClientType::DesktopMusic => DESKTOP_MUSIC_CLIENT_VERSION,
ClientType::Mobile => MOBILE_CLIENT_VERSION,
ClientType::Tv => TV_CLIENT_VERSION,
_ => unreachable!(),
if client.should_retry() {
tracing::debug!("getting {client_type:?} client version");
match self.extract_client_version(client_type).await {
Ok(version) => {
*client = CacheEntry::from(ClientData {
version: version.clone(),
});
drop(client);
self.store_cache().await;
return version.into();
}
Err(e) => {
client.retry_at = Some(util::now_sec() + time::Duration::hours(1));
drop(client);
self.store_cache().await;
tracing::warn!(
"{e}, falling back to hardcoded {client_type:?} client version"
);
}
.into()
}
} else {
tracing::warn!("falling back to hardcoded {client_type:?} client version")
}
match client_type {
ClientType::Desktop => DESKTOP_CLIENT_VERSION,
ClientType::DesktopMusic => DESKTOP_MUSIC_CLIENT_VERSION,
ClientType::Mobile => MOBILE_CLIENT_VERSION,
ClientType::Tv => TV_CLIENT_VERSION,
_ => unreachable!(),
}
.into()
}
}
}
@ -1040,27 +1051,50 @@ impl RustyPipe {
match deobf_data.get() {
Some(deobf_data) => Ok(deobf_data.clone()),
None => {
tracing::debug!("getting deobf data");
// Only attempt to fetch deobf data every 24 hours to avoid a flood of error reports
// if the client JS cannot be parsed
if deobf_data.should_retry() {
tracing::debug!("getting deobf data");
match DeobfData::extract(self.inner.http.clone(), self.inner.reporter.as_deref())
match DeobfData::extract(
self.inner.http.clone(),
self.inner.reporter.as_deref(),
)
.await
{
Ok(new_data) => {
// Write new data to the cache
*deobf_data = CacheEntry::from(new_data.clone());
drop(deobf_data);
self.store_cache().await;
Ok(new_data)
}
Err(e) => {
// Try to fall back to expired cache data if available, otherwise return error
match deobf_data.get_expired() {
Some(d) => {
tracing::warn!("could not get new deobf data ({e}), falling back to expired cache");
Ok(d.clone())
}
None => Err(e),
{
Ok(new_data) => {
// Write new data to the cache
*deobf_data = CacheEntry::from(new_data.clone());
drop(deobf_data);
self.store_cache().await;
Ok(new_data)
}
Err(e) => {
// Try to fall back to expired cache data if available, otherwise return error
deobf_data.retry_at = Some(util::now_sec() + time::Duration::days(1));
let res = match deobf_data.get_expired() {
Some(d) => {
tracing::warn!("could not get new deobf data ({e}), falling back to expired cache");
Ok(d.clone())
}
None => Err(e),
};
drop(deobf_data);
self.store_cache().await;
res
}
}
} else {
match deobf_data.get_expired() {
Some(d) => {
tracing::warn!(
"could not get new deobf data, falling back to expired cache"
);
Ok(d.clone())
}
None => Err(Error::Extraction(ExtractionError::Deobfuscation(
"could not get deobf data".into(),
))),
}
}
}

View file

@ -140,7 +140,7 @@ async fn get_player_from_client(#[case] client_type: ClientType, rp: RustyPipe)
assert_eq!(audio.codec, AudioCodec::Opus);
// Desktop client now requires pot token so the streams cannot be tested here
if client_type != ClientType::Desktop {
if !matches!(client_type, ClientType::Desktop | ClientType::Mobile) {
check_video_stream(video).await;
check_video_stream(audio).await;
}
@ -2673,6 +2673,7 @@ async fn music_genre(#[case] id: &str, #[case] name: &str, rp: RustyPipe, unloca
#[rstest]
#[tokio::test]
#[ignore]
async fn music_genre_not_found(rp: RustyPipe) {
let err = rp
.query()
@ -2710,10 +2711,18 @@ async fn invalid_ctoken(#[case] ep: ContinuationEndpoint, rp: RustyPipe) {
}
}
/// YouTube Music allows searching for ISRC codes
/// This feature does not seem to work with all languages and it has changed in the past.
/// This test is used to check which languages are working
#[rstest]
#[tokio::test]
async fn isrc_search_languages(rp: RustyPipe) {
for lang in LANGUAGES {
// flaky for English, skipping for now
if matches!(lang, Language::EnIn) {
continue;
}
let tracks = rp
.query()
.lang(lang)
@ -2721,11 +2730,7 @@ async fn isrc_search_languages(rp: RustyPipe) {
.await
.unwrap();
let working = tracks.items.items.iter().any(|t| t.id == "g0iRiJ_ck48");
assert_eq!(
working,
!matches!(lang, Language::En | Language::EnGb | Language::EnIn),
"lang: {lang}"
);
assert!(working, "lang: {lang}");
}
}