Sidecar Playlist op via rustypipe playlist(). Returns playlist metadata
block (id, name, channel, video_count) + items array. Verified live
against LTT's 'Consumer Advocacy' (PL8mG-RkN2uTzwoF72GqeqAJMI-N7scqtI):
returns the single video with full metadata.
Addon ?action=playlist&id=PL... lists items via _add_video_items reuse.
Verified via Files.GetDirectory JSON-RPC.
resources/settings.xml gains a 'dash_enabled' toggle (boolean, default
off). main.py checks ADDON.getSettingBool('dash_enabled') OR the
TORTTUBE_DASH env fallback before attempting the DASH path. Toggle via
Kodi Settings → Add-on settings → torttube, OR via
Addons.SetSettings JSON-RPC.
docs/upstream.md: filed a 'watching' entry for rustypipe PR #77
(Schmiddiii's late-May YouTube parsing fixes) with our independent
test data — player(), search(), and channel_videos() all still work
against current YouTube on 0.11.4, suggesting the PR fixes code paths
torttube doesn't yet exercise. Endorsement comment pending: gated on
creating a Sulkta-Coop codeberg account.
Observation from kodi.log: plugin.video.youtube successfully parsed a
DASH MPD with 26 streams via inputstream.adaptive on this same Pi —
proves DASH is solvable on our setup, just need to match the URL
pattern they use. M7 stabilization carrying forward.
Addon version 0.0.9.
755 lines
28 KiB
Python
755 lines
28 KiB
Python
# torttube — Kodi YouTube addon
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
"""
|
|
Entry point. Plugin URL shapes handled:
|
|
|
|
plugin://plugin.video.torttube/ (root: M3 will add browse UI)
|
|
plugin://plugin.video.torttube/?action=play&id=ID (resolve + play a YouTube ID)
|
|
plugin://plugin.video.torttube/?action=play&url=URL (extract ID from URL, then play)
|
|
|
|
Remote-control / share-to-TV pattern (Kodi JSON-RPC):
|
|
|
|
POST http://<rpi>:8080/jsonrpc
|
|
Basic auth (kodi user)
|
|
{"jsonrpc":"2.0","method":"Player.Open","params":{
|
|
"item":{"file":"plugin://plugin.video.torttube/?action=play&id=<id>"}}}
|
|
|
|
That's how Android / phone / "send to TV" flows hand off — Kodi already
|
|
exposes the endpoint, we just need to register the plugin URL.
|
|
"""
|
|
|
|
import http.server
|
|
import json
|
|
import os
|
|
import re
|
|
import socketserver
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
from typing import Any
|
|
from urllib.parse import parse_qsl, urlencode, urlparse
|
|
from xml.sax.saxutils import escape as xml_escape
|
|
|
|
import xbmc
|
|
import xbmcaddon
|
|
import xbmcgui
|
|
import xbmcplugin
|
|
|
|
ADDON = xbmcaddon.Addon()
|
|
ADDON_PATH = ADDON.getAddonInfo("path")
|
|
SIDECAR_BIN = os.path.join(ADDON_PATH, "bin", "torttube-sidecar")
|
|
|
|
_HANDLE = int(sys.argv[1]) if len(sys.argv) > 1 else -1
|
|
_QS = sys.argv[2] if len(sys.argv) > 2 else ""
|
|
|
|
|
|
def _log(msg, level=xbmc.LOGINFO):
|
|
xbmc.log(f"[torttube] {msg}", level)
|
|
|
|
|
|
def _extract_id(url_or_id: str) -> str:
|
|
"""Accept either a bare ID or any common YouTube URL form, return the ID."""
|
|
s = url_or_id.strip()
|
|
# Bare 11-char ID (YouTube's canonical length).
|
|
if re.fullmatch(r"[A-Za-z0-9_-]{11}", s):
|
|
return s
|
|
parsed = urlparse(s)
|
|
# https://youtu.be/<id>
|
|
if parsed.netloc.endswith("youtu.be"):
|
|
return parsed.path.lstrip("/").split("/")[0]
|
|
# https://www.youtube.com/watch?v=<id>
|
|
if "youtube.com" in parsed.netloc:
|
|
for k, v in parse_qsl(parsed.query):
|
|
if k == "v":
|
|
return v
|
|
# /shorts/<id>, /embed/<id>, /live/<id>
|
|
m = re.match(r"^/(shorts|embed|live)/([A-Za-z0-9_-]{11})", parsed.path)
|
|
if m:
|
|
return m.group(2)
|
|
raise ValueError(f"could not extract YouTube id from {url_or_id!r}")
|
|
|
|
|
|
def _call_sidecar(request: dict, timeout_s: int = 30) -> dict:
|
|
"""Invoke the sidecar with one JSON request, parse one JSON response.
|
|
|
|
Puts the addon's bin/ on PATH so the sidecar's `yt-dlp` shell-outs find
|
|
the bundled zipapp (LibreELEC has no system yt-dlp).
|
|
"""
|
|
if not os.path.isfile(SIDECAR_BIN):
|
|
raise RuntimeError(f"sidecar binary missing at {SIDECAR_BIN}")
|
|
if not os.access(SIDECAR_BIN, os.X_OK):
|
|
raise RuntimeError(f"sidecar binary not executable at {SIDECAR_BIN}")
|
|
|
|
env = os.environ.copy()
|
|
addon_bin = os.path.join(ADDON_PATH, "bin")
|
|
env["PATH"] = addon_bin + os.pathsep + env.get("PATH", "")
|
|
|
|
proc = subprocess.run(
|
|
[SIDECAR_BIN],
|
|
input=(json.dumps(request) + "\n").encode("utf-8"),
|
|
capture_output=True,
|
|
timeout=timeout_s,
|
|
env=env,
|
|
)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(
|
|
f"sidecar exited {proc.returncode}: {proc.stderr.decode('utf-8', 'replace')[:500]}"
|
|
)
|
|
# Stdout may have multiple lines if the sidecar logged something; take the first
|
|
# non-empty line as the response.
|
|
for line in proc.stdout.splitlines():
|
|
line = line.strip()
|
|
if line:
|
|
return json.loads(line.decode("utf-8") if isinstance(line, bytes) else line)
|
|
raise RuntimeError("sidecar produced no response")
|
|
|
|
|
|
def _resolved_listitem(stream_url: str, title: str | None) -> xbmcgui.ListItem:
|
|
li = xbmcgui.ListItem(label=title or "torttube")
|
|
li.setPath(stream_url)
|
|
li.setProperty("IsPlayable", "true")
|
|
# Tell inputstream.adaptive to handle DASH/HLS based on URL path/suffix.
|
|
if ".mpd" in stream_url:
|
|
li.setProperty("inputstream", "inputstream.adaptive")
|
|
li.setProperty("inputstream.adaptive.manifest_type", "mpd")
|
|
elif ".m3u8" in stream_url:
|
|
li.setProperty("inputstream", "inputstream.adaptive")
|
|
li.setProperty("inputstream.adaptive.manifest_type", "hls")
|
|
return li
|
|
|
|
|
|
class _MpdHandler(http.server.BaseHTTPRequestHandler):
|
|
"""Serves the per-video MPD file. The bytes come from the closure-captured
|
|
`_MPD_BYTES` so the server can outlive the temp file if the addon decides
|
|
to clean up early. One handler per HTTPServer instance."""
|
|
|
|
mpd_bytes: bytes = b""
|
|
|
|
def do_GET(self) -> None: # noqa: N802 — http.server convention
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/dash+xml")
|
|
self.send_header("Content-Length", str(len(self.mpd_bytes)))
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
self.wfile.write(self.mpd_bytes)
|
|
|
|
def log_message(self, *args: Any, **kwargs: Any) -> None:
|
|
# Silence the default request log — Kodi's log is verbose enough.
|
|
return
|
|
|
|
|
|
def _start_mpd_server(mpd_bytes: bytes) -> tuple[str, http.server.HTTPServer]:
|
|
"""Spin up a one-shot localhost HTTP server that serves `mpd_bytes` at any
|
|
path. Returns (url, server). Caller is responsible for `server.shutdown()`
|
|
once playback ends."""
|
|
|
|
# Per-server handler subclass so the closure captures the bytes for THIS
|
|
# video without leaking state to a concurrent invocation.
|
|
handler_cls = type(
|
|
"_MpdHandlerInstance",
|
|
(_MpdHandler,),
|
|
{"mpd_bytes": mpd_bytes},
|
|
)
|
|
# Port 0 → kernel picks free port. ThreadingHTTPServer so multiple GETs
|
|
# (the manifest + range requests, if any) don't serialize.
|
|
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler_cls)
|
|
threading.Thread(target=server.serve_forever, daemon=True).start()
|
|
port = server.server_address[1]
|
|
url = f"http://127.0.0.1:{port}/manifest.mpd"
|
|
_log(f"MPD HTTP server up on {url}")
|
|
return url, server
|
|
|
|
|
|
_MIME_CODEC_RE = re.compile(r'codecs="([^"]+)"')
|
|
|
|
|
|
def _codec_from_mime(stream: dict[str, Any]) -> str:
|
|
"""Extract the full DASH codec string (avc1.4d4015 etc) from the mime field.
|
|
|
|
rustypipe gives `codec: "avc1"` (short form) but DASH MPDs need the full
|
|
profile/level identifier from `mime: 'video/mp4; codecs="avc1.4d4015"'`.
|
|
"""
|
|
mime = stream.get("mime", "")
|
|
m = _MIME_CODEC_RE.search(mime)
|
|
return m.group(1) if m else (stream.get("codec") or "")
|
|
|
|
|
|
def _build_dash_mpd(
|
|
details: dict[str, Any],
|
|
video_streams: list[dict[str, Any]],
|
|
audio_streams: list[dict[str, Any]],
|
|
) -> str | None:
|
|
"""Build a static MPEG-DASH on-demand manifest from rustypipe's stream data.
|
|
|
|
Picks H.264 (avc1) video streams up to 1080p — guaranteed to play on the
|
|
RPi 4's hardware H.264 decoder. Picks the best AAC (mp4a) audio stream.
|
|
Returns the MPD XML, or None if no compatible streams were found.
|
|
"""
|
|
duration_s = float(details.get("duration") or 0)
|
|
if duration_s <= 0:
|
|
return None
|
|
|
|
# Filter video: H.264 only (avc1.*), height <= 1080 (RPi 4 H.264 ceiling).
|
|
h264 = [
|
|
s
|
|
for s in video_streams
|
|
if "avc1" in (s.get("codec") or s.get("mime", ""))
|
|
and (s.get("height") or 0) <= 1080
|
|
and s.get("init_range")
|
|
and s.get("index_range")
|
|
and s.get("url")
|
|
]
|
|
if not h264:
|
|
return None
|
|
|
|
# Filter audio: AAC preferred, Opus fallback. Pick highest-bitrate of preferred codec.
|
|
aac = [s for s in audio_streams if "mp4a" in (s.get("codec") or s.get("mime", ""))]
|
|
opus = [s for s in audio_streams if "opus" in (s.get("codec") or s.get("mime", ""))]
|
|
audio_pool = aac or opus
|
|
if not audio_pool:
|
|
return None
|
|
best_audio = max(audio_pool, key=lambda s: s.get("bitrate") or 0)
|
|
if not (best_audio.get("init_range") and best_audio.get("index_range") and best_audio.get("url")):
|
|
return None
|
|
|
|
# Sort video high → low quality so inputstream.adaptive's default-best picks the top.
|
|
h264.sort(key=lambda s: (s.get("height") or 0, s.get("bitrate") or 0), reverse=True)
|
|
|
|
parts = [
|
|
'<?xml version="1.0" encoding="UTF-8"?>',
|
|
'<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="static"',
|
|
f' mediaPresentationDuration="PT{duration_s:.3f}S" minBufferTime="PT1.5S"',
|
|
' profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">',
|
|
" <Period>",
|
|
' <AdaptationSet mimeType="video/mp4" startWithSAP="1" segmentAlignment="true">',
|
|
]
|
|
for v in h264:
|
|
codec = _codec_from_mime(v) or "avc1.4d401f"
|
|
ir = v["index_range"]
|
|
init = v["init_range"]
|
|
parts.append(
|
|
f' <Representation id="{v["itag"]}" mimeType="video/mp4" codecs="{codec}"'
|
|
f' width="{v.get("width", 0)}" height="{v.get("height", 0)}"'
|
|
f' bandwidth="{v.get("bitrate", 0)}" frameRate="{int(v.get("fps") or 30)}"'
|
|
f' startWithSAP="1">'
|
|
)
|
|
parts.append(f" <BaseURL>{xml_escape(v['url'])}</BaseURL>")
|
|
parts.append(
|
|
f' <SegmentBase indexRange="{ir["start"]}-{ir["end"]}" indexRangeExact="true">'
|
|
)
|
|
parts.append(f' <Initialization range="{init["start"]}-{init["end"]}"/>')
|
|
parts.append(" </SegmentBase>")
|
|
parts.append(" </Representation>")
|
|
parts.append(" </AdaptationSet>")
|
|
|
|
# Audio adaptation set.
|
|
a = best_audio
|
|
a_codec = _codec_from_mime(a) or ("mp4a.40.2" if "mp4a" in (a.get("codec") or "") else "opus")
|
|
a_mime = "audio/mp4" if "mp4a" in a_codec else "audio/webm"
|
|
a_ir = a["index_range"]
|
|
a_init = a["init_range"]
|
|
parts.append(
|
|
f' <AdaptationSet mimeType="{a_mime}" startWithSAP="1" segmentAlignment="true">'
|
|
)
|
|
parts.append(
|
|
f' <Representation id="{a["itag"]}" mimeType="{a_mime}" codecs="{a_codec}"'
|
|
f' bandwidth="{a.get("bitrate", 0)}" audioSamplingRate="44100" startWithSAP="1">'
|
|
)
|
|
parts.append(
|
|
' <AudioChannelConfiguration'
|
|
' schemeIdUri="urn:mpeg:dash:23003:3:audio_channel_configuration:2011"'
|
|
f' value="{a.get("channels", 2)}"/>'
|
|
)
|
|
parts.append(f" <BaseURL>{xml_escape(a['url'])}</BaseURL>")
|
|
parts.append(
|
|
f' <SegmentBase indexRange="{a_ir["start"]}-{a_ir["end"]}" indexRangeExact="true">'
|
|
)
|
|
parts.append(f' <Initialization range="{a_init["start"]}-{a_init["end"]}"/>')
|
|
parts.append(" </SegmentBase>")
|
|
parts.append(" </Representation>")
|
|
parts.append(" </AdaptationSet>")
|
|
parts.append(" </Period>")
|
|
parts.append("</MPD>")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _try_dash(yt_id: str) -> tuple[bytes | None, dict[str, Any]]:
|
|
"""Resolve via rustypipe + build DASH MPD. Returns (mpd_bytes, resp).
|
|
|
|
On any failure returns (None, resp) so the caller can fall back to
|
|
the progressive path. We serve the MPD over localhost HTTP because
|
|
inputstream.adaptive's libcurl can't open file:// URLs.
|
|
"""
|
|
try:
|
|
resp = _call_sidecar({"op": "resolve_dash", "id": yt_id}, timeout_s=30)
|
|
except Exception as e:
|
|
_log(f"resolve_dash failed (will fall back): {e}", xbmc.LOGWARNING)
|
|
return None, {}
|
|
if not resp.get("ok"):
|
|
return None, resp
|
|
|
|
mpd = _build_dash_mpd(
|
|
resp.get("details") or {},
|
|
resp.get("video_only_streams") or [],
|
|
resp.get("audio_streams") or [],
|
|
)
|
|
if not mpd:
|
|
_log("DASH build returned no compatible streams; falling back to progressive")
|
|
return None, resp
|
|
return mpd.encode("utf-8"), resp
|
|
|
|
|
|
def _play(yt_id: str) -> None:
|
|
"""Resolve via DASH (rustypipe, up to 1080p H.264) with progressive
|
|
yt-dlp fallback (360p).
|
|
|
|
DASH path is gated behind TORTTUBE_DASH=1 while the manifest-serving
|
|
HTTP server + inputstream.adaptive integration is being stabilized
|
|
(file:// URLs don't work, and rapid retries via a port-0 HTTP server
|
|
can trigger Kodi's 'two concurrent busydialogs' fatal). Default OFF
|
|
until that's solid — progressive yt-dlp path is reliable.
|
|
"""
|
|
_log(f"play id={yt_id}")
|
|
|
|
mpd_bytes: bytes | None = None
|
|
dash_resp: dict[str, Any] = {}
|
|
dash_enabled = False
|
|
try:
|
|
dash_enabled = ADDON.getSettingBool("dash_enabled")
|
|
except Exception:
|
|
# Setting might not exist on older configs; treat as off.
|
|
pass
|
|
if dash_enabled or os.environ.get("TORTTUBE_DASH") == "1":
|
|
mpd_bytes, dash_resp = _try_dash(yt_id)
|
|
if mpd_bytes:
|
|
details = dash_resp.get("details") or {}
|
|
title = details.get("name")
|
|
mpd_url, server = _start_mpd_server(mpd_bytes)
|
|
_log(f"resolved via rustypipe DASH, serving manifest at {mpd_url}")
|
|
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(mpd_url, title))
|
|
try:
|
|
_attach_sponsorblock(yt_id)
|
|
finally:
|
|
# Shut down the MPD server cleanly once playback ends or aborts.
|
|
# _attach_sponsorblock blocks while playback is active.
|
|
try:
|
|
server.shutdown()
|
|
server.server_close()
|
|
_log("MPD HTTP server shut down")
|
|
except Exception as e:
|
|
_log(f"MPD server shutdown error: {e}", xbmc.LOGWARNING)
|
|
return
|
|
|
|
# Fallback: progressive single-URL via yt-dlp (360p).
|
|
try:
|
|
resp = _call_sidecar({"op": "resolve_play", "id": yt_id}, timeout_s=45)
|
|
except Exception as e:
|
|
_log(f"sidecar resolve_play failed: {e}", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"resolve failed: {e}", xbmcgui.NOTIFICATION_ERROR, 5000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
|
|
if not resp.get("ok"):
|
|
kind = resp.get("kind", "unknown")
|
|
err = resp.get("error", "unknown")
|
|
_log(f"sidecar returned not-ok: {kind}: {err}", xbmc.LOGWARNING)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"{kind}: {err}", xbmcgui.NOTIFICATION_WARNING, 5000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
|
|
stream_url = resp.get("stream_url")
|
|
title = resp.get("title")
|
|
if not stream_url:
|
|
_log("no usable stream URL in sidecar response", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", "no stream URL", xbmcgui.NOTIFICATION_ERROR, 5000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
|
|
_log(f"resolved via yt-dlp progressive fallback, playing")
|
|
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(stream_url, title))
|
|
_attach_sponsorblock(yt_id)
|
|
|
|
|
|
def _attach_sponsorblock(yt_id: str) -> None:
|
|
"""Fetch SponsorBlock segments and block on the monitor loop. Always blocks
|
|
until playback ends (or 30s if playback never starts) so the caller can
|
|
use this as a 'wait for playback to finish' signal — needed to keep the
|
|
MPD HTTP server alive throughout playback.
|
|
|
|
Non-fatal on segment fetch error.
|
|
"""
|
|
skip_segments: list[dict[str, Any]] = []
|
|
try:
|
|
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
|
|
if sb_resp.get("ok"):
|
|
segs = sb_resp.get("segments") or []
|
|
skip_segments = [s for s in segs if s.get("actionType") == "skip"]
|
|
_log(f"sponsorblock: {len(skip_segments)} skip segments")
|
|
except Exception as e:
|
|
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
|
|
|
|
# Always run the watcher even with zero segments — it doubles as the
|
|
# 'block until playback ends' signal that gates MPD-server shutdown.
|
|
SponsorBlockMonitor(skip_segments).run()
|
|
|
|
# SponsorBlock: fetch segments, then block on a monitor that seeks past
|
|
# each skip segment as the playhead enters it. Best-effort — failure to
|
|
# fetch segments is non-fatal, playback proceeds without skipping.
|
|
try:
|
|
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
|
|
except Exception as e:
|
|
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
|
|
return
|
|
|
|
if not sb_resp.get("ok"):
|
|
return
|
|
segments = sb_resp.get("segments") or []
|
|
skip_segments = [s for s in segments if s.get("actionType") == "skip"]
|
|
if not skip_segments:
|
|
_log("sponsorblock: no skip segments for this video")
|
|
return
|
|
|
|
_log(f"sponsorblock: monitoring {len(skip_segments)} skip segments")
|
|
SponsorBlockMonitor(skip_segments).run()
|
|
|
|
|
|
class SponsorBlockMonitor(xbmc.Monitor):
|
|
"""Polls Kodi's player position and seeks past each SponsorBlock skip
|
|
segment exactly once. Exits on playback stop or Kodi shutdown.
|
|
|
|
Each segment has shape {"segment": [start_s, end_s], "category": str,
|
|
"UUID": str, "actionType": "skip"}. We sort by start time so we can
|
|
short-circuit the scan; we also dedupe via the UUID set so the same
|
|
segment doesn't trigger twice if the user manually rewinds into it.
|
|
"""
|
|
|
|
POLL_S = 0.5
|
|
MAX_WAIT_FOR_PLAYBACK_S = 30.0
|
|
|
|
def __init__(self, segments: list[dict[str, Any]]):
|
|
super().__init__()
|
|
self.segments = sorted(
|
|
segments, key=lambda s: float(s.get("segment", [0, 0])[0])
|
|
)
|
|
self.skipped: set[str] = set()
|
|
|
|
def run(self) -> None:
|
|
player = xbmc.Player()
|
|
# Wait for playback to actually begin — setResolvedUrl is async,
|
|
# Kodi takes a beat to demux + start the streams.
|
|
waited = 0.0
|
|
while waited < self.MAX_WAIT_FOR_PLAYBACK_S:
|
|
if self.abortRequested():
|
|
return
|
|
if player.isPlaying():
|
|
break
|
|
if self.waitForAbort(0.25):
|
|
return
|
|
waited += 0.25
|
|
else:
|
|
_log("sponsorblock: timed out waiting for playback to start")
|
|
return
|
|
|
|
while not self.abortRequested() and player.isPlaying():
|
|
try:
|
|
pos = float(player.getTime())
|
|
except (RuntimeError, OSError):
|
|
# getTime raises if the player went away between checks.
|
|
return
|
|
for seg in self.segments:
|
|
uuid = seg.get("UUID", "")
|
|
if uuid in self.skipped:
|
|
continue
|
|
start, end = float(seg["segment"][0]), float(seg["segment"][1])
|
|
if pos < start:
|
|
# Segments are sorted; nothing past here can match either.
|
|
break
|
|
if pos < end:
|
|
duration = end - start
|
|
category = seg.get("category", "?")
|
|
_log(
|
|
f"sponsorblock skip: {category} {start:.1f}-{end:.1f} "
|
|
f"({duration:.0f}s)"
|
|
)
|
|
player.seekTime(end)
|
|
self.skipped.add(uuid)
|
|
xbmcgui.Dialog().notification(
|
|
"SponsorBlock",
|
|
f"Skipped {category} ({duration:.0f}s)",
|
|
xbmcgui.NOTIFICATION_INFO,
|
|
2500,
|
|
)
|
|
break
|
|
if self.waitForAbort(self.POLL_S):
|
|
return
|
|
|
|
|
|
def _plugin_url(**kwargs: Any) -> str:
|
|
"""Build a `plugin://plugin.video.torttube/?...` URL for nav/play targets."""
|
|
return f"plugin://plugin.video.torttube/?{urlencode(kwargs)}"
|
|
|
|
|
|
def _format_duration(seconds: int | float | None) -> str:
|
|
"""Format seconds as `H:MM:SS` or `M:SS`."""
|
|
if not seconds:
|
|
return ""
|
|
s = int(seconds)
|
|
h, rem = divmod(s, 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h:
|
|
return f"{h}:{m:02d}:{s:02d}"
|
|
return f"{m}:{s:02d}"
|
|
|
|
|
|
def _format_views(views: int | None) -> str:
|
|
"""Format view count compactly: 1234 → '1.2K', 1234567 → '1.2M'."""
|
|
if not views:
|
|
return ""
|
|
if views >= 1_000_000_000:
|
|
return f"{views / 1_000_000_000:.1f}B"
|
|
if views >= 1_000_000:
|
|
return f"{views / 1_000_000:.1f}M"
|
|
if views >= 1_000:
|
|
return f"{views / 1_000:.1f}K"
|
|
return str(views)
|
|
|
|
|
|
def _pick_thumbnail(thumbs: list[dict[str, Any]] | None) -> str:
|
|
"""Pick the largest thumbnail from rustypipe's thumbnail list."""
|
|
if not thumbs:
|
|
return ""
|
|
return max(thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0)).get(
|
|
"url", ""
|
|
)
|
|
|
|
|
|
def _root_directory() -> None:
|
|
"""Top-level addon menu. Search lands first because that's how most TV-side
|
|
YouTube use actually works. Remote-control via JSON-RPC still works."""
|
|
items = [
|
|
("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")),
|
|
("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")),
|
|
]
|
|
for label, icon, url in items:
|
|
li = xbmcgui.ListItem(label=label)
|
|
li.setArt({"icon": icon})
|
|
xbmcplugin.addDirectoryItem(_HANDLE, url, li, isFolder=True)
|
|
xbmcplugin.endOfDirectory(_HANDLE)
|
|
|
|
|
|
def _add_video_items(items: list[dict[str, Any]]) -> None:
|
|
"""Add VideoItem dicts to the current plugin directory, formatted for Kodi.
|
|
|
|
Each item gets a play-action plugin URL, channel + duration + view-count
|
|
metadata in the label, thumbnail art, video InfoLabels for skin support,
|
|
and a 'Go to channel' context menu entry when the channel id is known.
|
|
"""
|
|
xbmcplugin.setContent(_HANDLE, "videos")
|
|
for item in items:
|
|
yt_id = item.get("id") or ""
|
|
if not yt_id:
|
|
continue
|
|
name = item.get("name") or "(no title)"
|
|
channel = item.get("channel") or {}
|
|
if isinstance(channel, dict):
|
|
channel_name = channel.get("name") or ""
|
|
channel_id = channel.get("id") or ""
|
|
else:
|
|
channel_name = ""
|
|
channel_id = ""
|
|
duration = item.get("duration")
|
|
views = item.get("view_count")
|
|
|
|
# Label: "Title · Channel · Duration · ViewCount"
|
|
meta_bits = [b for b in (channel_name, _format_duration(duration), _format_views(views)) if b]
|
|
label = f"{name} · {' · '.join(meta_bits)}" if meta_bits else name
|
|
|
|
li = xbmcgui.ListItem(label=label)
|
|
li.setProperty("IsPlayable", "true")
|
|
thumb_url = _pick_thumbnail(item.get("thumbnail"))
|
|
if thumb_url:
|
|
li.setArt({"thumb": thumb_url, "poster": thumb_url, "fanart": thumb_url})
|
|
|
|
info: dict[str, Any] = {"title": name, "mediatype": "video"}
|
|
if duration:
|
|
info["duration"] = int(duration)
|
|
if channel_name:
|
|
info["studio"] = channel_name
|
|
if item.get("description"):
|
|
info["plot"] = item["description"]
|
|
try:
|
|
li.setInfo("video", info)
|
|
except Exception:
|
|
pass
|
|
|
|
# Context menu: jump to channel listing if we have the id.
|
|
if channel_id:
|
|
li.addContextMenuItems(
|
|
[
|
|
(
|
|
f"Go to {channel_name or 'channel'}",
|
|
f"Container.Update({_plugin_url(action='channel', id=channel_id)})",
|
|
)
|
|
]
|
|
)
|
|
|
|
xbmcplugin.addDirectoryItem(
|
|
_HANDLE, _plugin_url(action="play", id=yt_id), li, isFolder=False
|
|
)
|
|
|
|
|
|
def _search_directory(query: str | None = None) -> None:
|
|
"""Hit sidecar `search`, list results as playable items.
|
|
|
|
`query` arg: if provided (via `?action=search&q=...`), skip the keyboard
|
|
prompt — used by JSON-RPC clients (phones, share-to-TV) and tests.
|
|
"""
|
|
if not query:
|
|
query = xbmcgui.Dialog().input("Search YouTube")
|
|
if not query:
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
try:
|
|
resp = _call_sidecar({"op": "search", "query": query, "limit": 30}, timeout_s=15)
|
|
except Exception as e:
|
|
_log(f"search failed: {e}", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"search failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
if not resp.get("ok"):
|
|
_log(f"search not-ok: {resp.get('error')}", xbmc.LOGWARNING)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube",
|
|
f"search: {resp.get('error', 'unknown')}",
|
|
xbmcgui.NOTIFICATION_WARNING,
|
|
4000,
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
items = resp.get("items") or []
|
|
_log(f"search '{query}' → {len(items)} items")
|
|
_add_video_items(items)
|
|
xbmcplugin.endOfDirectory(_HANDLE)
|
|
|
|
|
|
def _playlist_directory(playlist_id: str) -> None:
|
|
"""List a playlist's videos."""
|
|
try:
|
|
resp = _call_sidecar(
|
|
{"op": "playlist", "id": playlist_id, "limit": 100}, timeout_s=15
|
|
)
|
|
except Exception as e:
|
|
_log(f"playlist failed: {e}", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"playlist failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
if not resp.get("ok"):
|
|
xbmcgui.Dialog().notification(
|
|
"torttube",
|
|
f"playlist: {resp.get('error', 'unknown')}",
|
|
xbmcgui.NOTIFICATION_WARNING,
|
|
4000,
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
items = resp.get("items") or []
|
|
pl = resp.get("playlist") or {}
|
|
_log(f"playlist {pl.get('name') or playlist_id}: {len(items)} items")
|
|
_add_video_items(items)
|
|
xbmcplugin.endOfDirectory(_HANDLE)
|
|
|
|
|
|
def _channel_directory(channel_id: str) -> None:
|
|
"""List a channel's recent videos."""
|
|
try:
|
|
resp = _call_sidecar(
|
|
{"op": "channel_videos", "id": channel_id, "limit": 50}, timeout_s=15
|
|
)
|
|
except Exception as e:
|
|
_log(f"channel_videos failed: {e}", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"channel failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
if not resp.get("ok"):
|
|
xbmcgui.Dialog().notification(
|
|
"torttube",
|
|
f"channel: {resp.get('error', 'unknown')}",
|
|
xbmcgui.NOTIFICATION_WARNING,
|
|
4000,
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
items = resp.get("items") or []
|
|
ch = resp.get("channel") or {}
|
|
_log(f"channel {ch.get('name') or channel_id}: {len(items)} items")
|
|
_add_video_items(items)
|
|
xbmcplugin.endOfDirectory(_HANDLE)
|
|
|
|
|
|
def _play_by_url_prompt() -> None:
|
|
"""Manual URL/ID entry for tap-to-play from the Kodi UI."""
|
|
s = xbmcgui.Dialog().input("Paste YouTube URL or ID")
|
|
if not s:
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
try:
|
|
yt_id = _extract_id(s)
|
|
except ValueError as e:
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
# Open via Player.Open-equivalent: run the plugin URL through Kodi
|
|
# rather than calling _play() in directory context.
|
|
xbmc.executebuiltin(f"PlayMedia({_plugin_url(action='play', id=yt_id)})")
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=True)
|
|
|
|
|
|
def main() -> None:
|
|
params = dict(parse_qsl(_QS.lstrip("?")))
|
|
action = params.get("action")
|
|
|
|
if action == "play":
|
|
try:
|
|
yt_id = _extract_id(params.get("url") or params.get("id") or "")
|
|
except ValueError as e:
|
|
_log(str(e), xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
_play(yt_id)
|
|
elif action == "search":
|
|
_search_directory(query=params.get("q"))
|
|
elif action == "channel":
|
|
_channel_directory(params.get("id") or "")
|
|
elif action == "playlist":
|
|
_playlist_directory(params.get("id") or "")
|
|
elif action == "play_by_url":
|
|
_play_by_url_prompt()
|
|
else:
|
|
_root_directory()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|