import { spawn, type IPty } from '@skitee3000/bun-pty' import { existsSync } from 'fs' import { join } from 'path' import { homedir } from 'os' import { PORT_TERMINAL, WORKING_DIR, SHELL, SHELL_ARGS, DEFAULT_SESSION_ID, MAX_BUFFER_LINES, MAX_TERMINALS } from '../config' import { sessionState, type PtyStatePatch } from './session-state' // Agent transcript directories (mirrored from transcript-debug.ts) const AGENT_TRANSCRIPT_DIRS: Record = { ejecutor: join(WORKING_DIR, '.claude-ejecutor', 'projects'), nucleo000: join(WORKING_DIR, '.claude-nucleo000', 'projects'), claude: join(homedir(), '.claude', 'projects') } const PROJECT_HASH = 'C--Users-jodar-agent-ui' function getTranscriptProjectDir(agent: string): string | null { const baseDir = AGENT_TRANSCRIPT_DIRS[agent] if (!baseDir || !existsSync(baseDir)) return null const exact = join(baseDir, PROJECT_HASH) if (existsSync(exact)) return exact return null } function transcriptSessionExists(agent: string, sessionId: string): boolean { const projectDir = getTranscriptProjectDir(agent) if (!projectDir) return false return existsSync(join(projectDir, `${sessionId}.jsonl`)) } interface TerminalSession { id: string pty: IPty outputBuffer: string[] maxBufferSize: number clients: Set createdAt: Date } // Agent terminal state tracking interface AgentTerminalState { agentId: string sessionId: string command: string startedAt: Date | null isAgentRunning: boolean } export const agentSessions = new Map() const AGENT_COMMANDS: Record = { 'claude': 'claude', 'ejecutor': 'ejecutor', 'nucleo000': 'nucleo000' } // Store active terminal sessions by ID (persistent across reconnections) const sessions = new Map() // Map WebSocket to sessionId (only for PTY-connected clients) const wsToSession = new Map() // Broadcast-only clients (no PTY, just receive state updates) const broadcastClients = new Set() // ── Global terminal registry ── // Tracks metadata about transcript-debug terminals so all clients can see/connect to them interface TerminalRegistryEntry { ephemeralSessionId: string // PTY session ID on this server transcriptSessionId: string // Claude transcript session being resumed (or '__new__') agent: string // ejecutor | nucleo000 | claude label: string // First user message or short description command: string // Full command that was run createdAt: string // ISO timestamp } const terminalRegistry = new Map() // keyed by ephemeralSessionId function getRegistrySnapshot() { return Array.from(terminalRegistry.values()).map(entry => { const ptySession = sessions.get(entry.ephemeralSessionId) return { ...entry, alive: !!ptySession, clients: ptySession?.clients.size ?? 0, bufferSize: ptySession?.outputBuffer.length ?? 0 } }) } // Send a message to ALL clients: broadcast-only + PTY-connected function broadcastToAll(message: string): number { let count = 0 for (const ws of broadcastClients) { try { ws.send(message); count++ } catch { /* skip */ } } for (const [, session] of sessions) { for (const ws of session.clients) { try { ws.send(message); count++ } catch { /* skip */ } } } return count } function broadcastPtyStatePatch(patch: PtyStatePatch) { const message = JSON.stringify(patch) broadcastToAll(message) } function broadcastRegistryChange() { const message = JSON.stringify({ type: 'terminal-registry-change', registry: getRegistrySnapshot(), timestamp: Date.now() }) const count = broadcastToAll(message) console.log(`[Terminal] Registry broadcast → ${count} clients (${terminalRegistry.size} entries)`) } function getOrCreateSession(sessionId: string = DEFAULT_SESSION_ID): TerminalSession { let session = sessions.get(sessionId) if (!session) { console.log(`[Terminal] Creating new session: ${sessionId}`) const pty = spawn(SHELL, SHELL_ARGS, { name: 'xterm-256color', cols: 80, rows: 24, cwd: WORKING_DIR, }) // Inject AGENT_UI_PTY_SESSION env var into the shell session // (bun-pty FFI doesn't support env in spawn options) if (process.platform === 'win32') { pty.write(`$env:AGENT_UI_PTY_SESSION="${sessionId}"\r`) } else { pty.write(`export AGENT_UI_PTY_SESSION="${sessionId}"\n`) } session = { id: sessionId, pty, outputBuffer: [], maxBufferSize: MAX_BUFFER_LINES, clients: new Set(), createdAt: new Date() } // Capture output to buffer and send to clients pty.onData((data: string) => { session!.outputBuffer.push(data) if (session!.outputBuffer.length > session!.maxBufferSize) { session!.outputBuffer.shift() } for (const ws of session!.clients) { try { ws.send(JSON.stringify({ type: 'output', data })) } catch { // Client disconnected } } }) // Handle PTY exit pty.onExit(({ exitCode }) => { console.log(`[Terminal] Session ${sessionId} exited with code ${exitCode}`) for (const ws of session!.clients) { try { ws.send(JSON.stringify({ type: 'exit', data: `\r\n\x1b[33mSession ended (code ${exitCode})\x1b[0m\r\n` })) } catch { /* ignore */ } } sessions.delete(sessionId) // Auto-remove from terminal registry if (terminalRegistry.has(sessionId)) { terminalRegistry.delete(sessionId) broadcastRegistryChange() } // Mark agent as not running if this is an agent session if (sessionId.startsWith('agent-')) { const agentId = sessionId.replace('agent-', '') const state = agentSessions.get(agentId) if (state) { state.isAgentRunning = false console.log(`[Terminal] Agent ${agentId} marked as stopped (exit code ${exitCode})`) } } }) sessions.set(sessionId, session) console.log(`[Terminal] Session ${sessionId} created, PID: ${pty.pid}`) } return session } // Kill an existing session's PTY process export function killSession(sessionId: string): boolean { const session = sessions.get(sessionId) if (!session) return false console.log(`[Terminal] Killing session: ${sessionId} (PID: ${session.pty.pid})`) // Notify clients before killing for (const ws of session.clients) { try { ws.send(JSON.stringify({ type: 'session-restart', sessionId })) } catch { /* ignore */ } } try { session.pty.kill() } catch (e) { console.error(`[Terminal] Error killing PTY for ${sessionId}:`, e) } sessions.delete(sessionId) // Auto-remove from terminal registry if (terminalRegistry.has(sessionId)) { terminalRegistry.delete(sessionId) broadcastRegistryChange() } return true } // Start an agent command in its dedicated session export async function startAgentInSession(agentId: string, force = false): Promise { const sessionId = `agent-${agentId}` const command = AGENT_COMMANDS[agentId] || agentId // If force restart, kill existing session first if (force && sessions.has(sessionId)) { killSession(sessionId) await new Promise(r => setTimeout(r, 300)) } const session = getOrCreateSession(sessionId) // Write the agent command to the PTY session.pty.write(command + '\r') const state: AgentTerminalState = { agentId, sessionId, command, startedAt: new Date(), isAgentRunning: true } agentSessions.set(agentId, state) console.log(`[Terminal] Agent ${agentId} started in session ${sessionId} with command: ${command}`) return state } export function startTerminalServer() { const server = Bun.serve({ hostname: '0.0.0.0', port: PORT_TERMINAL, async fetch(req, server) { const url = new URL(req.url) const corsHeaders = { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', 'Access-Control-Allow-Headers': 'Content-Type' } // CORS preflight if (req.method === 'OPTIONS') { return new Response(null, { headers: corsHeaders }) } // Health check with session info if (url.pathname === '/health') { const sessionsInfo = Array.from(sessions.entries()).map(([id, s]) => ({ id, clients: s.clients.size, pid: s.pty.pid, bufferSize: s.outputBuffer.length, createdAt: s.createdAt.toISOString() })) return Response.json({ status: 'ok', sessions: sessionsInfo, broadcastClients: broadcastClients.size, cwd: WORKING_DIR }, { headers: corsHeaders }) } // List active sessions if (url.pathname === '/sessions') { const list = Array.from(sessions.keys()) return Response.json({ sessions: list }) } // Claude hook broadcast endpoint (rich data from stdin) if (url.pathname === '/claude-hook' && req.method === 'POST') { try { const body = await req.json() broadcastClaudeHook(body) return Response.json({ success: true }, { headers: corsHeaders }) } catch { return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders }) } } // Agent sessions info if (url.pathname === '/agent-sessions' && req.method === 'GET') { const result: Record = {} for (const [id, state] of agentSessions) { const session = sessions.get(state.sessionId) result[id] = { ...state, pid: session?.pty.pid ?? null, bufferSize: session?.outputBuffer.length ?? 0, clientCount: session?.clients.size ?? 0, sessionExists: !!session } } return Response.json(result, { headers: corsHeaders }) } // Start agent in session if (url.pathname === '/start-agent' && req.method === 'POST') { try { const body = await req.json() as { agentId: string; force?: boolean } if (!body.agentId) { return Response.json({ error: 'agentId required' }, { status: 400, headers: corsHeaders }) } const state = await startAgentInSession(body.agentId, body.force) return Response.json({ success: true, state }, { headers: corsHeaders }) } catch (e: any) { return Response.json({ error: e.message }, { status: 500, headers: corsHeaders }) } } // Stop agent session if (url.pathname === '/stop-agent' && req.method === 'POST') { try { const body = await req.json() as { agentId: string } if (!body.agentId) { return Response.json({ error: 'agentId required' }, { status: 400, headers: corsHeaders }) } const sessionId = `agent-${body.agentId}` const killed = killSession(sessionId) if (killed) { const state = agentSessions.get(body.agentId) if (state) state.isAgentRunning = false } return Response.json({ success: true, killed }, { headers: corsHeaders }) } catch (e: any) { return Response.json({ error: e.message }, { status: 500, headers: corsHeaders }) } } // Kill a specific session by ID (used for ephemeral sessions) if (url.pathname === '/kill-session' && req.method === 'POST') { try { const body = await req.json() as { sessionId: string } if (!body.sessionId) { return Response.json({ error: 'sessionId required' }, { status: 400, headers: corsHeaders }) } const killed = killSession(body.sessionId) return Response.json({ success: true, killed }, { headers: corsHeaders }) } catch (e: any) { return Response.json({ error: e.message }, { status: 500, headers: corsHeaders }) } } // Transcript update broadcast endpoint if (url.pathname === '/transcript-update' && req.method === 'POST') { try { const body = await req.json() broadcastTranscriptUpdate(body as Record) return Response.json({ success: true }, { headers: corsHeaders }) } catch { return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders }) } } // ── Terminal Registry endpoints ── // Create a new terminal (server-first flow: PTY + registry + broadcast) if (url.pathname === '/create-terminal' && req.method === 'POST') { try { const body = await req.json() as { agent: string transcriptSessionId?: string label?: string command: string } if (!body.command) { return Response.json({ error: 'command required' }, { status: 400, headers: corsHeaders }) } // Enforce terminal limit if (terminalRegistry.size >= MAX_TERMINALS) { console.error(`[Terminal] Cannot create terminal: limit reached (${terminalRegistry.size}/${MAX_TERMINALS})`) return Response.json( { error: `Terminal limit reached (max ${MAX_TERMINALS})` }, { status: 429, headers: corsHeaders } ) } // Validate transcript session exists before resuming const tsId = body.transcriptSessionId if (tsId && tsId !== '__new__' && body.agent) { if (!transcriptSessionExists(body.agent, tsId)) { console.warn(`[Terminal] Transcript session not found: ${tsId} (agent: ${body.agent})`) return Response.json( { error: `Transcript session "${tsId}" not found. It may have been deleted.` }, { status: 404, headers: corsHeaders } ) } } // Generate ephemeralSessionId server-side const ephemeralSessionId = `pty-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` // Create PTY session const session = getOrCreateSession(ephemeralSessionId) // Write command to PTY (with delay for shell init) setTimeout(() => { session.pty.write(body.command + '\r') }, 300) // Register in terminal registry terminalRegistry.set(ephemeralSessionId, { ephemeralSessionId, transcriptSessionId: body.transcriptSessionId || '__new__', agent: body.agent || '', label: body.label || 'New session', command: body.command, createdAt: new Date().toISOString() }) console.log(`[Terminal] Created terminal: ${ephemeralSessionId} → ${body.transcriptSessionId || '__new__'} (${body.agent})`) broadcastRegistryChange() return Response.json({ success: true, ephemeralSessionId }, { headers: corsHeaders }) } catch (e: any) { return Response.json({ error: e.message }, { status: 500, headers: corsHeaders }) } } // List all registered terminals (global, for all clients) if (url.pathname === '/terminal-registry' && req.method === 'GET') { return Response.json({ registry: getRegistrySnapshot() }, { headers: corsHeaders }) } // Register a new terminal if (url.pathname === '/register-terminal' && req.method === 'POST') { try { const body = await req.json() as TerminalRegistryEntry if (!body.ephemeralSessionId) { return Response.json({ error: 'ephemeralSessionId required' }, { status: 400, headers: corsHeaders }) } terminalRegistry.set(body.ephemeralSessionId, { ephemeralSessionId: body.ephemeralSessionId, transcriptSessionId: body.transcriptSessionId || '', agent: body.agent || '', label: body.label || '', command: body.command || '', createdAt: body.createdAt || new Date().toISOString() }) console.log(`[Terminal] Registered terminal: ${body.ephemeralSessionId} → ${body.transcriptSessionId} (${body.agent})`) broadcastRegistryChange() return Response.json({ success: true }, { headers: corsHeaders }) } catch (e: any) { return Response.json({ error: e.message }, { status: 400, headers: corsHeaders }) } } // Update a registered terminal (e.g. re-key transcriptSessionId, update label) if (url.pathname === '/update-terminal' && req.method === 'POST') { try { const body = await req.json() as Partial & { ephemeralSessionId: string } const entry = terminalRegistry.get(body.ephemeralSessionId) if (!entry) { return Response.json({ error: 'Not found' }, { status: 404, headers: corsHeaders }) } if (body.transcriptSessionId !== undefined) entry.transcriptSessionId = body.transcriptSessionId if (body.label !== undefined) entry.label = body.label if (body.agent !== undefined) entry.agent = body.agent if (body.command !== undefined) entry.command = body.command broadcastRegistryChange() return Response.json({ success: true }, { headers: corsHeaders }) } catch (e: any) { return Response.json({ error: e.message }, { status: 400, headers: corsHeaders }) } } // Unregister a terminal (does NOT kill the PTY — use /kill-session for that) if (url.pathname === '/unregister-terminal' && req.method === 'POST') { try { const body = await req.json() as { ephemeralSessionId: string } const deleted = terminalRegistry.delete(body.ephemeralSessionId) if (deleted) broadcastRegistryChange() return Response.json({ success: true, deleted }, { headers: corsHeaders }) } catch (e: any) { return Response.json({ error: e.message }, { status: 400, headers: corsHeaders }) } } // ── Session State endpoints (centralized state) ── if (url.pathname === '/session-state' && req.method === 'GET') { return Response.json({ ptySessions: sessionState.getSnapshot(), }, { headers: corsHeaders }) } if (url.pathname.startsWith('/session-state/') && req.method === 'GET') { const ptyId = url.pathname.replace('/session-state/', '') const state = sessionState.getPtyState(ptyId) if (!state) return Response.json({ error: 'PTY session not found' }, { status: 404, headers: corsHeaders }) return Response.json(state, { headers: corsHeaders }) } // ── Approval tracking endpoints ── if (url.pathname === '/add-approval' && req.method === 'POST') { try { const body = await req.json() as { agent: string, approval: any, ptySessionId?: string } const ptyId = body.ptySessionId if (!ptyId) { console.warn('[Terminal] /add-approval called without ptySessionId') return Response.json({ error: 'ptySessionId required' }, { status: 400, headers: corsHeaders }) } const patch = sessionState.addApproval(ptyId, body.agent, body.approval) broadcastPtyStatePatch({ type: 'pty-state-patch', ptySessionId: ptyId, agent: body.agent, patch, event: 'approval-added', timestamp: Date.now(), }) return Response.json({ success: true }, { headers: corsHeaders }) } catch { return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders }) } } if (url.pathname === '/resolve-approval' && req.method === 'POST') { try { const body = await req.json() as { requestId: string, decision: string } const result = sessionState.resolveApproval(body.requestId) if (result) { broadcastPtyStatePatch({ type: 'pty-state-patch', ptySessionId: result.ptySessionId, agent: result.agent, patch: result.patch, event: 'approval-resolved', timestamp: Date.now(), }) } return Response.json({ success: true, resolved: !!result }, { headers: corsHeaders }) } catch { return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders }) } } // Check if this is a WebSocket upgrade request const upgradeHeader = req.headers.get('upgrade') console.log(`[Terminal] Request: ${req.method} ${url.pathname}, Upgrade: ${upgradeHeader}`) if (upgradeHeader?.toLowerCase() === 'websocket') { const sessionId = url.searchParams.get('session') || null const isBroadcast = !sessionId const success = server.upgrade(req, { data: { sessionId, broadcast: isBroadcast } }) console.log(`[Terminal] WebSocket upgrade ${isBroadcast ? '(broadcast-only)' : `for session "${sessionId}"`}: ${success ? 'success' : 'failed'}`) if (success) { return undefined } return new Response('WebSocket upgrade failed', { status: 400 }) } return new Response( 'Terminal WebSocket Server - Persistent Sessions\n\nEndpoints:\n /health - Server status\n /sessions - List active sessions\n ws://...?session= - Connect to session', { status: 200 } ) }, websocket: { open(ws) { const data = ws.data as any const isBroadcast = data?.broadcast === true if (isBroadcast) { // Broadcast-only client — no PTY, just receives state updates broadcastClients.add(ws) // Send session state snapshot + terminal registry const snapshot = sessionState.getSnapshot() if (Object.keys(snapshot).length > 0) { ws.send(JSON.stringify({ type: 'session-state-snapshot', ptySessions: snapshot, })) } // Send current terminal registry ws.send(JSON.stringify({ type: 'terminal-registry-change', registry: getRegistrySnapshot(), timestamp: Date.now() })) console.log(`[Terminal] Broadcast client connected (${broadcastClients.size} broadcast clients)`) return } // PTY client — connect to an existing session const sessionId = data?.sessionId || DEFAULT_SESSION_ID console.log(`[Terminal] Client connecting to session: ${sessionId}`) try { const session = getOrCreateSession(sessionId) session.clients.add(ws) wsToSession.set(ws, sessionId) ws.send(JSON.stringify({ type: 'connected', sessionId: session.id, isNew: session.outputBuffer.length === 0, hasHistory: session.outputBuffer.length > 0, bufferSize: session.outputBuffer.length })) console.log(`[Terminal] Client joined session ${sessionId} (${session.clients.size} clients)`) } catch (e: any) { console.error('[Terminal] Error:', e) ws.send(JSON.stringify({ type: 'error', message: e.message })) } }, message(ws, message) { // Broadcast-only clients don't have a PTY — ignore their messages if (broadcastClients.has(ws)) return try { const msg = JSON.parse(message as string) const sessionId = wsToSession.get(ws) if (!sessionId) return const session = sessions.get(sessionId) if (!session) return if (msg.type === 'input') { session.pty.write(msg.data) } else if (msg.type === 'resize' && msg.cols && msg.rows) { session.pty.resize(msg.cols, msg.rows) console.log(`[Terminal] Session ${sessionId} resized to ${msg.cols}x${msg.rows}`) } else if (msg.type === 'clear-buffer') { session.outputBuffer = [] console.log(`[Terminal] Buffer cleared for session ${sessionId}`) ws.send(JSON.stringify({ type: 'buffer-cleared' })) } else if (msg.type === 'request-replay') { // Client requests fresh replay (used when terminal becomes visible) console.log(`[Terminal] Replay requested, buffer has ${session.outputBuffer.length} chunks`) if (session.outputBuffer.length > 0) { // If tailOnly specified, only send last N chunks (enough for a few screens) const tailOnly = msg.tailOnly === true const tailChunks = msg.chunks || 500 // Default ~500 chunks for tail let data: string if (tailOnly && session.outputBuffer.length > tailChunks) { // Send only the tail - more efficient for large buffers data = session.outputBuffer.slice(-tailChunks).join('') console.log(`[Terminal] Replaying tail (${tailChunks}/${session.outputBuffer.length} chunks), ${data.length} bytes`) } else { data = session.outputBuffer.join('') console.log(`[Terminal] Replaying full buffer (${session.outputBuffer.length} chunks), ${data.length} bytes`) } ws.send(JSON.stringify({ type: 'replay', data, isTail: tailOnly && session.outputBuffer.length > tailChunks })) } else { console.log('[Terminal] No buffer to replay') } } } catch (e: any) { console.error('[Terminal] Error:', e) } }, close(ws) { // Check if this was a broadcast-only client if (broadcastClients.delete(ws)) { console.log(`[Terminal] Broadcast client disconnected (${broadcastClients.size} remaining)`) return } // PTY client cleanup const sessionId = wsToSession.get(ws) if (sessionId) { const session = sessions.get(sessionId) if (session) { session.clients.delete(ws) console.log(`[Terminal] Client left session ${sessionId} (${session.clients.size} clients remaining)`) } wsToSession.delete(ws) } } } }) console.log(`[Terminal] WebSocket running at ws://localhost:${PORT_TERMINAL}`) return server } // Reverse lookup: find the ephemeralSessionId (PTY) for a given transcriptSessionId // If no exact match, auto-bind a '__new__' entry from the same agent (first hook on new session) function findPtySessionId(transcriptSessionId: string, agentName?: string): string | undefined { for (const [eid, entry] of terminalRegistry) { if (entry.transcriptSessionId === transcriptSessionId) { return eid } } // Fallback: auto-bind a '__new__' entry from the same agent if (agentName) { for (const [eid, entry] of terminalRegistry) { if (entry.transcriptSessionId === '__new__' && entry.agent === agentName) { entry.transcriptSessionId = transcriptSessionId console.log(`[Terminal] Auto-bound PTY ${eid} to transcript ${transcriptSessionId}`) broadcastRegistryChange() return eid } } } return undefined } // Process hook event and broadcast PTY state patch to ALL clients export function broadcastClaudeHook(data: Record) { // Resolve PTY session ID BEFORE processing so it gets tagged on the hook history entry const ptySession = data.pty_session as string const transcriptSid = data.session_id as string const agentName = (data.agent_name as string) || 'claude' const resolvedPtyId = ptySession || (transcriptSid ? findPtySessionId(transcriptSid, agentName) : undefined) // Inject ptySessionId into payload so processHookEvent can write to the correct PTY state if (resolvedPtyId) { data.pty_session_id = resolvedPtyId } const ptyPatch = sessionState.processHookEvent(data as any) if (ptyPatch) { broadcastPtyStatePatch(ptyPatch) } // Track agent running state in terminal sessions const event = data.hook_event_name as string if (event === 'SessionStart' || event === 'SessionEnd') { const state = agentSessions.get(agentName) if (state) { state.isAgentRunning = event === 'SessionStart' } } } // Broadcast transcript updates to ALL clients export function broadcastTranscriptUpdate(data: Record) { const message = JSON.stringify({ type: 'transcript-update', ...data, timestamp: Date.now() }) const clientCount = broadcastToAll(message) console.log(`[Terminal] Transcript update: ${data.hookEvent || 'fetch'} (${(data.messages as any[])?.length || 0} msgs) → ${clientCount} clients`) }