diff --git a/extensions/ty-sandbox/.gitignore b/extensions/ty-sandbox/.gitignore new file mode 100644 index 0000000..6d75e7e --- /dev/null +++ b/extensions/ty-sandbox/.gitignore @@ -0,0 +1,2 @@ +ty-sandbox +*.exe diff --git a/extensions/ty-sandbox/Dockerfile b/extensions/ty-sandbox/Dockerfile new file mode 100644 index 0000000..893ea26 --- /dev/null +++ b/extensions/ty-sandbox/Dockerfile @@ -0,0 +1,23 @@ +FROM golang:1.24-alpine AS builder + +WORKDIR /build +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . +RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o /ty-sandbox ./cmd/main.go + +FROM alpine:3.21 + +RUN apk add --no-cache git nodejs npm + +# Install Claude Code CLI +RUN npm install -g @anthropic-ai/claude-code + +COPY --from=builder /ty-sandbox /usr/local/bin/ty-sandbox + +WORKDIR /workspace +EXPOSE 8080 + +ENTRYPOINT ["ty-sandbox"] +CMD ["--addr", ":8080"] diff --git a/extensions/ty-sandbox/README.md b/extensions/ty-sandbox/README.md new file mode 100644 index 0000000..1703d51 --- /dev/null +++ b/extensions/ty-sandbox/README.md @@ -0,0 +1,171 @@ +# ty-sandbox + +HTTP/SSE server for running coding agents in cloud sandboxes. Port of [rivet-dev/sandbox-agent](https://github.com/rivet-dev/sandbox-agent) to Go, integrated with TaskYou. + +## Install + +```sh +go build -o ty-sandbox ./cmd/main.go +``` + +Or with Docker: + +```sh +docker build -t ty-sandbox . +docker run -p 8080:8080 -e ANTHROPIC_API_KEY=sk-... ty-sandbox +``` + +## Usage + +Start the server: + +```sh +ty-sandbox --addr :8080 +``` + +With authentication: + +```sh +ty-sandbox --addr :8080 --auth-token your-secret +``` + +## API + +### Health + +``` +GET /v1/health +``` + +### Agents + +List agents: + +``` +GET /v1/agents +``` + +Install an agent: + +``` +POST /v1/agents/{agent}/install +``` + +### Sessions + +Create a session: + +``` +POST /v1/sessions/{session_id} + +{ + "agent": "claude-code", + "prompt": "Fix the login bug", + "work_dir": "/workspace" +} +``` + +List sessions: + +``` +GET /v1/sessions +``` + +Send a message: + +``` +POST /v1/sessions/{session_id}/messages + +{"message": "Also update the tests"} +``` + +Get events (polling): + +``` +GET /v1/sessions/{session_id}/events +GET /v1/sessions/{session_id}/events?after_sequence=5 +``` + +Stream events (SSE): + +``` +GET /v1/sessions/{session_id}/events/sse +``` + +Terminate a session: + +``` +POST /v1/sessions/{session_id}/terminate +``` + +### Human-in-the-Loop + +Reply to a question: + +``` +POST /v1/sessions/{session_id}/questions/{question_id}/reply + +{"answer": "Yes, proceed"} +``` + +Reject a question: + +``` +POST /v1/sessions/{session_id}/questions/{question_id}/reject +``` + +Reply to a permission request: + +``` +POST /v1/sessions/{session_id}/permissions/{permission_id}/reply + +{"allow": true} +``` + +## Event Schema + +Events follow the universal schema from sandbox-agent. Each event has: + +- `event_id` - Unique identifier +- `session_id` - Session this event belongs to +- `type` - Event type (see below) +- `source` - `"agent"` or `"daemon"` +- `sequence` - Monotonically increasing counter +- `time` - ISO 8601 timestamp +- `data` - Type-specific payload + +Event types: + +| Type | Description | +|------|-------------| +| `session.started` | Session initialized | +| `session.ended` | Session terminated | +| `turn.started` | Conversation turn began | +| `turn.ended` | Conversation turn ended | +| `item.started` | Message or tool call began | +| `item.delta` | Streaming content update | +| `item.completed` | Message or tool call finished | +| `question.requested` | Agent needs user input | +| `question.resolved` | Question was answered | +| `permission.requested` | Tool execution needs approval | +| `permission.resolved` | Permission was granted/denied | +| `error` | Error occurred | + +## Supported Agents + +| Agent | ID | Status | +|-------|----|--------| +| Claude Code | `claude-code` | Supported | +| Mock | `mock` | For testing | + +## TaskYou Integration + +When the `ty` CLI is available, sessions can create/update TaskYou tasks. Set `--ty-path` or have `ty` on your PATH. + +## Environment Variables + +| Variable | Description | +|----------|-------------| +| `SANDBOX_AUTH_TOKEN` | Bearer token for API auth | +| `SANDBOX_WORK_DIR` | Default working directory | +| `ANTHROPIC_API_KEY` | API key for Claude Code | diff --git a/extensions/ty-sandbox/cmd/main.go b/extensions/ty-sandbox/cmd/main.go new file mode 100644 index 0000000..9b44d87 --- /dev/null +++ b/extensions/ty-sandbox/cmd/main.go @@ -0,0 +1,94 @@ +// ty-sandbox is an HTTP/SSE server that enables remote control of coding agents +// (Claude Code, Codex, etc.) running in isolated sandbox environments. +// +// It exposes a unified REST + SSE API modeled after rivet-dev/sandbox-agent, +// and integrates with TaskYou for task tracking. +// +// Usage: +// +// ty-sandbox [flags] +// ty-sandbox --addr :8080 --auth-token secret123 +// ty-sandbox --work-dir /workspace +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/bborn/workflow/extensions/ty-sandbox/internal/agent" + "github.com/bborn/workflow/extensions/ty-sandbox/internal/bridge" + "github.com/bborn/workflow/extensions/ty-sandbox/internal/server" + "github.com/bborn/workflow/extensions/ty-sandbox/internal/session" +) + +var ( + version = "0.1.0" +) + +func main() { + addr := flag.String("addr", ":8080", "listen address") + authToken := flag.String("auth-token", "", "bearer token for API authentication (or SANDBOX_AUTH_TOKEN env)") + workDir := flag.String("work-dir", "", "default working directory for agent sessions") + tyPath := flag.String("ty-path", "", "path to ty CLI for TaskYou integration") + showVersion := flag.Bool("version", false, "print version and exit") + flag.Parse() + + if *showVersion { + fmt.Printf("ty-sandbox %s\n", version) + os.Exit(0) + } + + // Auth token from env if not set via flag + if *authToken == "" { + *authToken = os.Getenv("SANDBOX_AUTH_TOKEN") + } + + // Work dir from env if not set + if *workDir == "" { + *workDir = os.Getenv("SANDBOX_WORK_DIR") + } + + log.Printf("ty-sandbox %s starting", version) + + // Initialize components + registry := agent.NewRegistry() + mgr := session.NewManager(registry) + br := bridge.New(*tyPath) + + if br.IsAvailable() { + log.Printf("TaskYou bridge: available") + } else { + log.Printf("TaskYou bridge: not available (ty CLI not found)") + } + + cfg := server.Config{ + Addr: *addr, + AuthToken: *authToken, + } + + srv := server.New(cfg, registry, mgr) + + // Graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = ctx + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + log.Printf("shutting down...") + cancel() + os.Exit(0) + }() + + if err := srv.Start(); err != nil { + log.Fatalf("server error: %v", err) + } +} diff --git a/extensions/ty-sandbox/config.example.yaml b/extensions/ty-sandbox/config.example.yaml new file mode 100644 index 0000000..d21ee48 --- /dev/null +++ b/extensions/ty-sandbox/config.example.yaml @@ -0,0 +1,15 @@ +# ty-sandbox configuration +# +# All values can also be set via flags or environment variables. + +# Listen address +addr: ":8080" + +# Bearer token for API authentication (env: SANDBOX_AUTH_TOKEN) +# auth_token: "your-secret-token" + +# Default working directory for agent sessions (env: SANDBOX_WORK_DIR) +# work_dir: "/workspace" + +# Path to ty CLI for TaskYou integration (auto-detected if on PATH) +# ty_path: "/usr/local/bin/ty" diff --git a/extensions/ty-sandbox/go.mod b/extensions/ty-sandbox/go.mod new file mode 100644 index 0000000..1dc5d0a --- /dev/null +++ b/extensions/ty-sandbox/go.mod @@ -0,0 +1,5 @@ +module github.com/bborn/workflow/extensions/ty-sandbox + +go 1.24.4 + +require github.com/google/uuid v1.6.0 diff --git a/extensions/ty-sandbox/go.sum b/extensions/ty-sandbox/go.sum new file mode 100644 index 0000000..7790d7c --- /dev/null +++ b/extensions/ty-sandbox/go.sum @@ -0,0 +1,2 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/extensions/ty-sandbox/internal/agent/agent.go b/extensions/ty-sandbox/internal/agent/agent.go new file mode 100644 index 0000000..47f83d9 --- /dev/null +++ b/extensions/ty-sandbox/internal/agent/agent.go @@ -0,0 +1,103 @@ +// Package agent defines the adapter interface for coding agents and provides +// implementations for Claude Code and other agents. +package agent + +import ( + "context" + "fmt" + + "github.com/bborn/workflow/extensions/ty-sandbox/internal/events" +) + +// AgentID identifies a supported coding agent. +type AgentID string + +const ( + AgentClaude AgentID = "claude-code" + AgentCodex AgentID = "codex" + AgentOpenCode AgentID = "opencode" + AgentMock AgentID = "mock" +) + +// AgentInfo describes a supported agent. +type AgentInfo struct { + ID AgentID `json:"id"` + Name string `json:"name"` + Installed bool `json:"installed"` + Available bool `json:"available"` + Description string `json:"description,omitempty"` +} + +// SpawnConfig contains parameters for starting an agent session. +type SpawnConfig struct { + Agent AgentID `json:"agent"` + Model string `json:"model,omitempty"` + Prompt string `json:"prompt,omitempty"` + WorkDir string `json:"work_dir,omitempty"` + SystemPrompt string `json:"system_prompt,omitempty"` + MaxTurns int `json:"max_turns,omitempty"` + Args []string `json:"args,omitempty"` +} + +// Agent defines the interface that all agent adapters must implement. +type Agent interface { + // ID returns the agent identifier. + ID() AgentID + + // Info returns metadata about this agent. + Info() AgentInfo + + // IsInstalled checks if the agent CLI is available on the system. + IsInstalled() bool + + // Install attempts to install the agent CLI. + Install(ctx context.Context) error + + // Spawn starts a new agent session with the given config. + // Events are sent to the provided channel. + Spawn(ctx context.Context, sessionID string, cfg SpawnConfig, eventCh chan<- *events.UniversalEvent) error + + // SendMessage sends a user message to an active session. + SendMessage(ctx context.Context, sessionID string, message string) error + + // Terminate stops an active session. + Terminate(ctx context.Context, sessionID string) error +} + +// Registry holds all registered agent adapters. +type Registry struct { + agents map[AgentID]Agent +} + +// NewRegistry creates a new agent registry with the default adapters. +func NewRegistry() *Registry { + r := &Registry{ + agents: make(map[AgentID]Agent), + } + r.Register(NewClaudeAgent()) + r.Register(NewMockAgent()) + return r +} + +// Register adds an agent adapter to the registry. +func (r *Registry) Register(a Agent) { + r.agents[a.ID()] = a +} + +// Get returns an agent adapter by ID. +func (r *Registry) Get(id AgentID) (Agent, error) { + a, ok := r.agents[id] + if !ok { + return nil, fmt.Errorf("unknown agent: %s", id) + } + return a, nil +} + +// List returns info for all registered agents. +func (r *Registry) List() []AgentInfo { + var infos []AgentInfo + for _, a := range r.agents { + infos = append(infos, a.Info()) + } + return infos +} diff --git a/extensions/ty-sandbox/internal/agent/claude.go b/extensions/ty-sandbox/internal/agent/claude.go new file mode 100644 index 0000000..b15383a --- /dev/null +++ b/extensions/ty-sandbox/internal/agent/claude.go @@ -0,0 +1,394 @@ +package agent + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "log" + "os" + "os/exec" + "sync" + "sync/atomic" + + "github.com/bborn/workflow/extensions/ty-sandbox/internal/events" + "github.com/google/uuid" +) + +// ClaudeAgent implements the Agent interface for Claude Code CLI. +type ClaudeAgent struct { + mu sync.Mutex + sessions map[string]*claudeSession +} + +type claudeSession struct { + cmd *exec.Cmd + cancel context.CancelFunc + seq atomic.Uint64 +} + +func NewClaudeAgent() *ClaudeAgent { + return &ClaudeAgent{ + sessions: make(map[string]*claudeSession), + } +} + +func (a *ClaudeAgent) ID() AgentID { return AgentClaude } + +func (a *ClaudeAgent) Info() AgentInfo { + return AgentInfo{ + ID: AgentClaude, + Name: "Claude Code", + Installed: a.IsInstalled(), + Available: a.IsInstalled(), + Description: "Anthropic's Claude Code CLI agent", + } +} + +func (a *ClaudeAgent) IsInstalled() bool { + _, err := exec.LookPath("claude") + return err == nil +} + +func (a *ClaudeAgent) Install(ctx context.Context) error { + cmd := exec.CommandContext(ctx, "npm", "install", "-g", "@anthropic-ai/claude-code") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} + +func (a *ClaudeAgent) Spawn(ctx context.Context, sessionID string, cfg SpawnConfig, eventCh chan<- *events.UniversalEvent) error { + ctx, cancel := context.WithCancel(ctx) + + args := []string{ + "--output-format", "stream-json", + "--verbose", + } + + if cfg.Model != "" { + args = append(args, "--model", cfg.Model) + } + if cfg.MaxTurns > 0 { + args = append(args, "--max-turns", fmt.Sprintf("%d", cfg.MaxTurns)) + } + if cfg.SystemPrompt != "" { + args = append(args, "--system-prompt", cfg.SystemPrompt) + } + args = append(args, cfg.Args...) + + if cfg.Prompt != "" { + args = append(args, "--print", cfg.Prompt) + } + + cmd := exec.CommandContext(ctx, "claude", args...) + if cfg.WorkDir != "" { + cmd.Dir = cfg.WorkDir + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel() + return fmt.Errorf("stdout pipe: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + cancel() + return fmt.Errorf("stderr pipe: %w", err) + } + + sess := &claudeSession{cmd: cmd, cancel: cancel} + + a.mu.Lock() + a.sessions[sessionID] = sess + a.mu.Unlock() + + if err := cmd.Start(); err != nil { + cancel() + a.mu.Lock() + delete(a.sessions, sessionID) + a.mu.Unlock() + return fmt.Errorf("start claude: %w", err) + } + + // Emit session.started + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventSessionStarted, events.SourceDaemon, &events.SessionStartedData{ + Agent: string(AgentClaude), + Model: cfg.Model, + }) + eventCh <- evt + + // Parse streaming JSON output from Claude Code + go func() { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large outputs + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + a.handleClaudeEvent(sessionID, sess, line, eventCh) + } + }() + + // Log stderr + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + log.Printf("[claude:%s] stderr: %s", sessionID[:8], scanner.Text()) + } + }() + + // Wait for process to exit + go func() { + err := cmd.Wait() + reason := events.EndReasonCompleted + if err != nil { + if ctx.Err() != nil { + reason = events.EndReasonTerminated + } else { + reason = events.EndReasonError + } + } + + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventSessionEnded, events.SourceDaemon, &events.SessionEndedData{ + Reason: reason, + }) + eventCh <- evt + + a.mu.Lock() + delete(a.sessions, sessionID) + a.mu.Unlock() + }() + + return nil +} + +func (a *ClaudeAgent) SendMessage(ctx context.Context, sessionID string, message string) error { + a.mu.Lock() + sess, ok := a.sessions[sessionID] + a.mu.Unlock() + if !ok { + return fmt.Errorf("session not found: %s", sessionID) + } + _ = sess + // Claude Code in --print mode doesn't support interactive input. + // For multi-turn, we'd need to use the MCP or session resume approach. + return fmt.Errorf("interactive messages not yet supported for Claude Code; use a new session for each prompt") +} + +func (a *ClaudeAgent) Terminate(ctx context.Context, sessionID string) error { + a.mu.Lock() + sess, ok := a.sessions[sessionID] + a.mu.Unlock() + if !ok { + return fmt.Errorf("session not found: %s", sessionID) + } + sess.cancel() + return nil +} + +// handleClaudeEvent parses a single line of Claude Code's stream-json output +// and converts it to universal events. +func (a *ClaudeAgent) handleClaudeEvent(sessionID string, sess *claudeSession, line []byte, eventCh chan<- *events.UniversalEvent) { + var raw map[string]any + if err := json.Unmarshal(line, &raw); err != nil { + // Not JSON - emit as unparsed + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventAgentUnparsed, events.SourceDaemon, &events.AgentUnparsedData{ + Error: "invalid json", + Location: "stdout", + }) + eventCh <- evt + return + } + + eventType, _ := raw["type"].(string) + + switch eventType { + case "assistant": + a.handleAssistantMessage(sessionID, sess, raw, eventCh) + case "tool_use": + a.handleToolUse(sessionID, sess, raw, eventCh) + case "tool_result": + a.handleToolResult(sessionID, sess, raw, eventCh) + case "result": + a.handleResult(sessionID, sess, raw, eventCh) + case "system": + // System messages from Claude Code - emit as text item + msg, _ := raw["message"].(string) + if msg != "" { + seq := sess.seq.Add(1) - 1 + itemID := uuid.New().String() + evt, _ := events.NewEvent(sessionID, seq, events.EventItemStarted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: itemID, + Kind: events.ItemKindMessage, + Role: events.RoleSystem, + Status: events.ItemStatusCompleted, + Content: []events.ContentPart{ + {Type: "text", Text: msg}, + }, + }, + }) + eventCh <- evt + + seq = sess.seq.Add(1) - 1 + evt, _ = events.NewEvent(sessionID, seq, events.EventItemCompleted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: itemID, + Kind: events.ItemKindMessage, + Role: events.RoleSystem, + Status: events.ItemStatusCompleted, + Content: []events.ContentPart{ + {Type: "text", Text: msg}, + }, + }, + }) + eventCh <- evt + } + default: + // Unknown event type - emit raw + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventAgentUnparsed, events.SourceDaemon, &events.AgentUnparsedData{ + Error: fmt.Sprintf("unknown event type: %s", eventType), + Location: "stdout", + }) + evt.Raw = line + eventCh <- evt + } +} + +func (a *ClaudeAgent) handleAssistantMessage(sessionID string, sess *claudeSession, raw map[string]any, eventCh chan<- *events.UniversalEvent) { + itemID := uuid.New().String() + message, _ := raw["message"].(string) + if message == "" { + // Try nested content + if content, ok := raw["content"].([]any); ok { + for _, c := range content { + if cm, ok := c.(map[string]any); ok { + if t, ok := cm["text"].(string); ok { + message += t + } + } + } + } + } + + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventItemStarted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: itemID, + Kind: events.ItemKindMessage, + Role: events.RoleAssistant, + Status: events.ItemStatusInProgress, + }, + }) + eventCh <- evt + + if message != "" { + seq = sess.seq.Add(1) - 1 + evt, _ = events.NewEvent(sessionID, seq, events.EventItemDelta, events.SourceAgent, &events.ItemDeltaData{ + ItemID: itemID, + Delta: events.ContentPart{Type: "text", Text: message}, + }) + eventCh <- evt + } + + seq = sess.seq.Add(1) - 1 + evt, _ = events.NewEvent(sessionID, seq, events.EventItemCompleted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: itemID, + Kind: events.ItemKindMessage, + Role: events.RoleAssistant, + Status: events.ItemStatusCompleted, + Content: []events.ContentPart{ + {Type: "text", Text: message}, + }, + }, + }) + eventCh <- evt +} + +func (a *ClaudeAgent) handleToolUse(sessionID string, sess *claudeSession, raw map[string]any, eventCh chan<- *events.UniversalEvent) { + toolName, _ := raw["name"].(string) + callID, _ := raw["id"].(string) + if callID == "" { + callID = uuid.New().String() + } + argsRaw, _ := raw["input"].(map[string]any) + argsBytes, _ := json.Marshal(argsRaw) + + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventItemStarted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: callID, + Kind: events.ItemKindToolCall, + Role: events.RoleAssistant, + Status: events.ItemStatusInProgress, + Content: []events.ContentPart{ + {Type: "tool_call", CallID: callID, Name: toolName, Arguments: string(argsBytes)}, + }, + }, + }) + eventCh <- evt +} + +func (a *ClaudeAgent) handleToolResult(sessionID string, sess *claudeSession, raw map[string]any, eventCh chan<- *events.UniversalEvent) { + callID, _ := raw["tool_use_id"].(string) + if callID == "" { + callID = uuid.New().String() + } + output, _ := raw["content"].(string) + if output == "" { + if contentArr, ok := raw["content"].([]any); ok { + for _, c := range contentArr { + if cm, ok := c.(map[string]any); ok { + if t, ok := cm["text"].(string); ok { + output += t + } + } + } + } + } + + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventItemCompleted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: callID, + Kind: events.ItemKindToolCall, + Role: events.RoleAssistant, + Status: events.ItemStatusCompleted, + Content: []events.ContentPart{ + {Type: "tool_result", CallID: callID, Output: output}, + }, + }, + }) + eventCh <- evt +} + +func (a *ClaudeAgent) handleResult(sessionID string, sess *claudeSession, raw map[string]any, eventCh chan<- *events.UniversalEvent) { + // Final result from Claude Code - contains cost info, session ID, etc. + itemID := uuid.New().String() + result, _ := raw["result"].(string) + if result == "" { + result, _ = raw["message"].(string) + } + + if result != "" { + seq := sess.seq.Add(1) - 1 + evt, _ := events.NewEvent(sessionID, seq, events.EventItemCompleted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: itemID, + Kind: events.ItemKindMessage, + Role: events.RoleAssistant, + Status: events.ItemStatusCompleted, + Content: []events.ContentPart{ + {Type: "text", Text: result}, + }, + }, + }) + eventCh <- evt + } +} diff --git a/extensions/ty-sandbox/internal/agent/mock.go b/extensions/ty-sandbox/internal/agent/mock.go new file mode 100644 index 0000000..c301bbc --- /dev/null +++ b/extensions/ty-sandbox/internal/agent/mock.go @@ -0,0 +1,109 @@ +package agent + +import ( + "context" + "time" + + "github.com/bborn/workflow/extensions/ty-sandbox/internal/events" + "github.com/google/uuid" +) + +// MockAgent is a test agent that emits synthetic events without running a real CLI. +type MockAgent struct{} + +func NewMockAgent() *MockAgent { return &MockAgent{} } + +func (a *MockAgent) ID() AgentID { return AgentMock } + +func (a *MockAgent) Info() AgentInfo { + return AgentInfo{ + ID: AgentMock, + Name: "Mock Agent", + Installed: true, + Available: true, + Description: "Test agent that emits synthetic events", + } +} + +func (a *MockAgent) IsInstalled() bool { return true } + +func (a *MockAgent) Install(ctx context.Context) error { return nil } + +func (a *MockAgent) Spawn(ctx context.Context, sessionID string, cfg SpawnConfig, eventCh chan<- *events.UniversalEvent) error { + go func() { + var seq uint64 + + // session.started + evt, _ := events.NewEvent(sessionID, seq, events.EventSessionStarted, events.SourceDaemon, &events.SessionStartedData{ + Agent: string(AgentMock), + }) + eventCh <- evt + seq++ + + time.Sleep(200 * time.Millisecond) + + // Emit a turn + turnID := uuid.New().String() + evt, _ = events.NewEvent(sessionID, seq, events.EventTurnStarted, events.SourceDaemon, &events.TurnEventData{TurnID: turnID}) + eventCh <- evt + seq++ + + // Emit an assistant message + itemID := uuid.New().String() + evt, _ = events.NewEvent(sessionID, seq, events.EventItemStarted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: itemID, + Kind: events.ItemKindMessage, + Role: events.RoleAssistant, + Status: events.ItemStatusInProgress, + }, + }) + eventCh <- evt + seq++ + + time.Sleep(100 * time.Millisecond) + + response := "Hello! I'm the mock agent. Your prompt was: " + cfg.Prompt + evt, _ = events.NewEvent(sessionID, seq, events.EventItemDelta, events.SourceAgent, &events.ItemDeltaData{ + ItemID: itemID, + Delta: events.ContentPart{Type: "text", Text: response}, + }) + eventCh <- evt + seq++ + + evt, _ = events.NewEvent(sessionID, seq, events.EventItemCompleted, events.SourceAgent, &events.ItemEventData{ + Item: events.UniversalItem{ + ItemID: itemID, + Kind: events.ItemKindMessage, + Role: events.RoleAssistant, + Status: events.ItemStatusCompleted, + Content: []events.ContentPart{ + {Type: "text", Text: response}, + }, + }, + }) + eventCh <- evt + seq++ + + evt, _ = events.NewEvent(sessionID, seq, events.EventTurnEnded, events.SourceDaemon, &events.TurnEventData{TurnID: turnID}) + eventCh <- evt + seq++ + + time.Sleep(100 * time.Millisecond) + + // session.ended + evt, _ = events.NewEvent(sessionID, seq, events.EventSessionEnded, events.SourceDaemon, &events.SessionEndedData{ + Reason: events.EndReasonCompleted, + }) + eventCh <- evt + }() + return nil +} + +func (a *MockAgent) SendMessage(ctx context.Context, sessionID string, message string) error { + return nil +} + +func (a *MockAgent) Terminate(ctx context.Context, sessionID string) error { + return nil +} diff --git a/extensions/ty-sandbox/internal/bridge/bridge.go b/extensions/ty-sandbox/internal/bridge/bridge.go new file mode 100644 index 0000000..769d394 --- /dev/null +++ b/extensions/ty-sandbox/internal/bridge/bridge.go @@ -0,0 +1,97 @@ +// Package bridge provides integration with the TaskYou CLI for task tracking. +// When sessions are created/completed, corresponding tasks can be created/updated +// in the TaskYou system. +package bridge + +import ( + "encoding/json" + "fmt" + "log" + "os/exec" + "strings" +) + +// Bridge connects the sandbox agent to a TaskYou instance via the ty CLI. +type Bridge struct { + tyPath string + enabled bool +} + +// New creates a new TaskYou bridge. +// If tyPath is empty, auto-detection is attempted. +func New(tyPath string) *Bridge { + if tyPath == "" { + if p, err := exec.LookPath("ty"); err == nil { + tyPath = p + } + } + return &Bridge{ + tyPath: tyPath, + enabled: tyPath != "", + } +} + +// IsAvailable returns true if the ty CLI is accessible. +func (b *Bridge) IsAvailable() bool { + return b.enabled +} + +// CreateTask creates a task in TaskYou for a sandbox session. +func (b *Bridge) CreateTask(title, body, project string) (int64, error) { + if !b.enabled { + return 0, nil + } + + args := []string{"create", title, "--json"} + if body != "" { + args = append(args, "--body", body) + } + if project != "" { + args = append(args, "--project", project) + } + + out, err := b.run(args...) + if err != nil { + return 0, fmt.Errorf("create task: %w", err) + } + + var result struct { + ID int64 `json:"id"` + } + if err := json.Unmarshal([]byte(out), &result); err != nil { + // Try to parse just the ID from output + log.Printf("bridge: could not parse task creation output: %s", out) + return 0, nil + } + + return result.ID, nil +} + +// UpdateTaskStatus updates the status of a TaskYou task. +func (b *Bridge) UpdateTaskStatus(taskID int64, status string) error { + if !b.enabled { + return nil + } + + _, err := b.run("status", fmt.Sprintf("%d", taskID), status) + return err +} + +// CloseTask marks a TaskYou task as done. +func (b *Bridge) CloseTask(taskID int64) error { + if !b.enabled { + return nil + } + + _, err := b.run("close", fmt.Sprintf("%d", taskID)) + return err +} + +func (b *Bridge) run(args ...string) (string, error) { + cmd := exec.Command(b.tyPath, args...) + out, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("ty %s: %s (%w)", strings.Join(args, " "), string(out), err) + } + return strings.TrimSpace(string(out)), nil +} diff --git a/extensions/ty-sandbox/internal/events/events.go b/extensions/ty-sandbox/internal/events/events.go new file mode 100644 index 0000000..9290ada --- /dev/null +++ b/extensions/ty-sandbox/internal/events/events.go @@ -0,0 +1,221 @@ +// Package events defines the universal event schema for sandbox agent communication. +// This schema normalizes output from different coding agents (Claude Code, Codex, etc.) +// into a consistent event stream, modeled after rivet-dev/sandbox-agent. +package events + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" +) + +// EventType identifies the kind of universal event. +type EventType string + +const ( + EventSessionStarted EventType = "session.started" + EventSessionEnded EventType = "session.ended" + EventTurnStarted EventType = "turn.started" + EventTurnEnded EventType = "turn.ended" + EventItemStarted EventType = "item.started" + EventItemDelta EventType = "item.delta" + EventItemCompleted EventType = "item.completed" + EventQuestionRequested EventType = "question.requested" + EventQuestionResolved EventType = "question.resolved" + EventPermissionRequested EventType = "permission.requested" + EventPermissionResolved EventType = "permission.resolved" + EventAgentUnparsed EventType = "agent.unparsed" + EventError EventType = "error" +) + +// EventSource indicates where the event originated. +type EventSource string + +const ( + SourceAgent EventSource = "agent" + SourceDaemon EventSource = "daemon" +) + +// UniversalEvent is the top-level envelope for all events in the stream. +type UniversalEvent struct { + EventID string `json:"event_id"` + SessionID string `json:"session_id"` + NativeSessionID *string `json:"native_session_id,omitempty"` + Type EventType `json:"type"` + Source EventSource `json:"source"` + Synthetic bool `json:"synthetic"` + Sequence uint64 `json:"sequence"` + Time string `json:"time"` + Data json.RawMessage `json:"data"` + Raw json.RawMessage `json:"raw,omitempty"` +} + +// NewEvent creates a new universal event with a unique ID and timestamp. +func NewEvent(sessionID string, seq uint64, eventType EventType, source EventSource, data any) (*UniversalEvent, error) { + dataBytes, err := json.Marshal(data) + if err != nil { + return nil, fmt.Errorf("marshal event data: %w", err) + } + return &UniversalEvent{ + EventID: uuid.New().String(), + SessionID: sessionID, + Type: eventType, + Source: source, + Synthetic: source == SourceDaemon, + Sequence: seq, + Time: time.Now().UTC().Format(time.RFC3339Nano), + Data: dataBytes, + }, nil +} + +// SessionStartedData is emitted when a session begins. +type SessionStartedData struct { + Agent string `json:"agent"` + Model string `json:"model,omitempty"` +} + +// SessionEndReason explains why a session ended. +type SessionEndReason string + +const ( + EndReasonCompleted SessionEndReason = "completed" + EndReasonTerminated SessionEndReason = "terminated" + EndReasonError SessionEndReason = "error" +) + +// SessionEndedData is emitted when a session ends. +type SessionEndedData struct { + Reason SessionEndReason `json:"reason"` +} + +// TurnPhase identifies the phase of a conversation turn. +type TurnPhase string + +const ( + TurnPhaseStarted TurnPhase = "started" + TurnPhaseEnded TurnPhase = "ended" +) + +// TurnEventData is emitted at the start and end of conversation turns. +type TurnEventData struct { + TurnID string `json:"turn_id"` + Phase TurnPhase `json:"phase,omitempty"` +} + +// ItemKind categorizes the type of an item in the conversation. +type ItemKind string + +const ( + ItemKindMessage ItemKind = "message" + ItemKindToolCall ItemKind = "tool_call" +) + +// ItemRole identifies the role of the message author. +type ItemRole string + +const ( + RoleUser ItemRole = "user" + RoleAssistant ItemRole = "assistant" + RoleSystem ItemRole = "system" +) + +// ItemStatus tracks the state of a conversation item. +type ItemStatus string + +const ( + ItemStatusInProgress ItemStatus = "in_progress" + ItemStatusCompleted ItemStatus = "completed" +) + +// ContentPart represents a piece of content in a message or tool call. +type ContentPart struct { + Type string `json:"type"` // "text", "tool_call", "tool_result", "file_ref", "reasoning" + + // text + Text string `json:"text,omitempty"` + + // tool_call + CallID string `json:"call_id,omitempty"` + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` + + // tool_result + Output string `json:"output,omitempty"` + + // file_ref + Path string `json:"path,omitempty"` + Action string `json:"action,omitempty"` + Diff string `json:"diff,omitempty"` + + // reasoning + Visibility string `json:"visibility,omitempty"` +} + +// UniversalItem represents a normalized message or tool call. +type UniversalItem struct { + ItemID string `json:"item_id"` + Kind ItemKind `json:"kind"` + Role ItemRole `json:"role,omitempty"` + Status ItemStatus `json:"status"` + Content []ContentPart `json:"content,omitempty"` +} + +// ItemEventData is emitted for item.started and item.completed events. +type ItemEventData struct { + Item UniversalItem `json:"item"` +} + +// ItemDeltaData is emitted for streaming content updates. +type ItemDeltaData struct { + ItemID string `json:"item_id"` + Delta ContentPart `json:"delta"` +} + +// QuestionStatus tracks the state of a human-in-the-loop question. +type QuestionStatus string + +const ( + QuestionPending QuestionStatus = "pending" + QuestionAnswered QuestionStatus = "answered" + QuestionRejected QuestionStatus = "rejected" +) + +// QuestionEventData is emitted for HITL questions. +type QuestionEventData struct { + QuestionID string `json:"question_id"` + Status QuestionStatus `json:"status"` + Question string `json:"question,omitempty"` + Answer string `json:"answer,omitempty"` +} + +// PermissionStatus tracks the state of a tool permission request. +type PermissionStatus string + +const ( + PermissionPending PermissionStatus = "pending" + PermissionApproved PermissionStatus = "approved" + PermissionDenied PermissionStatus = "denied" +) + +// PermissionEventData is emitted for tool execution approval requests. +type PermissionEventData struct { + PermissionID string `json:"permission_id"` + Status PermissionStatus `json:"status"` + ToolName string `json:"tool_name,omitempty"` + Arguments string `json:"arguments,omitempty"` +} + +// ErrorData is emitted when an error occurs. +type ErrorData struct { + Message string `json:"message"` + Code string `json:"code,omitempty"` +} + +// AgentUnparsedData is emitted when agent output can't be parsed. +type AgentUnparsedData struct { + Error string `json:"error"` + Location string `json:"location"` + RawHash string `json:"raw_hash,omitempty"` +} diff --git a/extensions/ty-sandbox/internal/server/server.go b/extensions/ty-sandbox/internal/server/server.go new file mode 100644 index 0000000..f83b8b7 --- /dev/null +++ b/extensions/ty-sandbox/internal/server/server.go @@ -0,0 +1,425 @@ +// Package server implements the HTTP/SSE API for the sandbox agent. +// The API is modeled after rivet-dev/sandbox-agent's v1 endpoints. +package server + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "strconv" + "strings" + "time" + + "github.com/bborn/workflow/extensions/ty-sandbox/internal/agent" + "github.com/bborn/workflow/extensions/ty-sandbox/internal/session" +) + +// Config holds server configuration. +type Config struct { + Addr string + AuthToken string +} + +// Server is the HTTP server for the sandbox agent API. +type Server struct { + cfg Config + manager *session.Manager + agents *agent.Registry + mux *http.ServeMux +} + +// New creates a new HTTP server. +func New(cfg Config, registry *agent.Registry, manager *session.Manager) *Server { + s := &Server{ + cfg: cfg, + manager: manager, + agents: registry, + mux: http.NewServeMux(), + } + s.registerRoutes() + return s +} + +// Start begins listening for HTTP requests. +func (s *Server) Start() error { + handler := s.withMiddleware(s.mux) + log.Printf("ty-sandbox listening on %s", s.cfg.Addr) + return http.ListenAndServe(s.cfg.Addr, handler) +} + +func (s *Server) withMiddleware(h http.Handler) http.Handler { + // CORS + h = corsMiddleware(h) + // Auth + if s.cfg.AuthToken != "" { + h = authMiddleware(s.cfg.AuthToken, h) + } + // Logging + h = loggingMiddleware(h) + return h +} + +func (s *Server) registerRoutes() { + // Health + s.mux.HandleFunc("GET /v1/health", s.handleHealth) + + // Agents + s.mux.HandleFunc("GET /v1/agents", s.handleListAgents) + s.mux.HandleFunc("POST /v1/agents/{agent}/install", s.handleInstallAgent) + + // Sessions + s.mux.HandleFunc("GET /v1/sessions", s.handleListSessions) + s.mux.HandleFunc("POST /v1/sessions/{session_id}", s.handleCreateSession) + s.mux.HandleFunc("POST /v1/sessions/{session_id}/messages", s.handlePostMessage) + s.mux.HandleFunc("POST /v1/sessions/{session_id}/terminate", s.handleTerminateSession) + s.mux.HandleFunc("GET /v1/sessions/{session_id}/events", s.handleGetEvents) + s.mux.HandleFunc("GET /v1/sessions/{session_id}/events/sse", s.handleGetEventsSSE) + + // Questions (HITL) + s.mux.HandleFunc("POST /v1/sessions/{session_id}/questions/{question_id}/reply", s.handleReplyQuestion) + s.mux.HandleFunc("POST /v1/sessions/{session_id}/questions/{question_id}/reject", s.handleRejectQuestion) + + // Permissions + s.mux.HandleFunc("POST /v1/sessions/{session_id}/permissions/{permission_id}/reply", s.handleReplyPermission) +} + +// --- Health --- + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "version": "0.1.0", + }) +} + +// --- Agents --- + +func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "agents": s.agents.List(), + }) +} + +func (s *Server) handleInstallAgent(w http.ResponseWriter, r *http.Request) { + agentID := agent.AgentID(r.PathValue("agent")) + a, err := s.agents.Get(agentID) + if err != nil { + writeError(w, http.StatusNotFound, err.Error()) + return + } + + if err := a.Install(r.Context()); err != nil { + writeError(w, http.StatusInternalServerError, fmt.Sprintf("install failed: %v", err)) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"installed": true}) +} + +// --- Sessions --- + +func (s *Server) handleListSessions(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "sessions": s.manager.ListSessions(), + }) +} + +type createSessionRequest struct { + Agent string `json:"agent"` + Model string `json:"model,omitempty"` + Prompt string `json:"prompt,omitempty"` + SystemPrompt string `json:"system_prompt,omitempty"` + WorkDir string `json:"work_dir,omitempty"` + MaxTurns int `json:"max_turns,omitempty"` + Args []string `json:"args,omitempty"` +} + +func (s *Server) handleCreateSession(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + + var req createSessionRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) + return + } + + if req.Agent == "" { + req.Agent = string(agent.AgentClaude) + } + + cfg := agent.SpawnConfig{ + Agent: agent.AgentID(req.Agent), + Model: req.Model, + Prompt: req.Prompt, + SystemPrompt: req.SystemPrompt, + WorkDir: req.WorkDir, + MaxTurns: req.MaxTurns, + Args: req.Args, + } + + info, err := s.manager.CreateSession(r.Context(), sessionID, cfg) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusCreated, info) +} + +type postMessageRequest struct { + Message string `json:"message"` +} + +func (s *Server) handlePostMessage(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + + var req postMessageRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) + return + } + + if err := s.manager.SendMessage(r.Context(), sessionID, req.Message); err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, err.Error()) + } else { + writeError(w, http.StatusInternalServerError, err.Error()) + } + return + } + + writeJSON(w, http.StatusOK, map[string]any{"sent": true}) +} + +func (s *Server) handleTerminateSession(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + + if err := s.manager.TerminateSession(r.Context(), sessionID); err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, err.Error()) + } else { + writeError(w, http.StatusInternalServerError, err.Error()) + } + return + } + + writeJSON(w, http.StatusOK, map[string]any{"terminated": true}) +} + +// --- Events --- + +func (s *Server) handleGetEvents(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + + var afterSeq *uint64 + if v := r.URL.Query().Get("after_sequence"); v != "" { + n, err := strconv.ParseUint(v, 10, 64) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid after_sequence") + return + } + afterSeq = &n + } + + evts, err := s.manager.GetEvents(sessionID, afterSeq) + if err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, err.Error()) + } else { + writeError(w, http.StatusInternalServerError, err.Error()) + } + return + } + + writeJSON(w, http.StatusOK, map[string]any{ + "events": evts, + }) +} + +func (s *Server) handleGetEventsSSE(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + + ch, err := s.manager.SubscribeEvents(sessionID) + if err != nil { + writeError(w, http.StatusNotFound, err.Error()) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "streaming not supported") + return + } + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case evt, ok := <-ch: + if !ok { + // Channel closed - session ended + fmt.Fprintf(w, "event: done\ndata: {}\n\n") + flusher.Flush() + return + } + + data, err := json.Marshal(evt) + if err != nil { + continue + } + + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", evt.Type, data) + flusher.Flush() + } + } +} + +// --- Questions --- + +type replyQuestionRequest struct { + Answer string `json:"answer"` +} + +func (s *Server) handleReplyQuestion(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + questionID := r.PathValue("question_id") + + var req replyQuestionRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) + return + } + + if err := s.manager.ReplyQuestion(sessionID, questionID, req.Answer); err != nil { + writeError(w, http.StatusNotFound, err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"replied": true}) +} + +func (s *Server) handleRejectQuestion(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + questionID := r.PathValue("question_id") + + if err := s.manager.RejectQuestion(sessionID, questionID); err != nil { + writeError(w, http.StatusNotFound, err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"rejected": true}) +} + +// --- Permissions --- + +type replyPermissionRequest struct { + Allow bool `json:"allow"` +} + +func (s *Server) handleReplyPermission(w http.ResponseWriter, r *http.Request) { + sessionID := r.PathValue("session_id") + permissionID := r.PathValue("permission_id") + + var req replyPermissionRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) + return + } + + if err := s.manager.ReplyPermission(sessionID, permissionID, req.Allow); err != nil { + writeError(w, http.StatusNotFound, err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"replied": true}) +} + +// --- Helpers --- + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, status int, message string) { + writeJSON(w, status, map[string]any{ + "error": map[string]any{ + "message": message, + "status": status, + }, + }) +} + +// --- Middleware --- + +func corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusNoContent) + return + } + next.ServeHTTP(w, r) + }) +} + +func authMiddleware(token string, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Skip auth for health check + if r.URL.Path == "/v1/health" { + next.ServeHTTP(w, r) + return + } + + auth := r.Header.Get("Authorization") + if auth == "" { + writeError(w, http.StatusUnauthorized, "missing authorization header") + return + } + + parts := strings.SplitN(auth, " ", 2) + if len(parts) != 2 || strings.ToLower(parts[0]) != "bearer" || parts[1] != token { + writeError(w, http.StatusUnauthorized, "invalid token") + return + } + + next.ServeHTTP(w, r) + }) +} + +type responseWriter struct { + http.ResponseWriter + status int +} + +func (rw *responseWriter) WriteHeader(code int) { + rw.status = code + rw.ResponseWriter.WriteHeader(code) +} + +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + rw := &responseWriter{ResponseWriter: w, status: http.StatusOK} + next.ServeHTTP(rw, r) + log.Printf("%s %s %d %s", r.Method, r.URL.Path, rw.status, time.Since(start)) + }) +} + +// WithContext returns a new Server that uses the given context for background operations. +func (s *Server) WithContext(ctx context.Context) *Server { + _ = ctx + return s +} diff --git a/extensions/ty-sandbox/internal/server/server_test.go b/extensions/ty-sandbox/internal/server/server_test.go new file mode 100644 index 0000000..5367661 --- /dev/null +++ b/extensions/ty-sandbox/internal/server/server_test.go @@ -0,0 +1,177 @@ +package server + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/bborn/workflow/extensions/ty-sandbox/internal/agent" + "github.com/bborn/workflow/extensions/ty-sandbox/internal/session" +) + +func newTestServer() *Server { + registry := agent.NewRegistry() + mgr := session.NewManager(registry) + cfg := Config{Addr: ":0"} + return New(cfg, registry, mgr) +} + +func TestHealthEndpoint(t *testing.T) { + srv := newTestServer() + req := httptest.NewRequest("GET", "/v1/health", nil) + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + var resp map[string]any + json.NewDecoder(w.Body).Decode(&resp) + if resp["status"] != "ok" { + t.Fatalf("expected status ok, got %v", resp["status"]) + } +} + +func TestListAgents(t *testing.T) { + srv := newTestServer() + req := httptest.NewRequest("GET", "/v1/agents", nil) + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + var resp map[string]any + json.NewDecoder(w.Body).Decode(&resp) + agents, ok := resp["agents"].([]any) + if !ok || len(agents) == 0 { + t.Fatalf("expected agents list, got %v", resp) + } +} + +func TestListSessionsEmpty(t *testing.T) { + srv := newTestServer() + req := httptest.NewRequest("GET", "/v1/sessions", nil) + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } +} + +func TestCreateMockSession(t *testing.T) { + srv := newTestServer() + + body, _ := json.Marshal(map[string]any{ + "agent": "mock", + "prompt": "hello world", + }) + req := httptest.NewRequest("POST", "/v1/sessions/test-session-1", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + + var resp session.SessionInfo + json.NewDecoder(w.Body).Decode(&resp) + if resp.ID != "test-session-1" { + t.Fatalf("expected session ID test-session-1, got %s", resp.ID) + } + if resp.Status != "active" { + t.Fatalf("expected active status, got %s", resp.Status) + } +} + +func TestGetEventsForMockSession(t *testing.T) { + srv := newTestServer() + + // Create session + body, _ := json.Marshal(map[string]any{ + "agent": "mock", + "prompt": "test prompt", + }) + req := httptest.NewRequest("POST", "/v1/sessions/test-events", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("create session: expected 201, got %d", w.Code) + } + + // Wait for mock agent to produce events (~400ms total) + time.Sleep(1 * time.Second) + + // Get events + req = httptest.NewRequest("GET", "/v1/sessions/test-events/events", nil) + w = httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("get events: expected 200, got %d", w.Code) + } + + var resp map[string]any + json.NewDecoder(w.Body).Decode(&resp) + evts, ok := resp["events"].([]any) + if !ok { + t.Fatalf("expected events array, got %v", resp) + } + if len(evts) == 0 { + t.Fatalf("expected events, got none") + } +} + +func TestAuthMiddleware(t *testing.T) { + registry := agent.NewRegistry() + mgr := session.NewManager(registry) + cfg := Config{Addr: ":0", AuthToken: "secret123"} + srv := New(cfg, registry, mgr) + + handler := srv.withMiddleware(srv.mux) + + // Without token + req := httptest.NewRequest("GET", "/v1/agents", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + if w.Code != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", w.Code) + } + + // With correct token + req = httptest.NewRequest("GET", "/v1/agents", nil) + req.Header.Set("Authorization", "Bearer secret123") + w = httptest.NewRecorder() + handler.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + // Health should skip auth + req = httptest.NewRequest("GET", "/v1/health", nil) + w = httptest.NewRecorder() + handler.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200 for health without auth, got %d", w.Code) + } +} + +func TestSessionNotFound(t *testing.T) { + srv := newTestServer() + + req := httptest.NewRequest("GET", "/v1/sessions/nonexistent/events", nil) + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + + if w.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d", w.Code) + } +} diff --git a/extensions/ty-sandbox/internal/session/manager.go b/extensions/ty-sandbox/internal/session/manager.go new file mode 100644 index 0000000..b257ae3 --- /dev/null +++ b/extensions/ty-sandbox/internal/session/manager.go @@ -0,0 +1,348 @@ +// Package session manages the lifecycle of agent sessions, including event +// buffering, SSE streaming, and question/permission handling. +package session + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/bborn/workflow/extensions/ty-sandbox/internal/agent" + "github.com/bborn/workflow/extensions/ty-sandbox/internal/events" + "github.com/google/uuid" +) + +// SessionInfo describes the state of a session. +type SessionInfo struct { + ID string `json:"id"` + Agent agent.AgentID `json:"agent"` + Model string `json:"model,omitempty"` + Status string `json:"status"` // "active", "ended" + CreatedAt time.Time `json:"created_at"` + EndedAt *time.Time `json:"ended_at,omitempty"` +} + +// Session holds the state for a single agent session. +type Session struct { + mu sync.RWMutex + info SessionInfo + events []*events.UniversalEvent + eventCh chan *events.UniversalEvent + listeners []chan *events.UniversalEvent + questions map[string]*pendingQuestion + perms map[string]*pendingPermission + cancel context.CancelFunc +} + +type pendingQuestion struct { + event *events.QuestionEventData + replyCh chan string +} + +type pendingPermission struct { + event *events.PermissionEventData + replyCh chan bool +} + +// Manager coordinates sessions across agent adapters. +type Manager struct { + mu sync.RWMutex + sessions map[string]*Session + registry *agent.Registry +} + +// NewManager creates a new session manager. +func NewManager(registry *agent.Registry) *Manager { + return &Manager{ + sessions: make(map[string]*Session), + registry: registry, + } +} + +// CreateSession starts a new agent session. +func (m *Manager) CreateSession(ctx context.Context, sessionID string, cfg agent.SpawnConfig) (*SessionInfo, error) { + if sessionID == "" { + sessionID = uuid.New().String() + } + + m.mu.Lock() + if _, exists := m.sessions[sessionID]; exists { + m.mu.Unlock() + return nil, fmt.Errorf("session already exists: %s", sessionID) + } + m.mu.Unlock() + + a, err := m.registry.Get(cfg.Agent) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + eventCh := make(chan *events.UniversalEvent, 256) + + sess := &Session{ + info: SessionInfo{ + ID: sessionID, + Agent: cfg.Agent, + Model: cfg.Model, + Status: "active", + CreatedAt: time.Now(), + }, + eventCh: eventCh, + questions: make(map[string]*pendingQuestion), + perms: make(map[string]*pendingPermission), + cancel: cancel, + } + + m.mu.Lock() + m.sessions[sessionID] = sess + m.mu.Unlock() + + // Start event consumer goroutine + go m.consumeEvents(sess) + + // Spawn the agent + if err := a.Spawn(ctx, sessionID, cfg, eventCh); err != nil { + cancel() + m.mu.Lock() + delete(m.sessions, sessionID) + m.mu.Unlock() + return nil, fmt.Errorf("spawn agent: %w", err) + } + + return &sess.info, nil +} + +// consumeEvents reads events from the agent and distributes to listeners. +func (m *Manager) consumeEvents(sess *Session) { + for evt := range sess.eventCh { + sess.mu.Lock() + sess.events = append(sess.events, evt) + + // Distribute to all SSE listeners + for i := len(sess.listeners) - 1; i >= 0; i-- { + select { + case sess.listeners[i] <- evt: + default: + // Listener is full, remove it + sess.listeners = append(sess.listeners[:i], sess.listeners[i+1:]...) + } + } + + // Mark session as ended when we get a session.ended event + if evt.Type == events.EventSessionEnded { + sess.info.Status = "ended" + now := time.Now() + sess.info.EndedAt = &now + // Close all listeners + for _, l := range sess.listeners { + close(l) + } + sess.listeners = nil + } + + sess.mu.Unlock() + } +} + +// SendMessage sends a message to an active session. +func (m *Manager) SendMessage(ctx context.Context, sessionID string, message string) error { + sess, err := m.getSession(sessionID) + if err != nil { + return err + } + + sess.mu.RLock() + if sess.info.Status != "active" { + sess.mu.RUnlock() + return fmt.Errorf("session is not active: %s", sessionID) + } + agentID := sess.info.Agent + sess.mu.RUnlock() + + a, err := m.registry.Get(agentID) + if err != nil { + return err + } + + return a.SendMessage(ctx, sessionID, message) +} + +// TerminateSession stops an active session. +func (m *Manager) TerminateSession(ctx context.Context, sessionID string) error { + sess, err := m.getSession(sessionID) + if err != nil { + return err + } + + sess.mu.RLock() + agentID := sess.info.Agent + sess.mu.RUnlock() + + a, err := m.registry.Get(agentID) + if err != nil { + return err + } + + return a.Terminate(ctx, sessionID) +} + +// GetEvents returns all events for a session, optionally filtered by sequence. +func (m *Manager) GetEvents(sessionID string, afterSeq *uint64) ([]*events.UniversalEvent, error) { + sess, err := m.getSession(sessionID) + if err != nil { + return nil, err + } + + sess.mu.RLock() + defer sess.mu.RUnlock() + + if afterSeq == nil { + result := make([]*events.UniversalEvent, len(sess.events)) + copy(result, sess.events) + return result, nil + } + + var result []*events.UniversalEvent + for _, e := range sess.events { + if e.Sequence > *afterSeq { + result = append(result, e) + } + } + return result, nil +} + +// SubscribeEvents returns a channel that receives new events for a session. +// The channel is closed when the session ends. +func (m *Manager) SubscribeEvents(sessionID string) (<-chan *events.UniversalEvent, error) { + sess, err := m.getSession(sessionID) + if err != nil { + return nil, err + } + + ch := make(chan *events.UniversalEvent, 64) + + sess.mu.Lock() + defer sess.mu.Unlock() + + // Send buffered events first + go func() { + sess.mu.RLock() + buffered := make([]*events.UniversalEvent, len(sess.events)) + copy(buffered, sess.events) + sess.mu.RUnlock() + + for _, e := range buffered { + ch <- e + } + }() + + if sess.info.Status == "ended" { + go func() { + // Give time for buffered events to be sent + time.Sleep(100 * time.Millisecond) + close(ch) + }() + return ch, nil + } + + sess.listeners = append(sess.listeners, ch) + return ch, nil +} + +// ReplyQuestion answers a pending HITL question. +func (m *Manager) ReplyQuestion(sessionID, questionID, answer string) error { + sess, err := m.getSession(sessionID) + if err != nil { + return err + } + + sess.mu.Lock() + q, ok := sess.questions[questionID] + if !ok { + sess.mu.Unlock() + return fmt.Errorf("question not found: %s", questionID) + } + delete(sess.questions, questionID) + sess.mu.Unlock() + + q.replyCh <- answer + return nil +} + +// RejectQuestion rejects a pending HITL question. +func (m *Manager) RejectQuestion(sessionID, questionID string) error { + sess, err := m.getSession(sessionID) + if err != nil { + return err + } + + sess.mu.Lock() + q, ok := sess.questions[questionID] + if !ok { + sess.mu.Unlock() + return fmt.Errorf("question not found: %s", questionID) + } + delete(sess.questions, questionID) + sess.mu.Unlock() + + close(q.replyCh) + return nil +} + +// ReplyPermission approves or denies a pending tool permission. +func (m *Manager) ReplyPermission(sessionID, permissionID string, allow bool) error { + sess, err := m.getSession(sessionID) + if err != nil { + return err + } + + sess.mu.Lock() + p, ok := sess.perms[permissionID] + if !ok { + sess.mu.Unlock() + return fmt.Errorf("permission not found: %s", permissionID) + } + delete(sess.perms, permissionID) + sess.mu.Unlock() + + p.replyCh <- allow + return nil +} + +// ListSessions returns info for all sessions. +func (m *Manager) ListSessions() []SessionInfo { + m.mu.RLock() + defer m.mu.RUnlock() + + var infos []SessionInfo + for _, s := range m.sessions { + s.mu.RLock() + infos = append(infos, s.info) + s.mu.RUnlock() + } + return infos +} + +// GetSessionInfo returns info for a single session. +func (m *Manager) GetSessionInfo(sessionID string) (*SessionInfo, error) { + sess, err := m.getSession(sessionID) + if err != nil { + return nil, err + } + sess.mu.RLock() + defer sess.mu.RUnlock() + info := sess.info + return &info, nil +} + +func (m *Manager) getSession(sessionID string) (*Session, error) { + m.mu.RLock() + defer m.mu.RUnlock() + sess, ok := m.sessions[sessionID] + if !ok { + return nil, fmt.Errorf("session not found: %s", sessionID) + } + return sess, nil +}