torttube/addon/plugin.video.torttube/main.py
Kayos 03e1eb526a Second audit fix sprint — 2 CRIT + 5 HIGH + 5 MED
Second adversarial Opus audit found 2 CRIT + 7 HIGH + 9 MED + 9 LOW
against HEAD 503dbef. This sprint lands all CRIT, the 5 highest-value
HIGH items, and the most impactful MEDs. Remaining items deferred (see
notes at end).

CRIT-1 (race + torn write on Watch Later / search-history JSON state):
The persistence layer was load->mutate->save with plain open(w). Two
real failure modes were live:
  - Lost-update race: phone fires two RunPlugin wl_add back-to-back,
    each in a fresh Python interpreter. Interpreter A reads [], inserts
    AAA, writes [AAA]; B was already past _load_watch_later (also read
    []) and writes [BBB] — AAA lost.
  - Torn-write loss: open(w) truncates immediately. Kodi crash / Pi
    yank between truncate and json.dump finish leaves a zero-byte or
    partial file. _load* catches JSONDecodeError and returns [],
    silently wiping the user's pinned-videos list on next read.
Fix: _atomic_write_json (mkstemp + fsync + os.replace) + _with_lock
(fcntl.flock on a sibling .lock file). All four persistence functions
(_record_search, _clear_search_history, _add_to_watch_later,
_remove_from_watch_later) refactored to wrap load->mutate->save under
the lock. os.replace is atomic on POSIX same-filesystem; flock is
per-process advisory across the concurrent plugin interpreters.

CRIT-2 (rip allowlist bypass via ..): RIP_DEST_ALLOWLIST check was
string-level starts_with. '/storage/.kodi/temp/../../etc/cron.d' passed
the allowlist but escaped to anywhere. Dormant op (no Python caller
today) but the protocol is a wide-open arbitrary-write primitive
under the sidecar UID (root on LibreELEC). Fix: literal-prefix check
first (cheap reject), then create_dir_all + tokio::fs::canonicalize,
then a second check against the same allowlist on the canonical
result. Defeats .. and symlink escape.

HIGH-1 (SponsorBlock 1 MiB cap was post-hoc): The cap from the first
audit landed AFTER resp.bytes().await — which buffers the entire body
unbounded. A hostile mirror returning multi-GB would OOM the Pi before
the cap fired. Fix: stream via resp.chunk() in a loop, bail as soon as
accumulated bytes > cap. Defends OOM during ingest, not after.

HIGH-2 (Container.Refresh fires in wrong context): _wl_remove_action
unconditionally fired Refresh. Future refactors that expose the Remove
context menu outside the Watch Later listing (or stale context-menu
state across navigation) would refresh the wrong container — search
results would reload pointlessly when the user just hit Remove in WL.
Fix: only Refresh when Container.FolderPath contains
'action=watch_later'.

HIGH-3 (thumbnail shape — promoted from first-audit MED-9): The new
Watch Later code persists rustypipe Player.details which has a
thumbnail string in many versions. _pick_thumbnail called max() on a
string — iterates chars and crashes with AttributeError on the .get
lookup. Live crash-on-render of Watch Later. Fix: _pick_thumbnail now
accepts Any and dispatches on isinstance: empty/None -> '', str ->
str, dict -> .url, list -> filter for dicts then max-by-area.

HIGH-4 (search query unvalidated — length, control chars, surrogates):
Search took an arbitrary string from the addon and shipped it to
rustypipe. Three failure modes:
  - 10 MB query allocates 10 MB on stdin pipe + another 10 MB in
    serde_json::from_str + materializes in rustypipe's HTTP call. ~30
    MB blip per request on the 1 GB Pi.
  - Newlines in query produce multi-line entries in
    tracing::info!(query, ...) — log injection (mild on this stack,
    but obfuscates real entries).
  - Lone UTF-16 surrogates aren't a crash but produce a confusing
    error path.
Fix: validate_query() at the sidecar dispatch — 2 KB length cap,
reject control chars (TAB allowed since YouTube treats it as
whitespace). Addon-side _record_search now also collapses whitespace
via ' '.join(query.split()).

MED-1 (clear_history flicker): _clear_history_action finalized with
succeeded=True THEN Container.Update — caused a half-tick of empty
directory before the nav replaced it. Fix: succeeded=False first,
then the replace navigates cleanly.

MED-3 (LibreELEC-only fallback): _addon_data_path's xbmcvfs fallback
hardcoded /storage/.kodi/userdata/... — non-existent on Linux desktop
Kodi etc. Fix: fall back to ~/.kodi/userdata/addon_data/... which is
portable. Tests + dev rigs now persist correctly.

MED-4 (Response::ok clobber): Added debug_assert! so any future op
returning its own 'ok' key is caught in debug builds. Not a fix
per se — but a tripwire.

MED-5 (codec regex single-quote): Made _MIME_CODEC_RE accept either
quote style. Belt-and-braces against upstream MIME format drift.

MED-8 (SponsorBlockMonitor orphaned across plugin boundary): When we
delegate to plugin.video.youtube, our monitor runs in our plugin's
context against xbmc.Player()'s global state. If the user starts a
different video mid-monitor, the segments are for the old one — would
spuriously skip the new content. Fix: capture player.getPlayingFile()
at start, bail if it changes mid-loop.

MED-9 / LOW-2 carried over from first audit and tracked here too.

Smoke-verified via JSON-RPC (browse-only, no playback, Leia still
watching the TV):
- 3000-byte query rejected: 'query too long: 3000 bytes (cap 2048)'
- newline-in-query rejected: 'query contains control characters'
- legit search returns expected results
- wl_add + watch_later directory renders without thumbnail crash

Remaining deferred (cosmetic / no current impact):
- MED-2 (WL staleness): accept for v0.1, add lazy refresh later
- MED-6 (byte-length validate_youtube_id error message): cosmetic
- MED-7 (_lan_ip multi-NIC): not currently a problem on the family
  LAN, document and revisit if Tailscale lands on the Pi
- LOW-1..9: defensive polish, no real risk
- First-audit's deferred MED/LOWs: still defer-able

Addon v0.0.16.
2026-05-23 12:52:49 -07:00

1308 lines
49 KiB
Python

# torttube — Kodi YouTube addon
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Entry point. Plugin URL shapes handled:
plugin://plugin.video.torttube/ (root: M3 will add browse UI)
plugin://plugin.video.torttube/?action=play&id=ID (resolve + play a YouTube ID)
plugin://plugin.video.torttube/?action=play&url=URL (extract ID from URL, then play)
Remote-control / share-to-TV pattern (Kodi JSON-RPC):
POST http://<rpi>:8080/jsonrpc
Basic auth (kodi user)
{"jsonrpc":"2.0","method":"Player.Open","params":{
"item":{"file":"plugin://plugin.video.torttube/?action=play&id=<id>"}}}
That's how Android / phone / "send to TV" flows hand off — Kodi already
exposes the endpoint, we just need to register the plugin URL.
"""
import fcntl
import http.server
import json
import os
import re
import socket
import subprocess
import sys
import tempfile
import threading
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse
from xml.sax.saxutils import escape as xml_escape
import xbmc
import xbmcaddon
import xbmcgui
import xbmcplugin
ADDON = xbmcaddon.Addon()
ADDON_PATH = ADDON.getAddonInfo("path")
SIDECAR_BIN = os.path.join(ADDON_PATH, "bin", "torttube-sidecar")
def _safe_handle() -> int:
"""Kodi always passes an int as sys.argv[1] per the plugin contract, but
if someone runs main.py directly (testing) or Kodi has a bad day we'd
crash with a ValueError before _log is even defined. -1 means 'no handle';
setResolvedUrl / endOfDirectory are no-ops with that."""
if len(sys.argv) <= 1:
return -1
try:
return int(sys.argv[1])
except ValueError:
return -1
_HANDLE = _safe_handle()
_QS = sys.argv[2] if len(sys.argv) > 2 else ""
def _log(msg, level=xbmc.LOGINFO):
xbmc.log(f"[torttube] {msg}", level)
_ID_RE = re.compile(r"[A-Za-z0-9_-]{11}")
def _validate_id(candidate: str | None) -> str:
"""Return the candidate iff it matches YouTube's 11-char id shape; raise otherwise.
Every branch of _extract_id MUST pipe its return value through this so a
bogus `v=` query, an empty youtu.be path, or a malformed share link doesn't
leak into _call_sidecar -> yt-dlp / rustypipe / log lines."""
if candidate and _ID_RE.fullmatch(candidate):
return candidate
raise ValueError(f"not a valid YouTube id: {candidate!r}")
def _extract_id(url_or_id: str) -> str:
"""Accept either a bare ID or any common YouTube URL form, return the ID.
Every accepted form is validated through _validate_id so we never hand
untrusted text to the sidecar."""
s = url_or_id.strip()
# Bare 11-char ID (YouTube's canonical length).
if _ID_RE.fullmatch(s):
return s
parsed = urlparse(s)
# https://youtu.be/<id> (strict suffix match — avoid `myyoutu.be` etc.)
if parsed.netloc == "youtu.be" or parsed.netloc.endswith(".youtu.be"):
return _validate_id(parsed.path.lstrip("/").split("/")[0])
# https://www.youtube.com/watch?v=<id> (strict suffix match)
if parsed.netloc == "youtube.com" or parsed.netloc.endswith(".youtube.com"):
for k, v in parse_qsl(parsed.query):
if k == "v":
return _validate_id(v)
# /shorts/<id>, /embed/<id>, /live/<id>
m = re.match(r"^/(shorts|embed|live)/([A-Za-z0-9_-]{11})", parsed.path)
if m:
return _validate_id(m.group(2))
raise ValueError(f"could not extract YouTube id from {url_or_id!r}")
def _call_sidecar(request: dict, timeout_s: int = 30) -> dict:
"""Invoke the sidecar with one JSON request, parse one JSON response.
Puts the addon's bin/ on PATH so the sidecar's `yt-dlp` shell-outs find
the bundled zipapp (LibreELEC has no system yt-dlp).
"""
if not os.path.isfile(SIDECAR_BIN):
raise RuntimeError(f"sidecar binary missing at {SIDECAR_BIN}")
if not os.access(SIDECAR_BIN, os.X_OK):
raise RuntimeError(f"sidecar binary not executable at {SIDECAR_BIN}")
env = os.environ.copy()
addon_bin = os.path.join(ADDON_PATH, "bin")
env["PATH"] = addon_bin + os.pathsep + env.get("PATH", "")
proc = subprocess.run(
[SIDECAR_BIN],
input=(json.dumps(request) + "\n").encode("utf-8"),
capture_output=True,
timeout=timeout_s,
env=env,
)
if proc.returncode != 0:
raise RuntimeError(
f"sidecar exited {proc.returncode}: {proc.stderr.decode('utf-8', 'replace')[:500]}"
)
# The sidecar writes its JSON response as the LAST line of stdout (after any
# log lines it might have written via println!/eprintln/tracing if the route
# config ever changes). Pick the last non-empty line and decode robustly.
last_line: bytes | None = None
for line in proc.stdout.splitlines():
line = line.strip()
if line:
last_line = line
if last_line is None:
raise RuntimeError("sidecar produced no response")
try:
return json.loads(last_line.decode("utf-8", errors="replace"))
except json.JSONDecodeError as e:
raise RuntimeError(
f"sidecar stdout was not JSON: {e}; line={last_line[:200]!r}"
) from e
def _resolved_listitem(stream_url: str, title: str | None) -> xbmcgui.ListItem:
li = xbmcgui.ListItem(label=title or "torttube")
li.setPath(stream_url)
li.setProperty("IsPlayable", "true")
# Tell inputstream.adaptive to handle DASH/HLS based on URL path/suffix.
# Check the URL *path* extension, not a substring of the whole URL —
# progressive yt-dlp URLs have base64 blobs in the query string that
# can accidentally contain ".mpd" or ".m3u8".
parsed = urlparse(stream_url)
if parsed.path.endswith(".mpd"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "mpd")
# googlevideo rejects segment GETs that don't carry an Origin/Referer
# from www.youtube.com — 403 Forbidden otherwise. Same Mozilla UA
# rustypipe / yt-dlp use when minting the URL.
ytdl_ua = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
seg_headers = (
f"User-Agent={ytdl_ua}"
"&Origin=https://www.youtube.com"
"&Referer=https://www.youtube.com/"
)
li.setProperty("inputstream.adaptive.stream_headers", seg_headers)
li.setProperty("inputstream.adaptive.manifest_headers", seg_headers)
elif parsed.path.endswith(".m3u8"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "hls")
return li
class _MpdHandler(http.server.BaseHTTPRequestHandler):
"""Serves the per-video MPD file. The bytes come from the closure-captured
`_MPD_BYTES` so the server can outlive the temp file if the addon decides
to clean up early. One handler per HTTPServer instance."""
mpd_bytes: bytes = b""
def do_GET(self) -> None: # noqa: N802 — http.server convention
self.send_response(200)
self.send_header("Content-Type", "application/dash+xml")
self.send_header("Content-Length", str(len(self.mpd_bytes)))
self.end_headers()
self.wfile.write(self.mpd_bytes)
def log_message(self, *args: Any, **kwargs: Any) -> None:
# Silence the default request log — Kodi's log is verbose enough.
return
def _has_setting(setting_id: str) -> bool:
"""Best-effort check that a setting exists in resources/settings.xml.
Returns False if the lookup throws (older builds, missing schema)."""
try:
ADDON.getSetting(setting_id)
return True
except Exception:
return False
def _lan_ip() -> str:
"""Detect this host's LAN IP by opening a UDP socket toward an external
address (no packets actually sent — just lets the kernel pick the source IP).
plugin.video.youtube uses this same trick because inputstream.adaptive's
libcurl in Kodi 20 has trouble fetching from `127.0.0.1` reliably."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
finally:
s.close()
def _start_mpd_server(mpd_bytes: bytes) -> tuple[str, http.server.HTTPServer]:
"""Spin up a one-shot HTTP server that serves `mpd_bytes` at any path.
Binds to the LAN IP so inputstream.adaptive can fetch via the same code
path it uses for real network URLs. Returns (url, server) — caller is
responsible for `server.shutdown()` once playback ends."""
handler_cls = type(
"_MpdHandlerInstance",
(_MpdHandler,),
{"mpd_bytes": mpd_bytes},
)
lan_ip = _lan_ip()
# Bind to the LAN IP (not 0.0.0.0) so the MPD is reachable only on the
# interface inputstream.adaptive uses to connect, not exposed to every
# device on the LAN. The MPD embeds signed googlevideo segment URLs that
# we don't want walkable from a guest phone or IoT device.
try:
server = http.server.ThreadingHTTPServer((lan_ip, 0), handler_cls)
except OSError:
# LAN-IP bind can fail in rare network states (interface down,
# IP not yet assigned). Fall back to loopback — inputstream.adaptive
# on the same host can still reach it.
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler_cls)
lan_ip = "127.0.0.1"
threading.Thread(target=server.serve_forever, daemon=True).start()
port = server.server_address[1]
url = f"http://{lan_ip}:{port}/manifest.mpd"
_log(f"MPD HTTP server up on {url}")
return url, server
_MIME_CODEC_RE = re.compile(r'codecs=["\']([^"\']+)["\']')
def _codec_from_mime(stream: dict[str, Any]) -> str:
"""Extract the full DASH codec string (avc1.4d4015 etc) from the mime field.
rustypipe gives `codec: "avc1"` (short form) but DASH MPDs need the full
profile/level identifier from `mime: 'video/mp4; codecs="avc1.4d4015"'`.
"""
mime = stream.get("mime", "")
m = _MIME_CODEC_RE.search(mime)
return m.group(1) if m else (stream.get("codec") or "")
def _build_dash_mpd(
details: dict[str, Any],
video_streams: list[dict[str, Any]],
audio_streams: list[dict[str, Any]],
) -> str | None:
"""Build a static MPEG-DASH on-demand manifest from rustypipe's stream data.
Picks H.264 (avc1) video streams up to 1080p — guaranteed to play on the
RPi 4's hardware H.264 decoder. Picks the best AAC (mp4a) audio stream.
Returns the MPD XML, or None if no compatible streams were found.
"""
duration_s = float(details.get("duration") or 0)
if duration_s <= 0:
return None
# Filter video: H.264 only (avc1.*), 720p <= height <= 1080p.
# Floor at 720p so inputstream.adaptive's conservative-start chooser doesn't
# land us on a low-quality rep first. Ceiling at 1080p because that's the
# RPi 4's H.264 hardware-decode sweet spot.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and 720 <= (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
# Fallback: drop the 720p floor if the video has no HD streams at all.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
return None
# Filter audio: AAC preferred, Opus fallback. Pick highest-bitrate of preferred codec.
aac = [s for s in audio_streams if "mp4a" in (s.get("codec") or s.get("mime", ""))]
opus = [s for s in audio_streams if "opus" in (s.get("codec") or s.get("mime", ""))]
audio_pool = aac or opus
if not audio_pool:
return None
best_audio = max(audio_pool, key=lambda s: s.get("bitrate") or 0)
if not (best_audio.get("init_range") and best_audio.get("index_range") and best_audio.get("url")):
return None
# Sort video high → low quality so inputstream.adaptive's default-best picks the top.
h264.sort(key=lambda s: (s.get("height") or 0, s.get("bitrate") or 0), reverse=True)
parts = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="static"',
f' mediaPresentationDuration="PT{duration_s:.3f}S" minBufferTime="PT1.5S"',
' profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">',
" <Period>",
' <AdaptationSet mimeType="video/mp4" startWithSAP="1" segmentAlignment="true">',
]
for v in h264:
codec = _codec_from_mime(v) or "avc1.4d401f"
ir = v["index_range"]
init = v["init_range"]
parts.append(
f' <Representation id="{v["itag"]}" mimeType="video/mp4" codecs="{codec}"'
f' width="{v.get("width", 0)}" height="{v.get("height", 0)}"'
f' bandwidth="{v.get("bitrate", 0)}" frameRate="{int(v.get("fps") or 30)}"'
f' startWithSAP="1">'
)
parts.append(f" <BaseURL>{xml_escape(v['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{ir["start"]}-{ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{init["start"]}-{init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
# Audio adaptation set.
a = best_audio
a_codec = _codec_from_mime(a) or ("mp4a.40.2" if "mp4a" in (a.get("codec") or "") else "opus")
a_mime = "audio/mp4" if "mp4a" in a_codec else "audio/webm"
a_ir = a["index_range"]
a_init = a["init_range"]
parts.append(
f' <AdaptationSet mimeType="{a_mime}" startWithSAP="1" segmentAlignment="true">'
)
# NOTE: NOT setting audioSamplingRate here on purpose. rustypipe doesn't
# expose the sample rate, and hard-coding 44100 caused a ~9% playback-rate
# mismatch (= growing audio-vs-video desync) for content at 48000 Hz.
# inputstream.adaptive reads the actual rate from the audio init segment's
# mdhd box when this attribute is omitted, which is correct for any source.
parts.append(
f' <Representation id="{a["itag"]}" mimeType="{a_mime}" codecs="{a_codec}"'
f' bandwidth="{a.get("bitrate", 0)}" startWithSAP="1">'
)
parts.append(
' <AudioChannelConfiguration'
' schemeIdUri="urn:mpeg:dash:23003:3:audio_channel_configuration:2011"'
f' value="{a.get("channels", 2)}"/>'
)
parts.append(f" <BaseURL>{xml_escape(a['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{a_ir["start"]}-{a_ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{a_init["start"]}-{a_init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
parts.append(" </Period>")
parts.append("</MPD>")
return "\n".join(parts)
def _try_dash(yt_id: str) -> tuple[bytes | None, dict[str, Any]]:
"""Resolve via rustypipe + build DASH MPD. Returns (mpd_bytes, resp).
On any failure returns (None, resp) so the caller can fall back to
the progressive path. We serve the MPD over localhost HTTP because
inputstream.adaptive's libcurl can't open file:// URLs.
"""
try:
resp = _call_sidecar({"op": "resolve_dash", "id": yt_id}, timeout_s=30)
except Exception as e:
_log(f"resolve_dash failed (will fall back): {e}", xbmc.LOGWARNING)
return None, {}
if not resp.get("ok"):
return None, resp
mpd = _build_dash_mpd(
resp.get("details") or {},
resp.get("video_only_streams") or [],
resp.get("audio_streams") or [],
)
if not mpd:
_log("DASH build returned no compatible streams; falling back to progressive")
return None, resp
return mpd.encode("utf-8"), resp
def _delegate_to_pv_youtube(yt_id: str) -> bool:
"""Hand playback off to plugin.video.youtube via its play URL. They have
the proper SegmentTimeline-aware MPD construction (sidx-parsed) that
unlocks HD without the audio-sync drift our naive MPD has. Returns True
if delegation succeeded (Kodi will chain-resolve)."""
if not _pv_youtube_installed():
return False
target = f"plugin://plugin.video.youtube/play/?video_id={yt_id}"
_log(f"delegating playback to plugin.video.youtube: {target}")
li = xbmcgui.ListItem(label=yt_id)
li.setPath(target)
li.setProperty("IsPlayable", "true")
xbmcplugin.setResolvedUrl(_HANDLE, True, li)
return True
def _pv_youtube_installed() -> bool:
"""Check whether plugin.video.youtube is installed + enabled. We don't
enable it ourselves — if the user removed it, we fall back to our own
paths. Log any probe failure so silent degradation to 360p is observable."""
try:
return bool(xbmc.getCondVisibility("System.HasAddon(plugin.video.youtube)"))
except Exception as e:
_log(f"pv.youtube probe failed: {e}", xbmc.LOGWARNING)
return False
def _play(yt_id: str) -> None:
"""Resolve playback in order of preference:
1. plugin.video.youtube delegation — HD via their proven DASH MPD with
sidx-parsed SegmentTimeline. Default, when available.
2. Our DASH path — only if `dash_enabled` setting / dash.on marker / env
are set (WIP, partial — see M7 milestone).
3. yt-dlp progressive — last-resort 360p, always works.
SponsorBlock attaches regardless of which path we took.
"""
_log(f"play id={yt_id}")
# Tier 0: delegate to plugin.video.youtube if installed. Our SponsorBlock
# monitor still attaches because xbmc.Player() works regardless of which
# addon initiated playback. If pv.youtube ALSO has SB enabled the user
# might see double skips; rare in practice (pv.youtube's SB is off by
# default and theirs is more conservative).
use_pv_youtube = True
try:
use_pv_youtube = ADDON.getSettingBool("prefer_pv_youtube")
except Exception:
pass
if use_pv_youtube and _delegate_to_pv_youtube(yt_id):
try:
_attach_sponsorblock(yt_id)
except Exception as e:
# Don't let a SB monitor bug pop a 'Plugin error' dialog on the TV
# after a successful delegate-to-pv.youtube hand-off.
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
return
mpd_bytes: bytes | None = None
dash_resp: dict[str, Any] = {}
# DASH path: read setting first; fall back to env-var; OR honor a magic file
# at /storage/.kodi/userdata/addon_data/plugin.video.torttube/dash.on as a
# last-ditch trigger that doesn't depend on Kodi's settings cache.
dash_enabled = False
try:
dash_enabled = ADDON.getSettingBool("dash_enabled")
except Exception:
pass
env_dash = os.environ.get("TORTTUBE_DASH") == "1"
addon_data = ""
try:
import xbmcvfs
addon_data = xbmcvfs.translatePath("special://profile/addon_data/plugin.video.torttube/")
except Exception:
pass
file_dash = bool(addon_data) and os.path.isfile(os.path.join(addon_data, "dash.on"))
_log(f"play id={yt_id} dash_enabled={dash_enabled} env_dash={env_dash} file_dash={file_dash}")
if dash_enabled or env_dash or file_dash:
mpd_bytes, dash_resp = _try_dash(yt_id)
_log(f"_try_dash returned mpd_bytes={'<%d bytes>' % len(mpd_bytes) if mpd_bytes else None}")
if mpd_bytes:
details = dash_resp.get("details") or {}
title = details.get("name")
mpd_url, server = _start_mpd_server(mpd_bytes)
_log(f"resolved via rustypipe DASH, serving manifest at {mpd_url}")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(mpd_url, title))
try:
_attach_sponsorblock(yt_id)
finally:
# Shut down the MPD server cleanly once playback ends or aborts.
# _attach_sponsorblock blocks while playback is active.
try:
server.shutdown()
server.server_close()
_log("MPD HTTP server shut down")
except Exception as e:
_log(f"MPD server shutdown error: {e}", xbmc.LOGWARNING)
return
# Fallback: progressive single-URL via yt-dlp (360p).
try:
resp = _call_sidecar({"op": "resolve_play", "id": yt_id}, timeout_s=45)
except Exception as e:
_log(f"sidecar resolve_play failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"resolve failed: {e}", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
if not resp.get("ok"):
kind = resp.get("kind", "unknown")
err = resp.get("error", "unknown")
_log(f"sidecar returned not-ok: {kind}: {err}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube", f"{kind}: {err}", xbmcgui.NOTIFICATION_WARNING, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
stream_url = resp.get("stream_url")
title = resp.get("title")
if not stream_url:
_log("no usable stream URL in sidecar response", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", "no stream URL", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_log(f"resolved via yt-dlp progressive fallback, playing")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(stream_url, title))
try:
_attach_sponsorblock(yt_id)
except Exception as e:
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
def _attach_sponsorblock(yt_id: str) -> None:
"""Fetch SponsorBlock segments and block on the monitor loop. Always blocks
until playback ends (or 30s if playback never starts) so the caller can
use this as a 'wait for playback to finish' signal — needed to keep the
MPD HTTP server alive throughout playback.
Non-fatal on segment fetch error.
"""
skip_segments: list[dict[str, Any]] = []
try:
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
if sb_resp.get("ok"):
segs = sb_resp.get("segments") or []
skip_segments = [s for s in segs if s.get("actionType") == "skip"]
_log(f"sponsorblock: {len(skip_segments)} skip segments")
except Exception as e:
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
# Always run the watcher even with zero segments — it doubles as the
# 'block until playback ends' signal that gates MPD-server shutdown.
SponsorBlockMonitor(skip_segments).run()
class SponsorBlockMonitor(xbmc.Monitor):
"""Polls Kodi's player position and seeks past each SponsorBlock skip
segment exactly once. Exits on playback stop or Kodi shutdown.
Each segment has shape {"segment": [start_s, end_s], "category": str,
"UUID": str, "actionType": "skip"}. We sort by start time so we can
short-circuit the scan; we also dedupe via the UUID set so the same
segment doesn't trigger twice if the user manually rewinds into it.
"""
POLL_S = 0.5
MAX_WAIT_FOR_PLAYBACK_S = 30.0
def __init__(self, segments: list[dict[str, Any]]):
super().__init__()
self.segments = sorted(
segments, key=lambda s: float(s.get("segment", [0, 0])[0])
)
self.skipped: set[str] = set()
def run(self) -> None:
player = xbmc.Player()
# Wait for playback to actually begin — setResolvedUrl is async,
# Kodi takes a beat to demux + start the streams.
waited = 0.0
while waited < self.MAX_WAIT_FOR_PLAYBACK_S:
if self.abortRequested():
return
if player.isPlaying():
break
if self.waitForAbort(0.25):
return
waited += 0.25
else:
_log("sponsorblock: timed out waiting for playback to start")
return
# Capture the file path that's actually playing now; bail if it changes
# mid-monitor (delegate-to-pv.youtube means our SponsorBlockMonitor is
# alive in our plugin's context while pv.youtube drives playback —
# if the user starts a different video, our skip-segments are stale).
# Audit MED-8 (2nd pass).
try:
initial_file = player.getPlayingFile()
except Exception:
initial_file = ""
while not self.abortRequested() and player.isPlaying():
try:
pos = float(player.getTime())
current_file = player.getPlayingFile()
except Exception:
# getTime raises various exception types when the player goes
# away mid-poll (Kodi shutdown, plugin reload, etc). Wider catch
# so an exception path doesn't escape into _play's finally and
# leak the MPD HTTP server.
return
if initial_file and current_file and current_file != initial_file:
# User started a different video — our skip segments are for the
# old one, abort instead of spurious-skipping the new content.
_log("sponsorblock: playing file changed, monitor exiting")
return
for seg in self.segments:
uuid = seg.get("UUID", "")
if uuid in self.skipped:
continue
start, end = float(seg["segment"][0]), float(seg["segment"][1])
if pos < start:
# Segments are sorted; nothing past here can match either.
break
if pos < end:
duration = end - start
category = seg.get("category", "?")
_log(
f"sponsorblock skip: {category} {start:.1f}-{end:.1f} "
f"({duration:.0f}s)"
)
player.seekTime(end)
self.skipped.add(uuid)
xbmcgui.Dialog().notification(
"SponsorBlock",
f"Skipped {category} ({duration:.0f}s)",
xbmcgui.NOTIFICATION_INFO,
2500,
)
break
if self.waitForAbort(self.POLL_S):
return
def _plugin_url(**kwargs: Any) -> str:
"""Build a `plugin://plugin.video.torttube/?...` URL for nav/play targets."""
return f"plugin://plugin.video.torttube/?{urlencode(kwargs)}"
def _format_duration(seconds: int | float | None) -> str:
"""Format seconds as `H:MM:SS` or `M:SS`. None / negative → empty;
0 is a legitimate duration (livestream pre-roll) so we render '0:00'."""
if seconds is None or seconds < 0:
return ""
s = int(seconds)
h, rem = divmod(s, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
def _format_views(views: int | None) -> str:
"""Format view count compactly: 1234 → '1.2K', 1234567 → '1.2M'. None →
empty; 0 is rendered as '0' (brand-new upload)."""
if views is None or views < 0:
return ""
if views >= 1_000_000_000:
return f"{views / 1_000_000_000:.1f}B"
if views >= 1_000_000:
return f"{views / 1_000_000:.1f}M"
if views >= 1_000:
return f"{views / 1_000:.1f}K"
return str(views)
# Persistence helpers (atomic write + advisory lock).
# Both search history and Watch Later go through these to survive concurrent
# RunPlugin invocations (each is a fresh Python interpreter — load→mutate→save
# races dropped updates without locking) and unclean shutdowns (open(w) truncates
# immediately — Kodi crash mid-write leaves the user's pinned-videos list as a
# zero-byte file that load() silently treats as empty). Audit CRIT-1 (2nd pass).
def _addon_data_path(filename: str) -> str:
try:
import xbmcvfs
base = xbmcvfs.translatePath("special://profile/addon_data/plugin.video.torttube/")
except Exception:
# Fallback for non-Kodi tests / generic Linux Kodi installs.
base = os.path.join(
os.path.expanduser("~"), ".kodi", "userdata",
"addon_data", "plugin.video.torttube",
)
return os.path.join(base, filename)
def _atomic_write_json(path: str, data: Any) -> None:
"""Write JSON to `path` via a same-filesystem tempfile + os.replace.
Crash-safe: at any moment, `path` either has the old contents or the new
contents — never a torn write."""
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(
dir=os.path.dirname(path), prefix=".tmp.", suffix=".json"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
except Exception:
try:
os.remove(tmp)
except OSError:
pass
raise
def _with_lock(path: str, fn):
"""Run `fn()` under an exclusive advisory lock on a sibling .lock file.
fcntl.flock is per-process — exactly what we need to serialize concurrent
plugin interpreters that are each modifying the same JSON state file."""
lock_path = path + ".lock"
os.makedirs(os.path.dirname(lock_path), exist_ok=True)
with open(lock_path, "w") as lf:
fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
try:
return fn()
finally:
fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
# Search history persistence — stored as a plain JSON list of recent queries.
# Newest first, deduplicated, capped at SEARCH_HISTORY_MAX.
SEARCH_HISTORY_MAX = 12
def _search_history_path() -> str:
return _addon_data_path("search_history.json")
def _load_search_history() -> list[str]:
try:
with open(_search_history_path(), "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [s for s in data if isinstance(s, str)][:SEARCH_HISTORY_MAX]
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
return []
def _save_search_history(items: list[str]) -> None:
"""Atomic, no-lock save. Caller MUST already hold the lock if doing a
load→mutate→save sequence (see _record_search)."""
path = _search_history_path()
try:
_atomic_write_json(path, items[:SEARCH_HISTORY_MAX])
except OSError as e:
_log(f"search history save failed (non-fatal): {e}", xbmc.LOGWARNING)
def _record_search(query: str) -> None:
q = " ".join(query.split()) # collapse whitespace
if not q:
return
path = _search_history_path()
def _do() -> None:
history = _load_search_history()
# Dedupe case-insensitively, keep newest at the front.
history = [h for h in history if h.lower() != q.lower()]
history.insert(0, q)
_save_search_history(history)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"search history lock failed (non-fatal): {e}", xbmc.LOGWARNING)
def _clear_search_history() -> None:
path = _search_history_path()
def _do() -> None:
try:
os.remove(path)
except (FileNotFoundError, OSError):
pass
try:
_with_lock(path, _do)
except OSError as e:
_log(f"search history clear failed (non-fatal): {e}", xbmc.LOGWARNING)
# Watch Later — user-curated list of saved videos. The anti-algorithm answer
# to YouTube's recommendation cancer: you decide what comes back. Items are
# stored as {id, name, channel, duration, thumbnail} dicts so we don't need
# to re-fetch metadata when rendering the list. Newest first, no cap.
WATCH_LATER_MAX = 500
def _watch_later_path() -> str:
return _addon_data_path("watch_later.json")
def _load_watch_later() -> list[dict[str, Any]]:
try:
with open(_watch_later_path(), "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [
d for d in data
if isinstance(d, dict) and isinstance(d.get("id"), str)
][:WATCH_LATER_MAX]
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
return []
def _save_watch_later(items: list[dict[str, Any]]) -> None:
"""Atomic, no-lock save. Caller MUST hold the lock (see _add_to / _remove_from)."""
path = _watch_later_path()
try:
_atomic_write_json(path, items[:WATCH_LATER_MAX])
except OSError as e:
_log(f"watch later save failed (non-fatal): {e}", xbmc.LOGWARNING)
def _add_to_watch_later(item: dict[str, Any]) -> None:
yt_id = item.get("id")
if not yt_id or not _ID_RE.fullmatch(str(yt_id)):
return
path = _watch_later_path()
def _do() -> None:
items = _load_watch_later()
items = [i for i in items if i.get("id") != yt_id]
items.insert(0, item)
_save_watch_later(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"watch later add lock failed (non-fatal): {e}", xbmc.LOGWARNING)
def _remove_from_watch_later(yt_id: str) -> None:
path = _watch_later_path()
def _do() -> None:
items = _load_watch_later()
items = [i for i in items if i.get("id") != yt_id]
_save_watch_later(items)
try:
_with_lock(path, _do)
except OSError as e:
_log(f"watch later remove lock failed (non-fatal): {e}", xbmc.LOGWARNING)
def _pick_thumbnail(thumbs: Any) -> str:
"""Pick a thumbnail URL from whatever shape rustypipe / yt-dlp / our own
persisted Watch Later records hand us.
rustypipe `Player.details.thumbnail` is a single URL **string**, while
`VideoItem.thumbnail` is a `list[dict]`. The original implementation
crashed on the string form (`max(str, ...)` iterates chars and calls
`.get` on each — `AttributeError`). Watch Later persists details-shaped
items, so this is a live crash-on-render once any video is pinned.
Audit HIGH-3 (2nd pass)."""
if not thumbs:
return ""
if isinstance(thumbs, str):
return thumbs
if isinstance(thumbs, dict):
return thumbs.get("url", "") or ""
if isinstance(thumbs, list):
dict_thumbs = [t for t in thumbs if isinstance(t, dict)]
if not dict_thumbs:
return ""
best = max(
dict_thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0)
)
return best.get("url", "") or ""
return ""
def _root_directory() -> None:
"""Top-level addon menu. Search lands first because that's how most TV-side
YouTube use actually works. Remote-control via JSON-RPC still works."""
items: list[tuple[str, str, str]] = [
("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")),
("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")),
]
wl = _load_watch_later()
if wl:
items.append(
(
f"Watch Later ({len(wl)})",
"DefaultPlaylist.png",
_plugin_url(action="watch_later"),
)
)
else:
items.append(
("Watch Later", "DefaultPlaylist.png", _plugin_url(action="watch_later"))
)
history = _load_search_history()
if history:
items.append(
(
"[I]Recent searches[/I]",
"DefaultAddonsRecentlyUpdated.png",
_plugin_url(action="recent"),
)
)
for label, icon, url in items:
li = xbmcgui.ListItem(label=label)
li.setArt({"icon": icon})
xbmcplugin.addDirectoryItem(_HANDLE, url, li, isFolder=True)
# Don't cache the root — it might gain/lose 'Recent searches' between visits.
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _recent_directory() -> None:
"""Show stored recent searches as quick-pick items + a 'Clear history' tail."""
history = _load_search_history()
if not history:
xbmcgui.Dialog().notification(
"torttube",
"No recent searches yet.",
xbmcgui.NOTIFICATION_INFO,
2500,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
for q in history:
li = xbmcgui.ListItem(label=q)
li.setArt({"icon": "DefaultAddonsSearch.png"})
li.addContextMenuItems(
[
(
"Clear history",
f"Container.Update({_plugin_url(action='clear_history')})",
)
]
)
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="search", q=q), li, isFolder=True
)
# Tail entry: clear all history.
clear_li = xbmcgui.ListItem(label="[B]Clear search history[/B]")
clear_li.setArt({"icon": "DefaultAddonNone.png"})
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="clear_history"), clear_li, isFolder=True
)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _clear_history_action() -> None:
_clear_search_history()
xbmcgui.Dialog().notification(
"torttube", "Search history cleared", xbmcgui.NOTIFICATION_INFO, 2000
)
# succeeded=False so Kodi doesn't enter an empty directory before the
# Container.Update lands — avoids the half-tick flicker the audit caught.
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
xbmc.executebuiltin("Container.Update(plugin://plugin.video.torttube/,replace)")
def _watch_later_directory() -> None:
"""Browse the user's curated Watch Later list."""
items = _load_watch_later()
if not items:
xbmcgui.Dialog().notification(
"torttube",
"Watch Later is empty. Right-click any video to add.",
xbmcgui.NOTIFICATION_INFO,
3500,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
_add_video_items(items, in_watch_later=True)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _wl_add_action() -> None:
"""RunPlugin handler: add a video to Watch Later.
Called from the context-menu Container action, so we don't have full
item metadata in the URL. We do a sidecar `resolve` to fetch metadata
fresh — slower than caching the item from the listing, but reliable
and works across navigation paths.
"""
params = dict(parse_qsl(_QS.lstrip("?")))
try:
yt_id = _validate_id(params.get("id"))
except ValueError as e:
xbmcgui.Dialog().notification("torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 3000)
return
# Try to get rich metadata via a fast rustypipe resolve. If anything fails,
# save the ID alone so we still remember the user's pin.
item: dict[str, Any] = {"id": yt_id}
try:
resp = _call_sidecar({"op": "resolve", "id": yt_id}, timeout_s=15)
if resp.get("ok"):
details = resp.get("details") or {}
item = {
"id": yt_id,
"name": details.get("name") or details.get("title") or yt_id,
"channel": {
"id": details.get("channel_id"),
"name": details.get("channel_name"),
},
"duration": details.get("duration"),
"thumbnail": details.get("thumbnail"),
}
except Exception as e:
_log(f"watch-later metadata fetch failed (saving id only): {e}", xbmc.LOGWARNING)
_add_to_watch_later(item)
xbmcgui.Dialog().notification(
"torttube",
f"Added to Watch Later: {item.get('name') or yt_id}",
xbmcgui.NOTIFICATION_INFO,
2500,
)
def _wl_remove_action() -> None:
"""RunPlugin handler: remove a video from Watch Later."""
params = dict(parse_qsl(_QS.lstrip("?")))
try:
yt_id = _validate_id(params.get("id"))
except ValueError as e:
xbmcgui.Dialog().notification("torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 3000)
return
_remove_from_watch_later(yt_id)
xbmcgui.Dialog().notification(
"torttube", "Removed from Watch Later", xbmcgui.NOTIFICATION_INFO, 2000
)
# Refresh ONLY if we're currently inside the Watch Later listing. If the
# user invoked remove from somewhere else (future refactor, stale
# context-menu state), refreshing the wrong container is jarring.
# Audit HIGH-2 (2nd pass).
container_path = xbmc.getInfoLabel("Container.FolderPath") or ""
if "action=watch_later" in container_path:
xbmc.executebuiltin("Container.Refresh")
def _add_video_items(items: list[dict[str, Any]], *, in_watch_later: bool = False) -> None:
"""Add VideoItem dicts to the current plugin directory, formatted for Kodi.
Each item gets a play-action plugin URL, channel + duration + view-count
metadata in the label, thumbnail art, video InfoLabels for skin support,
and context-menu entries: 'Go to channel' and either 'Add to Watch Later'
or 'Remove from Watch Later' depending on `in_watch_later`.
"""
xbmcplugin.setContent(_HANDLE, "videos")
for item in items:
yt_id = item.get("id") or ""
if not yt_id:
continue
name = item.get("name") or "(no title)"
channel = item.get("channel") or {}
if isinstance(channel, dict):
channel_name = channel.get("name") or ""
channel_id = channel.get("id") or ""
else:
channel_name = ""
channel_id = ""
duration = item.get("duration")
views = item.get("view_count")
# Label: "Title · Channel · Duration · ViewCount"
meta_bits = [b for b in (channel_name, _format_duration(duration), _format_views(views)) if b]
label = f"{name} · {' · '.join(meta_bits)}" if meta_bits else name
li = xbmcgui.ListItem(label=label)
li.setProperty("IsPlayable", "true")
thumb_url = _pick_thumbnail(item.get("thumbnail"))
if thumb_url:
li.setArt({"thumb": thumb_url, "poster": thumb_url, "fanart": thumb_url})
info: dict[str, Any] = {"title": name, "mediatype": "video"}
if duration:
info["duration"] = int(duration)
if channel_name:
info["studio"] = channel_name
if item.get("description"):
info["plot"] = item["description"]
try:
li.setInfo("video", info)
except Exception:
pass
# Context menu: jump to channel listing + Watch Later add/remove.
ctx: list[tuple[str, str]] = []
if channel_id:
ctx.append(
(
f"Go to {channel_name or 'channel'}",
f"Container.Update({_plugin_url(action='channel', id=channel_id)})",
)
)
if in_watch_later:
ctx.append(
(
"Remove from Watch Later",
f"RunPlugin({_plugin_url(action='wl_remove', id=yt_id)})",
)
)
else:
ctx.append(
(
"Add to Watch Later",
f"RunPlugin({_plugin_url(action='wl_add', id=yt_id)})",
)
)
if ctx:
li.addContextMenuItems(ctx)
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="play", id=yt_id), li, isFolder=False
)
def _search_directory(query: str | None = None) -> None:
"""Hit sidecar `search`, list results as playable items.
`query` arg: if provided (via `?action=search&q=...`), skip the keyboard
prompt — used by JSON-RPC clients (phones, share-to-TV) and tests.
"""
if not query:
query = xbmcgui.Dialog().input("Search YouTube")
if not query:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
resp = _call_sidecar({"op": "search", "query": query, "limit": 30}, timeout_s=25)
except Exception as e:
_log(f"search failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"search failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
_log(f"search not-ok: {resp.get('error')}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube",
f"search: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
_log(f"search '{query}'{len(items)} items")
_record_search(query)
_add_video_items(items)
# cacheToDisc=False so the user's next search isn't shadowed by the
# previous query's cached result-set.
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _playlist_directory(playlist_id: str) -> None:
"""List a playlist's videos."""
try:
resp = _call_sidecar(
{"op": "playlist", "id": playlist_id, "limit": 100}, timeout_s=25
)
except Exception as e:
_log(f"playlist failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"playlist failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"playlist: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
pl = resp.get("playlist") or {}
_log(f"playlist {pl.get('name') or playlist_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _channel_directory(channel_id: str) -> None:
"""List a channel's recent videos."""
try:
resp = _call_sidecar(
{"op": "channel_videos", "id": channel_id, "limit": 50}, timeout_s=25
)
except Exception as e:
_log(f"channel_videos failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"channel failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"channel: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
ch = resp.get("channel") or {}
_log(f"channel {ch.get('name') or channel_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _play_by_url_prompt() -> None:
"""Manual URL/ID entry for tap-to-play from the Kodi UI."""
s = xbmcgui.Dialog().input("Paste YouTube URL or ID")
if not s:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
yt_id = _extract_id(s)
except ValueError as e:
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
# Open via Player.Open-equivalent: run the plugin URL through Kodi
# rather than calling _play() in directory context.
xbmc.executebuiltin(f"PlayMedia({_plugin_url(action='play', id=yt_id)})")
xbmcplugin.endOfDirectory(_HANDLE, succeeded=True)
def main() -> None:
params = dict(parse_qsl(_QS.lstrip("?")))
action = params.get("action")
if action == "play":
try:
yt_id = _extract_id(params.get("url") or params.get("id") or "")
except ValueError as e:
_log(str(e), xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_play(yt_id)
elif action == "search":
_search_directory(query=params.get("q"))
elif action == "channel":
_channel_directory(params.get("id") or "")
elif action == "playlist":
_playlist_directory(params.get("id") or "")
elif action == "play_by_url":
_play_by_url_prompt()
elif action == "recent":
_recent_directory()
elif action == "clear_history":
_clear_history_action()
elif action == "watch_later":
_watch_later_directory()
elif action == "wl_add":
_wl_add_action()
elif action == "wl_remove":
_wl_remove_action()
else:
_root_directory()
if __name__ == "__main__":
main()