feat: add session po token cache

This commit is contained in:
ThetaDev 2025-02-05 15:55:32 +01:00
parent 29c854b20d
commit b72b501b6d
No known key found for this signature in database
GPG key ID: E319D3C5148D65B6
15 changed files with 234 additions and 61 deletions

View file

@ -1,12 +1,16 @@
use std::sync::{atomic::AtomicU32, Arc, RwLock};
use std::{
collections::HashMap,
sync::{atomic::AtomicU32, Arc, RwLock},
};
use once_cell::sync::Lazy;
use rand::Rng;
use regex::Regex;
use reqwest::{header, Client};
use time::OffsetDateTime;
use crate::{
client::{CONSENT_COOKIE, YOUTUBE_MUSIC_HOME_URL},
client::{PoToken, CONSENT_COOKIE, YOUTUBE_MUSIC_HOME_URL},
error::{Error, ExtractionError},
util,
};
@ -29,23 +33,27 @@ pub struct VisitorDataCache {
struct VisitorDataCacheRef {
req_counter: AtomicU32,
visitor_data: RwLock<Vec<String>>,
session_potoken: RwLock<HashMap<String, PoToken>>,
http: Client,
/// Number of requests after which a new token is requested
req_limit: u32,
/// Maximum size of the cache
max_size: usize,
}
static VISITOR_DATA_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r#""visitorData":"([\w\d_\-%]+?)""#).unwrap());
/// Number of requests after which a new token is requested
const REQ_LIMIT: u32 = 50;
/// Maximum size of the cache
const MAX_SIZE: usize = 20;
impl VisitorDataCache {
pub fn new(http: Client) -> Self {
pub fn new(http: Client, req_limit: u32, max_size: usize) -> Self {
Self {
inner: VisitorDataCacheRef {
req_counter: Default::default(),
visitor_data: Default::default(),
session_potoken: Default::default(),
http,
req_limit,
max_size: max_size - 1,
}
.into(),
}
@ -107,8 +115,12 @@ impl VisitorDataCache {
.req_counter
.store(0, std::sync::atomic::Ordering::Relaxed);
let mut vds = self.inner.visitor_data.write().unwrap();
for _ in 0..(vds.len().saturating_sub(MAX_SIZE)) {
for _ in 0..(vds.len().saturating_sub(self.inner.max_size)) {
let rem = vds.remove(0);
{
let mut pots = self.inner.session_potoken.write().unwrap();
pots.remove(&rem);
}
tracing::debug!("visitor data {rem} removed from cache");
}
vds.push(vd.to_owned());
@ -122,7 +134,7 @@ impl VisitorDataCache {
.inner
.req_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
>= REQ_LIMIT
>= self.inner.req_limit
{
self.inner
.req_counter
@ -148,9 +160,26 @@ impl VisitorDataCache {
let mut vds = self.inner.visitor_data.write().unwrap();
if let Some(i) = vds.iter().position(|x| x == visitor_data) {
vds.remove(i);
let mut pots = self.inner.session_potoken.write().unwrap();
pots.remove(visitor_data);
tracing::debug!("visitor data {visitor_data} removed from cache");
}
}
pub fn store_pot(&self, visitor_data: &str, po_token: PoToken) {
let mut pots = self.inner.session_potoken.write().unwrap();
pots.insert(visitor_data.to_owned(), po_token);
}
pub fn get_pot(&self, visitor_data: &str) -> Option<PoToken> {
let pots = self.inner.session_potoken.read().unwrap();
if let Some(entry) = pots.get(visitor_data) {
if entry.valid_until > OffsetDateTime::now_utc() {
return Some(entry.clone());
}
}
None
}
}
#[cfg(test)]
@ -166,13 +195,16 @@ mod tests {
#[tokio::test]
#[traced_test]
async fn get_visitor_data() {
let cache =
VisitorDataCache::new(Client::builder().user_agent(DEFAULT_UA).build().unwrap());
let cache = VisitorDataCache::new(
Client::builder().user_agent(DEFAULT_UA).build().unwrap(),
2,
2,
);
// Get initial visitor data
let v1 = cache.get().await.unwrap();
// Run as many request as necessary to fetch second visitor data
for _ in 0..=REQ_LIMIT {
for _ in 0..=cache.inner.req_limit {
let got = cache.get().await.unwrap();
assert_eq!(got, v1);
}
@ -186,4 +218,32 @@ mod tests {
let vds_len = cache.inner.visitor_data.read().unwrap().len();
assert_eq!(vds_len, 2);
}
#[tokio::test]
#[traced_test]
async fn cache_potoken() {
let cache = VisitorDataCache::new(
Client::builder().user_agent(DEFAULT_UA).build().unwrap(),
1,
2,
);
let v1 = cache.get().await.unwrap();
let pot1 = PoToken {
po_token: "pot1".to_owned(),
valid_until: OffsetDateTime::now_utc() + time::Duration::hours(1),
};
cache.store_pot(&v1, pot1.clone());
assert_eq!(cache.get_pot(&v1).unwrap(), pot1);
for _ in 0..4 {
cache.get().await.unwrap();
}
tokio::time::sleep(Duration::from_millis(1000)).await;
{
let vd = cache.inner.visitor_data.read().unwrap();
assert!(!vd.contains(&v1), "first token still present");
}
assert_eq!(cache.get_pot(&v1), None);
}
}