clients/go: v0.2 multi-turn Session API
- Session struct with idempotent Close(ctx) (atomic.Bool short-circuit)
- Client.NewSession(ctx, opts) / ListSessions(ctx) / GetSession(ctx, id)
- TurnResult.Text() helper concatenates text events
- Per-session sync.Mutex serializes concurrent Turn calls
- clawdforge_session_test.go: 9 tests
- README "Multi-turn / Sessions (v0.2)" section
v0.1 Run path unchanged.
Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
This commit is contained in:
parent
940861f70a
commit
41a522a469
3 changed files with 885 additions and 0 deletions
|
|
@ -70,6 +70,119 @@ func main() {
|
|||
}
|
||||
```
|
||||
|
||||
## Multi-turn / Sessions (v0.2)
|
||||
|
||||
For workflows that need context across turns ("build this with me step by
|
||||
step", "now look at the auth flow", etc.) v0.2 adds a `Session` handle that
|
||||
wraps the server's `/sessions/*` endpoints. Single-turn `Client.Run` is
|
||||
unchanged — Sessions are purely additive.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
cf "gitea.sulkta.com/Sulkta-Coop/clawdforge/clients/go"
|
||||
)
|
||||
|
||||
func main() {
|
||||
client := cf.New("http://192.168.0.5:8800", os.Getenv("CLAWDFORGE_TOKEN"))
|
||||
ctx := context.Background()
|
||||
|
||||
s, err := client.NewSession(ctx, &cf.SessionOptions{Agent: "claude"})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer s.Close(ctx) // idempotent — safe in defer / cleanup paths
|
||||
|
||||
r1, err := s.Turn(ctx, "Read README.md and summarize")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println(r1.Text()) // concat all "text" events
|
||||
|
||||
r2, err := s.Turn(ctx, "Now look at the auth flow", cf.TurnOption{
|
||||
Files: []string{"ff_xyz"},
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
_ = r2
|
||||
|
||||
// List sessions visible to the calling token:
|
||||
list, _ := client.ListSessions(ctx)
|
||||
fmt.Println("open sessions:", len(list))
|
||||
|
||||
// Inspect server-side state (turn count, timestamps, closed-at):
|
||||
state, _ := client.GetSession(ctx, s.ID())
|
||||
fmt.Println("turns so far:", state.TurnCount)
|
||||
}
|
||||
```
|
||||
|
||||
### Lifecycle
|
||||
|
||||
| Method | Endpoint | Notes |
|
||||
|---|---|---|
|
||||
| `client.NewSession(ctx, opts)` | `POST /sessions` | `opts` may be `nil` (defaults to agent="claude", no meta). |
|
||||
| `s.Turn(ctx, prompt, opts...)` | `POST /sessions/{id}/turn` | Returns `*TurnResult` with the full event batch + stop reason + timing. |
|
||||
| `s.Close(ctx)` | `DELETE /sessions/{id}` | **Idempotent** — second and subsequent calls short-circuit via an atomic flag with no network round-trip. Safe to `defer`. |
|
||||
| `client.ListSessions(ctx)` | `GET /sessions` | Per-token isolation enforced server-side. |
|
||||
| `client.GetSession(ctx, id)` | `GET /sessions/{id}` | Cross-token access surfaces as `*APIError` with `StatusCode==404` (no existence leak). |
|
||||
|
||||
### TurnResult.Text()
|
||||
|
||||
Each turn returns a batch of structured events: `thinking`, `tool_call`,
|
||||
`text`. `Text()` is a convenience that concatenates the `Content` of every
|
||||
`text` event in order — the user-visible reply. Use the `Events` slice
|
||||
directly for tool-call introspection, thinking traces, etc.
|
||||
|
||||
```go
|
||||
res, _ := s.Turn(ctx, "...")
|
||||
fmt.Println(res.Text()) // user-visible text only
|
||||
for _, ev := range res.Events { // full structured stream
|
||||
if ev.Type == "tool_call" {
|
||||
log.Printf("tool: %s args=%v", ev.Name, ev.Args)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Concurrent Turn calls
|
||||
|
||||
Concurrent `s.Turn(ctx, ...)` calls on the **same** session are serialized
|
||||
by a `sync.Mutex` internal to the `Session` value — the server observes
|
||||
turns in the order the SDK dispatches them, never overlapping. The mutex is
|
||||
**per-session, not global**: different sessions on the same `Client` never
|
||||
block each other.
|
||||
|
||||
This matches what the agent layer wants (a session is a conversation, and
|
||||
two prompts to the same conversation should not race), without serializing
|
||||
unrelated work on the client.
|
||||
|
||||
### TurnOption fields
|
||||
|
||||
```go
|
||||
type TurnOption struct {
|
||||
Files []string // file_token values from UploadFile / UploadReader
|
||||
TimeoutMs int // 0 = use server default; rounded UP to whole seconds on the wire
|
||||
}
|
||||
```
|
||||
|
||||
`TimeoutMs` is exposed for symmetry with `TurnResult.DurationMs`. The server
|
||||
takes whole seconds (`timeout_secs`); a sub-second value rounds **up** so
|
||||
e.g. `TimeoutMs: 500` becomes `timeout_secs=1`, never `0` (which would mean
|
||||
"use default").
|
||||
|
||||
### Errors
|
||||
|
||||
The Session surface uses the same error families as v0.1 — no new types.
|
||||
Cross-token / unknown-id 404s are `*APIError` with `StatusCode==404`. Auth
|
||||
failures still go through `ErrAuth`. Transport / context-cancel still wraps
|
||||
into `*TransportError`.
|
||||
|
||||
## API surface
|
||||
|
||||
The SDK mirrors the FastAPI surface in `clawdforge/server.py` 1:1.
|
||||
|
|
|
|||
424
clients/go/clawdforge_session_test.go
Normal file
424
clients/go/clawdforge_session_test.go
Normal file
|
|
@ -0,0 +1,424 @@
|
|||
package clawdforge
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ---------- v0.2 Session tests ----------------------------------------------
|
||||
|
||||
// TestNewSessionAndClose exercises the create + close round-trip end to end:
|
||||
// POST /sessions returns a handle, defer Close hits DELETE /sessions/{id},
|
||||
// and the test asserts both endpoints actually got hit.
|
||||
func TestNewSessionAndClose(t *testing.T) {
|
||||
var (
|
||||
gotCreate atomic.Bool
|
||||
gotClose atomic.Bool
|
||||
)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
switch {
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/sessions":
|
||||
gotCreate.Store(true)
|
||||
if got := r.Header.Get("Authorization"); got != "Bearer cf_test_token" {
|
||||
t.Errorf("Authorization = %q", got)
|
||||
}
|
||||
var body struct {
|
||||
Agent string `json:"agent"`
|
||||
Meta map[string]any `json:"meta"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
t.Fatalf("decode create body: %v", err)
|
||||
}
|
||||
if body.Agent != "claude" {
|
||||
t.Errorf("agent = %q, want claude", body.Agent)
|
||||
}
|
||||
_, _ = w.Write([]byte(`{"ok":true,"session_id":"sess_abc","agent":"claude","created_at":1700000000}`))
|
||||
case r.Method == http.MethodDelete && r.URL.Path == "/sessions/sess_abc":
|
||||
gotClose.Store(true)
|
||||
_, _ = w.Write([]byte(`{"ok":true}`))
|
||||
default:
|
||||
t.Fatalf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "cf_test_token")
|
||||
s, err := c.NewSession(context.Background(), &SessionOptions{Agent: "claude"})
|
||||
if err != nil {
|
||||
t.Fatalf("NewSession: %v", err)
|
||||
}
|
||||
if s.ID() != "sess_abc" || s.Agent() != "claude" || s.CreatedAt() != 1700000000 {
|
||||
t.Errorf("session getters wrong: id=%q agent=%q createdAt=%d", s.ID(), s.Agent(), s.CreatedAt())
|
||||
}
|
||||
|
||||
if err := s.Close(context.Background()); err != nil {
|
||||
t.Fatalf("Close: %v", err)
|
||||
}
|
||||
if !gotCreate.Load() {
|
||||
t.Error("POST /sessions never hit")
|
||||
}
|
||||
if !gotClose.Load() {
|
||||
t.Error("DELETE /sessions/sess_abc never hit")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSessionTurn round-trips a turn through a mocked /turn endpoint and
|
||||
// verifies request body shape + response decoding (events, indices, timing).
|
||||
func TestSessionTurn(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
switch {
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/sessions":
|
||||
_, _ = w.Write([]byte(`{"ok":true,"session_id":"sess_t","agent":"claude","created_at":1}`))
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/sessions/sess_t/turn":
|
||||
var body turnRequestBody
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
t.Fatalf("decode turn body: %v", err)
|
||||
}
|
||||
if body.Prompt != "summarize README" {
|
||||
t.Errorf("prompt = %q", body.Prompt)
|
||||
}
|
||||
if len(body.Files) != 1 || body.Files[0] != "ff_xyz" {
|
||||
t.Errorf("files = %v", body.Files)
|
||||
}
|
||||
// 1500ms → 2 secs (round up)
|
||||
if body.TimeoutSecs != 2 {
|
||||
t.Errorf("timeout_secs = %d, want 2 (1500ms rounds up)", body.TimeoutSecs)
|
||||
}
|
||||
_, _ = w.Write([]byte(`{
|
||||
"ok": true,
|
||||
"session_id": "sess_t",
|
||||
"turn_index": 1,
|
||||
"events": [
|
||||
{"type":"thinking","content":"reading..."},
|
||||
{"type":"tool_call","name":"Read","args":{"path":"README.md"},"result":{"len":42}},
|
||||
{"type":"text","content":"Hello "},
|
||||
{"type":"text","content":"world"}
|
||||
],
|
||||
"stop_reason": "end_turn",
|
||||
"duration_ms": 1234
|
||||
}`))
|
||||
default:
|
||||
t.Fatalf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "tok")
|
||||
s, err := c.NewSession(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSession: %v", err)
|
||||
}
|
||||
res, err := s.Turn(context.Background(), "summarize README", TurnOption{
|
||||
Files: []string{"ff_xyz"},
|
||||
TimeoutMs: 1500,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Turn: %v", err)
|
||||
}
|
||||
if !res.Ok || res.TurnIndex != 1 || res.StopReason != "end_turn" || res.DurationMs != 1234 {
|
||||
t.Errorf("got %+v", res)
|
||||
}
|
||||
if len(res.Events) != 4 {
|
||||
t.Fatalf("events len = %d, want 4", len(res.Events))
|
||||
}
|
||||
if res.Events[1].Type != "tool_call" || res.Events[1].Name != "Read" {
|
||||
t.Errorf("tool_call event = %+v", res.Events[1])
|
||||
}
|
||||
}
|
||||
|
||||
// TestSessionCloseIdempotent verifies the second Close short-circuits via
|
||||
// the atomic flag and never hits the network.
|
||||
func TestSessionCloseIdempotent(t *testing.T) {
|
||||
var closeHits atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
switch {
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/sessions":
|
||||
_, _ = w.Write([]byte(`{"ok":true,"session_id":"sess_idem","agent":"claude","created_at":1}`))
|
||||
case r.Method == http.MethodDelete && r.URL.Path == "/sessions/sess_idem":
|
||||
closeHits.Add(1)
|
||||
_, _ = w.Write([]byte(`{"ok":true}`))
|
||||
default:
|
||||
t.Fatalf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "tok")
|
||||
s, err := c.NewSession(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSession: %v", err)
|
||||
}
|
||||
if err := s.Close(context.Background()); err != nil {
|
||||
t.Fatalf("Close 1: %v", err)
|
||||
}
|
||||
if err := s.Close(context.Background()); err != nil {
|
||||
t.Fatalf("Close 2: %v", err)
|
||||
}
|
||||
if err := s.Close(context.Background()); err != nil {
|
||||
t.Fatalf("Close 3: %v", err)
|
||||
}
|
||||
if got := closeHits.Load(); got != 1 {
|
||||
t.Errorf("DELETE hit %d times, want exactly 1 (subsequent calls must short-circuit)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSessionConcurrentTurns dispatches two Turn calls on the same Session
|
||||
// from two goroutines and verifies the server never sees them overlap —
|
||||
// the per-session mutex must serialize them. The mock asserts no overlap by
|
||||
// counting in-flight turns; if the lock leaked, count would exceed 1.
|
||||
func TestSessionConcurrentTurns(t *testing.T) {
|
||||
var (
|
||||
inflight atomic.Int32
|
||||
maxSeen atomic.Int32
|
||||
hitCount atomic.Int32
|
||||
)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
switch {
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/sessions":
|
||||
_, _ = w.Write([]byte(`{"ok":true,"session_id":"sess_conc","agent":"claude","created_at":1}`))
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/sessions/sess_conc/turn":
|
||||
cur := inflight.Add(1)
|
||||
defer inflight.Add(-1)
|
||||
// Track high-water mark of concurrent turns on this session.
|
||||
for {
|
||||
m := maxSeen.Load()
|
||||
if cur <= m || maxSeen.CompareAndSwap(m, cur) {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Sleep so a leaky lock would let goroutine 2 enter while
|
||||
// goroutine 1 is still in-flight.
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
hitCount.Add(1)
|
||||
_, _ = w.Write([]byte(`{"ok":true,"session_id":"sess_conc","turn_index":` +
|
||||
strconvItoa(int(hitCount.Load())) +
|
||||
`,"events":[],"stop_reason":"end_turn","duration_ms":1}`))
|
||||
default:
|
||||
t.Fatalf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "tok")
|
||||
s, err := c.NewSession(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewSession: %v", err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
errs := make([]error, 2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, errs[0] = s.Turn(context.Background(), "first")
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, errs[1] = s.Turn(context.Background(), "second")
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
for i, e := range errs {
|
||||
if e != nil {
|
||||
t.Errorf("turn %d: %v", i, e)
|
||||
}
|
||||
}
|
||||
if hitCount.Load() != 2 {
|
||||
t.Errorf("hitCount = %d, want 2", hitCount.Load())
|
||||
}
|
||||
if maxSeen.Load() > 1 {
|
||||
t.Errorf("max concurrent in-flight turns = %d, want 1 (per-session mutex must serialize)", maxSeen.Load())
|
||||
}
|
||||
}
|
||||
|
||||
// TestListSessions verifies GET /sessions decodes into []SessionState.
|
||||
func TestListSessions(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet || r.URL.Path != "/sessions" {
|
||||
t.Fatalf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{
|
||||
"ok": true,
|
||||
"sessions": [
|
||||
{"session_id":"sess_a","agent":"claude","app_name":"app1","created_at":100,"last_turn_at":150,"turn_count":3,"closed_at":null},
|
||||
{"session_id":"sess_b","agent":"claude","app_name":"app1","created_at":200,"last_turn_at":null,"turn_count":0,"closed_at":250}
|
||||
],
|
||||
"count": 2
|
||||
}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "tok")
|
||||
list, err := c.ListSessions(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("ListSessions: %v", err)
|
||||
}
|
||||
if len(list) != 2 {
|
||||
t.Fatalf("len = %d, want 2", len(list))
|
||||
}
|
||||
if list[0].SessionID != "sess_a" || list[0].TurnCount != 3 || list[0].ClosedAt != nil {
|
||||
t.Errorf("sess_a wrong: %+v", list[0])
|
||||
}
|
||||
if list[1].LastTurnAt != nil {
|
||||
t.Errorf("sess_b LastTurnAt should be nil, got %v", *list[1].LastTurnAt)
|
||||
}
|
||||
if list[1].ClosedAt == nil || *list[1].ClosedAt != 250 {
|
||||
t.Errorf("sess_b ClosedAt wrong: %+v", list[1].ClosedAt)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetSession verifies GET /sessions/{id} decodes into a *SessionState.
|
||||
func TestGetSession(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet || r.URL.Path != "/sessions/sess_get" {
|
||||
t.Fatalf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{
|
||||
"ok": true,
|
||||
"session_id": "sess_get",
|
||||
"agent": "claude",
|
||||
"created_at": 555,
|
||||
"last_turn_at": 600,
|
||||
"turn_count": 2,
|
||||
"closed_at": null
|
||||
}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "tok")
|
||||
st, err := c.GetSession(context.Background(), "sess_get")
|
||||
if err != nil {
|
||||
t.Fatalf("GetSession: %v", err)
|
||||
}
|
||||
if st.SessionID != "sess_get" || st.TurnCount != 2 {
|
||||
t.Errorf("got %+v", st)
|
||||
}
|
||||
if st.LastTurnAt == nil || *st.LastTurnAt != 600 {
|
||||
t.Errorf("LastTurnAt wrong: %+v", st.LastTurnAt)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSessionCrossTokenIs404 verifies cross-token (or unknown-id) access
|
||||
// surfaces as *APIError with StatusCode==404 — same shape as the v0.1 generic
|
||||
// 404 path. No new error types.
|
||||
func TestSessionCrossTokenIs404(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
_, _ = w.Write([]byte(`{"detail":"session not found"}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "tok_b")
|
||||
_, err := c.GetSession(context.Background(), "sess_belongs_to_a")
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
var apiErr *APIError
|
||||
if !errors.As(err, &apiErr) {
|
||||
t.Fatalf("err is not *APIError: %T %v", err, err)
|
||||
}
|
||||
if apiErr.StatusCode != 404 {
|
||||
t.Errorf("StatusCode = %d, want 404", apiErr.StatusCode)
|
||||
}
|
||||
if !strings.Contains(apiErr.Message, "not found") {
|
||||
t.Errorf("Message = %q, want it to mention 'not found'", apiErr.Message)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTurnResultText verifies the Text() helper concatenates only "text"
|
||||
// events and skips thinking / tool_call frames.
|
||||
func TestTurnResultText(t *testing.T) {
|
||||
r := &TurnResult{
|
||||
Events: []TurnEvent{
|
||||
{Type: "thinking", Content: "hmm "},
|
||||
{Type: "text", Content: "hello "},
|
||||
{Type: "tool_call", Name: "Read"},
|
||||
{Type: "text", Content: "world"},
|
||||
{Type: "text", Content: "!"},
|
||||
},
|
||||
}
|
||||
if got := r.Text(); got != "hello world!" {
|
||||
t.Errorf("Text() = %q, want %q", got, "hello world!")
|
||||
}
|
||||
// Empty / nil safety.
|
||||
if (&TurnResult{}).Text() != "" {
|
||||
t.Error("empty TurnResult.Text() should be empty string")
|
||||
}
|
||||
var nilR *TurnResult
|
||||
if nilR.Text() != "" {
|
||||
t.Error("nil TurnResult.Text() should be empty string")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunUnchanged is the v0.1 regression — adding the Session surface must
|
||||
// not perturb the byte-on-the-wire shape of POST /run or its result decoding.
|
||||
func TestRunUnchanged(t *testing.T) {
|
||||
var sawBody RunRequest
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/run" || r.Method != http.MethodPost {
|
||||
t.Fatalf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&sawBody); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"ok":true,"result":"plain","duration_ms":7,"stop_reason":"end_turn"}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := New(srv.URL, "tok")
|
||||
res, err := c.Run(context.Background(), RunRequest{Prompt: "hi", Model: "sonnet"})
|
||||
if err != nil {
|
||||
t.Fatalf("Run: %v", err)
|
||||
}
|
||||
if !res.OK || res.DurationMS != 7 {
|
||||
t.Errorf("got %+v", res)
|
||||
}
|
||||
if sawBody.Prompt != "hi" || sawBody.Model != "sonnet" {
|
||||
t.Errorf("server saw body %+v — v0.1 wire shape changed!", sawBody)
|
||||
}
|
||||
s, err := res.AsText()
|
||||
if err != nil || s != "plain" {
|
||||
t.Errorf("AsText = %q err=%v", s, err)
|
||||
}
|
||||
}
|
||||
|
||||
// strconvItoa is a tiny inline shim so the concurrent-turn test can build a
|
||||
// fresh JSON body per call without dragging strconv into the test file's
|
||||
// imports beyond what's needed elsewhere — keeps the test self-contained.
|
||||
func strconvItoa(n int) string {
|
||||
if n == 0 {
|
||||
return "0"
|
||||
}
|
||||
var buf [20]byte
|
||||
i := len(buf)
|
||||
neg := n < 0
|
||||
if neg {
|
||||
n = -n
|
||||
}
|
||||
for n > 0 {
|
||||
i--
|
||||
buf[i] = byte('0' + n%10)
|
||||
n /= 10
|
||||
}
|
||||
if neg {
|
||||
i--
|
||||
buf[i] = '-'
|
||||
}
|
||||
return string(buf[i:])
|
||||
}
|
||||
348
clients/go/session.go
Normal file
348
clients/go/session.go
Normal file
|
|
@ -0,0 +1,348 @@
|
|||
package clawdforge
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// ---------- v0.2: multi-turn Session API ------------------------------------
|
||||
//
|
||||
// The Session surface is purely additive — v0.1 callers (Client.Run,
|
||||
// Client.UploadFile, etc.) keep their byte-identical behavior. The session
|
||||
// methods wrap the server's /sessions/* endpoints introduced in v0.2.
|
||||
|
||||
// SessionOptions configures Client.NewSession. Both fields are optional; the
|
||||
// zero value yields agent="claude" with no metadata.
|
||||
type SessionOptions struct {
|
||||
// Agent is the ACPX agent to drive (default "claude" server-side when
|
||||
// blank). Mirrors the server's CreateSessionRequest.agent field.
|
||||
Agent string
|
||||
// Meta is an arbitrary JSON-serializable map persisted on the server's
|
||||
// session ledger. Useful for app-side correlation.
|
||||
Meta map[string]any
|
||||
}
|
||||
|
||||
// TurnOption is the optional argument for Session.Turn. Multiple options
|
||||
// passed are merged left-to-right (last non-zero field wins per field).
|
||||
type TurnOption struct {
|
||||
// Files is a slice of file_token values previously returned by
|
||||
// Client.UploadFile / Client.UploadReader. Resolved server-side.
|
||||
Files []string
|
||||
// TimeoutMs is the per-turn timeout in milliseconds. Zero means use
|
||||
// the server's default. Note the server field is `timeout_secs`; the
|
||||
// SDK exposes ms here for symmetry with TurnResult.DurationMs and
|
||||
// converts on the wire (rounded up).
|
||||
TimeoutMs int
|
||||
}
|
||||
|
||||
// TurnEvent is one structured event emitted during a turn — typed text,
|
||||
// thinking, or a tool-call record. Fields not present on a given event type
|
||||
// are zero-valued. Mirrors the server's event shape from acpx_runner.
|
||||
type TurnEvent struct {
|
||||
Type string `json:"type"`
|
||||
Content string `json:"content,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Args map[string]any `json:"args,omitempty"`
|
||||
Result any `json:"result,omitempty"`
|
||||
}
|
||||
|
||||
// TurnResult is the parsed response from POST /sessions/{id}/turn on success.
|
||||
//
|
||||
// Use Text() to concatenate just the "text" events into a single string,
|
||||
// dropping thinking/tool_call frames.
|
||||
type TurnResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
SessionID string `json:"session_id"`
|
||||
TurnIndex int `json:"turn_index"`
|
||||
Events []TurnEvent `json:"events"`
|
||||
StopReason string `json:"stop_reason"`
|
||||
DurationMs int `json:"duration_ms"`
|
||||
}
|
||||
|
||||
// Text concatenates the Content of every event whose Type == "text",
|
||||
// in order. Non-text events (thinking, tool_call) are skipped. Use this
|
||||
// when you want the model's user-facing reply only.
|
||||
func (r *TurnResult) Text() string {
|
||||
if r == nil || len(r.Events) == 0 {
|
||||
return ""
|
||||
}
|
||||
var b strings.Builder
|
||||
for _, ev := range r.Events {
|
||||
if ev.Type == "text" {
|
||||
b.WriteString(ev.Content)
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// SessionState is the parsed response from GET /sessions/{id}. Mirrors the
|
||||
// server's session-state shape; LastTurnAt and ClosedAt are pointers because
|
||||
// they're nullable until the first turn / first close.
|
||||
type SessionState struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Agent string `json:"agent"`
|
||||
AppName string `json:"app_name,omitempty"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
LastTurnAt *int64 `json:"last_turn_at"`
|
||||
TurnCount int `json:"turn_count"`
|
||||
ClosedAt *int64 `json:"closed_at"`
|
||||
}
|
||||
|
||||
// Session is a handle to a multi-turn session on the server. Construct via
|
||||
// Client.NewSession. Methods are safe for concurrent use; Turn calls on the
|
||||
// same session are serialized via an internal mutex so the server sees them
|
||||
// in order.
|
||||
//
|
||||
// Always Close the session when you're done — Close is idempotent (a second
|
||||
// call short-circuits without a network round-trip via an atomic flag).
|
||||
//
|
||||
// s, err := client.NewSession(ctx, nil)
|
||||
// if err != nil { ... }
|
||||
// defer s.Close(ctx)
|
||||
type Session struct {
|
||||
client *Client
|
||||
sessionID string
|
||||
agent string
|
||||
createdAt int64
|
||||
|
||||
// closed short-circuits the second Close call without hitting the
|
||||
// network. The server is itself idempotent, but this saves the round
|
||||
// trip and makes Close safe to call from any number of defers.
|
||||
closed atomic.Bool
|
||||
|
||||
// turnMu serializes concurrent Turn calls on the same session so the
|
||||
// server observes them in caller-determined order. Per-session, NOT
|
||||
// global — different sessions on the same Client never block each
|
||||
// other.
|
||||
turnMu sync.Mutex
|
||||
}
|
||||
|
||||
// ID returns the server-assigned session id.
|
||||
func (s *Session) ID() string { return s.sessionID }
|
||||
|
||||
// Agent returns the agent name the session was created against
|
||||
// (default "claude").
|
||||
func (s *Session) Agent() string { return s.agent }
|
||||
|
||||
// CreatedAt returns the unix timestamp the server recorded at create time.
|
||||
func (s *Session) CreatedAt() int64 { return s.createdAt }
|
||||
|
||||
// ---------- Client session methods ------------------------------------------
|
||||
|
||||
// createSessionResponse is the wire shape of POST /sessions on success.
|
||||
type createSessionResponse struct {
|
||||
OK bool `json:"ok"`
|
||||
SessionID string `json:"session_id"`
|
||||
Agent string `json:"agent"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
Cwd string `json:"cwd,omitempty"`
|
||||
}
|
||||
|
||||
// NewSession issues POST /sessions and returns a *Session handle for
|
||||
// follow-up Turn / Close / state calls.
|
||||
//
|
||||
// opts may be nil for the all-default case (agent="claude", no meta).
|
||||
//
|
||||
// Errors mirror the rest of the SDK: 401/403 → ErrAuth, transport failures
|
||||
// → *TransportError, other non-2xx → *APIError.
|
||||
func (c *Client) NewSession(ctx context.Context, opts *SessionOptions) (*Session, error) {
|
||||
body := struct {
|
||||
Agent string `json:"agent,omitempty"`
|
||||
Meta map[string]any `json:"meta,omitempty"`
|
||||
}{}
|
||||
if opts != nil {
|
||||
body.Agent = opts.Agent
|
||||
body.Meta = opts.Meta
|
||||
}
|
||||
buf, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("clawdforge: marshal SessionOptions: %w", err)
|
||||
}
|
||||
req, err := c.newRequest(ctx, http.MethodPost, "/sessions", bytes.NewReader(buf), "application/json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out createSessionResponse
|
||||
if err := c.do(req, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if out.SessionID == "" {
|
||||
return nil, errors.New("clawdforge: NewSession: server returned empty session_id")
|
||||
}
|
||||
return &Session{
|
||||
client: c,
|
||||
sessionID: out.SessionID,
|
||||
agent: out.Agent,
|
||||
createdAt: out.CreatedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetSession issues GET /sessions/{id} and returns the server's view of the
|
||||
// session — turn count, timestamps, closed state, etc. A 404 on a session
|
||||
// that exists under a different token (cross-token access) surfaces as
|
||||
// *APIError with StatusCode==404, matching the server's
|
||||
// no-existence-leak design.
|
||||
func (c *Client) GetSession(ctx context.Context, sessionID string) (*SessionState, error) {
|
||||
if sessionID == "" {
|
||||
return nil, errors.New("clawdforge: GetSession: sessionID is required")
|
||||
}
|
||||
req, err := c.newRequest(ctx, http.MethodGet, "/sessions/"+url.PathEscape(sessionID), nil, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out SessionState
|
||||
if err := c.do(req, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// listSessionsResponse is the wire shape of GET /sessions.
|
||||
type listSessionsResponse struct {
|
||||
OK bool `json:"ok"`
|
||||
Sessions []SessionState `json:"sessions"`
|
||||
Count int `json:"count"`
|
||||
}
|
||||
|
||||
// ListSessions issues GET /sessions and returns every session visible to the
|
||||
// calling token (per-app isolation is enforced server-side). The result
|
||||
// includes closed-but-not-yet-hard-deleted sessions by default.
|
||||
func (c *Client) ListSessions(ctx context.Context) ([]SessionState, error) {
|
||||
req, err := c.newRequest(ctx, http.MethodGet, "/sessions", nil, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out listSessionsResponse
|
||||
if err := c.do(req, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out.Sessions, nil
|
||||
}
|
||||
|
||||
// ---------- Session methods -------------------------------------------------
|
||||
|
||||
// turnRequestBody is the wire shape of POST /sessions/{id}/turn.
|
||||
type turnRequestBody struct {
|
||||
Prompt string `json:"prompt"`
|
||||
Files []string `json:"files,omitempty"`
|
||||
TimeoutSecs int `json:"timeout_secs,omitempty"`
|
||||
}
|
||||
|
||||
// Turn sends a prompt to the session and returns the structured result —
|
||||
// the full event batch, stop reason, and timing. Multiple TurnOption values
|
||||
// are merged; later values override earlier ones per-field when non-zero.
|
||||
//
|
||||
// Concurrent Turn calls on the same Session are serialized by an internal
|
||||
// mutex so the server observes them in caller-determined order. Different
|
||||
// sessions on the same Client never block each other.
|
||||
func (s *Session) Turn(ctx context.Context, prompt string, opts ...TurnOption) (*TurnResult, error) {
|
||||
if s == nil {
|
||||
return nil, errors.New("clawdforge: Turn called on nil *Session")
|
||||
}
|
||||
if prompt == "" {
|
||||
return nil, errors.New("clawdforge: Turn: prompt is required")
|
||||
}
|
||||
if s.closed.Load() {
|
||||
return nil, errors.New("clawdforge: Turn called on closed session")
|
||||
}
|
||||
|
||||
// Merge options left-to-right; non-zero fields override.
|
||||
body := turnRequestBody{Prompt: prompt}
|
||||
for _, o := range opts {
|
||||
if len(o.Files) > 0 {
|
||||
body.Files = o.Files
|
||||
}
|
||||
if o.TimeoutMs > 0 {
|
||||
// Server takes seconds; round UP so a sub-second SDK timeout
|
||||
// doesn't degrade to 0 (= "use default") on the wire.
|
||||
secs := o.TimeoutMs / 1000
|
||||
if o.TimeoutMs%1000 != 0 {
|
||||
secs++
|
||||
}
|
||||
body.TimeoutSecs = secs
|
||||
}
|
||||
}
|
||||
|
||||
buf, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("clawdforge: marshal Turn body: %w", err)
|
||||
}
|
||||
|
||||
// Serialize concurrent Turns on the same session. Held for the entire
|
||||
// HTTP round-trip so the server sees ordered prompt arrivals — the
|
||||
// real ordering constraint lives at the model/agent layer, but the
|
||||
// SDK can at least guarantee the request ordering it dispatches.
|
||||
s.turnMu.Lock()
|
||||
defer s.turnMu.Unlock()
|
||||
|
||||
req, err := s.client.newRequest(
|
||||
ctx,
|
||||
http.MethodPost,
|
||||
"/sessions/"+url.PathEscape(s.sessionID)+"/turn",
|
||||
bytes.NewReader(buf),
|
||||
"application/json",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out TurnResult
|
||||
if err := s.client.do(req, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// closeResponse is the wire shape of DELETE /sessions/{id}.
|
||||
type closeResponse struct {
|
||||
OK bool `json:"ok"`
|
||||
AlreadyClosed bool `json:"already_closed,omitempty"`
|
||||
}
|
||||
|
||||
// Close issues DELETE /sessions/{id} to soft-close the session server-side.
|
||||
//
|
||||
// Close is idempotent — the second and subsequent calls short-circuit via
|
||||
// an atomic flag without a network round-trip. The server's close endpoint
|
||||
// is itself idempotent (returns {ok:true, already_closed:true} on a second
|
||||
// hit), but the local short-circuit saves the request round-trip in the
|
||||
// common defer-Close pattern.
|
||||
//
|
||||
// Safe to call from defer / cleanup paths regardless of prior state.
|
||||
func (s *Session) Close(ctx context.Context) error {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
// CompareAndSwap so only one caller wins the network call when many
|
||||
// defers race. Subsequent callers see the flag already set and return
|
||||
// nil without contacting the server.
|
||||
if !s.closed.CompareAndSwap(false, true) {
|
||||
return nil
|
||||
}
|
||||
req, err := s.client.newRequest(
|
||||
ctx,
|
||||
http.MethodDelete,
|
||||
"/sessions/"+url.PathEscape(s.sessionID),
|
||||
nil,
|
||||
"",
|
||||
)
|
||||
if err != nil {
|
||||
// Roll the flag back so the caller can retry — Close failing on
|
||||
// network setup shouldn't strand the session as "locally closed
|
||||
// but actually open server-side" with no way to retry.
|
||||
s.closed.Store(false)
|
||||
return err
|
||||
}
|
||||
var out closeResponse
|
||||
if err := s.client.do(req, &out); err != nil {
|
||||
s.closed.Store(false)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue