torttube/addon/plugin.video.torttube/main.py
Kayos cb690276d1 Audit fix sprint — CRIT-1, CRIT-2, HIGH-1..7, MED-1,2,7, LOW-1
Full Opus-max adversarial audit found 2 CRIT, 7 HIGH, 9 MED, 9 LOW.
This commit lands all CRIT, all 7 HIGH, the highest-impact MEDs, and
the addon-manifest LOW. The remaining MED/LOW items are cosmetic or
defensive polish — captured in the audit report but not blocking.

CRIT-1: _attach_sponsorblock had a duplicate block stacked on top of
itself. After a video ended the second monitor instantiated, blocked
30s waiting for a player that wasn't there, logged 'timed out' and
returned. Net effect on the family-room TV: Kodi froze for 30s after
every single video. Deleted the duplicate (lines 512–530).

CRIT-2: MPD HTTP server bound to 0.0.0.0 + emitted
Access-Control-Allow-Origin: *. The MPD embeds signed googlevideo
segment URLs — anything on the LAN (guest phones, IoT, malicious sites
loaded in any browser) could scrape the manifest and grab those URLs.
Bind to the LAN IP only with a 127.0.0.1 fallback if that fails; drop
the gratuitous CORS header. Setting is off by default so the live
exposure was small, but locked down before M7 ever flips it on.

HIGH-1: _extract_id accepted arbitrary 'v=' query values without
validating the 11-char [A-Za-z0-9_-] shape. New _validate_id helper +
strict netloc-suffix matching (was: 'youtube.com' in netloc, which let
'myyoutube.com.evil.example' through). Every branch now pipes through
_validate_id. Same shape enforced in the sidecar via
validate_youtube_id() at every op entry point — defense in depth.

HIGH-2: _call_sidecar parser now picks the LAST non-empty stdout line
(robust against future stray println!/log lines), with a dedicated
JSONDecodeError catch that returns a clear 'sidecar stdout was not
JSON: <repr>' rather than a confusing 'sidecar exited 0' message.

HIGH-3: _pv_youtube_installed() now logs the probe exception before
returning False, so silent degradation to 360p is observable in
kodi.log instead of mysterious.

HIGH-4: SponsorBlockMonitor.run()'s getTime() catch widened from
(RuntimeError, OSError) to bare Exception — historically Kodi has
thrown other types when the player goes stale mid-poll, and we don't
want one of those to escape past _play's finally block and leak the
MPD HTTP server.

HIGH-5: Wrapped _attach_sponsorblock in try/except at both call sites
(pv.youtube delegate path + progressive fallback) so a SB monitor bug
can't pop a 'Plugin error' dialog on the TV after successful playback.

HIGH-6: Sidecar Search/ChannelVideos/Playlist now clamp 'limit' at
MAX_LIMIT=200. Prevents an unbounded limit=u32::MAX from OOMing Kodi
on a malicious or buggy addon request.

HIGH-7: Sidecar Rip op now requires dest_dir to be a prefix of an
allowlist (/storage/.kodi/temp/ or
/storage/.kodi/userdata/addon_data/plugin.video.torttube/). Op is
dormant today (no Python caller), but the protocol was a wide-open
arbitrary-write primitive.

MED-1: Bumped search / channel / playlist sidecar timeouts from 15s
to 25s — first-search-after-Kodi-boot on a slow LAN routinely hit 8s+
of rustypipe TLS-handshake + Innertube initial parse.

MED-2: _resolved_listitem now checks urlparse(url).path.endswith('.mpd'
/'.m3u8') instead of substring scan of the whole URL — yt-dlp URLs
have base64 blobs in the query string that can accidentally contain
those substrings.

MED-7: Error classifiers (classify_yt_dlp_error, classify_rustypipe_error)
now match on word-level patterns ('private video', 'age-restrict',
'region-restrict' etc.) instead of bare substrings — fixes
'private network' in a TLS error misclassifying as PrivateVideo,
'package' triggering AgeRestricted, etc.

LOW-1: addon.xml's plugin.video.youtube dep marked optional='true' —
our _pv_youtube_installed() check and fallback paths assumed
optionality; the manifest now matches.

Addon v0.0.13. Verified live via two browse-only smokes:
  - Sidecar bad-id rejection ('../../etc/passwd'):
    {ok:false, error:'invalid youtube id (length 16 != 11)', kind:'bad_request'}
  - Files.GetDirectory ?q=cat: 12 results, formatted labels intact.

NOT VERIFIED VIA PLAYBACK (Leia was watching the TV).

Remaining MED/LOW items (mostly cosmetic): MED-3 endOfDirectory
cacheToDisc, MED-4 0-is-falsy in _format_duration/_format_views, MED-5
codec regex single-quote, MED-6 Response::Ok ok-clobber, MED-8 SB
response-size cap, MED-9 thumbnail dict type check, LOW-2..LOW-9
defensive polish. Tracked in audit report; can hit in a follow-up
sprint.
2026-05-23 12:28:35 -07:00

892 lines
34 KiB
Python

# torttube — Kodi YouTube addon
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Entry point. Plugin URL shapes handled:
plugin://plugin.video.torttube/ (root: M3 will add browse UI)
plugin://plugin.video.torttube/?action=play&id=ID (resolve + play a YouTube ID)
plugin://plugin.video.torttube/?action=play&url=URL (extract ID from URL, then play)
Remote-control / share-to-TV pattern (Kodi JSON-RPC):
POST http://<rpi>:8080/jsonrpc
Basic auth (kodi user)
{"jsonrpc":"2.0","method":"Player.Open","params":{
"item":{"file":"plugin://plugin.video.torttube/?action=play&id=<id>"}}}
That's how Android / phone / "send to TV" flows hand off — Kodi already
exposes the endpoint, we just need to register the plugin URL.
"""
import http.server
import json
import os
import re
import socket
import subprocess
import sys
import threading
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse
from xml.sax.saxutils import escape as xml_escape
import xbmc
import xbmcaddon
import xbmcgui
import xbmcplugin
ADDON = xbmcaddon.Addon()
ADDON_PATH = ADDON.getAddonInfo("path")
SIDECAR_BIN = os.path.join(ADDON_PATH, "bin", "torttube-sidecar")
_HANDLE = int(sys.argv[1]) if len(sys.argv) > 1 else -1
_QS = sys.argv[2] if len(sys.argv) > 2 else ""
def _log(msg, level=xbmc.LOGINFO):
xbmc.log(f"[torttube] {msg}", level)
_ID_RE = re.compile(r"[A-Za-z0-9_-]{11}")
def _validate_id(candidate: str | None) -> str:
"""Return the candidate iff it matches YouTube's 11-char id shape; raise otherwise.
Every branch of _extract_id MUST pipe its return value through this so a
bogus `v=` query, an empty youtu.be path, or a malformed share link doesn't
leak into _call_sidecar -> yt-dlp / rustypipe / log lines."""
if candidate and _ID_RE.fullmatch(candidate):
return candidate
raise ValueError(f"not a valid YouTube id: {candidate!r}")
def _extract_id(url_or_id: str) -> str:
"""Accept either a bare ID or any common YouTube URL form, return the ID.
Every accepted form is validated through _validate_id so we never hand
untrusted text to the sidecar."""
s = url_or_id.strip()
# Bare 11-char ID (YouTube's canonical length).
if _ID_RE.fullmatch(s):
return s
parsed = urlparse(s)
# https://youtu.be/<id> (strict suffix match — avoid `myyoutu.be` etc.)
if parsed.netloc == "youtu.be" or parsed.netloc.endswith(".youtu.be"):
return _validate_id(parsed.path.lstrip("/").split("/")[0])
# https://www.youtube.com/watch?v=<id> (strict suffix match)
if parsed.netloc == "youtube.com" or parsed.netloc.endswith(".youtube.com"):
for k, v in parse_qsl(parsed.query):
if k == "v":
return _validate_id(v)
# /shorts/<id>, /embed/<id>, /live/<id>
m = re.match(r"^/(shorts|embed|live)/([A-Za-z0-9_-]{11})", parsed.path)
if m:
return _validate_id(m.group(2))
raise ValueError(f"could not extract YouTube id from {url_or_id!r}")
def _call_sidecar(request: dict, timeout_s: int = 30) -> dict:
"""Invoke the sidecar with one JSON request, parse one JSON response.
Puts the addon's bin/ on PATH so the sidecar's `yt-dlp` shell-outs find
the bundled zipapp (LibreELEC has no system yt-dlp).
"""
if not os.path.isfile(SIDECAR_BIN):
raise RuntimeError(f"sidecar binary missing at {SIDECAR_BIN}")
if not os.access(SIDECAR_BIN, os.X_OK):
raise RuntimeError(f"sidecar binary not executable at {SIDECAR_BIN}")
env = os.environ.copy()
addon_bin = os.path.join(ADDON_PATH, "bin")
env["PATH"] = addon_bin + os.pathsep + env.get("PATH", "")
proc = subprocess.run(
[SIDECAR_BIN],
input=(json.dumps(request) + "\n").encode("utf-8"),
capture_output=True,
timeout=timeout_s,
env=env,
)
if proc.returncode != 0:
raise RuntimeError(
f"sidecar exited {proc.returncode}: {proc.stderr.decode('utf-8', 'replace')[:500]}"
)
# The sidecar writes its JSON response as the LAST line of stdout (after any
# log lines it might have written via println!/eprintln/tracing if the route
# config ever changes). Pick the last non-empty line and decode robustly.
last_line: bytes | None = None
for line in proc.stdout.splitlines():
line = line.strip()
if line:
last_line = line
if last_line is None:
raise RuntimeError("sidecar produced no response")
try:
return json.loads(last_line.decode("utf-8", errors="replace"))
except json.JSONDecodeError as e:
raise RuntimeError(
f"sidecar stdout was not JSON: {e}; line={last_line[:200]!r}"
) from e
def _resolved_listitem(stream_url: str, title: str | None) -> xbmcgui.ListItem:
li = xbmcgui.ListItem(label=title or "torttube")
li.setPath(stream_url)
li.setProperty("IsPlayable", "true")
# Tell inputstream.adaptive to handle DASH/HLS based on URL path/suffix.
# Check the URL *path* extension, not a substring of the whole URL —
# progressive yt-dlp URLs have base64 blobs in the query string that
# can accidentally contain ".mpd" or ".m3u8".
parsed = urlparse(stream_url)
if parsed.path.endswith(".mpd"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "mpd")
# googlevideo rejects segment GETs that don't carry an Origin/Referer
# from www.youtube.com — 403 Forbidden otherwise. Same Mozilla UA
# rustypipe / yt-dlp use when minting the URL.
ytdl_ua = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
seg_headers = (
f"User-Agent={ytdl_ua}"
"&Origin=https://www.youtube.com"
"&Referer=https://www.youtube.com/"
)
li.setProperty("inputstream.adaptive.stream_headers", seg_headers)
li.setProperty("inputstream.adaptive.manifest_headers", seg_headers)
elif parsed.path.endswith(".m3u8"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "hls")
return li
class _MpdHandler(http.server.BaseHTTPRequestHandler):
"""Serves the per-video MPD file. The bytes come from the closure-captured
`_MPD_BYTES` so the server can outlive the temp file if the addon decides
to clean up early. One handler per HTTPServer instance."""
mpd_bytes: bytes = b""
def do_GET(self) -> None: # noqa: N802 — http.server convention
self.send_response(200)
self.send_header("Content-Type", "application/dash+xml")
self.send_header("Content-Length", str(len(self.mpd_bytes)))
self.end_headers()
self.wfile.write(self.mpd_bytes)
def log_message(self, *args: Any, **kwargs: Any) -> None:
# Silence the default request log — Kodi's log is verbose enough.
return
def _has_setting(setting_id: str) -> bool:
"""Best-effort check that a setting exists in resources/settings.xml.
Returns False if the lookup throws (older builds, missing schema)."""
try:
ADDON.getSetting(setting_id)
return True
except Exception:
return False
def _lan_ip() -> str:
"""Detect this host's LAN IP by opening a UDP socket toward an external
address (no packets actually sent — just lets the kernel pick the source IP).
plugin.video.youtube uses this same trick because inputstream.adaptive's
libcurl in Kodi 20 has trouble fetching from `127.0.0.1` reliably."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
finally:
s.close()
def _start_mpd_server(mpd_bytes: bytes) -> tuple[str, http.server.HTTPServer]:
"""Spin up a one-shot HTTP server that serves `mpd_bytes` at any path.
Binds to the LAN IP so inputstream.adaptive can fetch via the same code
path it uses for real network URLs. Returns (url, server) — caller is
responsible for `server.shutdown()` once playback ends."""
handler_cls = type(
"_MpdHandlerInstance",
(_MpdHandler,),
{"mpd_bytes": mpd_bytes},
)
lan_ip = _lan_ip()
# Bind to the LAN IP (not 0.0.0.0) so the MPD is reachable only on the
# interface inputstream.adaptive uses to connect, not exposed to every
# device on the LAN. The MPD embeds signed googlevideo segment URLs that
# we don't want walkable from a guest phone or IoT device.
try:
server = http.server.ThreadingHTTPServer((lan_ip, 0), handler_cls)
except OSError:
# LAN-IP bind can fail in rare network states (interface down,
# IP not yet assigned). Fall back to loopback — inputstream.adaptive
# on the same host can still reach it.
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler_cls)
lan_ip = "127.0.0.1"
threading.Thread(target=server.serve_forever, daemon=True).start()
port = server.server_address[1]
url = f"http://{lan_ip}:{port}/manifest.mpd"
_log(f"MPD HTTP server up on {url}")
return url, server
_MIME_CODEC_RE = re.compile(r'codecs="([^"]+)"')
def _codec_from_mime(stream: dict[str, Any]) -> str:
"""Extract the full DASH codec string (avc1.4d4015 etc) from the mime field.
rustypipe gives `codec: "avc1"` (short form) but DASH MPDs need the full
profile/level identifier from `mime: 'video/mp4; codecs="avc1.4d4015"'`.
"""
mime = stream.get("mime", "")
m = _MIME_CODEC_RE.search(mime)
return m.group(1) if m else (stream.get("codec") or "")
def _build_dash_mpd(
details: dict[str, Any],
video_streams: list[dict[str, Any]],
audio_streams: list[dict[str, Any]],
) -> str | None:
"""Build a static MPEG-DASH on-demand manifest from rustypipe's stream data.
Picks H.264 (avc1) video streams up to 1080p — guaranteed to play on the
RPi 4's hardware H.264 decoder. Picks the best AAC (mp4a) audio stream.
Returns the MPD XML, or None if no compatible streams were found.
"""
duration_s = float(details.get("duration") or 0)
if duration_s <= 0:
return None
# Filter video: H.264 only (avc1.*), 720p <= height <= 1080p.
# Floor at 720p so inputstream.adaptive's conservative-start chooser doesn't
# land us on a low-quality rep first. Ceiling at 1080p because that's the
# RPi 4's H.264 hardware-decode sweet spot.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and 720 <= (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
# Fallback: drop the 720p floor if the video has no HD streams at all.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
return None
# Filter audio: AAC preferred, Opus fallback. Pick highest-bitrate of preferred codec.
aac = [s for s in audio_streams if "mp4a" in (s.get("codec") or s.get("mime", ""))]
opus = [s for s in audio_streams if "opus" in (s.get("codec") or s.get("mime", ""))]
audio_pool = aac or opus
if not audio_pool:
return None
best_audio = max(audio_pool, key=lambda s: s.get("bitrate") or 0)
if not (best_audio.get("init_range") and best_audio.get("index_range") and best_audio.get("url")):
return None
# Sort video high → low quality so inputstream.adaptive's default-best picks the top.
h264.sort(key=lambda s: (s.get("height") or 0, s.get("bitrate") or 0), reverse=True)
parts = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="static"',
f' mediaPresentationDuration="PT{duration_s:.3f}S" minBufferTime="PT1.5S"',
' profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">',
" <Period>",
' <AdaptationSet mimeType="video/mp4" startWithSAP="1" segmentAlignment="true">',
]
for v in h264:
codec = _codec_from_mime(v) or "avc1.4d401f"
ir = v["index_range"]
init = v["init_range"]
parts.append(
f' <Representation id="{v["itag"]}" mimeType="video/mp4" codecs="{codec}"'
f' width="{v.get("width", 0)}" height="{v.get("height", 0)}"'
f' bandwidth="{v.get("bitrate", 0)}" frameRate="{int(v.get("fps") or 30)}"'
f' startWithSAP="1">'
)
parts.append(f" <BaseURL>{xml_escape(v['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{ir["start"]}-{ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{init["start"]}-{init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
# Audio adaptation set.
a = best_audio
a_codec = _codec_from_mime(a) or ("mp4a.40.2" if "mp4a" in (a.get("codec") or "") else "opus")
a_mime = "audio/mp4" if "mp4a" in a_codec else "audio/webm"
a_ir = a["index_range"]
a_init = a["init_range"]
parts.append(
f' <AdaptationSet mimeType="{a_mime}" startWithSAP="1" segmentAlignment="true">'
)
# NOTE: NOT setting audioSamplingRate here on purpose. rustypipe doesn't
# expose the sample rate, and hard-coding 44100 caused a ~9% playback-rate
# mismatch (= growing audio-vs-video desync) for content at 48000 Hz.
# inputstream.adaptive reads the actual rate from the audio init segment's
# mdhd box when this attribute is omitted, which is correct for any source.
parts.append(
f' <Representation id="{a["itag"]}" mimeType="{a_mime}" codecs="{a_codec}"'
f' bandwidth="{a.get("bitrate", 0)}" startWithSAP="1">'
)
parts.append(
' <AudioChannelConfiguration'
' schemeIdUri="urn:mpeg:dash:23003:3:audio_channel_configuration:2011"'
f' value="{a.get("channels", 2)}"/>'
)
parts.append(f" <BaseURL>{xml_escape(a['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{a_ir["start"]}-{a_ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{a_init["start"]}-{a_init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
parts.append(" </Period>")
parts.append("</MPD>")
return "\n".join(parts)
def _try_dash(yt_id: str) -> tuple[bytes | None, dict[str, Any]]:
"""Resolve via rustypipe + build DASH MPD. Returns (mpd_bytes, resp).
On any failure returns (None, resp) so the caller can fall back to
the progressive path. We serve the MPD over localhost HTTP because
inputstream.adaptive's libcurl can't open file:// URLs.
"""
try:
resp = _call_sidecar({"op": "resolve_dash", "id": yt_id}, timeout_s=30)
except Exception as e:
_log(f"resolve_dash failed (will fall back): {e}", xbmc.LOGWARNING)
return None, {}
if not resp.get("ok"):
return None, resp
mpd = _build_dash_mpd(
resp.get("details") or {},
resp.get("video_only_streams") or [],
resp.get("audio_streams") or [],
)
if not mpd:
_log("DASH build returned no compatible streams; falling back to progressive")
return None, resp
return mpd.encode("utf-8"), resp
def _delegate_to_pv_youtube(yt_id: str) -> bool:
"""Hand playback off to plugin.video.youtube via its play URL. They have
the proper SegmentTimeline-aware MPD construction (sidx-parsed) that
unlocks HD without the audio-sync drift our naive MPD has. Returns True
if delegation succeeded (Kodi will chain-resolve)."""
if not _pv_youtube_installed():
return False
target = f"plugin://plugin.video.youtube/play/?video_id={yt_id}"
_log(f"delegating playback to plugin.video.youtube: {target}")
li = xbmcgui.ListItem(label=yt_id)
li.setPath(target)
li.setProperty("IsPlayable", "true")
xbmcplugin.setResolvedUrl(_HANDLE, True, li)
return True
def _pv_youtube_installed() -> bool:
"""Check whether plugin.video.youtube is installed + enabled. We don't
enable it ourselves — if the user removed it, we fall back to our own
paths. Log any probe failure so silent degradation to 360p is observable."""
try:
return bool(xbmc.getCondVisibility("System.HasAddon(plugin.video.youtube)"))
except Exception as e:
_log(f"pv.youtube probe failed: {e}", xbmc.LOGWARNING)
return False
def _play(yt_id: str) -> None:
"""Resolve playback in order of preference:
1. plugin.video.youtube delegation — HD via their proven DASH MPD with
sidx-parsed SegmentTimeline. Default, when available.
2. Our DASH path — only if `dash_enabled` setting / dash.on marker / env
are set (WIP, partial — see M7 milestone).
3. yt-dlp progressive — last-resort 360p, always works.
SponsorBlock attaches regardless of which path we took.
"""
_log(f"play id={yt_id}")
# Tier 0: delegate to plugin.video.youtube if installed. Our SponsorBlock
# monitor still attaches because xbmc.Player() works regardless of which
# addon initiated playback. If pv.youtube ALSO has SB enabled the user
# might see double skips; rare in practice (pv.youtube's SB is off by
# default and theirs is more conservative).
use_pv_youtube = True
try:
use_pv_youtube = ADDON.getSettingBool("prefer_pv_youtube")
except Exception:
pass
if use_pv_youtube and _delegate_to_pv_youtube(yt_id):
try:
_attach_sponsorblock(yt_id)
except Exception as e:
# Don't let a SB monitor bug pop a 'Plugin error' dialog on the TV
# after a successful delegate-to-pv.youtube hand-off.
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
return
mpd_bytes: bytes | None = None
dash_resp: dict[str, Any] = {}
# DASH path: read setting first; fall back to env-var; OR honor a magic file
# at /storage/.kodi/userdata/addon_data/plugin.video.torttube/dash.on as a
# last-ditch trigger that doesn't depend on Kodi's settings cache.
dash_enabled = False
try:
dash_enabled = ADDON.getSettingBool("dash_enabled")
except Exception:
pass
env_dash = os.environ.get("TORTTUBE_DASH") == "1"
addon_data = ""
try:
import xbmcvfs
addon_data = xbmcvfs.translatePath("special://profile/addon_data/plugin.video.torttube/")
except Exception:
pass
file_dash = bool(addon_data) and os.path.isfile(os.path.join(addon_data, "dash.on"))
_log(f"play id={yt_id} dash_enabled={dash_enabled} env_dash={env_dash} file_dash={file_dash}")
if dash_enabled or env_dash or file_dash:
mpd_bytes, dash_resp = _try_dash(yt_id)
_log(f"_try_dash returned mpd_bytes={'<%d bytes>' % len(mpd_bytes) if mpd_bytes else None}")
if mpd_bytes:
details = dash_resp.get("details") or {}
title = details.get("name")
mpd_url, server = _start_mpd_server(mpd_bytes)
_log(f"resolved via rustypipe DASH, serving manifest at {mpd_url}")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(mpd_url, title))
try:
_attach_sponsorblock(yt_id)
finally:
# Shut down the MPD server cleanly once playback ends or aborts.
# _attach_sponsorblock blocks while playback is active.
try:
server.shutdown()
server.server_close()
_log("MPD HTTP server shut down")
except Exception as e:
_log(f"MPD server shutdown error: {e}", xbmc.LOGWARNING)
return
# Fallback: progressive single-URL via yt-dlp (360p).
try:
resp = _call_sidecar({"op": "resolve_play", "id": yt_id}, timeout_s=45)
except Exception as e:
_log(f"sidecar resolve_play failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"resolve failed: {e}", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
if not resp.get("ok"):
kind = resp.get("kind", "unknown")
err = resp.get("error", "unknown")
_log(f"sidecar returned not-ok: {kind}: {err}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube", f"{kind}: {err}", xbmcgui.NOTIFICATION_WARNING, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
stream_url = resp.get("stream_url")
title = resp.get("title")
if not stream_url:
_log("no usable stream URL in sidecar response", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", "no stream URL", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_log(f"resolved via yt-dlp progressive fallback, playing")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(stream_url, title))
try:
_attach_sponsorblock(yt_id)
except Exception as e:
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
def _attach_sponsorblock(yt_id: str) -> None:
"""Fetch SponsorBlock segments and block on the monitor loop. Always blocks
until playback ends (or 30s if playback never starts) so the caller can
use this as a 'wait for playback to finish' signal — needed to keep the
MPD HTTP server alive throughout playback.
Non-fatal on segment fetch error.
"""
skip_segments: list[dict[str, Any]] = []
try:
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
if sb_resp.get("ok"):
segs = sb_resp.get("segments") or []
skip_segments = [s for s in segs if s.get("actionType") == "skip"]
_log(f"sponsorblock: {len(skip_segments)} skip segments")
except Exception as e:
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
# Always run the watcher even with zero segments — it doubles as the
# 'block until playback ends' signal that gates MPD-server shutdown.
SponsorBlockMonitor(skip_segments).run()
class SponsorBlockMonitor(xbmc.Monitor):
"""Polls Kodi's player position and seeks past each SponsorBlock skip
segment exactly once. Exits on playback stop or Kodi shutdown.
Each segment has shape {"segment": [start_s, end_s], "category": str,
"UUID": str, "actionType": "skip"}. We sort by start time so we can
short-circuit the scan; we also dedupe via the UUID set so the same
segment doesn't trigger twice if the user manually rewinds into it.
"""
POLL_S = 0.5
MAX_WAIT_FOR_PLAYBACK_S = 30.0
def __init__(self, segments: list[dict[str, Any]]):
super().__init__()
self.segments = sorted(
segments, key=lambda s: float(s.get("segment", [0, 0])[0])
)
self.skipped: set[str] = set()
def run(self) -> None:
player = xbmc.Player()
# Wait for playback to actually begin — setResolvedUrl is async,
# Kodi takes a beat to demux + start the streams.
waited = 0.0
while waited < self.MAX_WAIT_FOR_PLAYBACK_S:
if self.abortRequested():
return
if player.isPlaying():
break
if self.waitForAbort(0.25):
return
waited += 0.25
else:
_log("sponsorblock: timed out waiting for playback to start")
return
while not self.abortRequested() and player.isPlaying():
try:
pos = float(player.getTime())
except Exception:
# getTime raises various exception types when the player goes
# away mid-poll (Kodi shutdown, plugin reload, etc). Wider catch
# so an exception path doesn't escape into _play's finally and
# leak the MPD HTTP server.
return
for seg in self.segments:
uuid = seg.get("UUID", "")
if uuid in self.skipped:
continue
start, end = float(seg["segment"][0]), float(seg["segment"][1])
if pos < start:
# Segments are sorted; nothing past here can match either.
break
if pos < end:
duration = end - start
category = seg.get("category", "?")
_log(
f"sponsorblock skip: {category} {start:.1f}-{end:.1f} "
f"({duration:.0f}s)"
)
player.seekTime(end)
self.skipped.add(uuid)
xbmcgui.Dialog().notification(
"SponsorBlock",
f"Skipped {category} ({duration:.0f}s)",
xbmcgui.NOTIFICATION_INFO,
2500,
)
break
if self.waitForAbort(self.POLL_S):
return
def _plugin_url(**kwargs: Any) -> str:
"""Build a `plugin://plugin.video.torttube/?...` URL for nav/play targets."""
return f"plugin://plugin.video.torttube/?{urlencode(kwargs)}"
def _format_duration(seconds: int | float | None) -> str:
"""Format seconds as `H:MM:SS` or `M:SS`."""
if not seconds:
return ""
s = int(seconds)
h, rem = divmod(s, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
def _format_views(views: int | None) -> str:
"""Format view count compactly: 1234 → '1.2K', 1234567 → '1.2M'."""
if not views:
return ""
if views >= 1_000_000_000:
return f"{views / 1_000_000_000:.1f}B"
if views >= 1_000_000:
return f"{views / 1_000_000:.1f}M"
if views >= 1_000:
return f"{views / 1_000:.1f}K"
return str(views)
def _pick_thumbnail(thumbs: list[dict[str, Any]] | None) -> str:
"""Pick the largest thumbnail from rustypipe's thumbnail list."""
if not thumbs:
return ""
return max(thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0)).get(
"url", ""
)
def _root_directory() -> None:
"""Top-level addon menu. Search lands first because that's how most TV-side
YouTube use actually works. Remote-control via JSON-RPC still works."""
items = [
("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")),
("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")),
]
for label, icon, url in items:
li = xbmcgui.ListItem(label=label)
li.setArt({"icon": icon})
xbmcplugin.addDirectoryItem(_HANDLE, url, li, isFolder=True)
xbmcplugin.endOfDirectory(_HANDLE)
def _add_video_items(items: list[dict[str, Any]]) -> None:
"""Add VideoItem dicts to the current plugin directory, formatted for Kodi.
Each item gets a play-action plugin URL, channel + duration + view-count
metadata in the label, thumbnail art, video InfoLabels for skin support,
and a 'Go to channel' context menu entry when the channel id is known.
"""
xbmcplugin.setContent(_HANDLE, "videos")
for item in items:
yt_id = item.get("id") or ""
if not yt_id:
continue
name = item.get("name") or "(no title)"
channel = item.get("channel") or {}
if isinstance(channel, dict):
channel_name = channel.get("name") or ""
channel_id = channel.get("id") or ""
else:
channel_name = ""
channel_id = ""
duration = item.get("duration")
views = item.get("view_count")
# Label: "Title · Channel · Duration · ViewCount"
meta_bits = [b for b in (channel_name, _format_duration(duration), _format_views(views)) if b]
label = f"{name} · {' · '.join(meta_bits)}" if meta_bits else name
li = xbmcgui.ListItem(label=label)
li.setProperty("IsPlayable", "true")
thumb_url = _pick_thumbnail(item.get("thumbnail"))
if thumb_url:
li.setArt({"thumb": thumb_url, "poster": thumb_url, "fanart": thumb_url})
info: dict[str, Any] = {"title": name, "mediatype": "video"}
if duration:
info["duration"] = int(duration)
if channel_name:
info["studio"] = channel_name
if item.get("description"):
info["plot"] = item["description"]
try:
li.setInfo("video", info)
except Exception:
pass
# Context menu: jump to channel listing if we have the id.
if channel_id:
li.addContextMenuItems(
[
(
f"Go to {channel_name or 'channel'}",
f"Container.Update({_plugin_url(action='channel', id=channel_id)})",
)
]
)
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="play", id=yt_id), li, isFolder=False
)
def _search_directory(query: str | None = None) -> None:
"""Hit sidecar `search`, list results as playable items.
`query` arg: if provided (via `?action=search&q=...`), skip the keyboard
prompt — used by JSON-RPC clients (phones, share-to-TV) and tests.
"""
if not query:
query = xbmcgui.Dialog().input("Search YouTube")
if not query:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
resp = _call_sidecar({"op": "search", "query": query, "limit": 30}, timeout_s=25)
except Exception as e:
_log(f"search failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"search failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
_log(f"search not-ok: {resp.get('error')}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube",
f"search: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
_log(f"search '{query}'{len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _playlist_directory(playlist_id: str) -> None:
"""List a playlist's videos."""
try:
resp = _call_sidecar(
{"op": "playlist", "id": playlist_id, "limit": 100}, timeout_s=25
)
except Exception as e:
_log(f"playlist failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"playlist failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"playlist: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
pl = resp.get("playlist") or {}
_log(f"playlist {pl.get('name') or playlist_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _channel_directory(channel_id: str) -> None:
"""List a channel's recent videos."""
try:
resp = _call_sidecar(
{"op": "channel_videos", "id": channel_id, "limit": 50}, timeout_s=25
)
except Exception as e:
_log(f"channel_videos failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"channel failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"channel: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
ch = resp.get("channel") or {}
_log(f"channel {ch.get('name') or channel_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _play_by_url_prompt() -> None:
"""Manual URL/ID entry for tap-to-play from the Kodi UI."""
s = xbmcgui.Dialog().input("Paste YouTube URL or ID")
if not s:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
yt_id = _extract_id(s)
except ValueError as e:
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
# Open via Player.Open-equivalent: run the plugin URL through Kodi
# rather than calling _play() in directory context.
xbmc.executebuiltin(f"PlayMedia({_plugin_url(action='play', id=yt_id)})")
xbmcplugin.endOfDirectory(_HANDLE, succeeded=True)
def main() -> None:
params = dict(parse_qsl(_QS.lstrip("?")))
action = params.get("action")
if action == "play":
try:
yt_id = _extract_id(params.get("url") or params.get("id") or "")
except ValueError as e:
_log(str(e), xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_play(yt_id)
elif action == "search":
_search_directory(query=params.get("q"))
elif action == "channel":
_channel_directory(params.get("id") or "")
elif action == "playlist":
_playlist_directory(params.get("id") or "")
elif action == "play_by_url":
_play_by_url_prompt()
else:
_root_directory()
if __name__ == "__main__":
main()