torttube/addon/plugin.video.torttube/main.py
Kayos 5f2145e5fe v1.0.1 — pv.youtube service preflight + sidecar runtime revert
Two fixes after a 2026-05-23 regression where a Kodi restart left
plugin.video.youtube's service.py un-started, breaking every delegated
play with a silent "Service IPC - Monitor has not started" error.

- Addon: probe pv.youtube's localhost httpd (50152/50153) before
  delegating. If nothing is listening, show a "YouTube service down —
  restart Kodi" notification and fall back to our DASH / progressive
  paths instead of letting setResolvedUrl fail silently.

- Sidecar: revert tokio runtime from current_thread back to
  multi_thread with 2 worker threads so subscriptions_feed's per-channel
  tokio::spawn fan-out runs in parallel. The current_thread RSS savings
  weren't worth the behavioral change.
2026-05-23 13:56:04 -07:00

1703 lines
62 KiB
Python

# torttube — Kodi YouTube addon
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Entry point. Plugin URL shapes handled:
plugin://plugin.video.torttube/ (root: M3 will add browse UI)
plugin://plugin.video.torttube/?action=play&id=ID (resolve + play a YouTube ID)
plugin://plugin.video.torttube/?action=play&url=URL (extract ID from URL, then play)
Remote-control / share-to-TV pattern (Kodi JSON-RPC):
POST http://<rpi>:8080/jsonrpc
Basic auth (kodi user)
{"jsonrpc":"2.0","method":"Player.Open","params":{
"item":{"file":"plugin://plugin.video.torttube/?action=play&id=<id>"}}}
That's how Android / phone / "send to TV" flows hand off — Kodi already
exposes the endpoint, we just need to register the plugin URL.
"""
import fcntl
import http.server
import json
import os
import re
import socket
import subprocess
import sys
import tempfile
import threading
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse
from xml.sax.saxutils import escape as xml_escape
import xbmc
import xbmcaddon
import xbmcgui
import xbmcplugin
ADDON = xbmcaddon.Addon()
ADDON_PATH = ADDON.getAddonInfo("path")
SIDECAR_BIN = os.path.join(ADDON_PATH, "bin", "torttube-sidecar")
def _safe_handle() -> int:
"""Kodi always passes an int as sys.argv[1] per the plugin contract, but
if someone runs main.py directly (testing) or Kodi has a bad day we'd
crash with a ValueError before _log is even defined. -1 means 'no handle';
setResolvedUrl / endOfDirectory are no-ops with that."""
if len(sys.argv) <= 1:
return -1
try:
return int(sys.argv[1])
except ValueError:
return -1
_HANDLE = _safe_handle()
_QS = sys.argv[2] if len(sys.argv) > 2 else ""
def _log(msg, level=xbmc.LOGINFO):
xbmc.log(f"[torttube] {msg}", level)
_ID_RE = re.compile(r"[A-Za-z0-9_-]{11}")
def _validate_id(candidate: str | None) -> str:
"""Return the candidate iff it matches YouTube's 11-char id shape; raise otherwise.
Every branch of _extract_id MUST pipe its return value through this so a
bogus `v=` query, an empty youtu.be path, or a malformed share link doesn't
leak into _call_sidecar -> yt-dlp / rustypipe / log lines."""
if candidate and _ID_RE.fullmatch(candidate):
return candidate
raise ValueError(f"not a valid YouTube id: {candidate!r}")
def _extract_id(url_or_id: str) -> str:
"""Accept either a bare ID or any common YouTube URL form, return the ID.
Every accepted form is validated through _validate_id so we never hand
untrusted text to the sidecar."""
s = url_or_id.strip()
# Bare 11-char ID (YouTube's canonical length).
if _ID_RE.fullmatch(s):
return s
parsed = urlparse(s)
# https://youtu.be/<id> (strict suffix match — avoid `myyoutu.be` etc.)
if parsed.netloc == "youtu.be" or parsed.netloc.endswith(".youtu.be"):
return _validate_id(parsed.path.lstrip("/").split("/")[0])
# https://www.youtube.com/watch?v=<id> (strict suffix match)
if parsed.netloc == "youtube.com" or parsed.netloc.endswith(".youtube.com"):
for k, v in parse_qsl(parsed.query):
if k == "v":
return _validate_id(v)
# /shorts/<id>, /embed/<id>, /live/<id>
m = re.match(r"^/(shorts|embed|live)/([A-Za-z0-9_-]{11})", parsed.path)
if m:
return _validate_id(m.group(2))
raise ValueError(f"could not extract YouTube id from {url_or_id!r}")
def _call_sidecar(request: dict, timeout_s: int = 30) -> dict:
"""Invoke the sidecar with one JSON request, parse one JSON response.
Puts the addon's bin/ on PATH so the sidecar's `yt-dlp` shell-outs find
the bundled zipapp (LibreELEC has no system yt-dlp).
"""
if not os.path.isfile(SIDECAR_BIN):
raise RuntimeError(f"sidecar binary missing at {SIDECAR_BIN}")
if not os.access(SIDECAR_BIN, os.X_OK):
raise RuntimeError(f"sidecar binary not executable at {SIDECAR_BIN}")
env = os.environ.copy()
addon_bin = os.path.join(ADDON_PATH, "bin")
env["PATH"] = addon_bin + os.pathsep + env.get("PATH", "")
proc = subprocess.run(
[SIDECAR_BIN],
input=(json.dumps(request) + "\n").encode("utf-8"),
capture_output=True,
timeout=timeout_s,
env=env,
)
if proc.returncode != 0:
raise RuntimeError(
f"sidecar exited {proc.returncode}: {proc.stderr.decode('utf-8', 'replace')[:500]}"
)
# The sidecar writes its JSON response as the LAST line of stdout (after any
# log lines it might have written via println!/eprintln/tracing if the route
# config ever changes). Pick the last non-empty line and decode robustly.
last_line: bytes | None = None
for line in proc.stdout.splitlines():
line = line.strip()
if line:
last_line = line
if last_line is None:
raise RuntimeError("sidecar produced no response")
try:
return json.loads(last_line.decode("utf-8", errors="replace"))
except json.JSONDecodeError as e:
raise RuntimeError(
f"sidecar stdout was not JSON: {e}; line={last_line[:200]!r}"
) from e
def _resolved_listitem(stream_url: str, title: str | None) -> xbmcgui.ListItem:
li = xbmcgui.ListItem(label=title or "torttube")
li.setPath(stream_url)
li.setProperty("IsPlayable", "true")
# Tell inputstream.adaptive to handle DASH/HLS based on URL path/suffix.
# Check the URL *path* extension, not a substring of the whole URL —
# progressive yt-dlp URLs have base64 blobs in the query string that
# can accidentally contain ".mpd" or ".m3u8".
parsed = urlparse(stream_url)
if parsed.path.endswith(".mpd"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "mpd")
# googlevideo rejects segment GETs that don't carry an Origin/Referer
# from www.youtube.com — 403 Forbidden otherwise. Same Mozilla UA
# rustypipe / yt-dlp use when minting the URL.
ytdl_ua = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
seg_headers = (
f"User-Agent={ytdl_ua}"
"&Origin=https://www.youtube.com"
"&Referer=https://www.youtube.com/"
)
li.setProperty("inputstream.adaptive.stream_headers", seg_headers)
li.setProperty("inputstream.adaptive.manifest_headers", seg_headers)
elif parsed.path.endswith(".m3u8"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "hls")
return li
class _MpdHandler(http.server.BaseHTTPRequestHandler):
"""Serves the per-video MPD file. The bytes come from the closure-captured
`_MPD_BYTES` so the server can outlive the temp file if the addon decides
to clean up early. One handler per HTTPServer instance."""
mpd_bytes: bytes = b""
def do_GET(self) -> None: # noqa: N802 — http.server convention
self.send_response(200)
self.send_header("Content-Type", "application/dash+xml")
self.send_header("Content-Length", str(len(self.mpd_bytes)))
self.end_headers()
self.wfile.write(self.mpd_bytes)
def log_message(self, *args: Any, **kwargs: Any) -> None:
# Silence the default request log — Kodi's log is verbose enough.
return
def _has_setting(setting_id: str) -> bool:
"""Best-effort check that a setting exists in resources/settings.xml.
Returns False if the lookup throws (older builds, missing schema)."""
try:
ADDON.getSetting(setting_id)
return True
except Exception:
return False
def _lan_ip() -> str:
"""Detect this host's LAN IP.
First preference: a private-range IPv4 on a local interface
(192.168.x.x / 10.x.x.x / 172.16-31.x.x). This wins over the
connect-trick on multi-NIC hosts where the default route points
at a VPN (tailscale, OpenVPN). inputstream.adaptive's libcurl
has historically had trouble reaching VPN-tunnel IPs.
Fallback: open a UDP socket toward 8.8.8.8 (no packets actually
sent — kernel picks the source IP). Same trick plugin.video.youtube
uses, fine when the host has a single default route.
Last resort: 127.0.0.1. inputstream.adaptive in Kodi 20 was flaky
against loopback but it's the only thing that always exists.
"""
private_prefixes = ("192.168.", "10.")
private_172 = lambda ip: (
ip.startswith("172.")
and len(ip.split(".")) >= 2
and 16 <= int(ip.split(".")[1]) <= 31
)
try:
_, _, all_ips = socket.gethostbyname_ex(socket.gethostname())
for ip in all_ips:
if ip == "127.0.0.1":
continue
if ip.startswith(private_prefixes) or private_172(ip):
return ip
except (socket.gaierror, ValueError, OSError):
pass
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
finally:
s.close()
def _start_mpd_server(mpd_bytes: bytes) -> tuple[str, http.server.HTTPServer]:
"""Spin up a one-shot HTTP server that serves `mpd_bytes` at any path.
Binds to the LAN IP so inputstream.adaptive can fetch via the same code
path it uses for real network URLs. Returns (url, server) — caller is
responsible for `server.shutdown()` once playback ends."""
handler_cls = type(
"_MpdHandlerInstance",
(_MpdHandler,),
{"mpd_bytes": mpd_bytes},
)
lan_ip = _lan_ip()
# Bind to the LAN IP (not 0.0.0.0) so the MPD is reachable only on the
# interface inputstream.adaptive uses to connect, not exposed to every
# device on the LAN. The MPD embeds signed googlevideo segment URLs that
# we don't want walkable from a guest phone or IoT device.
try:
server = http.server.ThreadingHTTPServer((lan_ip, 0), handler_cls)
except OSError:
# LAN-IP bind can fail in rare network states (interface down,
# IP not yet assigned). Fall back to loopback — inputstream.adaptive
# on the same host can still reach it.
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler_cls)
lan_ip = "127.0.0.1"
threading.Thread(target=server.serve_forever, daemon=True).start()
port = server.server_address[1]
url = f"http://{lan_ip}:{port}/manifest.mpd"
_log(f"MPD HTTP server up on {url}")
return url, server
_MIME_CODEC_RE = re.compile(r'codecs=["\']([^"\']+)["\']')
def _codec_from_mime(stream: dict[str, Any]) -> str:
"""Extract the full DASH codec string (avc1.4d4015 etc) from the mime field.
rustypipe gives `codec: "avc1"` (short form) but DASH MPDs need the full
profile/level identifier from `mime: 'video/mp4; codecs="avc1.4d4015"'`.
"""
mime = stream.get("mime", "")
m = _MIME_CODEC_RE.search(mime)
return m.group(1) if m else (stream.get("codec") or "")
def _build_dash_mpd(
details: dict[str, Any],
video_streams: list[dict[str, Any]],
audio_streams: list[dict[str, Any]],
) -> str | None:
"""Build a static MPEG-DASH on-demand manifest from rustypipe's stream data.
Picks H.264 (avc1) video streams up to 1080p — guaranteed to play on the
RPi 4's hardware H.264 decoder. Picks the best AAC (mp4a) audio stream.
Returns the MPD XML, or None if no compatible streams were found.
"""
duration_s = float(details.get("duration") or 0)
if duration_s <= 0:
return None
# Filter video: H.264 only (avc1.*), 720p <= height <= 1080p.
# Floor at 720p so inputstream.adaptive's conservative-start chooser doesn't
# land us on a low-quality rep first. Ceiling at 1080p because that's the
# RPi 4's H.264 hardware-decode sweet spot.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and 720 <= (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
# Fallback: drop the 720p floor if the video has no HD streams at all.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
return None
# Filter audio: AAC preferred, Opus fallback. Pick highest-bitrate of preferred codec.
aac = [s for s in audio_streams if "mp4a" in (s.get("codec") or s.get("mime", ""))]
opus = [s for s in audio_streams if "opus" in (s.get("codec") or s.get("mime", ""))]
audio_pool = aac or opus
if not audio_pool:
return None
best_audio = max(audio_pool, key=lambda s: s.get("bitrate") or 0)
if not (best_audio.get("init_range") and best_audio.get("index_range") and best_audio.get("url")):
return None
# Sort video high → low quality so inputstream.adaptive's default-best picks the top.
h264.sort(key=lambda s: (s.get("height") or 0, s.get("bitrate") or 0), reverse=True)
parts = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="static"',
f' mediaPresentationDuration="PT{duration_s:.3f}S" minBufferTime="PT1.5S"',
' profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">',
" <Period>",
' <AdaptationSet mimeType="video/mp4" startWithSAP="1" segmentAlignment="true">',
]
for v in h264:
codec = _codec_from_mime(v) or "avc1.4d401f"
ir = v["index_range"]
init = v["init_range"]
parts.append(
f' <Representation id="{v["itag"]}" mimeType="video/mp4" codecs="{codec}"'
f' width="{v.get("width", 0)}" height="{v.get("height", 0)}"'
f' bandwidth="{v.get("bitrate", 0)}" frameRate="{int(v.get("fps") or 30)}"'
f' startWithSAP="1">'
)
parts.append(f" <BaseURL>{xml_escape(v['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{ir["start"]}-{ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{init["start"]}-{init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
# Audio adaptation set.
a = best_audio
a_codec = _codec_from_mime(a) or ("mp4a.40.2" if "mp4a" in (a.get("codec") or "") else "opus")
a_mime = "audio/mp4" if "mp4a" in a_codec else "audio/webm"
a_ir = a["index_range"]
a_init = a["init_range"]
parts.append(
f' <AdaptationSet mimeType="{a_mime}" startWithSAP="1" segmentAlignment="true">'
)
# NOTE: NOT setting audioSamplingRate here on purpose. rustypipe doesn't
# expose the sample rate, and hard-coding 44100 caused a ~9% playback-rate
# mismatch (= growing audio-vs-video desync) for content at 48000 Hz.
# inputstream.adaptive reads the actual rate from the audio init segment's
# mdhd box when this attribute is omitted, which is correct for any source.
parts.append(
f' <Representation id="{a["itag"]}" mimeType="{a_mime}" codecs="{a_codec}"'
f' bandwidth="{a.get("bitrate", 0)}" startWithSAP="1">'
)
parts.append(
' <AudioChannelConfiguration'
' schemeIdUri="urn:mpeg:dash:23003:3:audio_channel_configuration:2011"'
f' value="{a.get("channels", 2)}"/>'
)
parts.append(f" <BaseURL>{xml_escape(a['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{a_ir["start"]}-{a_ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{a_init["start"]}-{a_init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
parts.append(" </Period>")
parts.append("</MPD>")
return "\n".join(parts)
def _try_dash(yt_id: str) -> tuple[bytes | None, dict[str, Any]]:
"""Resolve via rustypipe + build DASH MPD. Returns (mpd_bytes, resp).
On any failure returns (None, resp) so the caller can fall back to
the progressive path. We serve the MPD over localhost HTTP because
inputstream.adaptive's libcurl can't open file:// URLs.
"""
try:
resp = _call_sidecar({"op": "resolve_dash", "id": yt_id}, timeout_s=30)
except Exception as e:
_log(f"resolve_dash failed (will fall back): {e}", xbmc.LOGWARNING)
return None, {}
if not resp.get("ok"):
return None, resp
mpd = _build_dash_mpd(
resp.get("details") or {},
resp.get("video_only_streams") or [],
resp.get("audio_streams") or [],
)
if not mpd:
_log("DASH build returned no compatible streams; falling back to progressive")
return None, resp
return mpd.encode("utf-8"), resp
def _delegate_to_pv_youtube(yt_id: str) -> bool:
"""Hand playback off to plugin.video.youtube via its play URL. They have
the proper SegmentTimeline-aware MPD construction (sidx-parsed) that
unlocks HD without the audio-sync drift our naive MPD has. Returns True
if delegation succeeded (Kodi will chain-resolve)."""
if not _pv_youtube_installed():
return False
if not _pv_youtube_service_alive():
_log(
"pv.youtube installed but service httpd not responding — restart Kodi to recover",
xbmc.LOGERROR,
)
try:
xbmcgui.Dialog().notification(
"YouTube service down",
"Restart Kodi to fix playback",
xbmcgui.NOTIFICATION_ERROR,
6000,
)
except Exception:
pass
return False
target = f"plugin://plugin.video.youtube/play/?video_id={yt_id}"
_log(f"delegating playback to plugin.video.youtube: {target}")
li = xbmcgui.ListItem(label=yt_id)
li.setPath(target)
li.setProperty("IsPlayable", "true")
xbmcplugin.setResolvedUrl(_HANDLE, True, li)
return True
def _pv_youtube_installed() -> bool:
"""Check whether plugin.video.youtube is installed + enabled. We don't
enable it ourselves — if the user removed it, we fall back to our own
paths. Log any probe failure so silent degradation to 360p is observable."""
try:
return bool(xbmc.getCondVisibility("System.HasAddon(plugin.video.youtube)"))
except Exception as e:
_log(f"pv.youtube probe failed: {e}", xbmc.LOGWARNING)
return False
def _pv_youtube_service_alive() -> bool:
# Their service.py runs an httpd on 50152 (default) for MPD/segment serving.
# If it isn't listening, every delegated play silently fails with
# "Service IPC - Monitor has not started" in kodi.log and the user just sees
# nothing happen. Probe with a 500ms localhost connect — cheap and reliable.
# Custom-port users may get a false negative; addon then falls back to its
# own paths, so degradation is graceful either way.
for port in (50152, 50153):
try:
with socket.create_connection(("127.0.0.1", port), timeout=0.5):
return True
except OSError:
continue
return False
def _play(yt_id: str) -> None:
"""Resolve playback in order of preference:
1. plugin.video.youtube delegation — HD via their proven DASH MPD with
sidx-parsed SegmentTimeline. Default, when available.
2. Our DASH path — only if `dash_enabled` setting / dash.on marker / env
are set (WIP, partial — see M7 milestone).
3. yt-dlp progressive — last-resort 360p, always works.
SponsorBlock attaches regardless of which path we took.
"""
_log(f"play id={yt_id}")
# Tier 0: delegate to plugin.video.youtube if installed. Our SponsorBlock
# monitor still attaches because xbmc.Player() works regardless of which
# addon initiated playback. If pv.youtube ALSO has SB enabled the user
# might see double skips; rare in practice (pv.youtube's SB is off by
# default and theirs is more conservative).
use_pv_youtube = True
try:
use_pv_youtube = ADDON.getSettingBool("prefer_pv_youtube")
except Exception:
pass
if use_pv_youtube and _delegate_to_pv_youtube(yt_id):
try:
_attach_sponsorblock(yt_id)
except Exception as e:
# Don't let a SB monitor bug pop a 'Plugin error' dialog on the TV
# after a successful delegate-to-pv.youtube hand-off.
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
return
mpd_bytes: bytes | None = None
dash_resp: dict[str, Any] = {}
# DASH path: read setting first; fall back to env-var; OR honor a magic file
# at /storage/.kodi/userdata/addon_data/plugin.video.torttube/dash.on as a
# last-ditch trigger that doesn't depend on Kodi's settings cache.
dash_enabled = False
try:
dash_enabled = ADDON.getSettingBool("dash_enabled")
except Exception:
pass
env_dash = os.environ.get("TORTTUBE_DASH") == "1"
addon_data = ""
try:
import xbmcvfs
addon_data = xbmcvfs.translatePath("special://profile/addon_data/plugin.video.torttube/")
except Exception:
pass
file_dash = bool(addon_data) and os.path.isfile(os.path.join(addon_data, "dash.on"))
_log(f"play id={yt_id} dash_enabled={dash_enabled} env_dash={env_dash} file_dash={file_dash}")
if dash_enabled or env_dash or file_dash:
mpd_bytes, dash_resp = _try_dash(yt_id)
_log(f"_try_dash returned mpd_bytes={'<%d bytes>' % len(mpd_bytes) if mpd_bytes else None}")
if mpd_bytes:
details = dash_resp.get("details") or {}
title = details.get("name")
mpd_url, server = _start_mpd_server(mpd_bytes)
_log(f"resolved via rustypipe DASH, serving manifest at {mpd_url}")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(mpd_url, title))
try:
_attach_sponsorblock(yt_id)
finally:
# Shut down the MPD server cleanly once playback ends or aborts.
# _attach_sponsorblock blocks while playback is active.
try:
server.shutdown()
server.server_close()
_log("MPD HTTP server shut down")
except Exception as e:
_log(f"MPD server shutdown error: {e}", xbmc.LOGWARNING)
return
# Fallback: progressive single-URL via yt-dlp (360p).
try:
resp = _call_sidecar({"op": "resolve_play", "id": yt_id}, timeout_s=45)
except Exception as e:
_log(f"sidecar resolve_play failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"resolve failed: {e}", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
if not resp.get("ok"):
kind = resp.get("kind", "unknown")
err = resp.get("error", "unknown")
_log(f"sidecar returned not-ok: {kind}: {err}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube", f"{kind}: {err}", xbmcgui.NOTIFICATION_WARNING, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
stream_url = resp.get("stream_url")
title = resp.get("title")
if not stream_url:
_log("no usable stream URL in sidecar response", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", "no stream URL", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_log(f"resolved via yt-dlp progressive fallback, playing")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(stream_url, title))
try:
_attach_sponsorblock(yt_id)
except Exception as e:
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
def _attach_sponsorblock(yt_id: str) -> None:
"""Fetch SponsorBlock segments and block on the monitor loop. Always blocks
until playback ends (or 30s if playback never starts) so the caller can
use this as a 'wait for playback to finish' signal — needed to keep the
MPD HTTP server alive throughout playback.
Non-fatal on segment fetch error.
"""
skip_segments: list[dict[str, Any]] = []
try:
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
if sb_resp.get("ok"):
segs = sb_resp.get("segments") or []
skip_segments = [s for s in segs if s.get("actionType") == "skip"]
_log(f"sponsorblock: {len(skip_segments)} skip segments")
except Exception as e:
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
# Always run the watcher even with zero segments — it doubles as the
# 'block until playback ends' signal that gates MPD-server shutdown.
SponsorBlockMonitor(skip_segments).run()
class SponsorBlockMonitor(xbmc.Monitor):
"""Polls Kodi's player position and seeks past each SponsorBlock skip
segment exactly once. Exits on playback stop or Kodi shutdown.
Each segment has shape {"segment": [start_s, end_s], "category": str,
"UUID": str, "actionType": "skip"}. We sort by start time so we can
short-circuit the scan; we also dedupe via the UUID set so the same
segment doesn't trigger twice if the user manually rewinds into it.
"""
POLL_S = 0.5
MAX_WAIT_FOR_PLAYBACK_S = 30.0
def __init__(self, segments: list[dict[str, Any]]):
super().__init__()
self.segments = sorted(
segments, key=lambda s: float(s.get("segment", [0, 0])[0])
)
self.skipped: set[str] = set()
def run(self) -> None:
player = xbmc.Player()
# Wait for playback to actually begin — setResolvedUrl is async,
# Kodi takes a beat to demux + start the streams.
waited = 0.0
while waited < self.MAX_WAIT_FOR_PLAYBACK_S:
if self.abortRequested():
return
if player.isPlaying():
break
if self.waitForAbort(0.25):
return
waited += 0.25
else:
_log("sponsorblock: timed out waiting for playback to start")
return
# Capture the file path that's actually playing now; bail if it changes
# mid-monitor (delegate-to-pv.youtube means our SponsorBlockMonitor is
# alive in our plugin's context while pv.youtube drives playback —
# if the user starts a different video, our skip-segments are stale).
#
try:
initial_file = player.getPlayingFile()
except Exception:
initial_file = ""
while not self.abortRequested() and player.isPlaying():
try:
pos = float(player.getTime())
current_file = player.getPlayingFile()
except Exception:
# getTime raises various exception types when the player goes
# away mid-poll (Kodi shutdown, plugin reload, etc). Wider catch
# so an exception path doesn't escape into _play's finally and
# leak the MPD HTTP server.
return
if initial_file and current_file and current_file != initial_file:
# User started a different video — our skip segments are for the
# old one, abort instead of spurious-skipping the new content.
_log("sponsorblock: playing file changed, monitor exiting")
return
for seg in self.segments:
uuid = seg.get("UUID", "")
if uuid in self.skipped:
continue
start, end = float(seg["segment"][0]), float(seg["segment"][1])
if pos < start:
# Segments are sorted; nothing past here can match either.
break
if pos < end:
duration = end - start
category = seg.get("category", "?")
_log(
f"sponsorblock skip: {category} {start:.1f}-{end:.1f} "
f"({duration:.0f}s)"
)
player.seekTime(end)
self.skipped.add(uuid)
xbmcgui.Dialog().notification(
"SponsorBlock",
f"Skipped {category} ({duration:.0f}s)",
xbmcgui.NOTIFICATION_INFO,
2500,
)
break
if self.waitForAbort(self.POLL_S):
return
def _plugin_url(**kwargs: Any) -> str:
"""Build a `plugin://plugin.video.torttube/?...` URL for nav/play targets."""
return f"plugin://plugin.video.torttube/?{urlencode(kwargs)}"
def _format_duration(seconds: int | float | None) -> str:
"""Format seconds as `H:MM:SS` or `M:SS`. None / negative → empty;
0 is a legitimate duration (livestream pre-roll) so we render '0:00'."""
if seconds is None or seconds < 0:
return ""
s = int(seconds)
h, rem = divmod(s, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
def _format_views(views: int | None) -> str:
"""Format view count compactly: 1234 → '1.2K', 1234567 → '1.2M'. None →
empty; 0 is rendered as '0' (brand-new upload)."""
if views is None or views < 0:
return ""
if views >= 1_000_000_000:
return f"{views / 1_000_000_000:.1f}B"
if views >= 1_000_000:
return f"{views / 1_000_000:.1f}M"
if views >= 1_000:
return f"{views / 1_000:.1f}K"
return str(views)
# Persistence helpers (atomic write + advisory lock).
# Both search history and Watch Later go through these to survive concurrent
# RunPlugin invocations (each is a fresh Python interpreter — load→mutate→save
# races dropped updates without locking) and unclean shutdowns (open(w) truncates
# immediately — Kodi crash mid-write leaves the user's pinned-videos list as a
# zero-byte file that load() silently treats as empty).
def _addon_data_path(filename: str) -> str:
try:
import xbmcvfs
base = xbmcvfs.translatePath("special://profile/addon_data/plugin.video.torttube/")
except Exception:
# Fallback for non-Kodi tests / generic Linux Kodi installs.
base = os.path.join(
os.path.expanduser("~"), ".kodi", "userdata",
"addon_data", "plugin.video.torttube",
)
return os.path.join(base, filename)
def _atomic_write_json(path: str, data: Any) -> None:
"""Write JSON to `path` via a same-filesystem tempfile + os.replace.
Crash-safe: at any moment, `path` either has the old contents or the new
contents — never a torn write."""
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(
dir=os.path.dirname(path), prefix=".tmp.", suffix=".json"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
except Exception:
try:
os.remove(tmp)
except OSError:
pass
raise
def _with_lock(path: str, fn):
"""Run `fn()` under an exclusive advisory lock on a sibling .lock file.
fcntl.flock is per-process — exactly what we need to serialize concurrent
plugin interpreters that are each modifying the same JSON state file."""
lock_path = path + ".lock"
os.makedirs(os.path.dirname(lock_path), exist_ok=True)
with open(lock_path, "w") as lf:
fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
try:
return fn()
finally:
fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
_CHANNEL_ID_RE = re.compile(r"UC[A-Za-z0-9_-]{22}")
def _redact_query(q: str) -> str:
"""Truncate long queries before logging — kodi.log is shared with anyone
who can read /storage/.kodi/temp on the Pi. Search queries aren't
audit-grade secrets but there's no reason to put a 2KB query verbatim
in a log line either."""
return q if len(q) <= 24 else f"{q[:24]}…({len(q)} chars)"
# Search history persistence — stored as a plain JSON list of recent queries.
# Newest first, deduplicated, capped at SEARCH_HISTORY_MAX.
SEARCH_HISTORY_MAX = 12
def _search_history_path() -> str:
return _addon_data_path("search_history.json")
def _load_search_history() -> list[str]:
try:
with open(_search_history_path(), "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [s for s in data if isinstance(s, str)][:SEARCH_HISTORY_MAX]
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
return []
def _save_search_history(items: list[str]) -> None:
"""Atomic, no-lock save. Caller MUST already hold the lock if doing a
load→mutate→save sequence (see _record_search)."""
path = _search_history_path()
try:
_atomic_write_json(path, items[:SEARCH_HISTORY_MAX])
except OSError as e:
_log(f"search history save failed (non-fatal): {e}", xbmc.LOGWARNING)
def _record_search(query: str) -> None:
q = " ".join(query.split()) # collapse whitespace
if not q:
return
path = _search_history_path()
def _do() -> None:
history = _load_search_history()
# Dedupe case-insensitively, keep newest at the front.
history = [h for h in history if h.lower() != q.lower()]
history.insert(0, q)
_save_search_history(history)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"search history lock failed (non-fatal): {e}", xbmc.LOGWARNING)
def _clear_search_history() -> None:
path = _search_history_path()
def _do() -> None:
try:
os.remove(path)
except (FileNotFoundError, OSError):
pass
try:
_with_lock(path, _do)
except OSError as e:
_log(f"search history clear failed (non-fatal): {e}", xbmc.LOGWARNING)
# Watch Later — user-curated list of saved videos. The anti-algorithm answer
# to YouTube's recommendation cancer: you decide what comes back. Items are
# stored as {id, name, channel, duration, thumbnail} dicts so we don't need
# to re-fetch metadata when rendering the list. Newest first, no cap.
WATCH_LATER_MAX = 500
def _watch_later_path() -> str:
return _addon_data_path("watch_later.json")
def _load_watch_later() -> list[dict[str, Any]]:
try:
with open(_watch_later_path(), "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [
d for d in data
if isinstance(d, dict) and isinstance(d.get("id"), str)
][:WATCH_LATER_MAX]
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
return []
def _save_watch_later(items: list[dict[str, Any]]) -> None:
"""Atomic, no-lock save. Caller MUST hold the lock (see _add_to / _remove_from)."""
path = _watch_later_path()
try:
_atomic_write_json(path, items[:WATCH_LATER_MAX])
except OSError as e:
_log(f"watch later save failed (non-fatal): {e}", xbmc.LOGWARNING)
def _add_to_watch_later(item: dict[str, Any]) -> bool:
"""Pin a video to the local Watch Later list. Returns True if the list
was already at WATCH_LATER_MAX before this call (caller can surface a
'oldest dropped' notification —"""
yt_id = item.get("id")
if not yt_id or not _ID_RE.fullmatch(str(yt_id)):
return False
path = _watch_later_path()
was_full = False
def _do() -> None:
nonlocal was_full
items = _load_watch_later()
items = [i for i in items if i.get("id") != yt_id]
if len(items) >= WATCH_LATER_MAX:
was_full = True
items.insert(0, item)
_save_watch_later(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"watch later add lock failed (non-fatal): {e}", xbmc.LOGWARNING)
return was_full
def _refresh_watch_later_item(yt_id: str, fresh: dict[str, Any]) -> bool:
"""Replace one item's metadata in place. Returns True if it was found."""
if not _ID_RE.fullmatch(str(yt_id)):
return False
path = _watch_later_path()
replaced = False
def _do() -> None:
nonlocal replaced
items = _load_watch_later()
for i, it in enumerate(items):
if it.get("id") == yt_id:
items[i] = fresh
replaced = True
break
if replaced:
_save_watch_later(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"watch later refresh lock failed (non-fatal): {e}", xbmc.LOGWARNING)
return replaced
def _remove_from_watch_later(yt_id: str) -> bool:
"""Returns True if an item was actually removed (False = no-op)."""
path = _watch_later_path()
removed = False
def _do() -> None:
nonlocal removed
items = _load_watch_later()
new_items = [i for i in items if i.get("id") != yt_id]
if len(new_items) != len(items):
removed = True
_save_watch_later(new_items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"watch later remove lock failed (non-fatal): {e}", xbmc.LOGWARNING)
return removed
# Subscriptions — NewPipe-style offline subs. The user adds a channel to a
# local list; the Subscriptions Feed merges latest videos from each into a
# chronological feed. No YouTube account, no recommendation algorithm —
# you decide what feeds back. Same atomic-write + lock infra as WL/history.
SUBSCRIPTIONS_MAX = 500
def _subscriptions_path() -> str:
return _addon_data_path("subscriptions.json")
def _load_subscriptions() -> list[dict[str, Any]]:
try:
with open(_subscriptions_path(), "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [
d for d in data
if isinstance(d, dict) and isinstance(d.get("id"), str)
][:SUBSCRIPTIONS_MAX]
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
return []
def _save_subscriptions(items: list[dict[str, Any]]) -> None:
path = _subscriptions_path()
try:
_atomic_write_json(path, items[:SUBSCRIPTIONS_MAX])
except OSError as e:
_log(f"subscriptions save failed (non-fatal): {e}", xbmc.LOGWARNING)
def _subscribe(channel_id: str, channel_name: str = "") -> bool:
"""Add a channel to the local subscriptions list. Returns True if new."""
if not channel_id:
return False
path = _subscriptions_path()
new_sub = False
def _do() -> None:
nonlocal new_sub
items = _load_subscriptions()
if any(i.get("id") == channel_id for i in items):
return
new_sub = True
items.insert(0, {"id": channel_id, "name": channel_name or channel_id})
_save_subscriptions(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"subscribe lock failed (non-fatal): {e}", xbmc.LOGWARNING)
return new_sub
def _unsubscribe(channel_id: str) -> None:
path = _subscriptions_path()
def _do() -> None:
items = _load_subscriptions()
items = [i for i in items if i.get("id") != channel_id]
_save_subscriptions(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"unsubscribe lock failed (non-fatal): {e}", xbmc.LOGWARNING)
def _pick_thumbnail(thumbs: Any) -> str:
"""Pick a thumbnail URL from whatever shape rustypipe / yt-dlp / our own
persisted Watch Later records hand us.
rustypipe `Player.details.thumbnail` is a single URL **string**, while
`VideoItem.thumbnail` is a `list[dict]`. The original implementation
crashed on the string form (`max(str, ...)` iterates chars and calls
`.get` on each — `AttributeError`). Watch Later persists details-shaped
items, so this is a live crash-on-render once any video is pinned.
"""
if not thumbs:
return ""
if isinstance(thumbs, str):
return thumbs
if isinstance(thumbs, dict):
return thumbs.get("url", "") or ""
if isinstance(thumbs, list):
dict_thumbs = [t for t in thumbs if isinstance(t, dict)]
if not dict_thumbs:
return ""
best = max(
dict_thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0)
)
return best.get("url", "") or ""
return ""
def _root_directory() -> None:
"""Top-level addon menu. Search lands first because that's how most TV-side
YouTube use actually works. Remote-control via JSON-RPC still works."""
items: list[tuple[str, str, str]] = [
("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")),
("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")),
]
subs = _load_subscriptions()
if subs:
items.append(
(
f"Subscriptions Feed ({len(subs)})",
"DefaultRecentlyAddedEpisodes.png",
_plugin_url(action="subs_feed"),
)
)
items.append(
(
"Subscriptions (channel list)",
"DefaultArtist.png",
_plugin_url(action="subs"),
)
)
wl = _load_watch_later()
if wl:
items.append(
(
f"Watch Later ({len(wl)})",
"DefaultPlaylist.png",
_plugin_url(action="watch_later"),
)
)
else:
items.append(
("Watch Later", "DefaultPlaylist.png", _plugin_url(action="watch_later"))
)
history = _load_search_history()
if history:
items.append(
(
"[I]Recent searches[/I]",
"DefaultAddonsRecentlyUpdated.png",
_plugin_url(action="recent"),
)
)
for label, icon, url in items:
li = xbmcgui.ListItem(label=label)
li.setArt({"icon": icon})
xbmcplugin.addDirectoryItem(_HANDLE, url, li, isFolder=True)
# Don't cache the root — it might gain/lose 'Recent searches' between visits.
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _recent_directory() -> None:
"""Show stored recent searches as quick-pick items + a 'Clear history' tail."""
history = _load_search_history()
if not history:
xbmcgui.Dialog().notification(
"torttube",
"No recent searches yet.",
xbmcgui.NOTIFICATION_INFO,
2500,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
for q in history:
li = xbmcgui.ListItem(label=q)
li.setArt({"icon": "DefaultAddonsSearch.png"})
li.addContextMenuItems(
[
(
"Clear history",
f"Container.Update({_plugin_url(action='clear_history')})",
)
]
)
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="search", q=q), li, isFolder=True
)
# Tail entry: clear all history.
clear_li = xbmcgui.ListItem(label="[B]Clear search history[/B]")
clear_li.setArt({"icon": "DefaultAddonNone.png"})
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="clear_history"), clear_li, isFolder=True
)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _clear_history_action() -> None:
_clear_search_history()
xbmcgui.Dialog().notification(
"torttube", "Search history cleared", xbmcgui.NOTIFICATION_INFO, 2000
)
# succeeded=False so Kodi doesn't enter an empty directory before the
# Container.Update lands — avoids the half-tick flicker the audit caught.
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
xbmc.executebuiltin("Container.Update(plugin://plugin.video.torttube/,replace)")
def _watch_later_directory() -> None:
"""Browse the user's curated Watch Later list."""
items = _load_watch_later()
if not items:
xbmcgui.Dialog().notification(
"torttube",
"Watch Later is empty. Right-click any video to add.",
xbmcgui.NOTIFICATION_INFO,
3500,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
_add_video_items(items, in_watch_later=True)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _resolve_video_metadata(yt_id: str) -> dict[str, Any]:
"""Fetch fresh metadata for a video id via the sidecar resolve op.
Returns {id, name, channel, duration, thumbnail}. On error returns
the id-only fallback dict so callers still have something to persist."""
item: dict[str, Any] = {"id": yt_id}
try:
resp = _call_sidecar({"op": "resolve", "id": yt_id}, timeout_s=15)
if resp.get("ok"):
details = resp.get("details") or {}
item = {
"id": yt_id,
"name": details.get("name") or details.get("title") or yt_id,
"channel": {
"id": details.get("channel_id"),
"name": details.get("channel_name"),
},
"duration": details.get("duration"),
"thumbnail": details.get("thumbnail"),
}
except Exception as e:
_log(f"video metadata fetch failed (id-only fallback): {e}", xbmc.LOGWARNING)
return item
def _wl_add_action() -> None:
"""RunPlugin handler: add a video to Watch Later.
Called from the context-menu Container action, so we don't have full
item metadata in the URL. We do a sidecar `resolve` to fetch metadata
fresh — slower than caching the item from the listing, but reliable
and works across navigation paths.
"""
params = dict(parse_qsl(_QS.lstrip("?")))
try:
yt_id = _validate_id(params.get("id"))
except ValueError as e:
xbmcgui.Dialog().notification("torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 3000)
return
item = _resolve_video_metadata(yt_id)
was_full = _add_to_watch_later(item)
if was_full:
xbmcgui.Dialog().notification(
"torttube",
f"Watch Later at cap ({WATCH_LATER_MAX}) — dropped oldest",
xbmcgui.NOTIFICATION_WARNING,
3500,
)
else:
xbmcgui.Dialog().notification(
"torttube",
f"Added to Watch Later: {item.get('name') or yt_id}",
xbmcgui.NOTIFICATION_INFO,
2500,
)
def _wl_refresh_action() -> None:
"""RunPlugin handler: re-fetch fresh metadata for a single WL item.
Channel renames + thumbnail rotation + title edits get picked up.
"""
params = dict(parse_qsl(_QS.lstrip("?")))
try:
yt_id = _validate_id(params.get("id"))
except ValueError as e:
xbmcgui.Dialog().notification("torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 3000)
return
fresh = _resolve_video_metadata(yt_id)
if not _refresh_watch_later_item(yt_id, fresh):
xbmcgui.Dialog().notification(
"torttube", "Item not in Watch Later", xbmcgui.NOTIFICATION_WARNING, 2500
)
return
xbmcgui.Dialog().notification(
"torttube",
f"Refreshed: {fresh.get('name') or yt_id}",
xbmcgui.NOTIFICATION_INFO,
2500,
)
container_path = xbmc.getInfoLabel("Container.FolderPath") or ""
if "action=watch_later" in container_path:
xbmc.executebuiltin("Container.Refresh")
def _subscribe_action() -> None:
"""RunPlugin handler: add a channel to subscriptions."""
params = dict(parse_qsl(_QS.lstrip("?")))
channel_id = params.get("id") or ""
channel_name = params.get("name") or ""
if not channel_id:
xbmcgui.Dialog().notification(
"torttube", "missing channel id", xbmcgui.NOTIFICATION_ERROR, 3000
)
return
is_new = _subscribe(channel_id, channel_name)
msg = (
f"Subscribed to {channel_name or channel_id}"
if is_new
else f"Already subscribed to {channel_name or channel_id}"
)
xbmcgui.Dialog().notification("torttube", msg, xbmcgui.NOTIFICATION_INFO, 2500)
def _unsubscribe_action() -> None:
"""RunPlugin handler: remove a channel from subscriptions."""
params = dict(parse_qsl(_QS.lstrip("?")))
channel_id = params.get("id") or ""
if not channel_id:
return
_unsubscribe(channel_id)
xbmcgui.Dialog().notification(
"torttube", "Unsubscribed", xbmcgui.NOTIFICATION_INFO, 2000
)
container_path = xbmc.getInfoLabel("Container.FolderPath") or ""
if "action=subs" in container_path:
xbmc.executebuiltin("Container.Refresh")
def _subscriptions_directory() -> None:
"""Channel-list view: shows every subscribed channel as a folder you
can drill into. Unsubscribe is in each channel's context menu."""
subs = _load_subscriptions()
if not subs:
xbmcgui.Dialog().notification(
"torttube",
"No subscriptions yet. Right-click a video and 'Subscribe to <channel>'.",
xbmcgui.NOTIFICATION_INFO,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
xbmcplugin.setContent(_HANDLE, "files")
for s in subs:
cid = s.get("id") or ""
cname = s.get("name") or cid
li = xbmcgui.ListItem(label=cname)
li.setArt({"icon": "DefaultArtist.png"})
li.addContextMenuItems(
[
(
f"Unsubscribe from {cname}",
f"RunPlugin({_plugin_url(action='unsubscribe', id=cid)})",
)
]
)
xbmcplugin.addDirectoryItem(
_HANDLE,
_plugin_url(action="channel", id=cid),
li,
isFolder=True,
)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _subscriptions_feed_directory() -> None:
"""The killer NewPipe-style feed: latest videos across all subscribed
channels, merged + sorted newest-first."""
subs = _load_subscriptions()
if not subs:
xbmcgui.Dialog().notification(
"torttube",
"No subscriptions yet. Subscribe to a channel first.",
xbmcgui.NOTIFICATION_INFO,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
channel_ids = [s.get("id") for s in subs if s.get("id")]
try:
# Generous timeout because we're fanning out to all subscribed
# channels — N rustypipe round-trips in parallel.
resp = _call_sidecar(
{"op": "subscriptions_feed", "channel_ids": channel_ids, "per_channel": 8, "limit": 60},
timeout_s=60,
)
except Exception as e:
_log(f"subs_feed failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"feed failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"feed: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
failed = resp.get("channels_failed") or []
if failed:
_log(f"subs_feed: {len(failed)} channel(s) failed: {failed[:3]}", xbmc.LOGWARNING)
_log(f"subs_feed: {len(items)} items across {len(channel_ids)} channels")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _wl_remove_action() -> None:
"""RunPlugin handler: remove a video from Watch Later."""
params = dict(parse_qsl(_QS.lstrip("?")))
try:
yt_id = _validate_id(params.get("id"))
except ValueError as e:
xbmcgui.Dialog().notification("torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 3000)
return
removed = _remove_from_watch_later(yt_id)
if removed:
msg = "Removed from Watch Later"
else:
msg = "Item was not in Watch Later"
xbmcgui.Dialog().notification(
"torttube", msg, xbmcgui.NOTIFICATION_INFO, 2000
)
# Refresh ONLY if we're currently inside the Watch Later listing. If the
# user invoked remove from somewhere else (future refactor, stale
# context-menu state), refreshing the wrong container is jarring.
#
container_path = xbmc.getInfoLabel("Container.FolderPath") or ""
if "action=watch_later" in container_path:
xbmc.executebuiltin("Container.Refresh")
def _add_video_items(items: list[dict[str, Any]], *, in_watch_later: bool = False) -> None:
"""Add VideoItem dicts to the current plugin directory, formatted for Kodi.
Each item gets a play-action plugin URL, channel + duration + view-count
metadata in the label, thumbnail art, video InfoLabels for skin support,
and context-menu entries: 'Go to channel' and either 'Add to Watch Later'
or 'Remove from Watch Later' depending on `in_watch_later`.
"""
xbmcplugin.setContent(_HANDLE, "videos")
for item in items:
yt_id = item.get("id") or ""
if not yt_id:
continue
name = item.get("name") or "(no title)"
channel = item.get("channel") or {}
if isinstance(channel, dict):
channel_name = channel.get("name") or ""
channel_id = channel.get("id") or ""
else:
channel_name = ""
channel_id = ""
duration = item.get("duration")
views = item.get("view_count")
# Label: "Title · Channel · Duration · ViewCount"
meta_bits = [b for b in (channel_name, _format_duration(duration), _format_views(views)) if b]
label = f"{name} · {' · '.join(meta_bits)}" if meta_bits else name
li = xbmcgui.ListItem(label=label)
li.setProperty("IsPlayable", "true")
thumb_url = _pick_thumbnail(item.get("thumbnail"))
if thumb_url:
li.setArt({"thumb": thumb_url, "poster": thumb_url, "fanart": thumb_url})
info: dict[str, Any] = {"title": name, "mediatype": "video"}
if duration:
info["duration"] = int(duration)
if channel_name:
info["studio"] = channel_name
if item.get("description"):
info["plot"] = item["description"]
try:
li.setInfo("video", info)
except Exception:
pass
# Context menu: jump to channel listing + Watch Later + subscribe.
ctx: list[tuple[str, str]] = []
subscribed_ids = {s.get("id") for s in _load_subscriptions()}
# Defense-in-depth: drop a channel_id that doesn't match YouTube's
# canonical shape ('UC' + 22 chars). urlencode percent-encodes
# everything that could break our RunPlugin / Container.Update
# parsers, but if rustypipe ever hands us a garbage channel_id we
# don't want to render a 'Go to channel' entry that points nowhere.
#
if channel_id and not _CHANNEL_ID_RE.fullmatch(channel_id):
channel_id = ""
if channel_id:
ctx.append(
(
f"Go to {channel_name or 'channel'}",
f"Container.Update({_plugin_url(action='channel', id=channel_id)})",
)
)
if channel_id in subscribed_ids:
ctx.append(
(
f"Unsubscribe from {channel_name or 'channel'}",
f"RunPlugin({_plugin_url(action='unsubscribe', id=channel_id)})",
)
)
else:
ctx.append(
(
f"Subscribe to {channel_name or 'channel'}",
f"RunPlugin({_plugin_url(action='subscribe', id=channel_id, name=channel_name or '')})",
)
)
if in_watch_later:
ctx.append(
(
"Refresh metadata",
f"RunPlugin({_plugin_url(action='wl_refresh', id=yt_id)})",
)
)
ctx.append(
(
"Remove from Watch Later",
f"RunPlugin({_plugin_url(action='wl_remove', id=yt_id)})",
)
)
else:
ctx.append(
(
"Add to Watch Later",
f"RunPlugin({_plugin_url(action='wl_add', id=yt_id)})",
)
)
if ctx:
li.addContextMenuItems(ctx)
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="play", id=yt_id), li, isFolder=False
)
def _search_directory(query: str | None = None) -> None:
"""Hit sidecar `search`, list results as playable items.
`query` arg: if provided (via `?action=search&q=...`), skip the keyboard
prompt — used by JSON-RPC clients (phones, share-to-TV) and tests.
"""
if not query:
query = xbmcgui.Dialog().input("Search YouTube")
if not query:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
resp = _call_sidecar({"op": "search", "query": query, "limit": 30}, timeout_s=25)
except Exception as e:
_log(f"search failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"search failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
_log(f"search not-ok: {resp.get('error')}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube",
f"search: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
_log(f"search '{_redact_query(query)}'{len(items)} items")
_record_search(query)
_add_video_items(items)
# cacheToDisc=False so the user's next search isn't shadowed by the
# previous query's cached result-set.
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _playlist_directory(playlist_id: str) -> None:
"""List a playlist's videos."""
try:
resp = _call_sidecar(
{"op": "playlist", "id": playlist_id, "limit": 100}, timeout_s=25
)
except Exception as e:
_log(f"playlist failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"playlist failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"playlist: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
pl = resp.get("playlist") or {}
_log(f"playlist {pl.get('name') or playlist_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _channel_directory(channel_id: str) -> None:
"""List a channel's recent videos."""
try:
resp = _call_sidecar(
{"op": "channel_videos", "id": channel_id, "limit": 50}, timeout_s=25
)
except Exception as e:
_log(f"channel_videos failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"channel failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"channel: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
ch = resp.get("channel") or {}
_log(f"channel {ch.get('name') or channel_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _play_by_url_prompt() -> None:
"""Manual URL/ID entry for tap-to-play from the Kodi UI."""
s = xbmcgui.Dialog().input("Paste YouTube URL or ID")
if not s:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
yt_id = _extract_id(s)
except ValueError as e:
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
# Open via Player.Open-equivalent: run the plugin URL through Kodi
# rather than calling _play() in directory context.
xbmc.executebuiltin(f"PlayMedia({_plugin_url(action='play', id=yt_id)})")
xbmcplugin.endOfDirectory(_HANDLE, succeeded=True)
def main() -> None:
params = dict(parse_qsl(_QS.lstrip("?")))
action = params.get("action")
if action == "play":
try:
yt_id = _extract_id(params.get("url") or params.get("id") or "")
except ValueError as e:
_log(str(e), xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_play(yt_id)
elif action == "search":
_search_directory(query=params.get("q"))
elif action == "channel":
_channel_directory(params.get("id") or "")
elif action == "playlist":
_playlist_directory(params.get("id") or "")
elif action == "play_by_url":
_play_by_url_prompt()
elif action == "recent":
_recent_directory()
elif action == "clear_history":
_clear_history_action()
elif action == "watch_later":
_watch_later_directory()
elif action == "wl_add":
_wl_add_action()
elif action == "wl_remove":
_wl_remove_action()
elif action == "wl_refresh":
_wl_refresh_action()
elif action == "subs":
_subscriptions_directory()
elif action == "subs_feed":
_subscriptions_feed_directory()
elif action == "subscribe":
_subscribe_action()
elif action == "unsubscribe":
_unsubscribe_action()
else:
_root_directory()
if __name__ == "__main__":
main()