torttube/addon/plugin.video.torttube/main.py
Kayos f1c7264e75 M4 polish — search history + cacheToDisc + 0-is-falsy fixes
Search history (M4 ergonomics):
- _record_search() stores the query into search_history.json under
  Kodi's addon_data dir. Dedupe case-insensitively, keep newest first,
  cap at SEARCH_HISTORY_MAX=12.
- _recent_directory() (action=recent) shows persisted queries as
  quick-pick items, each re-running the same search on click. Each
  item gets a 'Clear history' context menu entry. Tail entry clears
  everything.
- Root menu adds a 'Recent searches' entry only when history is
  non-empty.
- _clear_history_action wires the clear path + a notification.

cacheToDisc=False on the root menu and on search results (audit MED-3):
prevents an empty Search button getting stuck cached after a no-input
cancel, and prevents the previous query's result-set shadowing the
next search.

_format_duration / _format_views fixed to treat 0 as a real value and
'' only for None/negative (audit MED-4) — brand-new 0-view uploads
and livestream pre-rolls now render correctly.

Verified end-to-end via Files.GetDirectory:
  - Two searches ('funny cats', 'python tutorial') record into history
  - history.json contains ['funny cats', 'python tutorial'] (newest
    first because 'funny cats' was searched second)
  - Recent dir lists both + Clear tail entry
  - Root dir gains 'Recent searches' entry once history is populated

No playback was tested (Leia was on the TV). Addon v0.0.14.
2026-05-23 12:30:44 -07:00

1009 lines
38 KiB
Python

# torttube — Kodi YouTube addon
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Entry point. Plugin URL shapes handled:
plugin://plugin.video.torttube/ (root: M3 will add browse UI)
plugin://plugin.video.torttube/?action=play&id=ID (resolve + play a YouTube ID)
plugin://plugin.video.torttube/?action=play&url=URL (extract ID from URL, then play)
Remote-control / share-to-TV pattern (Kodi JSON-RPC):
POST http://<rpi>:8080/jsonrpc
Basic auth (kodi user)
{"jsonrpc":"2.0","method":"Player.Open","params":{
"item":{"file":"plugin://plugin.video.torttube/?action=play&id=<id>"}}}
That's how Android / phone / "send to TV" flows hand off — Kodi already
exposes the endpoint, we just need to register the plugin URL.
"""
import http.server
import json
import os
import re
import socket
import subprocess
import sys
import threading
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse
from xml.sax.saxutils import escape as xml_escape
import xbmc
import xbmcaddon
import xbmcgui
import xbmcplugin
ADDON = xbmcaddon.Addon()
ADDON_PATH = ADDON.getAddonInfo("path")
SIDECAR_BIN = os.path.join(ADDON_PATH, "bin", "torttube-sidecar")
_HANDLE = int(sys.argv[1]) if len(sys.argv) > 1 else -1
_QS = sys.argv[2] if len(sys.argv) > 2 else ""
def _log(msg, level=xbmc.LOGINFO):
xbmc.log(f"[torttube] {msg}", level)
_ID_RE = re.compile(r"[A-Za-z0-9_-]{11}")
def _validate_id(candidate: str | None) -> str:
"""Return the candidate iff it matches YouTube's 11-char id shape; raise otherwise.
Every branch of _extract_id MUST pipe its return value through this so a
bogus `v=` query, an empty youtu.be path, or a malformed share link doesn't
leak into _call_sidecar -> yt-dlp / rustypipe / log lines."""
if candidate and _ID_RE.fullmatch(candidate):
return candidate
raise ValueError(f"not a valid YouTube id: {candidate!r}")
def _extract_id(url_or_id: str) -> str:
"""Accept either a bare ID or any common YouTube URL form, return the ID.
Every accepted form is validated through _validate_id so we never hand
untrusted text to the sidecar."""
s = url_or_id.strip()
# Bare 11-char ID (YouTube's canonical length).
if _ID_RE.fullmatch(s):
return s
parsed = urlparse(s)
# https://youtu.be/<id> (strict suffix match — avoid `myyoutu.be` etc.)
if parsed.netloc == "youtu.be" or parsed.netloc.endswith(".youtu.be"):
return _validate_id(parsed.path.lstrip("/").split("/")[0])
# https://www.youtube.com/watch?v=<id> (strict suffix match)
if parsed.netloc == "youtube.com" or parsed.netloc.endswith(".youtube.com"):
for k, v in parse_qsl(parsed.query):
if k == "v":
return _validate_id(v)
# /shorts/<id>, /embed/<id>, /live/<id>
m = re.match(r"^/(shorts|embed|live)/([A-Za-z0-9_-]{11})", parsed.path)
if m:
return _validate_id(m.group(2))
raise ValueError(f"could not extract YouTube id from {url_or_id!r}")
def _call_sidecar(request: dict, timeout_s: int = 30) -> dict:
"""Invoke the sidecar with one JSON request, parse one JSON response.
Puts the addon's bin/ on PATH so the sidecar's `yt-dlp` shell-outs find
the bundled zipapp (LibreELEC has no system yt-dlp).
"""
if not os.path.isfile(SIDECAR_BIN):
raise RuntimeError(f"sidecar binary missing at {SIDECAR_BIN}")
if not os.access(SIDECAR_BIN, os.X_OK):
raise RuntimeError(f"sidecar binary not executable at {SIDECAR_BIN}")
env = os.environ.copy()
addon_bin = os.path.join(ADDON_PATH, "bin")
env["PATH"] = addon_bin + os.pathsep + env.get("PATH", "")
proc = subprocess.run(
[SIDECAR_BIN],
input=(json.dumps(request) + "\n").encode("utf-8"),
capture_output=True,
timeout=timeout_s,
env=env,
)
if proc.returncode != 0:
raise RuntimeError(
f"sidecar exited {proc.returncode}: {proc.stderr.decode('utf-8', 'replace')[:500]}"
)
# The sidecar writes its JSON response as the LAST line of stdout (after any
# log lines it might have written via println!/eprintln/tracing if the route
# config ever changes). Pick the last non-empty line and decode robustly.
last_line: bytes | None = None
for line in proc.stdout.splitlines():
line = line.strip()
if line:
last_line = line
if last_line is None:
raise RuntimeError("sidecar produced no response")
try:
return json.loads(last_line.decode("utf-8", errors="replace"))
except json.JSONDecodeError as e:
raise RuntimeError(
f"sidecar stdout was not JSON: {e}; line={last_line[:200]!r}"
) from e
def _resolved_listitem(stream_url: str, title: str | None) -> xbmcgui.ListItem:
li = xbmcgui.ListItem(label=title or "torttube")
li.setPath(stream_url)
li.setProperty("IsPlayable", "true")
# Tell inputstream.adaptive to handle DASH/HLS based on URL path/suffix.
# Check the URL *path* extension, not a substring of the whole URL —
# progressive yt-dlp URLs have base64 blobs in the query string that
# can accidentally contain ".mpd" or ".m3u8".
parsed = urlparse(stream_url)
if parsed.path.endswith(".mpd"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "mpd")
# googlevideo rejects segment GETs that don't carry an Origin/Referer
# from www.youtube.com — 403 Forbidden otherwise. Same Mozilla UA
# rustypipe / yt-dlp use when minting the URL.
ytdl_ua = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
seg_headers = (
f"User-Agent={ytdl_ua}"
"&Origin=https://www.youtube.com"
"&Referer=https://www.youtube.com/"
)
li.setProperty("inputstream.adaptive.stream_headers", seg_headers)
li.setProperty("inputstream.adaptive.manifest_headers", seg_headers)
elif parsed.path.endswith(".m3u8"):
li.setProperty("inputstream", "inputstream.adaptive")
li.setProperty("inputstream.adaptive.manifest_type", "hls")
return li
class _MpdHandler(http.server.BaseHTTPRequestHandler):
"""Serves the per-video MPD file. The bytes come from the closure-captured
`_MPD_BYTES` so the server can outlive the temp file if the addon decides
to clean up early. One handler per HTTPServer instance."""
mpd_bytes: bytes = b""
def do_GET(self) -> None: # noqa: N802 — http.server convention
self.send_response(200)
self.send_header("Content-Type", "application/dash+xml")
self.send_header("Content-Length", str(len(self.mpd_bytes)))
self.end_headers()
self.wfile.write(self.mpd_bytes)
def log_message(self, *args: Any, **kwargs: Any) -> None:
# Silence the default request log — Kodi's log is verbose enough.
return
def _has_setting(setting_id: str) -> bool:
"""Best-effort check that a setting exists in resources/settings.xml.
Returns False if the lookup throws (older builds, missing schema)."""
try:
ADDON.getSetting(setting_id)
return True
except Exception:
return False
def _lan_ip() -> str:
"""Detect this host's LAN IP by opening a UDP socket toward an external
address (no packets actually sent — just lets the kernel pick the source IP).
plugin.video.youtube uses this same trick because inputstream.adaptive's
libcurl in Kodi 20 has trouble fetching from `127.0.0.1` reliably."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
finally:
s.close()
def _start_mpd_server(mpd_bytes: bytes) -> tuple[str, http.server.HTTPServer]:
"""Spin up a one-shot HTTP server that serves `mpd_bytes` at any path.
Binds to the LAN IP so inputstream.adaptive can fetch via the same code
path it uses for real network URLs. Returns (url, server) — caller is
responsible for `server.shutdown()` once playback ends."""
handler_cls = type(
"_MpdHandlerInstance",
(_MpdHandler,),
{"mpd_bytes": mpd_bytes},
)
lan_ip = _lan_ip()
# Bind to the LAN IP (not 0.0.0.0) so the MPD is reachable only on the
# interface inputstream.adaptive uses to connect, not exposed to every
# device on the LAN. The MPD embeds signed googlevideo segment URLs that
# we don't want walkable from a guest phone or IoT device.
try:
server = http.server.ThreadingHTTPServer((lan_ip, 0), handler_cls)
except OSError:
# LAN-IP bind can fail in rare network states (interface down,
# IP not yet assigned). Fall back to loopback — inputstream.adaptive
# on the same host can still reach it.
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler_cls)
lan_ip = "127.0.0.1"
threading.Thread(target=server.serve_forever, daemon=True).start()
port = server.server_address[1]
url = f"http://{lan_ip}:{port}/manifest.mpd"
_log(f"MPD HTTP server up on {url}")
return url, server
_MIME_CODEC_RE = re.compile(r'codecs="([^"]+)"')
def _codec_from_mime(stream: dict[str, Any]) -> str:
"""Extract the full DASH codec string (avc1.4d4015 etc) from the mime field.
rustypipe gives `codec: "avc1"` (short form) but DASH MPDs need the full
profile/level identifier from `mime: 'video/mp4; codecs="avc1.4d4015"'`.
"""
mime = stream.get("mime", "")
m = _MIME_CODEC_RE.search(mime)
return m.group(1) if m else (stream.get("codec") or "")
def _build_dash_mpd(
details: dict[str, Any],
video_streams: list[dict[str, Any]],
audio_streams: list[dict[str, Any]],
) -> str | None:
"""Build a static MPEG-DASH on-demand manifest from rustypipe's stream data.
Picks H.264 (avc1) video streams up to 1080p — guaranteed to play on the
RPi 4's hardware H.264 decoder. Picks the best AAC (mp4a) audio stream.
Returns the MPD XML, or None if no compatible streams were found.
"""
duration_s = float(details.get("duration") or 0)
if duration_s <= 0:
return None
# Filter video: H.264 only (avc1.*), 720p <= height <= 1080p.
# Floor at 720p so inputstream.adaptive's conservative-start chooser doesn't
# land us on a low-quality rep first. Ceiling at 1080p because that's the
# RPi 4's H.264 hardware-decode sweet spot.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and 720 <= (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
# Fallback: drop the 720p floor if the video has no HD streams at all.
h264 = [
s
for s in video_streams
if "avc1" in (s.get("codec") or s.get("mime", ""))
and (s.get("height") or 0) <= 1080
and s.get("init_range")
and s.get("index_range")
and s.get("url")
]
if not h264:
return None
# Filter audio: AAC preferred, Opus fallback. Pick highest-bitrate of preferred codec.
aac = [s for s in audio_streams if "mp4a" in (s.get("codec") or s.get("mime", ""))]
opus = [s for s in audio_streams if "opus" in (s.get("codec") or s.get("mime", ""))]
audio_pool = aac or opus
if not audio_pool:
return None
best_audio = max(audio_pool, key=lambda s: s.get("bitrate") or 0)
if not (best_audio.get("init_range") and best_audio.get("index_range") and best_audio.get("url")):
return None
# Sort video high → low quality so inputstream.adaptive's default-best picks the top.
h264.sort(key=lambda s: (s.get("height") or 0, s.get("bitrate") or 0), reverse=True)
parts = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="static"',
f' mediaPresentationDuration="PT{duration_s:.3f}S" minBufferTime="PT1.5S"',
' profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">',
" <Period>",
' <AdaptationSet mimeType="video/mp4" startWithSAP="1" segmentAlignment="true">',
]
for v in h264:
codec = _codec_from_mime(v) or "avc1.4d401f"
ir = v["index_range"]
init = v["init_range"]
parts.append(
f' <Representation id="{v["itag"]}" mimeType="video/mp4" codecs="{codec}"'
f' width="{v.get("width", 0)}" height="{v.get("height", 0)}"'
f' bandwidth="{v.get("bitrate", 0)}" frameRate="{int(v.get("fps") or 30)}"'
f' startWithSAP="1">'
)
parts.append(f" <BaseURL>{xml_escape(v['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{ir["start"]}-{ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{init["start"]}-{init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
# Audio adaptation set.
a = best_audio
a_codec = _codec_from_mime(a) or ("mp4a.40.2" if "mp4a" in (a.get("codec") or "") else "opus")
a_mime = "audio/mp4" if "mp4a" in a_codec else "audio/webm"
a_ir = a["index_range"]
a_init = a["init_range"]
parts.append(
f' <AdaptationSet mimeType="{a_mime}" startWithSAP="1" segmentAlignment="true">'
)
# NOTE: NOT setting audioSamplingRate here on purpose. rustypipe doesn't
# expose the sample rate, and hard-coding 44100 caused a ~9% playback-rate
# mismatch (= growing audio-vs-video desync) for content at 48000 Hz.
# inputstream.adaptive reads the actual rate from the audio init segment's
# mdhd box when this attribute is omitted, which is correct for any source.
parts.append(
f' <Representation id="{a["itag"]}" mimeType="{a_mime}" codecs="{a_codec}"'
f' bandwidth="{a.get("bitrate", 0)}" startWithSAP="1">'
)
parts.append(
' <AudioChannelConfiguration'
' schemeIdUri="urn:mpeg:dash:23003:3:audio_channel_configuration:2011"'
f' value="{a.get("channels", 2)}"/>'
)
parts.append(f" <BaseURL>{xml_escape(a['url'])}</BaseURL>")
parts.append(
f' <SegmentBase indexRange="{a_ir["start"]}-{a_ir["end"]}" indexRangeExact="true">'
)
parts.append(f' <Initialization range="{a_init["start"]}-{a_init["end"]}"/>')
parts.append(" </SegmentBase>")
parts.append(" </Representation>")
parts.append(" </AdaptationSet>")
parts.append(" </Period>")
parts.append("</MPD>")
return "\n".join(parts)
def _try_dash(yt_id: str) -> tuple[bytes | None, dict[str, Any]]:
"""Resolve via rustypipe + build DASH MPD. Returns (mpd_bytes, resp).
On any failure returns (None, resp) so the caller can fall back to
the progressive path. We serve the MPD over localhost HTTP because
inputstream.adaptive's libcurl can't open file:// URLs.
"""
try:
resp = _call_sidecar({"op": "resolve_dash", "id": yt_id}, timeout_s=30)
except Exception as e:
_log(f"resolve_dash failed (will fall back): {e}", xbmc.LOGWARNING)
return None, {}
if not resp.get("ok"):
return None, resp
mpd = _build_dash_mpd(
resp.get("details") or {},
resp.get("video_only_streams") or [],
resp.get("audio_streams") or [],
)
if not mpd:
_log("DASH build returned no compatible streams; falling back to progressive")
return None, resp
return mpd.encode("utf-8"), resp
def _delegate_to_pv_youtube(yt_id: str) -> bool:
"""Hand playback off to plugin.video.youtube via its play URL. They have
the proper SegmentTimeline-aware MPD construction (sidx-parsed) that
unlocks HD without the audio-sync drift our naive MPD has. Returns True
if delegation succeeded (Kodi will chain-resolve)."""
if not _pv_youtube_installed():
return False
target = f"plugin://plugin.video.youtube/play/?video_id={yt_id}"
_log(f"delegating playback to plugin.video.youtube: {target}")
li = xbmcgui.ListItem(label=yt_id)
li.setPath(target)
li.setProperty("IsPlayable", "true")
xbmcplugin.setResolvedUrl(_HANDLE, True, li)
return True
def _pv_youtube_installed() -> bool:
"""Check whether plugin.video.youtube is installed + enabled. We don't
enable it ourselves — if the user removed it, we fall back to our own
paths. Log any probe failure so silent degradation to 360p is observable."""
try:
return bool(xbmc.getCondVisibility("System.HasAddon(plugin.video.youtube)"))
except Exception as e:
_log(f"pv.youtube probe failed: {e}", xbmc.LOGWARNING)
return False
def _play(yt_id: str) -> None:
"""Resolve playback in order of preference:
1. plugin.video.youtube delegation — HD via their proven DASH MPD with
sidx-parsed SegmentTimeline. Default, when available.
2. Our DASH path — only if `dash_enabled` setting / dash.on marker / env
are set (WIP, partial — see M7 milestone).
3. yt-dlp progressive — last-resort 360p, always works.
SponsorBlock attaches regardless of which path we took.
"""
_log(f"play id={yt_id}")
# Tier 0: delegate to plugin.video.youtube if installed. Our SponsorBlock
# monitor still attaches because xbmc.Player() works regardless of which
# addon initiated playback. If pv.youtube ALSO has SB enabled the user
# might see double skips; rare in practice (pv.youtube's SB is off by
# default and theirs is more conservative).
use_pv_youtube = True
try:
use_pv_youtube = ADDON.getSettingBool("prefer_pv_youtube")
except Exception:
pass
if use_pv_youtube and _delegate_to_pv_youtube(yt_id):
try:
_attach_sponsorblock(yt_id)
except Exception as e:
# Don't let a SB monitor bug pop a 'Plugin error' dialog on the TV
# after a successful delegate-to-pv.youtube hand-off.
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
return
mpd_bytes: bytes | None = None
dash_resp: dict[str, Any] = {}
# DASH path: read setting first; fall back to env-var; OR honor a magic file
# at /storage/.kodi/userdata/addon_data/plugin.video.torttube/dash.on as a
# last-ditch trigger that doesn't depend on Kodi's settings cache.
dash_enabled = False
try:
dash_enabled = ADDON.getSettingBool("dash_enabled")
except Exception:
pass
env_dash = os.environ.get("TORTTUBE_DASH") == "1"
addon_data = ""
try:
import xbmcvfs
addon_data = xbmcvfs.translatePath("special://profile/addon_data/plugin.video.torttube/")
except Exception:
pass
file_dash = bool(addon_data) and os.path.isfile(os.path.join(addon_data, "dash.on"))
_log(f"play id={yt_id} dash_enabled={dash_enabled} env_dash={env_dash} file_dash={file_dash}")
if dash_enabled or env_dash or file_dash:
mpd_bytes, dash_resp = _try_dash(yt_id)
_log(f"_try_dash returned mpd_bytes={'<%d bytes>' % len(mpd_bytes) if mpd_bytes else None}")
if mpd_bytes:
details = dash_resp.get("details") or {}
title = details.get("name")
mpd_url, server = _start_mpd_server(mpd_bytes)
_log(f"resolved via rustypipe DASH, serving manifest at {mpd_url}")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(mpd_url, title))
try:
_attach_sponsorblock(yt_id)
finally:
# Shut down the MPD server cleanly once playback ends or aborts.
# _attach_sponsorblock blocks while playback is active.
try:
server.shutdown()
server.server_close()
_log("MPD HTTP server shut down")
except Exception as e:
_log(f"MPD server shutdown error: {e}", xbmc.LOGWARNING)
return
# Fallback: progressive single-URL via yt-dlp (360p).
try:
resp = _call_sidecar({"op": "resolve_play", "id": yt_id}, timeout_s=45)
except Exception as e:
_log(f"sidecar resolve_play failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"resolve failed: {e}", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
if not resp.get("ok"):
kind = resp.get("kind", "unknown")
err = resp.get("error", "unknown")
_log(f"sidecar returned not-ok: {kind}: {err}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube", f"{kind}: {err}", xbmcgui.NOTIFICATION_WARNING, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
stream_url = resp.get("stream_url")
title = resp.get("title")
if not stream_url:
_log("no usable stream URL in sidecar response", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", "no stream URL", xbmcgui.NOTIFICATION_ERROR, 5000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_log(f"resolved via yt-dlp progressive fallback, playing")
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(stream_url, title))
try:
_attach_sponsorblock(yt_id)
except Exception as e:
_log(f"sponsorblock attach error (non-fatal): {e}", xbmc.LOGWARNING)
def _attach_sponsorblock(yt_id: str) -> None:
"""Fetch SponsorBlock segments and block on the monitor loop. Always blocks
until playback ends (or 30s if playback never starts) so the caller can
use this as a 'wait for playback to finish' signal — needed to keep the
MPD HTTP server alive throughout playback.
Non-fatal on segment fetch error.
"""
skip_segments: list[dict[str, Any]] = []
try:
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
if sb_resp.get("ok"):
segs = sb_resp.get("segments") or []
skip_segments = [s for s in segs if s.get("actionType") == "skip"]
_log(f"sponsorblock: {len(skip_segments)} skip segments")
except Exception as e:
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
# Always run the watcher even with zero segments — it doubles as the
# 'block until playback ends' signal that gates MPD-server shutdown.
SponsorBlockMonitor(skip_segments).run()
class SponsorBlockMonitor(xbmc.Monitor):
"""Polls Kodi's player position and seeks past each SponsorBlock skip
segment exactly once. Exits on playback stop or Kodi shutdown.
Each segment has shape {"segment": [start_s, end_s], "category": str,
"UUID": str, "actionType": "skip"}. We sort by start time so we can
short-circuit the scan; we also dedupe via the UUID set so the same
segment doesn't trigger twice if the user manually rewinds into it.
"""
POLL_S = 0.5
MAX_WAIT_FOR_PLAYBACK_S = 30.0
def __init__(self, segments: list[dict[str, Any]]):
super().__init__()
self.segments = sorted(
segments, key=lambda s: float(s.get("segment", [0, 0])[0])
)
self.skipped: set[str] = set()
def run(self) -> None:
player = xbmc.Player()
# Wait for playback to actually begin — setResolvedUrl is async,
# Kodi takes a beat to demux + start the streams.
waited = 0.0
while waited < self.MAX_WAIT_FOR_PLAYBACK_S:
if self.abortRequested():
return
if player.isPlaying():
break
if self.waitForAbort(0.25):
return
waited += 0.25
else:
_log("sponsorblock: timed out waiting for playback to start")
return
while not self.abortRequested() and player.isPlaying():
try:
pos = float(player.getTime())
except Exception:
# getTime raises various exception types when the player goes
# away mid-poll (Kodi shutdown, plugin reload, etc). Wider catch
# so an exception path doesn't escape into _play's finally and
# leak the MPD HTTP server.
return
for seg in self.segments:
uuid = seg.get("UUID", "")
if uuid in self.skipped:
continue
start, end = float(seg["segment"][0]), float(seg["segment"][1])
if pos < start:
# Segments are sorted; nothing past here can match either.
break
if pos < end:
duration = end - start
category = seg.get("category", "?")
_log(
f"sponsorblock skip: {category} {start:.1f}-{end:.1f} "
f"({duration:.0f}s)"
)
player.seekTime(end)
self.skipped.add(uuid)
xbmcgui.Dialog().notification(
"SponsorBlock",
f"Skipped {category} ({duration:.0f}s)",
xbmcgui.NOTIFICATION_INFO,
2500,
)
break
if self.waitForAbort(self.POLL_S):
return
def _plugin_url(**kwargs: Any) -> str:
"""Build a `plugin://plugin.video.torttube/?...` URL for nav/play targets."""
return f"plugin://plugin.video.torttube/?{urlencode(kwargs)}"
def _format_duration(seconds: int | float | None) -> str:
"""Format seconds as `H:MM:SS` or `M:SS`. None / negative → empty;
0 is a legitimate duration (livestream pre-roll) so we render '0:00'."""
if seconds is None or seconds < 0:
return ""
s = int(seconds)
h, rem = divmod(s, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
def _format_views(views: int | None) -> str:
"""Format view count compactly: 1234 → '1.2K', 1234567 → '1.2M'. None →
empty; 0 is rendered as '0' (brand-new upload)."""
if views is None or views < 0:
return ""
if views >= 1_000_000_000:
return f"{views / 1_000_000_000:.1f}B"
if views >= 1_000_000:
return f"{views / 1_000_000:.1f}M"
if views >= 1_000:
return f"{views / 1_000:.1f}K"
return str(views)
# Search history persistence — stored as a plain JSON list of recent queries.
# Newest first, deduplicated, capped at SEARCH_HISTORY_MAX.
SEARCH_HISTORY_MAX = 12
def _search_history_path() -> str:
try:
import xbmcvfs
base = xbmcvfs.translatePath("special://profile/addon_data/plugin.video.torttube/")
except Exception:
base = "/storage/.kodi/userdata/addon_data/plugin.video.torttube/"
return os.path.join(base, "search_history.json")
def _load_search_history() -> list[str]:
try:
with open(_search_history_path(), "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return [s for s in data if isinstance(s, str)][:SEARCH_HISTORY_MAX]
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
return []
def _save_search_history(items: list[str]) -> None:
path = _search_history_path()
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(items[:SEARCH_HISTORY_MAX], f, ensure_ascii=False)
except OSError as e:
_log(f"search history save failed (non-fatal): {e}", xbmc.LOGWARNING)
def _record_search(query: str) -> None:
q = query.strip()
if not q:
return
history = _load_search_history()
# Dedupe case-insensitively, keep newest at the front.
history = [h for h in history if h.lower() != q.lower()]
history.insert(0, q)
_save_search_history(history)
def _clear_search_history() -> None:
try:
os.remove(_search_history_path())
except (FileNotFoundError, OSError):
pass
def _pick_thumbnail(thumbs: list[dict[str, Any]] | None) -> str:
"""Pick the largest thumbnail from rustypipe's thumbnail list."""
if not thumbs:
return ""
return max(thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0)).get(
"url", ""
)
def _root_directory() -> None:
"""Top-level addon menu. Search lands first because that's how most TV-side
YouTube use actually works. Remote-control via JSON-RPC still works."""
items: list[tuple[str, str, str]] = [
("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")),
("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")),
]
history = _load_search_history()
if history:
items.append(
(
"[I]Recent searches[/I]",
"DefaultAddonsRecentlyUpdated.png",
_plugin_url(action="recent"),
)
)
for label, icon, url in items:
li = xbmcgui.ListItem(label=label)
li.setArt({"icon": icon})
xbmcplugin.addDirectoryItem(_HANDLE, url, li, isFolder=True)
# Don't cache the root — it might gain/lose 'Recent searches' between visits.
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _recent_directory() -> None:
"""Show stored recent searches as quick-pick items + a 'Clear history' tail."""
history = _load_search_history()
if not history:
xbmcgui.Dialog().notification(
"torttube",
"No recent searches yet.",
xbmcgui.NOTIFICATION_INFO,
2500,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
for q in history:
li = xbmcgui.ListItem(label=q)
li.setArt({"icon": "DefaultAddonsSearch.png"})
li.addContextMenuItems(
[
(
"Clear history",
f"Container.Update({_plugin_url(action='clear_history')})",
)
]
)
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="search", q=q), li, isFolder=True
)
# Tail entry: clear all history.
clear_li = xbmcgui.ListItem(label="[B]Clear search history[/B]")
clear_li.setArt({"icon": "DefaultAddonNone.png"})
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="clear_history"), clear_li, isFolder=True
)
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _clear_history_action() -> None:
_clear_search_history()
xbmcgui.Dialog().notification(
"torttube", "Search history cleared", xbmcgui.NOTIFICATION_INFO, 2000
)
# Bounce back to root.
xbmc.executebuiltin("Container.Update(plugin://plugin.video.torttube/,replace)")
xbmcplugin.endOfDirectory(_HANDLE, succeeded=True)
def _add_video_items(items: list[dict[str, Any]]) -> None:
"""Add VideoItem dicts to the current plugin directory, formatted for Kodi.
Each item gets a play-action plugin URL, channel + duration + view-count
metadata in the label, thumbnail art, video InfoLabels for skin support,
and a 'Go to channel' context menu entry when the channel id is known.
"""
xbmcplugin.setContent(_HANDLE, "videos")
for item in items:
yt_id = item.get("id") or ""
if not yt_id:
continue
name = item.get("name") or "(no title)"
channel = item.get("channel") or {}
if isinstance(channel, dict):
channel_name = channel.get("name") or ""
channel_id = channel.get("id") or ""
else:
channel_name = ""
channel_id = ""
duration = item.get("duration")
views = item.get("view_count")
# Label: "Title · Channel · Duration · ViewCount"
meta_bits = [b for b in (channel_name, _format_duration(duration), _format_views(views)) if b]
label = f"{name} · {' · '.join(meta_bits)}" if meta_bits else name
li = xbmcgui.ListItem(label=label)
li.setProperty("IsPlayable", "true")
thumb_url = _pick_thumbnail(item.get("thumbnail"))
if thumb_url:
li.setArt({"thumb": thumb_url, "poster": thumb_url, "fanart": thumb_url})
info: dict[str, Any] = {"title": name, "mediatype": "video"}
if duration:
info["duration"] = int(duration)
if channel_name:
info["studio"] = channel_name
if item.get("description"):
info["plot"] = item["description"]
try:
li.setInfo("video", info)
except Exception:
pass
# Context menu: jump to channel listing if we have the id.
if channel_id:
li.addContextMenuItems(
[
(
f"Go to {channel_name or 'channel'}",
f"Container.Update({_plugin_url(action='channel', id=channel_id)})",
)
]
)
xbmcplugin.addDirectoryItem(
_HANDLE, _plugin_url(action="play", id=yt_id), li, isFolder=False
)
def _search_directory(query: str | None = None) -> None:
"""Hit sidecar `search`, list results as playable items.
`query` arg: if provided (via `?action=search&q=...`), skip the keyboard
prompt — used by JSON-RPC clients (phones, share-to-TV) and tests.
"""
if not query:
query = xbmcgui.Dialog().input("Search YouTube")
if not query:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
resp = _call_sidecar({"op": "search", "query": query, "limit": 30}, timeout_s=25)
except Exception as e:
_log(f"search failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"search failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
_log(f"search not-ok: {resp.get('error')}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification(
"torttube",
f"search: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
_log(f"search '{query}'{len(items)} items")
_record_search(query)
_add_video_items(items)
# cacheToDisc=False so the user's next search isn't shadowed by the
# previous query's cached result-set.
xbmcplugin.endOfDirectory(_HANDLE, cacheToDisc=False)
def _playlist_directory(playlist_id: str) -> None:
"""List a playlist's videos."""
try:
resp = _call_sidecar(
{"op": "playlist", "id": playlist_id, "limit": 100}, timeout_s=25
)
except Exception as e:
_log(f"playlist failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"playlist failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"playlist: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
pl = resp.get("playlist") or {}
_log(f"playlist {pl.get('name') or playlist_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _channel_directory(channel_id: str) -> None:
"""List a channel's recent videos."""
try:
resp = _call_sidecar(
{"op": "channel_videos", "id": channel_id, "limit": 50}, timeout_s=25
)
except Exception as e:
_log(f"channel_videos failed: {e}", xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", f"channel failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
if not resp.get("ok"):
xbmcgui.Dialog().notification(
"torttube",
f"channel: {resp.get('error', 'unknown')}",
xbmcgui.NOTIFICATION_WARNING,
4000,
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
items = resp.get("items") or []
ch = resp.get("channel") or {}
_log(f"channel {ch.get('name') or channel_id}: {len(items)} items")
_add_video_items(items)
xbmcplugin.endOfDirectory(_HANDLE)
def _play_by_url_prompt() -> None:
"""Manual URL/ID entry for tap-to-play from the Kodi UI."""
s = xbmcgui.Dialog().input("Paste YouTube URL or ID")
if not s:
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
try:
yt_id = _extract_id(s)
except ValueError as e:
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
return
# Open via Player.Open-equivalent: run the plugin URL through Kodi
# rather than calling _play() in directory context.
xbmc.executebuiltin(f"PlayMedia({_plugin_url(action='play', id=yt_id)})")
xbmcplugin.endOfDirectory(_HANDLE, succeeded=True)
def main() -> None:
params = dict(parse_qsl(_QS.lstrip("?")))
action = params.get("action")
if action == "play":
try:
yt_id = _extract_id(params.get("url") or params.get("id") or "")
except ValueError as e:
_log(str(e), xbmc.LOGERROR)
xbmcgui.Dialog().notification(
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
)
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
return
_play(yt_id)
elif action == "search":
_search_directory(query=params.get("q"))
elif action == "channel":
_channel_directory(params.get("id") or "")
elif action == "playlist":
_playlist_directory(params.get("id") or "")
elif action == "play_by_url":
_play_by_url_prompt()
elif action == "recent":
_recent_directory()
elif action == "clear_history":
_clear_history_action()
else:
_root_directory()
if __name__ == "__main__":
main()