fix: allow concurrent cache access

This commit is contained in:
ThetaDev 2022-09-27 21:34:38 +02:00
parent 9866006690
commit 57e13d15a6
2 changed files with 76 additions and 41 deletions

View file

@ -44,6 +44,10 @@ impl CacheStorage for FileStorage {
}
fn read(&self) -> Option<String> {
if !self.path.exists() {
return None;
}
match fs::read_to_string(&self.path) {
Ok(data) => Some(data),
Err(e) => {

View file

@ -17,12 +17,12 @@ use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result};
use chrono::{DateTime, Duration, Utc};
use fancy_regex::Regex;
use log::{error, warn};
use log::{debug, error, warn};
use once_cell::sync::Lazy;
use rand::Rng;
use reqwest::{header, Client, ClientBuilder, Request, RequestBuilder, Response};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use crate::{
cache::{CacheStorage, FileStorage},
@ -179,7 +179,7 @@ struct RustyPipeRef {
n_retries: u32,
user_agent: String,
consent_cookie: String,
cache: Mutex<CacheData>,
cache: CacheHolder,
default_opts: RustyPipeOpts,
}
@ -216,6 +216,13 @@ impl Default for RustyPipeOpts {
}
}
#[derive(Default, Debug)]
struct CacheHolder {
desktop_client: RwLock<CacheEntry<ClientData>>,
music_client: RwLock<CacheEntry<ClientData>>,
deobf: RwLock<CacheEntry<DeobfData>>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
struct CacheData {
desktop_client: CacheEntry<ClientData>,
@ -224,6 +231,7 @@ struct CacheData {
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
enum CacheEntry<T> {
#[default]
None,
@ -291,7 +299,7 @@ impl RustyPipeBuilder {
.build()
.unwrap();
let cache = if let Some(storage) = &self.storage {
let cdata = if let Some(storage) = &self.storage {
if let Some(data) = storage.read() {
match serde_json::from_str::<CacheData>(&data) {
Ok(data) => data,
@ -320,7 +328,11 @@ impl RustyPipeBuilder {
CONSENT_COOKIE_YES,
rand::thread_rng().gen_range(100..1000)
),
cache: Mutex::new(cache),
cache: CacheHolder {
desktop_client: RwLock::new(cdata.desktop_client),
music_client: RwLock::new(cdata.music_client),
deobf: RwLock::new(cdata.deobf),
},
default_opts: self.default_opts,
}),
}
@ -586,23 +598,28 @@ impl RustyPipe {
/// 3. from the YouTube website
/// 4. fall back to the hardcoded version
async fn get_desktop_client_version(&self) -> String {
let mut cache = self.inner.cache.lock().await;
// Write lock here to prevent concurrent tasks from fetching the same data
let mut desktop_client = self.inner.cache.desktop_client.write().await;
match cache.desktop_client.get() {
match desktop_client.get() {
Some(cdata) => cdata.version.to_owned(),
None => match self.extract_desktop_client_version().await {
Ok(version) => {
cache.desktop_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
self.store_cache(&cache);
version
None => {
debug!("getting desktop client version");
match self.extract_desktop_client_version().await {
Ok(version) => {
*desktop_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
drop(desktop_client);
self.store_cache().await;
version
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_CLIENT_VERSION.to_owned()
}
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_CLIENT_VERSION.to_owned()
}
},
}
}
}
@ -613,45 +630,59 @@ impl RustyPipe {
/// 3. from the YouTube Music website
/// 4. fall back to the hardcoded version
async fn get_music_client_version(&self) -> String {
let mut cache = self.inner.cache.lock().await;
// Write lock here to prevent concurrent tasks from fetching the same data
let mut music_client = self.inner.cache.music_client.write().await;
match cache.music_client.get() {
match music_client.get() {
Some(cdata) => cdata.version.to_owned(),
None => match self.extract_music_client_version().await {
Ok(version) => {
cache.music_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
self.store_cache(&cache);
version
None => {
debug!("getting music client version");
match self.extract_music_client_version().await {
Ok(version) => {
*music_client = CacheEntry::from(ClientData {
version: version.to_owned(),
});
drop(music_client);
self.store_cache().await;
version
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_MUSIC_CLIENT_VERSION.to_owned()
}
}
Err(e) => {
warn!("{}, falling back to hardcoded version", e);
DESKTOP_MUSIC_CLIENT_VERSION.to_owned()
}
},
}
}
}
/// Instantiate a new deobfuscator from either cached or extracted YouTube JavaScript code.
async fn get_deobf(&self) -> Result<Deobfuscator> {
let mut cache = self.inner.cache.lock().await;
// Write lock here to prevent concurrent tasks from fetching the same data
let mut deobf = self.inner.cache.deobf.write().await;
match cache.deobf.get() {
match deobf.get() {
Some(deobf) => Ok(Deobfuscator::from(deobf.to_owned())),
None => {
let deobf = Deobfuscator::new(self.inner.http.clone()).await?;
cache.deobf = CacheEntry::from(deobf.get_data());
self.store_cache(&cache);
Ok(deobf)
debug!("getting deobfuscator");
let new_deobf = Deobfuscator::new(self.inner.http.clone()).await?;
*deobf = CacheEntry::from(new_deobf.get_data());
drop(deobf);
self.store_cache().await;
Ok(new_deobf)
}
}
}
/// Write the given cache data to the storage backend.
fn store_cache(&self, cache: &CacheData) {
async fn store_cache(&self) {
if let Some(storage) = &self.inner.storage {
match serde_json::to_string(cache) {
let cdata = CacheData {
desktop_client: self.inner.cache.desktop_client.read().await.clone(),
music_client: self.inner.cache.music_client.read().await.clone(),
deobf: self.inner.cache.deobf.read().await.clone(),
};
match serde_json::to_string(&cdata) {
Ok(data) => storage.write(&data),
Err(e) => error!("Could not serialize cache. Error: {}", e),
}