From 57e13d15a691f0c81ff73efc55426eacd55ed6b1 Mon Sep 17 00:00:00 2001 From: ThetaDev Date: Tue, 27 Sep 2022 21:34:38 +0200 Subject: [PATCH] fix: allow concurrent cache access --- src/cache.rs | 4 ++ src/client/mod.rs | 113 +++++++++++++++++++++++++++++----------------- 2 files changed, 76 insertions(+), 41 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index b38bf39..633599e 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -44,6 +44,10 @@ impl CacheStorage for FileStorage { } fn read(&self) -> Option { + if !self.path.exists() { + return None; + } + match fs::read_to_string(&self.path) { Ok(data) => Some(data), Err(e) => { diff --git a/src/client/mod.rs b/src/client/mod.rs index cc50473..1232ed9 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -17,12 +17,12 @@ use std::sync::Arc; use anyhow::{anyhow, bail, Context, Result}; use chrono::{DateTime, Duration, Utc}; use fancy_regex::Regex; -use log::{error, warn}; +use log::{debug, error, warn}; use once_cell::sync::Lazy; use rand::Rng; use reqwest::{header, Client, ClientBuilder, Request, RequestBuilder, Response}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::{ cache::{CacheStorage, FileStorage}, @@ -179,7 +179,7 @@ struct RustyPipeRef { n_retries: u32, user_agent: String, consent_cookie: String, - cache: Mutex, + cache: CacheHolder, default_opts: RustyPipeOpts, } @@ -216,6 +216,13 @@ impl Default for RustyPipeOpts { } } +#[derive(Default, Debug)] +struct CacheHolder { + desktop_client: RwLock>, + music_client: RwLock>, + deobf: RwLock>, +} + #[derive(Default, Debug, Clone, Serialize, Deserialize)] struct CacheData { desktop_client: CacheEntry, @@ -224,6 +231,7 @@ struct CacheData { } #[derive(Default, Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] enum CacheEntry { #[default] None, @@ -291,7 +299,7 @@ impl RustyPipeBuilder { .build() .unwrap(); - let cache = if let Some(storage) = &self.storage { + let cdata = if let Some(storage) = &self.storage { if let Some(data) = storage.read() { match serde_json::from_str::(&data) { Ok(data) => data, @@ -320,7 +328,11 @@ impl RustyPipeBuilder { CONSENT_COOKIE_YES, rand::thread_rng().gen_range(100..1000) ), - cache: Mutex::new(cache), + cache: CacheHolder { + desktop_client: RwLock::new(cdata.desktop_client), + music_client: RwLock::new(cdata.music_client), + deobf: RwLock::new(cdata.deobf), + }, default_opts: self.default_opts, }), } @@ -586,23 +598,28 @@ impl RustyPipe { /// 3. from the YouTube website /// 4. fall back to the hardcoded version async fn get_desktop_client_version(&self) -> String { - let mut cache = self.inner.cache.lock().await; + // Write lock here to prevent concurrent tasks from fetching the same data + let mut desktop_client = self.inner.cache.desktop_client.write().await; - match cache.desktop_client.get() { + match desktop_client.get() { Some(cdata) => cdata.version.to_owned(), - None => match self.extract_desktop_client_version().await { - Ok(version) => { - cache.desktop_client = CacheEntry::from(ClientData { - version: version.to_owned(), - }); - self.store_cache(&cache); - version + None => { + debug!("getting desktop client version"); + match self.extract_desktop_client_version().await { + Ok(version) => { + *desktop_client = CacheEntry::from(ClientData { + version: version.to_owned(), + }); + drop(desktop_client); + self.store_cache().await; + version + } + Err(e) => { + warn!("{}, falling back to hardcoded version", e); + DESKTOP_CLIENT_VERSION.to_owned() + } } - Err(e) => { - warn!("{}, falling back to hardcoded version", e); - DESKTOP_CLIENT_VERSION.to_owned() - } - }, + } } } @@ -613,45 +630,59 @@ impl RustyPipe { /// 3. from the YouTube Music website /// 4. fall back to the hardcoded version async fn get_music_client_version(&self) -> String { - let mut cache = self.inner.cache.lock().await; + // Write lock here to prevent concurrent tasks from fetching the same data + let mut music_client = self.inner.cache.music_client.write().await; - match cache.music_client.get() { + match music_client.get() { Some(cdata) => cdata.version.to_owned(), - None => match self.extract_music_client_version().await { - Ok(version) => { - cache.music_client = CacheEntry::from(ClientData { - version: version.to_owned(), - }); - self.store_cache(&cache); - version + None => { + debug!("getting music client version"); + match self.extract_music_client_version().await { + Ok(version) => { + *music_client = CacheEntry::from(ClientData { + version: version.to_owned(), + }); + drop(music_client); + self.store_cache().await; + version + } + Err(e) => { + warn!("{}, falling back to hardcoded version", e); + DESKTOP_MUSIC_CLIENT_VERSION.to_owned() + } } - Err(e) => { - warn!("{}, falling back to hardcoded version", e); - DESKTOP_MUSIC_CLIENT_VERSION.to_owned() - } - }, + } } } /// Instantiate a new deobfuscator from either cached or extracted YouTube JavaScript code. async fn get_deobf(&self) -> Result { - let mut cache = self.inner.cache.lock().await; + // Write lock here to prevent concurrent tasks from fetching the same data + let mut deobf = self.inner.cache.deobf.write().await; - match cache.deobf.get() { + match deobf.get() { Some(deobf) => Ok(Deobfuscator::from(deobf.to_owned())), None => { - let deobf = Deobfuscator::new(self.inner.http.clone()).await?; - cache.deobf = CacheEntry::from(deobf.get_data()); - self.store_cache(&cache); - Ok(deobf) + debug!("getting deobfuscator"); + let new_deobf = Deobfuscator::new(self.inner.http.clone()).await?; + *deobf = CacheEntry::from(new_deobf.get_data()); + drop(deobf); + self.store_cache().await; + Ok(new_deobf) } } } /// Write the given cache data to the storage backend. - fn store_cache(&self, cache: &CacheData) { + async fn store_cache(&self) { if let Some(storage) = &self.inner.storage { - match serde_json::to_string(cache) { + let cdata = CacheData { + desktop_client: self.inner.cache.desktop_client.read().await.clone(), + music_client: self.inner.cache.music_client.read().await.clone(), + deobf: self.inner.cache.deobf.read().await.clone(), + }; + + match serde_json::to_string(&cdata) { Ok(data) => storage.write(&data), Err(e) => error!("Could not serialize cache. Error: {}", e), }