strawcore/src/youtube/stream_extractor.rs
Cobb Hayes c8dfc8a34a Public-flip audit: scrub audit-ticket prefixes + LAN refs + tighten README
URLs → git.sulkta.com. Audit-ticket prefixes (SPEC §N, audit Track X, vc=N
audit-fix, FIX (audit ...), PORT DEVIATION) stripped from comments — technical
reasoning retained. Crafting-table LAN refs softened to 'Sulkta build host'.
README sheds marketing scaffolding + stale status tables.
2026-05-27 13:29:52 -07:00

853 lines
29 KiB
Rust

// YoutubeStreamExtractor — orchestrator. Mirrors NPE
// services/youtube/extractors/YoutubeStreamExtractor.java:onFetchPage().
//
// Order:
// 1. Optional Android po_token from PoTokenProvider (until a provider
// is registered we always go anonymous → reel endpoint).
// 2. Android `/player` (if po_token) or `/reel/reel_item_watch` (anon).
// checkPlayabilityStatus → typed ContentUnavailable variants.
// isPlayerResponseNotValid → reject the "you're a bot" decoy.
// 3. Optional iOS `/player` (best-effort, all exceptions swallowed).
// 4. WEB `/player?$fields=microformat...` — metadata + better thumbnails.
// Exceptions swallowed → falls back to Android-response thumbnails.
// 5. WEB `/next` — description + related + chapters. Mandatory.
//
// Per-format URL post-processing:
// * If format has `url` → use as-is (Android + iOS path).
// * Else parse `signatureCipher` → deobfuscate `s` → assemble
// `url&sp=<decoded>` (WEB path; not exercised in the current
// onFetchPage flow but kept for completeness).
// * Run `url_with_throttling_parameter_deobfuscated` UNCONDITIONALLY.
// * Append `&cpn=<client_cpn>`.
// * Append `&pot=<streamingDataPoToken>` if set.
use serde_json::Value;
use crate::exceptions::{ContentUnavailable, ExtractionError, ParsingError};
use crate::image::{Image, ResolutionLevel};
use crate::localization::{ContentCountry, Localization};
use crate::newpipe::NewPipe;
use crate::stream::{
AudioStream, DeliveryMethod, StreamInfo, StreamType, SubtitlesStream, VideoStream,
};
use crate::youtube::itag::{lookup as itag_lookup, ItagType};
#[cfg(test)]
use crate::youtube::itag::MediaFormat;
use crate::youtube::js::PlayerManager;
use crate::youtube::potoken::{po_token_provider, PoTokenResult};
use crate::youtube::stream_helper::{self, generate_content_playback_nonce};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum FetchPolicy {
AnonymousAndroidReel,
AndroidWithPoToken,
}
#[derive(Clone, Debug, Default)]
pub struct ExtractOptions {
pub fetch_ios_client: bool,
pub android_streaming_pot: Option<String>,
pub ios_streaming_pot: Option<String>,
pub android_visitor_data: Option<String>,
pub ios_visitor_data: Option<String>,
pub android_player_request_pot: Option<String>,
pub ios_player_request_pot: Option<String>,
}
/// One-shot StreamInfo build for a video. Walks NPE's Android-primary
/// fetch path, applies URL post-processing, returns the final shape.
pub fn stream_info(video_id: &str) -> Result<StreamInfo, ExtractionError> {
stream_info_with(video_id, ExtractOptions::default())
}
pub fn stream_info_with(
video_id: &str,
options: ExtractOptions,
) -> Result<StreamInfo, ExtractionError> {
let localization = NewPipe::preferred_localization();
let content_country = NewPipe::preferred_content_country();
// Resolve po_token via the registered provider if present, falling
// back to caller-supplied options. The trait split (Ok(None) vs
// Err) lets us treat "provider declined" as "go anonymous" and
// "provider errored" as "still try anonymous but log the failure."
let provider = po_token_provider();
let android_token: Option<PoTokenResult> = options_or_provider(
options.android_player_request_pot.as_deref(),
options.android_streaming_pot.as_deref(),
options.android_visitor_data.as_deref(),
|| {
provider
.as_ref()
.and_then(|p| p.get_android_client_po_token(video_id).ok().flatten())
},
);
let android_cpn = generate_content_playback_nonce();
let player_response = fetch_android(
video_id,
&localization,
&content_country,
&android_cpn,
android_token
.as_ref()
.map(|t| t.player_request_po_token.as_str()),
android_token.as_ref().map(|t| t.visitor_data.as_str()),
)?;
check_playability_status(&player_response)?;
if is_player_response_not_valid(&player_response, video_id) {
return Err(ExtractionError::Other(
"ANDROID player response is not valid (decoy detected)".into(),
));
}
let android_streaming_data = player_response
.get("streamingData")
.cloned()
.unwrap_or(Value::Null);
// Optional iOS — best-effort.
let ios_token: Option<PoTokenResult> = if options.fetch_ios_client {
options_or_provider(
options.ios_player_request_pot.as_deref(),
options.ios_streaming_pot.as_deref(),
options.ios_visitor_data.as_deref(),
|| {
provider
.as_ref()
.and_then(|p| p.get_ios_client_po_token(video_id).ok().flatten())
},
)
} else {
None
};
let (ios_streaming_data, ios_cpn) = if options.fetch_ios_client {
let ios_cpn = generate_content_playback_nonce();
match stream_helper::get_ios_player_response(
video_id,
&localization,
&content_country,
&ios_cpn,
ios_token.as_ref().map(|t| t.player_request_po_token.as_str()),
ios_token.as_ref().map(|t| t.visitor_data.as_str()),
) {
Ok(r) if !is_player_response_not_valid(&r, video_id) => (
r.get("streamingData").cloned().unwrap_or(Value::Null),
Some(ios_cpn),
),
_ => (Value::Null, None),
}
} else {
(Value::Null, None)
};
let android_streaming_pot = android_token
.as_ref()
.map(|t| t.streaming_data_po_token.clone())
.or_else(|| options.android_streaming_pot.clone());
let ios_streaming_pot = ios_token
.as_ref()
.map(|t| t.streaming_data_po_token.clone())
.or_else(|| options.ios_streaming_pot.clone());
let signature_timestamp = PlayerManager::instance()
.signature_timestamp(video_id)
.unwrap_or(0);
let web_metadata = fetch_web_metadata(video_id, &localization, &content_country, signature_timestamp);
let mut info = StreamInfo {
service_id: 0,
url: format!("https://www.youtube.com/watch?v={video_id}"),
video_id: video_id.to_string(),
stream_type: Some(StreamType::VideoStream),
..StreamInfo::default()
};
populate_video_details(&mut info, &player_response);
populate_microformat(&mut info, &web_metadata);
populate_streams(
&mut info,
&android_streaming_data,
&ios_streaming_data,
video_id,
&android_cpn,
ios_cpn.as_deref(),
android_streaming_pot.as_deref(),
ios_streaming_pot.as_deref(),
)?;
populate_manifests(
&mut info,
&android_streaming_data,
&ios_streaming_data,
android_streaming_pot.as_deref(),
ios_streaming_pot.as_deref(),
);
populate_captions(&mut info, &player_response);
Ok(info)
}
fn options_or_provider(
opt_player_token: Option<&str>,
opt_streaming_token: Option<&str>,
opt_visitor: Option<&str>,
provider_fn: impl FnOnce() -> Option<PoTokenResult>,
) -> Option<PoTokenResult> {
// Caller-supplied wins when ALL three are present.
if let (Some(p), Some(s), Some(v)) = (opt_player_token, opt_streaming_token, opt_visitor) {
return Some(PoTokenResult::new(p, s, v));
}
provider_fn()
}
fn fetch_android(
video_id: &str,
localization: &Localization,
content_country: &ContentCountry,
cpn: &str,
po_token: Option<&str>,
visitor_data: Option<&str>,
) -> Result<Value, ExtractionError> {
let result = if po_token.is_some() {
stream_helper::get_android_player_response(
video_id,
localization,
content_country,
cpn,
po_token,
visitor_data,
)
} else {
let r = stream_helper::get_android_reel_player_response(
video_id,
localization,
content_country,
cpn,
)?;
// The reel endpoint returns the `playerResponse` nested one level.
Ok(r.get("playerResponse").cloned().unwrap_or(r))
};
result
}
fn fetch_web_metadata(
video_id: &str,
localization: &Localization,
content_country: &ContentCountry,
signature_timestamp: i32,
) -> Value {
stream_helper::get_web_metadata_player_response(
video_id,
localization,
content_country,
signature_timestamp,
)
.unwrap_or(Value::Null)
}
fn check_playability_status(player_response: &Value) -> Result<(), ExtractionError> {
let status = player_response.get("playabilityStatus");
let Some(status) = status else { return Ok(()) };
let status_code = status.get("status").and_then(|v| v.as_str()).unwrap_or("");
if status_code == "OK" {
return Ok(());
}
let reason = status.get("reason").and_then(|v| v.as_str()).unwrap_or("");
let reason_lc = reason.to_ascii_lowercase();
let mapped = match status_code {
"LOGIN_REQUIRED" => {
if reason_lc.contains("a bot") {
ContentUnavailable::Other("sign in to confirm you're not a bot".into())
} else if reason_lc.contains("inappropriate") {
ContentUnavailable::AgeRestricted
} else if reason_lc.contains("private") {
ContentUnavailable::Private
} else {
ContentUnavailable::Other(reason.into())
}
}
"UNPLAYABLE" | "ERROR" => {
if reason_lc.contains("music premium") {
ContentUnavailable::YoutubeMusicPremium
} else if reason_lc.contains("payment") || reason_lc.contains("members") {
ContentUnavailable::Paid
} else if reason_lc.contains("country") {
ContentUnavailable::GeoRestricted
} else if reason_lc.contains("closed") || reason_lc.contains("terminated") {
ContentUnavailable::AccountTerminated
} else {
ContentUnavailable::Other(reason.into())
}
}
_ => ContentUnavailable::Other(format!("{status_code}: {reason}")),
};
Err(ExtractionError::ContentUnavailable(mapped))
}
fn is_player_response_not_valid(player_response: &Value, video_id: &str) -> bool {
let returned = player_response
.get("videoDetails")
.and_then(|v| v.get("videoId"))
.and_then(|v| v.as_str());
returned.map(|r| r != video_id).unwrap_or(false)
}
fn populate_video_details(info: &mut StreamInfo, player_response: &Value) {
let Some(vd) = player_response.get("videoDetails") else {
return;
};
if let Some(s) = vd.get("title").and_then(|v| v.as_str()) {
info.name = s.to_string();
}
if let Some(s) = vd.get("shortDescription").and_then(|v| v.as_str()) {
info.description = s.to_string();
}
if let Some(s) = vd.get("lengthSeconds").and_then(|v| v.as_str()) {
info.duration_seconds = s.parse().unwrap_or(0);
}
if let Some(s) = vd.get("viewCount").and_then(|v| v.as_str()) {
info.view_count = s.parse().unwrap_or(0);
}
if let Some(s) = vd.get("author").and_then(|v| v.as_str()) {
info.uploader_name = s.to_string();
}
if let Some(s) = vd.get("channelId").and_then(|v| v.as_str()) {
info.uploader_id = s.to_string();
info.uploader_url = format!("https://www.youtube.com/channel/{s}");
}
if let Some(thumbs) = vd
.get("thumbnail")
.and_then(|v| v.get("thumbnails"))
.and_then(|v| v.as_array())
{
for t in thumbs {
if let Some(url) = t.get("url").and_then(|v| v.as_str()) {
let h = t.get("height").and_then(|v| v.as_i64()).unwrap_or(-1) as i32;
let w = t.get("width").and_then(|v| v.as_i64()).unwrap_or(-1) as i32;
info.thumbnails.push(Image::new(
url,
h,
w,
ResolutionLevel::from_height(h),
));
}
}
}
if vd
.get("isLive")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
info.stream_type = Some(StreamType::VideoLiveStream);
} else if vd
.get("isPostLiveDvr")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
info.stream_type = Some(StreamType::PostLiveStream);
}
}
fn populate_microformat(info: &mut StreamInfo, web_metadata: &Value) {
let Some(mfr) = web_metadata
.get("microformat")
.and_then(|v| v.get("playerMicroformatRenderer"))
else {
return;
};
if let Some(s) = mfr
.get("uploadDate")
.and_then(|v| v.as_str())
.or_else(|| mfr.get("publishDate").and_then(|v| v.as_str()))
{
info.upload_date_iso = Some(s.to_string());
}
if let Some(s) = mfr.get("category").and_then(|v| v.as_str()) {
info.category = s.to_string();
}
// The microformat has higher-quality thumbnails — prepend over the
// videoDetails set we already populated.
if let Some(thumbs) = mfr
.get("thumbnail")
.and_then(|v| v.get("thumbnails"))
.and_then(|v| v.as_array())
{
let mut higher = Vec::new();
for t in thumbs {
if let Some(url) = t.get("url").and_then(|v| v.as_str()) {
let h = t.get("height").and_then(|v| v.as_i64()).unwrap_or(-1) as i32;
let w = t.get("width").and_then(|v| v.as_i64()).unwrap_or(-1) as i32;
higher.push(Image::new(url, h, w, ResolutionLevel::from_height(h)));
}
}
if !higher.is_empty() {
higher.extend(std::mem::take(&mut info.thumbnails));
info.thumbnails = higher;
}
}
}
#[allow(clippy::too_many_arguments)]
fn populate_streams(
info: &mut StreamInfo,
android: &Value,
ios: &Value,
video_id: &str,
android_cpn: &str,
ios_cpn: Option<&str>,
android_pot: Option<&str>,
ios_pot: Option<&str>,
) -> Result<(), ExtractionError> {
let merge = |fmt_array_key: &str| -> Vec<(Value, &'static str, &str, Option<&str>)> {
let mut out = Vec::new();
if let Some(arr) = android.get(fmt_array_key).and_then(|v| v.as_array()) {
for f in arr {
out.push((f.clone(), "ANDROID", android_cpn, android_pot));
}
}
if let Some(arr) = ios.get(fmt_array_key).and_then(|v| v.as_array()) {
for f in arr {
let cpn = ios_cpn.unwrap_or("");
out.push((f.clone(), "IOS", cpn, ios_pot));
}
}
out
};
// Progressive: streamingData.formats[]
for (fmt, _client, cpn, pot) in merge("formats") {
if let Some(stream) = build_video_progressive(&fmt, video_id, cpn, pot)? {
push_video_dedup(&mut info.video_streams, stream);
}
}
// Adaptive: streamingData.adaptiveFormats[]
for (fmt, _client, cpn, pot) in merge("adaptiveFormats") {
let mime = fmt
.get("mimeType")
.and_then(|v| v.as_str())
.unwrap_or("");
if mime.starts_with("audio/") {
if let Some(audio) = build_audio(&fmt, video_id, cpn, pot)? {
push_audio_dedup(&mut info.audio_streams, audio);
}
} else if mime.starts_with("video/") {
if let Some(video) = build_video_only(&fmt, video_id, cpn, pot)? {
push_video_dedup(&mut info.video_only_streams, video);
}
}
}
Ok(())
}
fn populate_manifests(
info: &mut StreamInfo,
android: &Value,
ios: &Value,
android_pot: Option<&str>,
ios_pot: Option<&str>,
) {
// DASH is Android-only.
if let Some(url) = android.get("dashManifestUrl").and_then(|v| v.as_str()) {
info.dash_manifest_url = Some(append_pot_to_manifest(url, android_pot));
}
// HLS prefers iOS, falls back to Android.
if let Some(url) = ios.get("hlsManifestUrl").and_then(|v| v.as_str()) {
info.hls_manifest_url = Some(append_pot_to_manifest(url, ios_pot));
} else if let Some(url) = android.get("hlsManifestUrl").and_then(|v| v.as_str()) {
info.hls_manifest_url = Some(append_pot_to_manifest(url, android_pot));
}
}
fn append_pot_to_manifest(url: &str, pot: Option<&str>) -> String {
match pot {
Some(t) => {
let sep = if url.contains('?') { '&' } else { '?' };
format!("{url}{sep}pot={t}&mpd_version=7")
}
None => url.to_string(),
}
}
fn populate_captions(info: &mut StreamInfo, player_response: &Value) {
let Some(tracks) = player_response
.get("captions")
.and_then(|v| v.get("playerCaptionsTracklistRenderer"))
.and_then(|v| v.get("captionTracks"))
.and_then(|v| v.as_array())
else {
return;
};
for t in tracks {
let Some(url) = t.get("baseUrl").and_then(|v| v.as_str()) else {
continue;
};
let lang = t
.get("languageCode")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let name = t
.get("name")
.and_then(|v| v.get("simpleText"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let auto = t.get("kind").and_then(|v| v.as_str()) == Some("asr");
info.subtitles.push(SubtitlesStream {
url: url.to_string(),
language_code: lang,
name,
is_auto_generated: auto,
mime: "application/ttml+xml".into(),
});
}
}
fn process_url(
raw_format: &Value,
video_id: &str,
cpn: &str,
pot: Option<&str>,
) -> Result<Option<String>, ExtractionError> {
let mut url = if let Some(u) = raw_format.get("url").and_then(|v| v.as_str()) {
u.to_string()
} else {
// signatureCipher path — WEB-family only; not exercised in the
// Android-primary flow but mirror NPE's behavior for completeness.
let cipher_str = raw_format
.get("signatureCipher")
.or_else(|| raw_format.get("cipher"))
.and_then(|v| v.as_str())
.unwrap_or("");
if cipher_str.is_empty() {
return Ok(None);
}
let cipher = parse_cipher_string(cipher_str);
let s = cipher.get("s").map(String::as_str).unwrap_or("");
let sp = cipher.get("sp").map(String::as_str).unwrap_or("sig");
let base = cipher.get("url").map(String::as_str).unwrap_or("");
if base.is_empty() {
return Ok(None);
}
let deobf = PlayerManager::instance()
.deobfuscate_signature(video_id, s)
.map_err(|e| {
ExtractionError::Parsing(ParsingError::Invalid(format!("sig deobf: {e}")))
})?;
format!("{base}&{sp}={deobf}")
};
// nsig deobf — unconditional. Quick-exit if no `n=` present.
url = PlayerManager::instance()
.url_with_throttling_parameter_deobfuscated(video_id, &url)
.map_err(|e| {
ExtractionError::Parsing(ParsingError::Invalid(format!("nsig deobf: {e}")))
})?;
let sep_cpn = if url.contains('?') { '&' } else { '?' };
url = format!("{url}{sep_cpn}cpn={cpn}");
if let Some(token) = pot {
url = format!("{url}&pot={token}");
}
Ok(Some(url))
}
fn parse_cipher_string(s: &str) -> std::collections::BTreeMap<String, String> {
// `url::form_urlencoded::parse` decodes percent-escapes as UTF-8
// multi-byte sequences and handles `+` → space — both of which the
// prior hand-rolled `urlencoded_decode` got wrong (it treated each
// %XX as an isolated code point, so `%E2%9C%93` rendered as three
// garbage chars instead of ✓). YT cipher strings are typically
// ASCII-only, but pulling in the canonical parser closes the
// surface and removes 20 lines.
url::form_urlencoded::parse(s.as_bytes())
.map(|(k, v)| (k.into_owned(), v.into_owned()))
.collect()
}
fn build_video_progressive(
fmt: &Value,
video_id: &str,
cpn: &str,
pot: Option<&str>,
) -> Result<Option<VideoStream>, ExtractionError> {
let itag_id = fmt.get("itag").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let Some(itag) = itag_lookup(itag_id) else {
return Ok(None);
};
let Some(url) = process_url(fmt, video_id, cpn, pot)? else {
return Ok(None);
};
Ok(Some(VideoStream {
itag: itag.id,
url,
format: itag.format,
delivery: DeliveryMethod::Progressive,
resolution: itag.resolution.unwrap_or("").to_string(),
fps: fmt.get("fps").and_then(|v| v.as_u64()).unwrap_or(itag.fps as u64) as u32,
bandwidth: fmt.get("bitrate").and_then(|v| v.as_u64()).map(|n| n as u32),
codec: codec_from_mime(fmt),
content_length_bytes: fmt
.get("contentLength")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<i64>().ok()),
width: fmt.get("width").and_then(|v| v.as_u64()).map(|n| n as u32),
height: fmt.get("height").and_then(|v| v.as_u64()).map(|n| n as u32),
video_only: false,
}))
}
fn build_video_only(
fmt: &Value,
video_id: &str,
cpn: &str,
pot: Option<&str>,
) -> Result<Option<VideoStream>, ExtractionError> {
let itag_id = fmt.get("itag").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let Some(itag) = itag_lookup(itag_id) else {
return Ok(None);
};
if itag.item_type != ItagType::VideoOnly {
return Ok(None);
}
let Some(url) = process_url(fmt, video_id, cpn, pot)? else {
return Ok(None);
};
Ok(Some(VideoStream {
itag: itag.id,
url,
format: itag.format,
delivery: DeliveryMethod::Dash,
resolution: itag.resolution.unwrap_or("").to_string(),
fps: fmt.get("fps").and_then(|v| v.as_u64()).unwrap_or(itag.fps as u64) as u32,
bandwidth: fmt.get("bitrate").and_then(|v| v.as_u64()).map(|n| n as u32),
codec: codec_from_mime(fmt),
content_length_bytes: fmt
.get("contentLength")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<i64>().ok()),
width: fmt.get("width").and_then(|v| v.as_u64()).map(|n| n as u32),
height: fmt.get("height").and_then(|v| v.as_u64()).map(|n| n as u32),
video_only: true,
}))
}
fn build_audio(
fmt: &Value,
video_id: &str,
cpn: &str,
pot: Option<&str>,
) -> Result<Option<AudioStream>, ExtractionError> {
let itag_id = fmt.get("itag").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let Some(itag) = itag_lookup(itag_id) else {
return Ok(None);
};
if itag.item_type != ItagType::Audio {
return Ok(None);
}
let Some(url) = process_url(fmt, video_id, cpn, pot)? else {
return Ok(None);
};
let audio_track = fmt.get("audioTrack");
Ok(Some(AudioStream {
itag: itag.id,
url,
format: itag.format,
delivery: DeliveryMethod::Dash,
average_bitrate_kbps: fmt
.get("averageBitrate")
.and_then(|v| v.as_u64())
.map(|n| (n / 1000) as u32)
.or(itag.avg_bitrate_kbps),
codec: codec_from_mime(fmt),
content_length_bytes: fmt
.get("contentLength")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<i64>().ok()),
audio_track_id: audio_track
.and_then(|t| t.get("id"))
.and_then(|v| v.as_str())
.map(String::from),
audio_track_name: audio_track
.and_then(|t| t.get("displayName"))
.and_then(|v| v.as_str())
.map(String::from),
audio_locale: audio_track
.and_then(|t| t.get("id"))
.and_then(|v| v.as_str())
.and_then(|s| s.split('.').next())
.map(String::from),
is_descriptive: audio_track
.and_then(|t| t.get("audioIsDefault"))
.and_then(|v| v.as_bool())
.map(|b| !b)
.unwrap_or(false),
itag_url_format: None,
}))
}
fn codec_from_mime(fmt: &Value) -> Option<String> {
let mime = fmt.get("mimeType").and_then(|v| v.as_str())?;
let codecs_idx = mime.find("codecs=\"")?;
let after = &mime[codecs_idx + 8..];
let end = after.find('"')?;
Some(after[..end].to_string())
}
/// Dedup by itag id + delivery method, NOT by `mediaFormat.id` — NPE's
/// dedup collides itag 140 and 141 because both are M4A.
fn push_audio_dedup(list: &mut Vec<AudioStream>, candidate: AudioStream) {
if list
.iter()
.any(|s| s.itag == candidate.itag && s.delivery == candidate.delivery)
{
return;
}
list.push(candidate);
}
fn push_video_dedup(list: &mut Vec<VideoStream>, candidate: VideoStream) {
if list
.iter()
.any(|s| s.itag == candidate.itag && s.delivery == candidate.delivery)
{
return;
}
list.push(candidate);
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn playability_ok_returns_ok() {
let resp = json!({"playabilityStatus": {"status": "OK"}});
assert!(check_playability_status(&resp).is_ok());
}
#[test]
fn playability_login_required_age() {
let resp = json!({
"playabilityStatus": {
"status": "LOGIN_REQUIRED",
"reason": "Sign in to confirm your age. This video may be inappropriate for some users."
}
});
let err = check_playability_status(&resp).unwrap_err();
match err {
ExtractionError::ContentUnavailable(ContentUnavailable::AgeRestricted) => (),
other => panic!("expected AgeRestricted, got {other:?}"),
}
}
#[test]
fn playability_geo_restricted() {
let resp = json!({
"playabilityStatus": {
"status": "UNPLAYABLE",
"reason": "This video is not available in your country"
}
});
let err = check_playability_status(&resp).unwrap_err();
match err {
ExtractionError::ContentUnavailable(ContentUnavailable::GeoRestricted) => (),
other => panic!("expected GeoRestricted, got {other:?}"),
}
}
#[test]
fn playability_paid_members() {
let resp = json!({
"playabilityStatus": {
"status": "UNPLAYABLE",
"reason": "This video is available to this channel's members on level: Tier 1"
}
});
match check_playability_status(&resp).unwrap_err() {
ExtractionError::ContentUnavailable(ContentUnavailable::Paid) => (),
other => panic!("expected Paid, got {other:?}"),
}
}
#[test]
fn decoy_detected() {
let resp = json!({"videoDetails": {"videoId": "DIFFERENT_ID"}});
assert!(is_player_response_not_valid(&resp, "REQUESTED_ID"));
let resp = json!({"videoDetails": {"videoId": "MATCHING"}});
assert!(!is_player_response_not_valid(&resp, "MATCHING"));
}
#[test]
fn cipher_string_parsed() {
let s = "s=AAA%3D&sp=sig&url=https%3A%2F%2Fexample.com%2Fpath%3Fa%3D1";
let m = parse_cipher_string(s);
assert_eq!(m.get("s").map(String::as_str), Some("AAA="));
assert_eq!(m.get("sp").map(String::as_str), Some("sig"));
assert_eq!(
m.get("url").map(String::as_str),
Some("https://example.com/path?a=1")
);
}
#[test]
fn manifest_pot_appended() {
assert_eq!(
append_pot_to_manifest("https://x/path", Some("tok")),
"https://x/path?pot=tok&mpd_version=7"
);
assert_eq!(
append_pot_to_manifest("https://x/path?foo=bar", Some("tok")),
"https://x/path?foo=bar&pot=tok&mpd_version=7"
);
assert_eq!(
append_pot_to_manifest("https://x/path", None),
"https://x/path"
);
}
#[test]
fn codec_extracted_from_mime() {
let fmt = json!({"mimeType": "video/mp4; codecs=\"avc1.4d401f\""});
assert_eq!(codec_from_mime(&fmt).as_deref(), Some("avc1.4d401f"));
let fmt = json!({"mimeType": "audio/mp4; codecs=\"mp4a.40.2\""});
assert_eq!(codec_from_mime(&fmt).as_deref(), Some("mp4a.40.2"));
let fmt = json!({"mimeType": "video/webm"});
assert!(codec_from_mime(&fmt).is_none());
}
#[test]
fn dedup_by_itag_plus_delivery() {
let mut list = vec![];
let s = VideoStream {
itag: 137,
url: "u1".into(),
format: MediaFormat::Mpeg4,
delivery: DeliveryMethod::Dash,
resolution: "1080p".into(),
fps: 30,
bandwidth: None,
codec: None,
content_length_bytes: None,
width: None,
height: None,
video_only: true,
};
push_video_dedup(&mut list, s.clone());
push_video_dedup(&mut list, s.clone()); // duplicate
assert_eq!(list.len(), 1);
let mut s2 = s.clone();
s2.itag = 299;
push_video_dedup(&mut list, s2);
assert_eq!(list.len(), 2);
}
}