acpx_runner: translate raw ACP JSON-RPC to SDK event shape

The events returned from POST /sessions/{id}/turn were leaking the raw
acpx NDJSON wire — JSON-RPC envelopes like
  {jsonrpc, id, method:"session/update", params:{update:{sessionUpdate, content}}}
— but the spec at memory/spec-clawdforge-v0.2.md (and every SDK's
TurnEvent/turn_text consumer) expected the normalized
  [{type:"text", content:"..."}, {type:"tool_call", name, args}, ...]
shape.

Net effect was that *every* turn returned events=[{jsonrpc,...}] and
text(payload)/turn_text() filtered for type=="text" finding nothing.
crafting-table's patcher saw empty model output → extract_diff_json
returned None → status=drafted error="malformed_response" on every
patch attempt (8 of them, all failed identically).

Fix: new _translate_acp_to_sdk_events() walks the raw envelopes and:
- Merges adjacent agent_message_chunk text into a single text event
  (chunks are streaming-protocol artifacts — SDK callers want full text)
- Surfaces agent_thought_chunk as type:"thinking"
- Surfaces tool_call / tool_call_update as type:"tool_call" with
  name + args + result
- Drops usage_update / available_commands_update (not user-facing)

_extract_stop_reason still reads the RAW envelopes (it needs the result
shape that the translator drops), so it's unaffected.

Verified against an actual Opus session: 4-event raw stream of
'H' + 'ELLO' chunks merges to single {"type":"text","content":"HELLO"}.

Unblocks the autonomous patch loop (step 7) — model output now reaches
extract_diff_json properly.
This commit is contained in:
Kayos 2026-04-29 15:52:33 -07:00
parent fabc782c09
commit 015348c526

View file

@ -309,8 +309,14 @@ class AcpxManager:
stdout = stdout_b.decode("utf-8", "replace") stdout = stdout_b.decode("utf-8", "replace")
stderr = stderr_b.decode("utf-8", "replace") stderr = stderr_b.decode("utf-8", "replace")
events = _parse_ndjson(stdout) raw_events = _parse_ndjson(stdout)
stop_reason = _extract_stop_reason(events) stop_reason = _extract_stop_reason(raw_events)
# Translate raw ACP JSON-RPC envelopes into the SDK-facing
# `{type:"text"|"thinking"|"tool_call", content, ...}` shape
# that all 14 SDKs + crafting-table's patcher expect. The raw
# NDJSON wire (jsonrpc/id/method/params) is a leaky abstraction
# — SDKs never asked for it; the spec promised normalized events.
events = _translate_acp_to_sdk_events(raw_events)
if proc.returncode != ACPX_EXIT_OK: if proc.returncode != ACPX_EXIT_OK:
error_msg = _exit_code_to_error(proc.returncode) error_msg = _exit_code_to_error(proc.returncode)
@ -418,10 +424,13 @@ def _first_json_line(stdout: str) -> dict | None:
def _parse_ndjson(stdout: str) -> list[dict]: def _parse_ndjson(stdout: str) -> list[dict]:
"""Parse acpx --format json NDJSON output. """Parse acpx --format json NDJSON output into raw ACP JSON-RPC envelopes.
acpx emits raw ACP JSON-RPC messages, one per line. We collect every acpx emits raw ACP JSON-RPC messages, one per line. We collect every
parseable dict; bad lines are skipped silently. parseable dict; bad lines are skipped silently. The output is the
raw ACP wire see _translate_acp_to_sdk_events for the SDK-facing
`{type, content, ...}` shape that the spec at memory/spec-clawdforge-v0.2.md
promised.
""" """
out: list[dict] = [] out: list[dict] = []
for line in stdout.splitlines(): for line in stdout.splitlines():
@ -437,10 +446,90 @@ def _parse_ndjson(stdout: str) -> list[dict]:
return out return out
def _extract_stop_reason(events: list[dict]) -> str | None: def _translate_acp_to_sdk_events(raw_events: list[dict]) -> list[dict]:
"""Find the result.stopReason from the JSON-RPC response envelope. """Walk raw ACP JSON-RPC envelopes from acpx and emit the SDK-facing
`{type, content, name, args, result, ...}` event shape that
spec-clawdforge-v0.2.md promised and that all 14 SDK Session APIs
plus crafting-table's patcher consume.
ACP terminal envelope: `{"jsonrpc":"2.0","id":"req-N","result":{"stopReason":"end_turn"}}` Adjacent agent_message_chunks are merged into a single text event
(chunks are streaming-protocol artifacts; SDK callers want the full
text). Tool calls / thinking / other update kinds get their own
events. Anything we don't recognize is dropped — the raw stream is
still available via the ACPX subprocess if a caller wants the
underlying wire (not currently exposed).
ACP message shapes we recognize (observed from `acpx claude prompt`):
- `session/update` with params.update.sessionUpdate ==
`agent_message_chunk` and content.type == `text` text content
- `session/update` with sessionUpdate == `agent_thought` (when
surfaced by claude) `{type:"thinking", content}`
- `session/update` with sessionUpdate == `tool_call` tool event
- JSON-RPC result envelope with stopReason consumed by
_extract_stop_reason; not emitted here.
"""
out: list[dict] = []
text_buf: list[str] = []
def flush_text() -> None:
if text_buf:
out.append({"type": "text", "content": "".join(text_buf)})
text_buf.clear()
for ev in raw_events:
# We only care about session/update notifications (params.update) and
# request envelopes that carry agent output. Result envelopes carry
# the stopReason which is handled separately.
method = ev.get("method")
params = ev.get("params") if isinstance(ev.get("params"), dict) else {}
update = params.get("update") if isinstance(params.get("update"), dict) else {}
update_kind = update.get("sessionUpdate") if isinstance(update, dict) else None
if method == "session/update" and update_kind == "agent_message_chunk":
content = update.get("content") if isinstance(update.get("content"), dict) else {}
if content.get("type") == "text":
txt = content.get("text") or ""
if isinstance(txt, str):
text_buf.append(txt)
continue
if method == "session/update" and update_kind == "agent_thought_chunk":
# Streaming thinking; emit as a single thinking event when the
# next non-thought update arrives. For now, flush any pending
# text first then emit the thought chunk content.
flush_text()
content = update.get("content") if isinstance(update.get("content"), dict) else {}
if content.get("type") == "text":
out.append({"type": "thinking", "content": content.get("text") or ""})
continue
if method == "session/update" and update_kind in ("tool_call", "tool_call_update"):
flush_text()
tc = update
out.append({
"type": "tool_call",
"name": tc.get("title") or tc.get("toolName") or "",
"args": tc.get("rawInput") or tc.get("input") or {},
"result": tc.get("rawOutput") or tc.get("output") or None,
"status": tc.get("status"),
})
continue
# usage_update, available_commands_update, etc. — not surfaced as SDK
# events. The /sessions endpoint can expose token usage via a future
# field on the turn response if callers ask for it.
flush_text()
return out
def _extract_stop_reason(events: list[dict]) -> str | None:
"""Find the result.stopReason from the raw ACP JSON-RPC response envelope.
Walks the RAW events (pre-translation) since the SDK-facing translated
events drop result envelopes. ACP terminal envelope:
``{"jsonrpc":"2.0","id":"req-N","result":{"stopReason":"end_turn"}}``
""" """
for ev in reversed(events): for ev in reversed(events):
result = ev.get("result") result = ev.get("result")