Sidecar gains the 'search' op via rustypipe's query().search::<VideoItem,_>() — returns id, title, channel, duration, thumbnails, view_count. Default limit 25. Addon root directory is no longer a placeholder notification: - 'Search' entry → ?action=search → keyboard input → result list → tap a result to play (each result is a play-action plugin URL). - 'Play by URL' entry → ?action=play_by_url → keyboard input → PlayMedia. - ?action=search also accepts inline 'q=…' so JSON-RPC clients can drive search without going through the on-TV keyboard (useful for share-to-TV from phone + tests). - Result labels formatted as 'Title · Channel · Duration · Views', with thumbnail + Kodi InfoLabels for richer skin views. Verified via Files.GetDirectory JSON-RPC: 19 well-formatted LTT results returned for query 'linus tech tips'. Pending M4: channel browse, playlist browse, pagination, search history. Addon version 0.0.6.
661 lines
25 KiB
Python
661 lines
25 KiB
Python
# torttube — Kodi YouTube addon
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
"""
|
|
Entry point. Plugin URL shapes handled:
|
|
|
|
plugin://plugin.video.torttube/ (root: M3 will add browse UI)
|
|
plugin://plugin.video.torttube/?action=play&id=ID (resolve + play a YouTube ID)
|
|
plugin://plugin.video.torttube/?action=play&url=URL (extract ID from URL, then play)
|
|
|
|
Remote-control / share-to-TV pattern (Kodi JSON-RPC):
|
|
|
|
POST http://<rpi>:8080/jsonrpc
|
|
Basic auth (kodi user)
|
|
{"jsonrpc":"2.0","method":"Player.Open","params":{
|
|
"item":{"file":"plugin://plugin.video.torttube/?action=play&id=<id>"}}}
|
|
|
|
That's how Android / phone / "send to TV" flows hand off — Kodi already
|
|
exposes the endpoint, we just need to register the plugin URL.
|
|
"""
|
|
|
|
import http.server
|
|
import json
|
|
import os
|
|
import re
|
|
import socketserver
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
from typing import Any
|
|
from urllib.parse import parse_qsl, urlencode, urlparse
|
|
from xml.sax.saxutils import escape as xml_escape
|
|
|
|
import xbmc
|
|
import xbmcaddon
|
|
import xbmcgui
|
|
import xbmcplugin
|
|
|
|
ADDON = xbmcaddon.Addon()
|
|
ADDON_PATH = ADDON.getAddonInfo("path")
|
|
SIDECAR_BIN = os.path.join(ADDON_PATH, "bin", "torttube-sidecar")
|
|
|
|
_HANDLE = int(sys.argv[1]) if len(sys.argv) > 1 else -1
|
|
_QS = sys.argv[2] if len(sys.argv) > 2 else ""
|
|
|
|
|
|
def _log(msg, level=xbmc.LOGINFO):
|
|
xbmc.log(f"[torttube] {msg}", level)
|
|
|
|
|
|
def _extract_id(url_or_id: str) -> str:
|
|
"""Accept either a bare ID or any common YouTube URL form, return the ID."""
|
|
s = url_or_id.strip()
|
|
# Bare 11-char ID (YouTube's canonical length).
|
|
if re.fullmatch(r"[A-Za-z0-9_-]{11}", s):
|
|
return s
|
|
parsed = urlparse(s)
|
|
# https://youtu.be/<id>
|
|
if parsed.netloc.endswith("youtu.be"):
|
|
return parsed.path.lstrip("/").split("/")[0]
|
|
# https://www.youtube.com/watch?v=<id>
|
|
if "youtube.com" in parsed.netloc:
|
|
for k, v in parse_qsl(parsed.query):
|
|
if k == "v":
|
|
return v
|
|
# /shorts/<id>, /embed/<id>, /live/<id>
|
|
m = re.match(r"^/(shorts|embed|live)/([A-Za-z0-9_-]{11})", parsed.path)
|
|
if m:
|
|
return m.group(2)
|
|
raise ValueError(f"could not extract YouTube id from {url_or_id!r}")
|
|
|
|
|
|
def _call_sidecar(request: dict, timeout_s: int = 30) -> dict:
|
|
"""Invoke the sidecar with one JSON request, parse one JSON response.
|
|
|
|
Puts the addon's bin/ on PATH so the sidecar's `yt-dlp` shell-outs find
|
|
the bundled zipapp (LibreELEC has no system yt-dlp).
|
|
"""
|
|
if not os.path.isfile(SIDECAR_BIN):
|
|
raise RuntimeError(f"sidecar binary missing at {SIDECAR_BIN}")
|
|
if not os.access(SIDECAR_BIN, os.X_OK):
|
|
raise RuntimeError(f"sidecar binary not executable at {SIDECAR_BIN}")
|
|
|
|
env = os.environ.copy()
|
|
addon_bin = os.path.join(ADDON_PATH, "bin")
|
|
env["PATH"] = addon_bin + os.pathsep + env.get("PATH", "")
|
|
|
|
proc = subprocess.run(
|
|
[SIDECAR_BIN],
|
|
input=(json.dumps(request) + "\n").encode("utf-8"),
|
|
capture_output=True,
|
|
timeout=timeout_s,
|
|
env=env,
|
|
)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(
|
|
f"sidecar exited {proc.returncode}: {proc.stderr.decode('utf-8', 'replace')[:500]}"
|
|
)
|
|
# Stdout may have multiple lines if the sidecar logged something; take the first
|
|
# non-empty line as the response.
|
|
for line in proc.stdout.splitlines():
|
|
line = line.strip()
|
|
if line:
|
|
return json.loads(line.decode("utf-8") if isinstance(line, bytes) else line)
|
|
raise RuntimeError("sidecar produced no response")
|
|
|
|
|
|
def _resolved_listitem(stream_url: str, title: str | None) -> xbmcgui.ListItem:
|
|
li = xbmcgui.ListItem(label=title or "torttube")
|
|
li.setPath(stream_url)
|
|
li.setProperty("IsPlayable", "true")
|
|
# Tell inputstream.adaptive to handle DASH/HLS based on URL path/suffix.
|
|
if ".mpd" in stream_url:
|
|
li.setProperty("inputstream", "inputstream.adaptive")
|
|
li.setProperty("inputstream.adaptive.manifest_type", "mpd")
|
|
elif ".m3u8" in stream_url:
|
|
li.setProperty("inputstream", "inputstream.adaptive")
|
|
li.setProperty("inputstream.adaptive.manifest_type", "hls")
|
|
return li
|
|
|
|
|
|
class _MpdHandler(http.server.BaseHTTPRequestHandler):
|
|
"""Serves the per-video MPD file. The bytes come from the closure-captured
|
|
`_MPD_BYTES` so the server can outlive the temp file if the addon decides
|
|
to clean up early. One handler per HTTPServer instance."""
|
|
|
|
mpd_bytes: bytes = b""
|
|
|
|
def do_GET(self) -> None: # noqa: N802 — http.server convention
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/dash+xml")
|
|
self.send_header("Content-Length", str(len(self.mpd_bytes)))
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
self.wfile.write(self.mpd_bytes)
|
|
|
|
def log_message(self, *args: Any, **kwargs: Any) -> None:
|
|
# Silence the default request log — Kodi's log is verbose enough.
|
|
return
|
|
|
|
|
|
def _start_mpd_server(mpd_bytes: bytes) -> tuple[str, http.server.HTTPServer]:
|
|
"""Spin up a one-shot localhost HTTP server that serves `mpd_bytes` at any
|
|
path. Returns (url, server). Caller is responsible for `server.shutdown()`
|
|
once playback ends."""
|
|
|
|
# Per-server handler subclass so the closure captures the bytes for THIS
|
|
# video without leaking state to a concurrent invocation.
|
|
handler_cls = type(
|
|
"_MpdHandlerInstance",
|
|
(_MpdHandler,),
|
|
{"mpd_bytes": mpd_bytes},
|
|
)
|
|
# Port 0 → kernel picks free port. ThreadingHTTPServer so multiple GETs
|
|
# (the manifest + range requests, if any) don't serialize.
|
|
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler_cls)
|
|
threading.Thread(target=server.serve_forever, daemon=True).start()
|
|
port = server.server_address[1]
|
|
url = f"http://127.0.0.1:{port}/manifest.mpd"
|
|
_log(f"MPD HTTP server up on {url}")
|
|
return url, server
|
|
|
|
|
|
_MIME_CODEC_RE = re.compile(r'codecs="([^"]+)"')
|
|
|
|
|
|
def _codec_from_mime(stream: dict[str, Any]) -> str:
|
|
"""Extract the full DASH codec string (avc1.4d4015 etc) from the mime field.
|
|
|
|
rustypipe gives `codec: "avc1"` (short form) but DASH MPDs need the full
|
|
profile/level identifier from `mime: 'video/mp4; codecs="avc1.4d4015"'`.
|
|
"""
|
|
mime = stream.get("mime", "")
|
|
m = _MIME_CODEC_RE.search(mime)
|
|
return m.group(1) if m else (stream.get("codec") or "")
|
|
|
|
|
|
def _build_dash_mpd(
|
|
details: dict[str, Any],
|
|
video_streams: list[dict[str, Any]],
|
|
audio_streams: list[dict[str, Any]],
|
|
) -> str | None:
|
|
"""Build a static MPEG-DASH on-demand manifest from rustypipe's stream data.
|
|
|
|
Picks H.264 (avc1) video streams up to 1080p — guaranteed to play on the
|
|
RPi 4's hardware H.264 decoder. Picks the best AAC (mp4a) audio stream.
|
|
Returns the MPD XML, or None if no compatible streams were found.
|
|
"""
|
|
duration_s = float(details.get("duration") or 0)
|
|
if duration_s <= 0:
|
|
return None
|
|
|
|
# Filter video: H.264 only (avc1.*), height <= 1080 (RPi 4 H.264 ceiling).
|
|
h264 = [
|
|
s
|
|
for s in video_streams
|
|
if "avc1" in (s.get("codec") or s.get("mime", ""))
|
|
and (s.get("height") or 0) <= 1080
|
|
and s.get("init_range")
|
|
and s.get("index_range")
|
|
and s.get("url")
|
|
]
|
|
if not h264:
|
|
return None
|
|
|
|
# Filter audio: AAC preferred, Opus fallback. Pick highest-bitrate of preferred codec.
|
|
aac = [s for s in audio_streams if "mp4a" in (s.get("codec") or s.get("mime", ""))]
|
|
opus = [s for s in audio_streams if "opus" in (s.get("codec") or s.get("mime", ""))]
|
|
audio_pool = aac or opus
|
|
if not audio_pool:
|
|
return None
|
|
best_audio = max(audio_pool, key=lambda s: s.get("bitrate") or 0)
|
|
if not (best_audio.get("init_range") and best_audio.get("index_range") and best_audio.get("url")):
|
|
return None
|
|
|
|
# Sort video high → low quality so inputstream.adaptive's default-best picks the top.
|
|
h264.sort(key=lambda s: (s.get("height") or 0, s.get("bitrate") or 0), reverse=True)
|
|
|
|
parts = [
|
|
'<?xml version="1.0" encoding="UTF-8"?>',
|
|
'<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="static"',
|
|
f' mediaPresentationDuration="PT{duration_s:.3f}S" minBufferTime="PT1.5S"',
|
|
' profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">',
|
|
" <Period>",
|
|
' <AdaptationSet mimeType="video/mp4" startWithSAP="1" segmentAlignment="true">',
|
|
]
|
|
for v in h264:
|
|
codec = _codec_from_mime(v) or "avc1.4d401f"
|
|
ir = v["index_range"]
|
|
init = v["init_range"]
|
|
parts.append(
|
|
f' <Representation id="{v["itag"]}" mimeType="video/mp4" codecs="{codec}"'
|
|
f' width="{v.get("width", 0)}" height="{v.get("height", 0)}"'
|
|
f' bandwidth="{v.get("bitrate", 0)}" frameRate="{int(v.get("fps") or 30)}"'
|
|
f' startWithSAP="1">'
|
|
)
|
|
parts.append(f" <BaseURL>{xml_escape(v['url'])}</BaseURL>")
|
|
parts.append(
|
|
f' <SegmentBase indexRange="{ir["start"]}-{ir["end"]}" indexRangeExact="true">'
|
|
)
|
|
parts.append(f' <Initialization range="{init["start"]}-{init["end"]}"/>')
|
|
parts.append(" </SegmentBase>")
|
|
parts.append(" </Representation>")
|
|
parts.append(" </AdaptationSet>")
|
|
|
|
# Audio adaptation set.
|
|
a = best_audio
|
|
a_codec = _codec_from_mime(a) or ("mp4a.40.2" if "mp4a" in (a.get("codec") or "") else "opus")
|
|
a_mime = "audio/mp4" if "mp4a" in a_codec else "audio/webm"
|
|
a_ir = a["index_range"]
|
|
a_init = a["init_range"]
|
|
parts.append(
|
|
f' <AdaptationSet mimeType="{a_mime}" startWithSAP="1" segmentAlignment="true">'
|
|
)
|
|
parts.append(
|
|
f' <Representation id="{a["itag"]}" mimeType="{a_mime}" codecs="{a_codec}"'
|
|
f' bandwidth="{a.get("bitrate", 0)}" audioSamplingRate="44100" startWithSAP="1">'
|
|
)
|
|
parts.append(
|
|
' <AudioChannelConfiguration'
|
|
' schemeIdUri="urn:mpeg:dash:23003:3:audio_channel_configuration:2011"'
|
|
f' value="{a.get("channels", 2)}"/>'
|
|
)
|
|
parts.append(f" <BaseURL>{xml_escape(a['url'])}</BaseURL>")
|
|
parts.append(
|
|
f' <SegmentBase indexRange="{a_ir["start"]}-{a_ir["end"]}" indexRangeExact="true">'
|
|
)
|
|
parts.append(f' <Initialization range="{a_init["start"]}-{a_init["end"]}"/>')
|
|
parts.append(" </SegmentBase>")
|
|
parts.append(" </Representation>")
|
|
parts.append(" </AdaptationSet>")
|
|
parts.append(" </Period>")
|
|
parts.append("</MPD>")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _try_dash(yt_id: str) -> tuple[bytes | None, dict[str, Any]]:
|
|
"""Resolve via rustypipe + build DASH MPD. Returns (mpd_bytes, resp).
|
|
|
|
On any failure returns (None, resp) so the caller can fall back to
|
|
the progressive path. We serve the MPD over localhost HTTP because
|
|
inputstream.adaptive's libcurl can't open file:// URLs.
|
|
"""
|
|
try:
|
|
resp = _call_sidecar({"op": "resolve_dash", "id": yt_id}, timeout_s=30)
|
|
except Exception as e:
|
|
_log(f"resolve_dash failed (will fall back): {e}", xbmc.LOGWARNING)
|
|
return None, {}
|
|
if not resp.get("ok"):
|
|
return None, resp
|
|
|
|
mpd = _build_dash_mpd(
|
|
resp.get("details") or {},
|
|
resp.get("video_only_streams") or [],
|
|
resp.get("audio_streams") or [],
|
|
)
|
|
if not mpd:
|
|
_log("DASH build returned no compatible streams; falling back to progressive")
|
|
return None, resp
|
|
return mpd.encode("utf-8"), resp
|
|
|
|
|
|
def _play(yt_id: str) -> None:
|
|
"""Resolve via DASH (rustypipe, up to 1080p H.264) with progressive
|
|
yt-dlp fallback (360p).
|
|
|
|
DASH path is gated behind TORTTUBE_DASH=1 while the manifest-serving
|
|
HTTP server + inputstream.adaptive integration is being stabilized
|
|
(file:// URLs don't work, and rapid retries via a port-0 HTTP server
|
|
can trigger Kodi's 'two concurrent busydialogs' fatal). Default OFF
|
|
until that's solid — progressive yt-dlp path is reliable.
|
|
"""
|
|
_log(f"play id={yt_id}")
|
|
|
|
mpd_bytes: bytes | None = None
|
|
dash_resp: dict[str, Any] = {}
|
|
if os.environ.get("TORTTUBE_DASH") == "1":
|
|
mpd_bytes, dash_resp = _try_dash(yt_id)
|
|
if mpd_bytes:
|
|
details = dash_resp.get("details") or {}
|
|
title = details.get("name")
|
|
mpd_url, server = _start_mpd_server(mpd_bytes)
|
|
_log(f"resolved via rustypipe DASH, serving manifest at {mpd_url}")
|
|
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(mpd_url, title))
|
|
try:
|
|
_attach_sponsorblock(yt_id)
|
|
finally:
|
|
# Shut down the MPD server cleanly once playback ends or aborts.
|
|
# _attach_sponsorblock blocks while playback is active.
|
|
try:
|
|
server.shutdown()
|
|
server.server_close()
|
|
_log("MPD HTTP server shut down")
|
|
except Exception as e:
|
|
_log(f"MPD server shutdown error: {e}", xbmc.LOGWARNING)
|
|
return
|
|
|
|
# Fallback: progressive single-URL via yt-dlp (360p).
|
|
try:
|
|
resp = _call_sidecar({"op": "resolve_play", "id": yt_id}, timeout_s=45)
|
|
except Exception as e:
|
|
_log(f"sidecar resolve_play failed: {e}", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"resolve failed: {e}", xbmcgui.NOTIFICATION_ERROR, 5000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
|
|
if not resp.get("ok"):
|
|
kind = resp.get("kind", "unknown")
|
|
err = resp.get("error", "unknown")
|
|
_log(f"sidecar returned not-ok: {kind}: {err}", xbmc.LOGWARNING)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"{kind}: {err}", xbmcgui.NOTIFICATION_WARNING, 5000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
|
|
stream_url = resp.get("stream_url")
|
|
title = resp.get("title")
|
|
if not stream_url:
|
|
_log("no usable stream URL in sidecar response", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", "no stream URL", xbmcgui.NOTIFICATION_ERROR, 5000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
|
|
_log(f"resolved via yt-dlp progressive fallback, playing")
|
|
xbmcplugin.setResolvedUrl(_HANDLE, True, _resolved_listitem(stream_url, title))
|
|
_attach_sponsorblock(yt_id)
|
|
|
|
|
|
def _attach_sponsorblock(yt_id: str) -> None:
|
|
"""Fetch SponsorBlock segments and block on the monitor loop. Always blocks
|
|
until playback ends (or 30s if playback never starts) so the caller can
|
|
use this as a 'wait for playback to finish' signal — needed to keep the
|
|
MPD HTTP server alive throughout playback.
|
|
|
|
Non-fatal on segment fetch error.
|
|
"""
|
|
skip_segments: list[dict[str, Any]] = []
|
|
try:
|
|
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
|
|
if sb_resp.get("ok"):
|
|
segs = sb_resp.get("segments") or []
|
|
skip_segments = [s for s in segs if s.get("actionType") == "skip"]
|
|
_log(f"sponsorblock: {len(skip_segments)} skip segments")
|
|
except Exception as e:
|
|
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
|
|
|
|
# Always run the watcher even with zero segments — it doubles as the
|
|
# 'block until playback ends' signal that gates MPD-server shutdown.
|
|
SponsorBlockMonitor(skip_segments).run()
|
|
|
|
# SponsorBlock: fetch segments, then block on a monitor that seeks past
|
|
# each skip segment as the playhead enters it. Best-effort — failure to
|
|
# fetch segments is non-fatal, playback proceeds without skipping.
|
|
try:
|
|
sb_resp = _call_sidecar({"op": "sponsorblock", "id": yt_id}, timeout_s=8)
|
|
except Exception as e:
|
|
_log(f"sponsorblock fetch failed (non-fatal): {e}", xbmc.LOGWARNING)
|
|
return
|
|
|
|
if not sb_resp.get("ok"):
|
|
return
|
|
segments = sb_resp.get("segments") or []
|
|
skip_segments = [s for s in segments if s.get("actionType") == "skip"]
|
|
if not skip_segments:
|
|
_log("sponsorblock: no skip segments for this video")
|
|
return
|
|
|
|
_log(f"sponsorblock: monitoring {len(skip_segments)} skip segments")
|
|
SponsorBlockMonitor(skip_segments).run()
|
|
|
|
|
|
class SponsorBlockMonitor(xbmc.Monitor):
|
|
"""Polls Kodi's player position and seeks past each SponsorBlock skip
|
|
segment exactly once. Exits on playback stop or Kodi shutdown.
|
|
|
|
Each segment has shape {"segment": [start_s, end_s], "category": str,
|
|
"UUID": str, "actionType": "skip"}. We sort by start time so we can
|
|
short-circuit the scan; we also dedupe via the UUID set so the same
|
|
segment doesn't trigger twice if the user manually rewinds into it.
|
|
"""
|
|
|
|
POLL_S = 0.5
|
|
MAX_WAIT_FOR_PLAYBACK_S = 30.0
|
|
|
|
def __init__(self, segments: list[dict[str, Any]]):
|
|
super().__init__()
|
|
self.segments = sorted(
|
|
segments, key=lambda s: float(s.get("segment", [0, 0])[0])
|
|
)
|
|
self.skipped: set[str] = set()
|
|
|
|
def run(self) -> None:
|
|
player = xbmc.Player()
|
|
# Wait for playback to actually begin — setResolvedUrl is async,
|
|
# Kodi takes a beat to demux + start the streams.
|
|
waited = 0.0
|
|
while waited < self.MAX_WAIT_FOR_PLAYBACK_S:
|
|
if self.abortRequested():
|
|
return
|
|
if player.isPlaying():
|
|
break
|
|
if self.waitForAbort(0.25):
|
|
return
|
|
waited += 0.25
|
|
else:
|
|
_log("sponsorblock: timed out waiting for playback to start")
|
|
return
|
|
|
|
while not self.abortRequested() and player.isPlaying():
|
|
try:
|
|
pos = float(player.getTime())
|
|
except (RuntimeError, OSError):
|
|
# getTime raises if the player went away between checks.
|
|
return
|
|
for seg in self.segments:
|
|
uuid = seg.get("UUID", "")
|
|
if uuid in self.skipped:
|
|
continue
|
|
start, end = float(seg["segment"][0]), float(seg["segment"][1])
|
|
if pos < start:
|
|
# Segments are sorted; nothing past here can match either.
|
|
break
|
|
if pos < end:
|
|
duration = end - start
|
|
category = seg.get("category", "?")
|
|
_log(
|
|
f"sponsorblock skip: {category} {start:.1f}-{end:.1f} "
|
|
f"({duration:.0f}s)"
|
|
)
|
|
player.seekTime(end)
|
|
self.skipped.add(uuid)
|
|
xbmcgui.Dialog().notification(
|
|
"SponsorBlock",
|
|
f"Skipped {category} ({duration:.0f}s)",
|
|
xbmcgui.NOTIFICATION_INFO,
|
|
2500,
|
|
)
|
|
break
|
|
if self.waitForAbort(self.POLL_S):
|
|
return
|
|
|
|
|
|
def _plugin_url(**kwargs: Any) -> str:
|
|
"""Build a `plugin://plugin.video.torttube/?...` URL for nav/play targets."""
|
|
return f"plugin://plugin.video.torttube/?{urlencode(kwargs)}"
|
|
|
|
|
|
def _format_duration(seconds: int | float | None) -> str:
|
|
"""Format seconds as `H:MM:SS` or `M:SS`."""
|
|
if not seconds:
|
|
return ""
|
|
s = int(seconds)
|
|
h, rem = divmod(s, 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h:
|
|
return f"{h}:{m:02d}:{s:02d}"
|
|
return f"{m}:{s:02d}"
|
|
|
|
|
|
def _format_views(views: int | None) -> str:
|
|
"""Format view count compactly: 1234 → '1.2K', 1234567 → '1.2M'."""
|
|
if not views:
|
|
return ""
|
|
if views >= 1_000_000_000:
|
|
return f"{views / 1_000_000_000:.1f}B"
|
|
if views >= 1_000_000:
|
|
return f"{views / 1_000_000:.1f}M"
|
|
if views >= 1_000:
|
|
return f"{views / 1_000:.1f}K"
|
|
return str(views)
|
|
|
|
|
|
def _pick_thumbnail(thumbs: list[dict[str, Any]] | None) -> str:
|
|
"""Pick the largest thumbnail from rustypipe's thumbnail list."""
|
|
if not thumbs:
|
|
return ""
|
|
return max(thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0)).get(
|
|
"url", ""
|
|
)
|
|
|
|
|
|
def _root_directory() -> None:
|
|
"""Top-level addon menu. Search lands first because that's how most TV-side
|
|
YouTube use actually works. Remote-control via JSON-RPC still works."""
|
|
items = [
|
|
("Search", "DefaultAddonsSearch.png", _plugin_url(action="search")),
|
|
("Play by URL", "DefaultAddonNone.png", _plugin_url(action="play_by_url")),
|
|
]
|
|
for label, icon, url in items:
|
|
li = xbmcgui.ListItem(label=label)
|
|
li.setArt({"icon": icon})
|
|
xbmcplugin.addDirectoryItem(_HANDLE, url, li, isFolder=True)
|
|
xbmcplugin.endOfDirectory(_HANDLE)
|
|
|
|
|
|
def _search_directory(query: str | None = None) -> None:
|
|
"""Hit sidecar `search`, list results as playable items.
|
|
|
|
`query` arg: if provided (via `?action=search&q=...`), skip the keyboard
|
|
prompt — used by JSON-RPC clients (phones, share-to-TV) and tests.
|
|
"""
|
|
if not query:
|
|
query = xbmcgui.Dialog().input("Search YouTube")
|
|
if not query:
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
try:
|
|
resp = _call_sidecar({"op": "search", "query": query, "limit": 30}, timeout_s=15)
|
|
except Exception as e:
|
|
_log(f"search failed: {e}", xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", f"search failed: {e}", xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
if not resp.get("ok"):
|
|
_log(f"search not-ok: {resp.get('error')}", xbmc.LOGWARNING)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube",
|
|
f"search: {resp.get('error', 'unknown')}",
|
|
xbmcgui.NOTIFICATION_WARNING,
|
|
4000,
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
|
|
items = resp.get("items") or []
|
|
_log(f"search '{query}' → {len(items)} items")
|
|
|
|
xbmcplugin.setContent(_HANDLE, "videos")
|
|
for item in items:
|
|
yt_id = item.get("id") or ""
|
|
if not yt_id:
|
|
continue
|
|
name = item.get("name") or "(no title)"
|
|
channel = item.get("channel") or {}
|
|
channel_name = channel.get("name") if isinstance(channel, dict) else ""
|
|
duration = item.get("duration")
|
|
views = item.get("view_count")
|
|
|
|
# Label: "Title · Channel · Duration · ViewCount"
|
|
meta_bits = [b for b in (channel_name, _format_duration(duration), _format_views(views)) if b]
|
|
label = f"{name} · {' · '.join(meta_bits)}" if meta_bits else name
|
|
|
|
li = xbmcgui.ListItem(label=label)
|
|
li.setProperty("IsPlayable", "true")
|
|
thumb_url = _pick_thumbnail(item.get("thumbnail"))
|
|
if thumb_url:
|
|
li.setArt({"thumb": thumb_url, "poster": thumb_url, "fanart": thumb_url})
|
|
|
|
# Kodi InfoLabels (videoinfo dict) — surfaces in skins/views.
|
|
info: dict[str, Any] = {"title": name, "mediatype": "video"}
|
|
if duration:
|
|
info["duration"] = int(duration)
|
|
if channel_name:
|
|
info["studio"] = channel_name
|
|
if item.get("description"):
|
|
info["plot"] = item["description"]
|
|
try:
|
|
li.setInfo("video", info)
|
|
except Exception:
|
|
pass
|
|
|
|
xbmcplugin.addDirectoryItem(
|
|
_HANDLE, _plugin_url(action="play", id=yt_id), li, isFolder=False
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE)
|
|
|
|
|
|
def _play_by_url_prompt() -> None:
|
|
"""Manual URL/ID entry for tap-to-play from the Kodi UI."""
|
|
s = xbmcgui.Dialog().input("Paste YouTube URL or ID")
|
|
if not s:
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
try:
|
|
yt_id = _extract_id(s)
|
|
except ValueError as e:
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=False)
|
|
return
|
|
# Open via Player.Open-equivalent: run the plugin URL through Kodi
|
|
# rather than calling _play() in directory context.
|
|
xbmc.executebuiltin(f"PlayMedia({_plugin_url(action='play', id=yt_id)})")
|
|
xbmcplugin.endOfDirectory(_HANDLE, succeeded=True)
|
|
|
|
|
|
def main() -> None:
|
|
params = dict(parse_qsl(_QS.lstrip("?")))
|
|
action = params.get("action")
|
|
|
|
if action == "play":
|
|
try:
|
|
yt_id = _extract_id(params.get("url") or params.get("id") or "")
|
|
except ValueError as e:
|
|
_log(str(e), xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(
|
|
"torttube", str(e), xbmcgui.NOTIFICATION_ERROR, 4000
|
|
)
|
|
xbmcplugin.setResolvedUrl(_HANDLE, False, xbmcgui.ListItem())
|
|
return
|
|
_play(yt_id)
|
|
elif action == "search":
|
|
_search_directory(query=params.get("q"))
|
|
elif action == "play_by_url":
|
|
_play_by_url_prompt()
|
|
else:
|
|
_root_directory()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|