"""Kokoro-82M FastAPI server, sibling to f5_server. Same /synthesize contract as F5 so skald can route between engines just by which URL it points at. The semantic difference: Kokoro voices are NAMED (af_heart, af_bella, am_michael, etc.) — there's no reference audio. We repurpose the `ref_audio_path` field to carry the voice name; if it starts with '/' we treat as F5-style path and error. Render-and-stitch: The naive "feed the whole chapter to Kokoro" path produces audio that runs paragraphs together — no breath between scenes, no beat on a hard line break. So this server splits the input on paragraph and scene boundaries, renders each chunk, and concatenates with explicit silence inserts between chunks. Control tags the splitter recognizes (case-insensitive): [pause:1.5s] — silence of N seconds at this point [pause:500ms] — silence of N milliseconds at this point [breath] — short breath beat (~400ms) [scene] — major scene break (~1500ms) Implicit breaks the splitter inserts: Blank line between paragraphs → 700ms A line of just `---` → 1500ms (scene break) Sentence-internal pacing (commas, periods, em-dashes, ellipses) is left to Kokoro's own phonemizer — it handles that well. License: Apache 2.0 (code + model weights). Clean stack for the sleep-quality narrator use case. """ import logging import re import time import uuid from pathlib import Path import numpy as np import soundfile as sf import torch from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from kokoro import KPipeline log = logging.getLogger("kokoro-server") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") # ─── pipeline state ────────────────────────────────────────────── DEVICE = "cuda" if torch.cuda.is_available() else "cpu" DEFAULT_LANG = "a" DEFAULT_VOICE = "af_heart" AUDIO_ROOT = Path("/audio") SAMPLE_RATE = 24000 # Default silence durations for implicit breaks. Tags override. PARAGRAPH_GAP_S = 0.7 SCENE_GAP_S = 1.5 BREATH_GAP_S = 0.4 _pipelines: dict[str, KPipeline] = {} def _get_pipeline(lang_code: str) -> KPipeline: if lang_code not in _pipelines: log.info("loading kokoro pipeline lang=%s device=%s", lang_code, DEVICE) _pipelines[lang_code] = KPipeline(lang_code=lang_code, device=DEVICE) log.info("kokoro pipeline loaded lang=%s", lang_code) return _pipelines[lang_code] # ─── split + render pipeline ───────────────────────────────────── # A "node" is one of three kinds; the renderer walks the list, # calls Kokoro on each text node with its (possibly per-segment) # voice, and emits zeros for each silence node. class Node: __slots__ = ("kind", "value", "voice") def __init__(self, kind: str, value, voice: str | None = None): # kind ∈ {"text", "silence"}; value is str for text and # float seconds for silence. voice override is only used # on text nodes from [voice:X]...[/voice] blocks; outside # those blocks the request's default voice is used. self.kind = kind self.value = value self.voice = voice # Voice-block delimiters are parsed at a higher level than other # tags so dialogue can contain its own [breath]/[pause] beats. _VOICE_OPEN_RE = re.compile(r"\[voice:([A-Za-z0-9_-]+)\]") _VOICE_CLOSE = "[/voice]" _TAG_RE = re.compile( r"\[(pause:(?P[0-9]+(?:\.[0-9]+)?)(?Ps|ms)?|breath|scene)\]", re.IGNORECASE, ) def _parse_tag(match: re.Match) -> float: body = match.group(0).lower().strip("[]") if body == "breath": return BREATH_GAP_S if body == "scene": return SCENE_GAP_S dur = float(match.group("dur")) unit = (match.group("unit") or "s").lower() return dur / 1000.0 if unit == "ms" else dur def _expand_inline(text: str, voice: str | None) -> list[Node]: """Expand inline [breath]/[pause]/[scene] tags inside a chunk of text that already has a single voice attribution. Voice blocks themselves are handled one level up in split_to_nodes.""" out: list[Node] = [] text = text.strip() if not text: return out cursor = 0 for m in _TAG_RE.finditer(text): pre = text[cursor : m.start()].strip() if pre: out.append(Node("text", pre, voice)) out.append(Node("silence", _parse_tag(m))) cursor = m.end() tail = text[cursor:].strip() if tail: out.append(Node("text", tail, voice)) return out def split_to_nodes(text: str) -> list[Node]: """Walk the source text and split it into text+silence nodes. Order of operations: 1. Split on `---` lines (scene breaks). 2. Within each scene, split on blank lines (paragraph breaks). 3. Within each paragraph, split on [voice:X]...[/voice] blocks so each dialogue line carries its own voice attribution. 4. Within each (paragraph, voice-region) chunk, expand inline [breath]/[pause:Xs]/[scene] tags. """ nodes: list[Node] = [] scenes = re.split(r"(?m)^\s*---\s*$", text) for s_idx, scene in enumerate(scenes): if s_idx > 0: nodes.append(Node("silence", SCENE_GAP_S)) paragraphs = re.split(r"\n\s*\n", scene) first_para = True for para in paragraphs: para = para.strip() if not para: continue if not first_para: nodes.append(Node("silence", PARAGRAPH_GAP_S)) first_para = False nodes.extend(_split_paragraph_voices(para)) return nodes def _split_paragraph_voices(para: str) -> list[Node]: """Split a single paragraph on [voice:X]...[/voice] blocks. Outside those blocks the voice is None (request default). Unmatched/orphan [/voice] markers are silently stripped. """ out: list[Node] = [] cursor = 0 while cursor < len(para): m = _VOICE_OPEN_RE.search(para, cursor) if not m: out.extend(_expand_inline(para[cursor:], None)) break # Text BEFORE the voice block uses default voice. out.extend(_expand_inline(para[cursor : m.start()], None)) voice = m.group(1) body_start = m.end() close_idx = para.find(_VOICE_CLOSE, body_start) if close_idx < 0: # Unclosed voice block; treat rest of paragraph as that # voice. Defensive — should be rare. out.extend(_expand_inline(para[body_start:], voice)) break out.extend(_expand_inline(para[body_start:close_idx], voice)) cursor = close_idx + len(_VOICE_CLOSE) return out def _silence_samples(seconds: float) -> np.ndarray: n = int(round(seconds * SAMPLE_RATE)) return np.zeros(n, dtype=np.float32) # ─── FastAPI app ───────────────────────────────────────────────── class SynthesizeRequest(BaseModel): gen_text: str = Field(min_length=1) ref_audio_path: str = DEFAULT_VOICE ref_text: str | None = None output_filename: str | None = None speed: float = Field(default=1.0, ge=0.3, le=2.0) lang_code: str = DEFAULT_LANG class SynthesizeResponse(BaseModel): ok: bool output_path: str sample_rate_hz: int duration_seconds: float elapsed_ms: int chars_in: int engine: str voice: str text_nodes: int silence_nodes: int # Every distinct Kokoro voice id that actually got rendered. # Single-element when no [voice:X] tags were in the input; # multiple when multi-voice dialogue was attributed. voices_used: list[str] app = FastAPI(title="kokoro-server", version="0.2.0") @app.on_event("startup") def _startup() -> None: _get_pipeline(DEFAULT_LANG) @app.get("/healthz") def healthz() -> dict: # Shape matches f5_server's so the same Rust HealthResponse # struct deserializes both: model/vocoder/loaded fields are # required by skald-core::narrate::HealthResponse. return { "ok": True, "device": DEVICE, "model": "kokoro-82m", "vocoder": "kokoro-internal", "loaded": bool(_pipelines), "engine": "kokoro-82m", "default_voice": DEFAULT_VOICE, "default_lang": DEFAULT_LANG, "loaded_langs": list(_pipelines.keys()), "version": "0.2.0", } @app.post("/synthesize", response_model=SynthesizeResponse) def synthesize(req: SynthesizeRequest) -> SynthesizeResponse: if req.ref_audio_path.startswith("/"): raise HTTPException( 400, "ref_audio_path looks like a filesystem path; Kokoro takes a voice " "name like 'af_heart' or 'am_michael'. Did you mean to route to the " "f5-tts engine?", ) voice = req.ref_audio_path output_filename = req.output_filename or f"{uuid.uuid4().hex}.wav" if "/" in output_filename or ".." in output_filename: raise HTTPException(400, "output_filename must be a bare name, no path parts") output_path = AUDIO_ROOT / output_filename output_path.parent.mkdir(parents=True, exist_ok=True) pipeline = _get_pipeline(req.lang_code) # Split the text into a node list. Empty nodes get filtered. nodes = [n for n in split_to_nodes(req.gen_text) if n.kind == "silence" or n.value] text_count = sum(1 for n in nodes if n.kind == "text") silence_count = sum(1 for n in nodes if n.kind == "silence") if text_count == 0: raise HTTPException(400, "gen_text expanded to zero text nodes") started = time.monotonic() pieces: list[np.ndarray] = [] voices_used: set[str] = set() for node in nodes: if node.kind == "silence": pieces.append(_silence_samples(node.value)) continue # text: hand to Kokoro. The node's voice override (set by # [voice:X]...[/voice] blocks) wins; otherwise the request's # default narrator voice. seg_voice = node.voice or voice voices_used.add(seg_voice) chunk_audio: list[np.ndarray] = [] for _, _, audio in pipeline(node.value, voice=seg_voice, speed=req.speed): arr = audio.cpu().numpy() if hasattr(audio, "cpu") else np.asarray(audio) chunk_audio.append(arr.astype(np.float32)) if chunk_audio: pieces.append(np.concatenate(chunk_audio)) elapsed_ms = int((time.monotonic() - started) * 1000) if not pieces: raise HTTPException(500, "kokoro returned no audio") full_audio = np.concatenate(pieces) sf.write(str(output_path), full_audio, SAMPLE_RATE, subtype="PCM_16") duration_s = float(len(full_audio)) / float(SAMPLE_RATE) log.info( "synthesized chars=%d voice=%s text_nodes=%d silence_nodes=%d " "voices_used=%s -> %s (dur=%.2fs, elapsed=%dms)", len(req.gen_text), voice, text_count, silence_count, sorted(voices_used), output_path, duration_s, elapsed_ms, ) return SynthesizeResponse( ok=True, output_path=str(output_path), sample_rate_hz=SAMPLE_RATE, duration_seconds=duration_s, elapsed_ms=elapsed_ms, chars_in=len(req.gen_text), engine="kokoro-82m", voice=voice, text_nodes=text_count, silence_nodes=silence_count, voices_used=sorted(voices_used), )