diff --git a/addon/plugin.video.torttube/addon.xml b/addon/plugin.video.torttube/addon.xml index 6dceed7..7c1cee3 100644 --- a/addon/plugin.video.torttube/addon.xml +++ b/addon/plugin.video.torttube/addon.xml @@ -1,7 +1,7 @@ diff --git a/addon/plugin.video.torttube/main.py b/addon/plugin.video.torttube/main.py index 7b6aea8..83e2d20 100644 --- a/addon/plugin.video.torttube/main.py +++ b/addon/plugin.video.torttube/main.py @@ -879,6 +879,77 @@ def _remove_from_watch_later(yt_id: str) -> None: _log(f"watch later remove lock failed (non-fatal): {e}", xbmc.LOGWARNING) +# Subscriptions — NewPipe-style offline subs. The user adds a channel to a +# local list; the Subscriptions Feed merges latest videos from each into a +# chronological feed. No YouTube account, no recommendation algorithm — +# you decide what feeds back. Same atomic-write + lock infra as WL/history. + +SUBSCRIPTIONS_MAX = 500 + + +def _subscriptions_path() -> str: + return _addon_data_path("subscriptions.json") + + +def _load_subscriptions() -> list[dict[str, Any]]: + try: + with open(_subscriptions_path(), "r", encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, list): + return [ + d for d in data + if isinstance(d, dict) and isinstance(d.get("id"), str) + ][:SUBSCRIPTIONS_MAX] + except (FileNotFoundError, json.JSONDecodeError, OSError): + pass + return [] + + +def _save_subscriptions(items: list[dict[str, Any]]) -> None: + path = _subscriptions_path() + try: + _atomic_write_json(path, items[:SUBSCRIPTIONS_MAX]) + except OSError as e: + _log(f"subscriptions save failed (non-fatal): {e}", xbmc.LOGWARNING) + + +def _subscribe(channel_id: str, channel_name: str = "") -> bool: + """Add a channel to the local subscriptions list. Returns True if new.""" + if not channel_id: + return False + path = _subscriptions_path() + new_sub = False + + def _do() -> None: + nonlocal new_sub + items = _load_subscriptions() + if any(i.get("id") == channel_id for i in items): + return + new_sub = True + items.insert(0, {"id": channel_id, "name": channel_name or channel_id}) + _save_subscriptions(items) + + try: + _with_lock(path, _do) + except OSError as e: + _log(f"subscribe lock failed (non-fatal): {e}", xbmc.LOGWARNING) + return new_sub + + +def _unsubscribe(channel_id: str) -> None: + path = _subscriptions_path() + + def _do() -> None: + items = _load_subscriptions() + items = [i for i in items if i.get("id") != channel_id] + _save_subscriptions(items) + + try: + _with_lock(path, _do) + except OSError as e: + _log(f"unsubscribe lock failed (non-fatal): {e}", xbmc.LOGWARNING) + + def _pick_thumbnail(thumbs: Any) -> str: """Pick a thumbnail URL from whatever shape rustypipe / yt-dlp / our own persisted Watch Later records hand us. @@ -913,6 +984,22 @@ def _root_directory() -> None: ("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")), ("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")), ] + subs = _load_subscriptions() + if subs: + items.append( + ( + f"Subscriptions Feed ({len(subs)})", + "DefaultRecentlyAddedEpisodes.png", + _plugin_url(action="subs_feed"), + ) + ) + items.append( + ( + "Subscriptions (channel list)", + "DefaultArtist.png", + _plugin_url(action="subs"), + ) + ) wl = _load_watch_later() if wl: items.append( @@ -1047,6 +1134,123 @@ def _wl_add_action() -> None: ) +def _subscribe_action() -> None: + """RunPlugin handler: add a channel to subscriptions.""" + params = dict(parse_qsl(_QS.lstrip("?"))) + channel_id = params.get("id") or "" + channel_name = params.get("name") or "" + if not channel_id: + xbmcgui.Dialog().notification( + "torttube", "missing channel id", xbmcgui.NOTIFICATION_ERROR, 3000 + ) + return + is_new = _subscribe(channel_id, channel_name) + msg = ( + f"Subscribed to {channel_name or channel_id}" + if is_new + else f"Already subscribed to {channel_name or channel_id}" + ) + xbmcgui.Dialog().notification("torttube", msg, xbmcgui.NOTIFICATION_INFO, 2500) + + +def _unsubscribe_action() -> None: + """RunPlugin handler: remove a channel from subscriptions.""" + params = dict(parse_qsl(_QS.lstrip("?"))) + channel_id = params.get("id") or "" + if not channel_id: + return + _unsubscribe(channel_id) + xbmcgui.Dialog().notification( + "torttube", "Unsubscribed", xbmcgui.NOTIFICATION_INFO, 2000 + ) + container_path = xbmc.getInfoLabel("Container.FolderPath") or "" + if "action=subs" in container_path: + xbmc.executebuiltin("Container.Refresh") + + +def _subscriptions_directory() -> None: + """Channel-list view: shows every subscribed channel as a folder you + can drill into. Unsubscribe is in each channel's context menu.""" + subs = _load_subscriptions() + if not subs: + xbmcgui.Dialog().notification( + "torttube", + "No subscriptions yet. Right-click a video and 'Subscribe to '.", + xbmcgui.NOTIFICATION_INFO, + 4000, + ) + xbmcplugin.endOfDirectory(_HANDLE, succeeded=False) + return + xbmcplugin.setContent(_HANDLE, "files") + for s in subs: + cid = s.get("id") or "" + cname = s.get("name") or cid + li = xbmcgui.ListItem(label=cname) + li.setArt({"icon": "DefaultArtist.png"}) + li.addContextMenuItems( + [ + ( + f"Unsubscribe from {cname}", + f"RunPlugin({_plugin_url(action='unsubscribe', id=cid)})", + ) + ] + ) + xbmcplugin.addDirectoryItem( + _HANDLE, + _plugin_url(action="channel", id=cid), + li, + isFolder=True, + ) + xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False) + + +def _subscriptions_feed_directory() -> None: + """The killer NewPipe-style feed: latest videos across all subscribed + channels, merged + sorted newest-first.""" + subs = _load_subscriptions() + if not subs: + xbmcgui.Dialog().notification( + "torttube", + "No subscriptions yet. Subscribe to a channel first.", + xbmcgui.NOTIFICATION_INFO, + 4000, + ) + xbmcplugin.endOfDirectory(_HANDLE, succeeded=False) + return + channel_ids = [s.get("id") for s in subs if s.get("id")] + try: + # Generous timeout because we're fanning out to all subscribed + # channels — N rustypipe round-trips in parallel. + resp = _call_sidecar( + {"op": "subscriptions_feed", "channel_ids": channel_ids, "per_channel": 8, "limit": 60}, + timeout_s=60, + ) + except Exception as e: + _log(f"subs_feed failed: {e}", xbmc.LOGERROR) + xbmcgui.Dialog().notification( + "torttube", f"feed failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000 + ) + xbmcplugin.endOfDirectory(_HANDLE, succeeded=False) + return + if not resp.get("ok"): + xbmcgui.Dialog().notification( + "torttube", + f"feed: {resp.get('error', 'unknown')}", + xbmcgui.NOTIFICATION_WARNING, + 4000, + ) + xbmcplugin.endOfDirectory(_HANDLE, succeeded=False) + return + + items = resp.get("items") or [] + failed = resp.get("channels_failed") or [] + if failed: + _log(f"subs_feed: {len(failed)} channel(s) failed: {failed[:3]}…", xbmc.LOGWARNING) + _log(f"subs_feed: {len(items)} items across {len(channel_ids)} channels") + _add_video_items(items) + xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False) + + def _wl_remove_action() -> None: """RunPlugin handler: remove a video from Watch Later.""" params = dict(parse_qsl(_QS.lstrip("?"))) @@ -1114,8 +1318,9 @@ def _add_video_items(items: list[dict[str, Any]], *, in_watch_later: bool = Fals except Exception: pass - # Context menu: jump to channel listing + Watch Later add/remove. + # Context menu: jump to channel listing + Watch Later + subscribe. ctx: list[tuple[str, str]] = [] + subscribed_ids = {s.get("id") for s in _load_subscriptions()} if channel_id: ctx.append( ( @@ -1123,6 +1328,20 @@ def _add_video_items(items: list[dict[str, Any]], *, in_watch_later: bool = Fals f"Container.Update({_plugin_url(action='channel', id=channel_id)})", ) ) + if channel_id in subscribed_ids: + ctx.append( + ( + f"Unsubscribe from {channel_name or 'channel'}", + f"RunPlugin({_plugin_url(action='unsubscribe', id=channel_id)})", + ) + ) + else: + ctx.append( + ( + f"Subscribe to {channel_name or 'channel'}", + f"RunPlugin({_plugin_url(action='subscribe', id=channel_id, name=channel_name or '')})", + ) + ) if in_watch_later: ctx.append( ( @@ -1300,6 +1519,14 @@ def main() -> None: _wl_add_action() elif action == "wl_remove": _wl_remove_action() + elif action == "subs": + _subscriptions_directory() + elif action == "subs_feed": + _subscriptions_feed_directory() + elif action == "subscribe": + _subscribe_action() + elif action == "unsubscribe": + _unsubscribe_action() else: _root_directory() diff --git a/sidecar/crates/torttube-sidecar/src/main.rs b/sidecar/crates/torttube-sidecar/src/main.rs index db5384f..23c9753 100644 --- a/sidecar/crates/torttube-sidecar/src/main.rs +++ b/sidecar/crates/torttube-sidecar/src/main.rs @@ -62,6 +62,25 @@ enum Request { #[serde(default = "default_search_limit")] limit: u32, }, + /// Subscriptions feed: take a list of channel IDs (the user's offline + /// subscriptions), fetch each channel's recent videos in parallel, merge + /// + sort by publish date newest-first, return up to `limit` items. + /// `per_channel` caps how many recent videos we pull from each channel. + SubscriptionsFeed { + channel_ids: Vec, + #[serde(default = "default_per_channel")] + per_channel: u32, + #[serde(default = "default_feed_limit")] + limit: u32, + }, +} + +fn default_per_channel() -> u32 { + 8 +} + +fn default_feed_limit() -> u32 { + 60 } fn default_search_limit() -> u32 { @@ -272,6 +291,43 @@ async fn handle_line(line: &str) -> Response { Ok(v) => Response::ok(v), Err(e) => e.into(), }, + Request::SubscriptionsFeed { + channel_ids, + per_channel, + limit, + } => { + // Cap inputs to prevent a malicious caller from hammering YouTube + // by passing 10k channel ids — limit total fan-out. + const MAX_CHANNELS_PER_FEED: usize = 200; + if channel_ids.len() > MAX_CHANNELS_PER_FEED { + return Response::err( + ErrorKind::BadRequest, + format!( + "too many channels: {} (cap {})", + channel_ids.len(), + MAX_CHANNELS_PER_FEED + ), + ); + } + for cid in &channel_ids { + if cid.len() > 64 || cid.is_empty() { + return Response::err( + ErrorKind::BadRequest, + "channel id has bad shape".to_string(), + ); + } + } + match resolve::subscriptions_feed( + &channel_ids, + per_channel.min(50), + limit.min(MAX_LIMIT), + ) + .await + { + Ok(v) => Response::ok(v), + Err(e) => e.into(), + } + } } } diff --git a/sidecar/crates/torttube-sidecar/src/resolve.rs b/sidecar/crates/torttube-sidecar/src/resolve.rs index af59346..38c848e 100644 --- a/sidecar/crates/torttube-sidecar/src/resolve.rs +++ b/sidecar/crates/torttube-sidecar/src/resolve.rs @@ -76,6 +76,93 @@ pub(crate) async fn channel_videos(channel_id: &str, limit: u32) -> Result Result { + use rustypipe::client::RustyPipe; + use std::collections::HashMap; + + let rp = std::sync::Arc::new(RustyPipe::new()); + + // Spawn one tokio task per channel; merge results once they all finish. + // Fan-out is capped at 200 by main.rs; per_channel capped at 50. + let tasks: Vec<_> = channel_ids + .iter() + .map(|cid| { + let rp = rp.clone(); + let cid = cid.clone(); + tokio::spawn(async move { + let res = rp.query().channel_videos(&cid).await; + (cid, res) + }) + }) + .collect(); + + let mut all_items: Vec = Vec::new(); + let mut channel_names: HashMap = HashMap::new(); + let mut failed: Vec = Vec::new(); + + for handle in tasks { + let (cid, res) = match handle.await { + Ok(t) => t, + Err(e) => { + tracing::warn!(error = %e, "subscriptions feed task panicked"); + continue; + } + }; + match res { + Ok(ch) => { + if !ch.name.is_empty() { + channel_names.insert(cid.clone(), ch.name.clone()); + } + for vi in ch.content.items.iter().take(per_channel as usize) { + if let Ok(v) = serde_json::to_value(vi) { + all_items.push(v); + } + } + } + Err(e) => { + tracing::warn!(channel = %cid, error = %e, "subscriptions feed channel failed"); + failed.push(cid); + } + } + } + + // Sort newest-first by publish_date. rustypipe VideoItem has an optional + // `publish_date` as an RFC3339-ish string when known; fall back to + // `publish_date_txt` (relative) ordering by string is meaningless, so we + // keep channel-order as a stable secondary sort by leaving channel order + // as insertion order and using a stable sort. + all_items.sort_by(|a, b| { + let ad = a.get("publish_date").and_then(Value::as_str).unwrap_or(""); + let bd = b.get("publish_date").and_then(Value::as_str).unwrap_or(""); + bd.cmp(ad) // descending + }); + all_items.truncate(limit as usize); + + tracing::info!( + channels = channel_ids.len(), + items = all_items.len(), + failed = failed.len(), + "subscriptions_feed ok" + ); + + Ok(serde_json::json!({ + "source": "rustypipe", + "items": all_items, + "channels_total": channel_ids.len(), + "channels_failed": failed, + "channel_names": channel_names, + })) +} + /// List a playlist's videos. Returns the same VideoItem shape as search/channel. pub(crate) async fn playlist(playlist_id: &str, limit: u32) -> Result { use rustypipe::client::RustyPipe;