Subscriptions — NewPipe-style offline subs + chronological feed

User-curated, no YouTube account. Same posture as Watch Later: you
decide what comes back, no algorithm involved.

Sidecar SubscriptionsFeed op:
  - Takes channel_ids: Vec<String>, per_channel + limit defaults 8/60
  - Caps fan-out at 200 channels per call + each id at 64 chars to
    keep a malicious caller from hammering YouTube via the sidecar
  - Spawns one tokio::task per channel against
    rustypipe::query().channel_videos(), merges results once all
    finish, sorts by publish_date string newest-first
  - A channel that 404s / region-blocks / rustypipe-errors is silently
    dropped — one dead subscription doesn't kill the whole feed; the
    failed list is returned in channels_failed for logging

Addon side:
  - subscriptions.json under addon_data, persisted via same
    _atomic_write_json + _with_lock helpers as Watch Later (no
    repeat of the race + torn-write hazards the audit caught)
  - Two new root menu entries (visible only when subscribed):
    * 'Subscriptions Feed  (N)' — chronological merge of latest uploads
    * 'Subscriptions (channel list)' — per-channel browse
  - Context menu on every video result toggles
    'Subscribe to <channel>' / 'Unsubscribe from <channel>' based on
    current sub state
  - Context menu on each entry in the channel list has its own
    'Unsubscribe from <channel>' for direct removal
  - _unsubscribe_action does Container.Refresh only when
    Container.FolderPath contains 'action=subs' (same guard pattern
    we used for wl_remove)

Live smoke (browse-only, no playback, Leia still safe):
  - Subscribed to LTT (UCXuqSBlHAE6Xw-yeJA0Tunw) + MKBHD
    (UCBJycsmduvYEL83R_U4JriQ) via RunPlugin
  - subscriptions.json correctly holds both
  - action=subs shows MKBHD + Linus Tech Tips as channel folders
  - action=subs_feed returns 16 merged items: MKBHD's recent uploads
    plus LTT's
  - Root menu now includes 'Subscriptions Feed  (2)' and
    'Subscriptions (channel list)'

Addon v0.0.17.
This commit is contained in:
Kayos 2026-05-23 12:59:16 -07:00
parent 03e1eb526a
commit 659e7cf613
4 changed files with 372 additions and 2 deletions

View file

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="plugin.video.torttube"
name="torttube"
version="0.0.16"
version="0.0.17"
provider-name="Sulkta-Coop">
<requires>
<import addon="xbmc.python" version="3.0.0"/>

View file

@ -879,6 +879,77 @@ def _remove_from_watch_later(yt_id: str) -> None:
_log(f"watch later remove lock failed (non-fatal): {e}", xbmc.LOGWARNING)
# Subscriptions — NewPipe-style offline subs. The user adds a channel to a
# local list; the Subscriptions Feed merges latest videos from each into a
# chronological feed. No YouTube account, no recommendation algorithm —
# you decide what feeds back. Same atomic-write + lock infra as WL/history.
SUBSCRIPTIONS_MAX = 500
def _subscriptions_path() -> str:
return _addon_data_path("subscriptions.json")
def _load_subscriptions() -> list[dict[str, Any]]:
try:
with open(_subscriptions_path(), "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [
d for d in data
if isinstance(d, dict) and isinstance(d.get("id"), str)
][:SUBSCRIPTIONS_MAX]
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
return []
def _save_subscriptions(items: list[dict[str, Any]]) -> None:
path = _subscriptions_path()
try:
_atomic_write_json(path, items[:SUBSCRIPTIONS_MAX])
except OSError as e:
_log(f"subscriptions save failed (non-fatal): {e}", xbmc.LOGWARNING)
def _subscribe(channel_id: str, channel_name: str = "") -> bool:
"""Add a channel to the local subscriptions list. Returns True if new."""
if not channel_id:
return False
path = _subscriptions_path()
new_sub = False
def _do() -> None:
nonlocal new_sub
items = _load_subscriptions()
if any(i.get("id") == channel_id for i in items):
return
new_sub = True
items.insert(0, {"id": channel_id, "name": channel_name or channel_id})
_save_subscriptions(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"subscribe lock failed (non-fatal): {e}", xbmc.LOGWARNING)
return new_sub
def _unsubscribe(channel_id: str) -> None:
path = _subscriptions_path()
def _do() -> None:
items = _load_subscriptions()
items = [i for i in items if i.get("id") != channel_id]
_save_subscriptions(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"unsubscribe lock failed (non-fatal): {e}", xbmc.LOGWARNING)
def _pick_thumbnail(thumbs: Any) -> str:
"""Pick a thumbnail URL from whatever shape rustypipe / yt-dlp / our own
persisted Watch Later records hand us.
@ -913,6 +984,22 @@ def _root_directory() -> None:
("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")),
("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")),
]
subs = _load_subscriptions()
if subs:
items.append(
(
f"Subscriptions Feed ({len(subs)})",
"DefaultRecentlyAddedEpisodes.png",
_plugin_url(action="subs_feed"),
)
)
items.append(
(
"Subscriptions (channel list)",
"DefaultArtist.png",
_plugin_url(action="subs"),
)
)
wl = _load_watch_later()
if wl:
items.append(
@ -1047,6 +1134,123 @@ def _wl_add_action() -> None:
)
def _subscribe_action() -> None:
"""RunPlugin handler: add a channel to subscriptions."""
params = dict(parse_qsl(_QS.lstrip("?")))
channel_id = params.get("id") or ""
channel_name = params.get("name") or ""
if not channel_id:
xbmcgui.Dialog().notification(
"torttube", "missing channel id", xbmcgui.NOTIFICATION_ERROR, 3000
)
return
is_new = _subscribe(channel_id, channel_name)
msg = (
f"Subscribed to {channel_name or channel_id}"
if is_new
else f"Already subscribed to {channel_name or channel_id}"
)
xbmcgui.Dialog().notification("torttube", msg, xbmcgui.NOTIFICATION_INFO, 2500)
def _unsubscribe_action() -> None:
"""RunPlugin handler: remove a channel from subscriptions."""
params = dict(parse_qsl(_QS.lstrip("?")))
channel_id = params.get("id") or ""
if not channel_id:
return
_unsubscribe(channel_id)
xbmcgui.Dialog().notification(
"torttube", "Unsubscribed", xbmcgui.NOTIFICATION_INFO, 2000
)
container_path = xbmc.getInfoLabel("Container.FolderPath") or ""
if "action=subs" in container_path:
xbmc.executebuiltin("Container.Refresh")
def _subscriptions_directory() -> None:
"""Channel-list view: shows every subscribed channel as a folder you
can drill into. Unsubscribe is in each channel's context menu."""
subs = _load_subscriptions()
if not subs:
xbmcgui.Dialog().notification(
"torttube",
"No subscriptions yet. Right-click a video and 'Subscribe to <channel>'.",
xbmcgui.NOTIFICATION_INFO,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
xbmcplugin.setContent(_HANDLE, "files")
for s in subs:
cid = s.get("id") or ""
cname = s.get("name") or cid
li = xbmcgui.ListItem(label=cname)
li.setArt({"icon": "DefaultArtist.png"})
li.addContextMenuItems(
[
(
f"Unsubscribe from {cname}",
f"RunPlugin({_plugin_url(action='unsubscribe', id=cid)})",
)
]
)
xbmcplugin.addDirectoryItem(
_HANDLE,
_plugin_url(action="channel", id=cid),
li,
isFolder=True,
)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _subscriptions_feed_directory() -> None:
"""The killer NewPipe-style feed: latest videos across all subscribed
channels, merged + sorted newest-first."""
subs = _load_subscriptions()
if not subs:
xbmcgui.Dialog().notification(
"torttube",
"No subscriptions yet. Subscribe to a channel first.",
xbmcgui.NOTIFICATION_INFO,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
channel_ids = [s.get("id") for s in subs if s.get("id")]
try:
# Generous timeout because we're fanning out to all subscribed
# channels — N rustypipe round-trips in parallel.
resp = _call_sidecar(
{"op": "subscriptions_feed", "channel_ids": channel_ids, "per_channel": 8, "limit": 60},
timeout_s=60,
)
except Exception as e:
_log(f"subs_feed failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"feed failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"feed: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
failed = resp.get("channels_failed") or []
if failed:
_log(f"subs_feed: {len(failed)} channel(s) failed: {failed[:3]}", xbmc.LOGWARNING)
_log(f"subs_feed: {len(items)} items across {len(channel_ids)} channels")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _wl_remove_action() -> None:
"""RunPlugin handler: remove a video from Watch Later."""
params = dict(parse_qsl(_QS.lstrip("?")))
@ -1114,8 +1318,9 @@ def _add_video_items(items: list[dict[str, Any]], *, in_watch_later: bool = Fals
except Exception:
pass
# Context menu: jump to channel listing + Watch Later add/remove.
# Context menu: jump to channel listing + Watch Later + subscribe.
ctx: list[tuple[str, str]] = []
subscribed_ids = {s.get("id") for s in _load_subscriptions()}
if channel_id:
ctx.append(
(
@ -1123,6 +1328,20 @@ def _add_video_items(items: list[dict[str, Any]], *, in_watch_later: bool = Fals
f"Container.Update({_plugin_url(action='channel', id=channel_id)})",
)
)
if channel_id in subscribed_ids:
ctx.append(
(
f"Unsubscribe from {channel_name or 'channel'}",
f"RunPlugin({_plugin_url(action='unsubscribe', id=channel_id)})",
)
)
else:
ctx.append(
(
f"Subscribe to {channel_name or 'channel'}",
f"RunPlugin({_plugin_url(action='subscribe', id=channel_id, name=channel_name or '')})",
)
)
if in_watch_later:
ctx.append(
(
@ -1300,6 +1519,14 @@ def main() -> None:
_wl_add_action()
elif action == "wl_remove":
_wl_remove_action()
elif action == "subs":
_subscriptions_directory()
elif action == "subs_feed":
_subscriptions_feed_directory()
elif action == "subscribe":
_subscribe_action()
elif action == "unsubscribe":
_unsubscribe_action()
else:
_root_directory()

View file

@ -62,6 +62,25 @@ enum Request {
#[serde(default = "default_search_limit")]
limit: u32,
},
/// Subscriptions feed: take a list of channel IDs (the user's offline
/// subscriptions), fetch each channel's recent videos in parallel, merge
/// + sort by publish date newest-first, return up to `limit` items.
/// `per_channel` caps how many recent videos we pull from each channel.
SubscriptionsFeed {
channel_ids: Vec<String>,
#[serde(default = "default_per_channel")]
per_channel: u32,
#[serde(default = "default_feed_limit")]
limit: u32,
},
}
fn default_per_channel() -> u32 {
8
}
fn default_feed_limit() -> u32 {
60
}
fn default_search_limit() -> u32 {
@ -272,6 +291,43 @@ async fn handle_line(line: &str) -> Response {
Ok(v) => Response::ok(v),
Err(e) => e.into(),
},
Request::SubscriptionsFeed {
channel_ids,
per_channel,
limit,
} => {
// Cap inputs to prevent a malicious caller from hammering YouTube
// by passing 10k channel ids — limit total fan-out.
const MAX_CHANNELS_PER_FEED: usize = 200;
if channel_ids.len() > MAX_CHANNELS_PER_FEED {
return Response::err(
ErrorKind::BadRequest,
format!(
"too many channels: {} (cap {})",
channel_ids.len(),
MAX_CHANNELS_PER_FEED
),
);
}
for cid in &channel_ids {
if cid.len() > 64 || cid.is_empty() {
return Response::err(
ErrorKind::BadRequest,
"channel id has bad shape".to_string(),
);
}
}
match resolve::subscriptions_feed(
&channel_ids,
per_channel.min(50),
limit.min(MAX_LIMIT),
)
.await
{
Ok(v) => Response::ok(v),
Err(e) => e.into(),
}
}
}
}

View file

@ -76,6 +76,93 @@ pub(crate) async fn channel_videos(channel_id: &str, limit: u32) -> Result<Value
}))
}
/// Fetch a subscriptions feed: pull the most recent N videos from each
/// channel in parallel, merge + sort by publish-date newest-first, cap
/// the returned union at `limit`. A channel that fails (404, region block,
/// rustypipe error) is silently dropped — one dead subscription shouldn't
/// kill the whole feed.
pub(crate) async fn subscriptions_feed(
channel_ids: &[String],
per_channel: u32,
limit: u32,
) -> Result<Value, HandlerError> {
use rustypipe::client::RustyPipe;
use std::collections::HashMap;
let rp = std::sync::Arc::new(RustyPipe::new());
// Spawn one tokio task per channel; merge results once they all finish.
// Fan-out is capped at 200 by main.rs; per_channel capped at 50.
let tasks: Vec<_> = channel_ids
.iter()
.map(|cid| {
let rp = rp.clone();
let cid = cid.clone();
tokio::spawn(async move {
let res = rp.query().channel_videos(&cid).await;
(cid, res)
})
})
.collect();
let mut all_items: Vec<Value> = Vec::new();
let mut channel_names: HashMap<String, String> = HashMap::new();
let mut failed: Vec<String> = Vec::new();
for handle in tasks {
let (cid, res) = match handle.await {
Ok(t) => t,
Err(e) => {
tracing::warn!(error = %e, "subscriptions feed task panicked");
continue;
}
};
match res {
Ok(ch) => {
if !ch.name.is_empty() {
channel_names.insert(cid.clone(), ch.name.clone());
}
for vi in ch.content.items.iter().take(per_channel as usize) {
if let Ok(v) = serde_json::to_value(vi) {
all_items.push(v);
}
}
}
Err(e) => {
tracing::warn!(channel = %cid, error = %e, "subscriptions feed channel failed");
failed.push(cid);
}
}
}
// Sort newest-first by publish_date. rustypipe VideoItem has an optional
// `publish_date` as an RFC3339-ish string when known; fall back to
// `publish_date_txt` (relative) ordering by string is meaningless, so we
// keep channel-order as a stable secondary sort by leaving channel order
// as insertion order and using a stable sort.
all_items.sort_by(|a, b| {
let ad = a.get("publish_date").and_then(Value::as_str).unwrap_or("");
let bd = b.get("publish_date").and_then(Value::as_str).unwrap_or("");
bd.cmp(ad) // descending
});
all_items.truncate(limit as usize);
tracing::info!(
channels = channel_ids.len(),
items = all_items.len(),
failed = failed.len(),
"subscriptions_feed ok"
);
Ok(serde_json::json!({
"source": "rustypipe",
"items": all_items,
"channels_total": channel_ids.len(),
"channels_failed": failed,
"channel_names": channel_names,
}))
}
/// List a playlist's videos. Returns the same VideoItem shape as search/channel.
pub(crate) async fn playlist(playlist_id: &str, limit: u32) -> Result<Value, HandlerError> {
use rustypipe::client::RustyPipe;