diff --git a/clawdforge/acpx_runner.py b/clawdforge/acpx_runner.py index 5f26d4e..b211d52 100644 --- a/clawdforge/acpx_runner.py +++ b/clawdforge/acpx_runner.py @@ -309,8 +309,14 @@ class AcpxManager: stdout = stdout_b.decode("utf-8", "replace") stderr = stderr_b.decode("utf-8", "replace") - events = _parse_ndjson(stdout) - stop_reason = _extract_stop_reason(events) + raw_events = _parse_ndjson(stdout) + stop_reason = _extract_stop_reason(raw_events) + # Translate raw ACP JSON-RPC envelopes into the SDK-facing + # `{type:"text"|"thinking"|"tool_call", content, ...}` shape + # that all 14 SDKs + crafting-table's patcher expect. The raw + # NDJSON wire (jsonrpc/id/method/params) is a leaky abstraction + # — SDKs never asked for it; the spec promised normalized events. + events = _translate_acp_to_sdk_events(raw_events) if proc.returncode != ACPX_EXIT_OK: error_msg = _exit_code_to_error(proc.returncode) @@ -418,10 +424,13 @@ def _first_json_line(stdout: str) -> dict | None: def _parse_ndjson(stdout: str) -> list[dict]: - """Parse acpx --format json NDJSON output. + """Parse acpx --format json NDJSON output into raw ACP JSON-RPC envelopes. acpx emits raw ACP JSON-RPC messages, one per line. We collect every - parseable dict; bad lines are skipped silently. + parseable dict; bad lines are skipped silently. The output is the + raw ACP wire — see _translate_acp_to_sdk_events for the SDK-facing + `{type, content, ...}` shape that the spec at memory/spec-clawdforge-v0.2.md + promised. """ out: list[dict] = [] for line in stdout.splitlines(): @@ -437,10 +446,90 @@ def _parse_ndjson(stdout: str) -> list[dict]: return out -def _extract_stop_reason(events: list[dict]) -> str | None: - """Find the result.stopReason from the JSON-RPC response envelope. +def _translate_acp_to_sdk_events(raw_events: list[dict]) -> list[dict]: + """Walk raw ACP JSON-RPC envelopes from acpx and emit the SDK-facing + `{type, content, name, args, result, ...}` event shape that + spec-clawdforge-v0.2.md promised and that all 14 SDK Session APIs + plus crafting-table's patcher consume. - ACP terminal envelope: `{"jsonrpc":"2.0","id":"req-N","result":{"stopReason":"end_turn"}}` + Adjacent agent_message_chunks are merged into a single text event + (chunks are streaming-protocol artifacts; SDK callers want the full + text). Tool calls / thinking / other update kinds get their own + events. Anything we don't recognize is dropped — the raw stream is + still available via the ACPX subprocess if a caller wants the + underlying wire (not currently exposed). + + ACP message shapes we recognize (observed from `acpx claude prompt`): + + - `session/update` with params.update.sessionUpdate == + `agent_message_chunk` and content.type == `text` → text content + - `session/update` with sessionUpdate == `agent_thought` (when + surfaced by claude) → `{type:"thinking", content}` + - `session/update` with sessionUpdate == `tool_call` → tool event + - JSON-RPC result envelope with stopReason → consumed by + _extract_stop_reason; not emitted here. + """ + out: list[dict] = [] + text_buf: list[str] = [] + + def flush_text() -> None: + if text_buf: + out.append({"type": "text", "content": "".join(text_buf)}) + text_buf.clear() + + for ev in raw_events: + # We only care about session/update notifications (params.update) and + # request envelopes that carry agent output. Result envelopes carry + # the stopReason which is handled separately. + method = ev.get("method") + params = ev.get("params") if isinstance(ev.get("params"), dict) else {} + update = params.get("update") if isinstance(params.get("update"), dict) else {} + update_kind = update.get("sessionUpdate") if isinstance(update, dict) else None + + if method == "session/update" and update_kind == "agent_message_chunk": + content = update.get("content") if isinstance(update.get("content"), dict) else {} + if content.get("type") == "text": + txt = content.get("text") or "" + if isinstance(txt, str): + text_buf.append(txt) + continue + + if method == "session/update" and update_kind == "agent_thought_chunk": + # Streaming thinking; emit as a single thinking event when the + # next non-thought update arrives. For now, flush any pending + # text first then emit the thought chunk content. + flush_text() + content = update.get("content") if isinstance(update.get("content"), dict) else {} + if content.get("type") == "text": + out.append({"type": "thinking", "content": content.get("text") or ""}) + continue + + if method == "session/update" and update_kind in ("tool_call", "tool_call_update"): + flush_text() + tc = update + out.append({ + "type": "tool_call", + "name": tc.get("title") or tc.get("toolName") or "", + "args": tc.get("rawInput") or tc.get("input") or {}, + "result": tc.get("rawOutput") or tc.get("output") or None, + "status": tc.get("status"), + }) + continue + + # usage_update, available_commands_update, etc. — not surfaced as SDK + # events. The /sessions endpoint can expose token usage via a future + # field on the turn response if callers ask for it. + + flush_text() + return out + + +def _extract_stop_reason(events: list[dict]) -> str | None: + """Find the result.stopReason from the raw ACP JSON-RPC response envelope. + + Walks the RAW events (pre-translation) since the SDK-facing translated + events drop result envelopes. ACP terminal envelope: + ``{"jsonrpc":"2.0","id":"req-N","result":{"stopReason":"end_turn"}}`` """ for ev in reversed(events): result = ev.get("result")