Files
agent-ui/server/services/terminal.ts
josedario87 08e73a1eb6 refactor: remove dead notification systems and legacy broadcasts
- Delete claude-hooks.ts store (processHook never called, always empty)
- Delete HookNotifications.vue and NotificationLog.vue (orphan components)
- Delete claude-status.ts route and broadcastClaudeStatus (no consumers)
- Delete agent-bar.md legacy doc
- Remove legacy WS 'claude-hook' broadcast (frontend ignores it)
- Move isAgentRunning tracking into broadcastClaudeHook
2026-02-24 11:42:06 -06:00

737 lines
27 KiB
TypeScript

import { spawn, type IPty } from '@skitee3000/bun-pty'
import { existsSync } from 'fs'
import { join } from 'path'
import { homedir } from 'os'
import { PORT_TERMINAL, WORKING_DIR, SHELL, SHELL_ARGS, DEFAULT_SESSION_ID, MAX_BUFFER_LINES, MAX_TERMINALS } from '../config'
import { sessionState, type SessionStatePatch } from './session-state'
// Agent transcript directories (mirrored from transcript-debug.ts)
const AGENT_TRANSCRIPT_DIRS: Record<string, string> = {
ejecutor: join(WORKING_DIR, '.claude-ejecutor', 'projects'),
nucleo000: join(WORKING_DIR, '.claude-nucleo000', 'projects'),
claude: join(homedir(), '.claude', 'projects')
}
const PROJECT_HASH = 'C--Users-jodar-agent-ui'
function getTranscriptProjectDir(agent: string): string | null {
const baseDir = AGENT_TRANSCRIPT_DIRS[agent]
if (!baseDir || !existsSync(baseDir)) return null
const exact = join(baseDir, PROJECT_HASH)
if (existsSync(exact)) return exact
return null
}
function transcriptSessionExists(agent: string, sessionId: string): boolean {
const projectDir = getTranscriptProjectDir(agent)
if (!projectDir) return false
return existsSync(join(projectDir, `${sessionId}.jsonl`))
}
interface TerminalSession {
id: string
pty: IPty
outputBuffer: string[]
maxBufferSize: number
clients: Set<any>
createdAt: Date
}
// Agent terminal state tracking
interface AgentTerminalState {
agentId: string
sessionId: string
command: string
startedAt: Date | null
isAgentRunning: boolean
}
export const agentSessions = new Map<string, AgentTerminalState>()
const AGENT_COMMANDS: Record<string, string> = {
'claude': 'claude',
'ejecutor': 'ejecutor',
'nucleo000': 'nucleo000'
}
// Store active terminal sessions by ID (persistent across reconnections)
const sessions = new Map<string, TerminalSession>()
// Map WebSocket to sessionId (only for PTY-connected clients)
const wsToSession = new Map<any, string>()
// Broadcast-only clients (no PTY, just receive state updates)
const broadcastClients = new Set<any>()
// ── Global terminal registry ──
// Tracks metadata about transcript-debug terminals so all clients can see/connect to them
interface TerminalRegistryEntry {
ephemeralSessionId: string // PTY session ID on this server
transcriptSessionId: string // Claude transcript session being resumed (or '__new__')
agent: string // ejecutor | nucleo000 | claude
label: string // First user message or short description
command: string // Full command that was run
createdAt: string // ISO timestamp
}
const terminalRegistry = new Map<string, TerminalRegistryEntry>() // keyed by ephemeralSessionId
function getRegistrySnapshot() {
return Array.from(terminalRegistry.values()).map(entry => {
const ptySession = sessions.get(entry.ephemeralSessionId)
return {
...entry,
alive: !!ptySession,
clients: ptySession?.clients.size ?? 0,
bufferSize: ptySession?.outputBuffer.length ?? 0
}
})
}
// Send a message to ALL clients: broadcast-only + PTY-connected
function broadcastToAll(message: string): number {
let count = 0
for (const ws of broadcastClients) {
try { ws.send(message); count++ } catch { /* skip */ }
}
for (const [, session] of sessions) {
for (const ws of session.clients) {
try { ws.send(message); count++ } catch { /* skip */ }
}
}
return count
}
// Broadcast session state patch to ALL clients
function broadcastSessionStatePatch(patch: SessionStatePatch) {
const message = JSON.stringify(patch)
const count = broadcastToAll(message)
console.log(`[Terminal] State patch: ${patch.event} (${patch.agent}) → ${count} clients`)
}
function broadcastRegistryChange() {
const message = JSON.stringify({
type: 'terminal-registry-change',
registry: getRegistrySnapshot(),
timestamp: Date.now()
})
const count = broadcastToAll(message)
console.log(`[Terminal] Registry broadcast → ${count} clients (${terminalRegistry.size} entries)`)
}
function getOrCreateSession(sessionId: string = DEFAULT_SESSION_ID): TerminalSession {
let session = sessions.get(sessionId)
if (!session) {
console.log(`[Terminal] Creating new session: ${sessionId}`)
const pty = spawn(SHELL, SHELL_ARGS, {
name: 'xterm-256color',
cols: 80,
rows: 24,
cwd: WORKING_DIR
})
session = {
id: sessionId,
pty,
outputBuffer: [],
maxBufferSize: MAX_BUFFER_LINES,
clients: new Set(),
createdAt: new Date()
}
// Capture output to buffer and send to clients
pty.onData((data: string) => {
session!.outputBuffer.push(data)
if (session!.outputBuffer.length > session!.maxBufferSize) {
session!.outputBuffer.shift()
}
for (const ws of session!.clients) {
try {
ws.send(JSON.stringify({ type: 'output', data }))
} catch {
// Client disconnected
}
}
})
// Handle PTY exit
pty.onExit(({ exitCode }) => {
console.log(`[Terminal] Session ${sessionId} exited with code ${exitCode}`)
for (const ws of session!.clients) {
try {
ws.send(JSON.stringify({
type: 'exit',
data: `\r\n\x1b[33mSession ended (code ${exitCode})\x1b[0m\r\n`
}))
} catch { /* ignore */ }
}
sessions.delete(sessionId)
// Auto-remove from terminal registry
if (terminalRegistry.has(sessionId)) {
terminalRegistry.delete(sessionId)
broadcastRegistryChange()
}
// Mark agent as not running if this is an agent session
if (sessionId.startsWith('agent-')) {
const agentId = sessionId.replace('agent-', '')
const state = agentSessions.get(agentId)
if (state) {
state.isAgentRunning = false
console.log(`[Terminal] Agent ${agentId} marked as stopped (exit code ${exitCode})`)
}
}
})
sessions.set(sessionId, session)
console.log(`[Terminal] Session ${sessionId} created, PID: ${pty.pid}`)
}
return session
}
// Kill an existing session's PTY process
export function killSession(sessionId: string): boolean {
const session = sessions.get(sessionId)
if (!session) return false
console.log(`[Terminal] Killing session: ${sessionId} (PID: ${session.pty.pid})`)
// Notify clients before killing
for (const ws of session.clients) {
try {
ws.send(JSON.stringify({ type: 'session-restart', sessionId }))
} catch { /* ignore */ }
}
try {
session.pty.kill()
} catch (e) {
console.error(`[Terminal] Error killing PTY for ${sessionId}:`, e)
}
sessions.delete(sessionId)
// Auto-remove from terminal registry
if (terminalRegistry.has(sessionId)) {
terminalRegistry.delete(sessionId)
broadcastRegistryChange()
}
return true
}
// Start an agent command in its dedicated session
export async function startAgentInSession(agentId: string, force = false): Promise<AgentTerminalState> {
const sessionId = `agent-${agentId}`
const command = AGENT_COMMANDS[agentId] || agentId
// If force restart, kill existing session first
if (force && sessions.has(sessionId)) {
killSession(sessionId)
await new Promise(r => setTimeout(r, 300))
}
const session = getOrCreateSession(sessionId)
// Write the agent command to the PTY
session.pty.write(command + '\r')
const state: AgentTerminalState = {
agentId,
sessionId,
command,
startedAt: new Date(),
isAgentRunning: true
}
agentSessions.set(agentId, state)
console.log(`[Terminal] Agent ${agentId} started in session ${sessionId} with command: ${command}`)
return state
}
export function startTerminalServer() {
const server = Bun.serve({
hostname: '0.0.0.0',
port: PORT_TERMINAL,
async fetch(req, server) {
const url = new URL(req.url)
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type'
}
// CORS preflight
if (req.method === 'OPTIONS') {
return new Response(null, { headers: corsHeaders })
}
// Health check with session info
if (url.pathname === '/health') {
const sessionsInfo = Array.from(sessions.entries()).map(([id, s]) => ({
id,
clients: s.clients.size,
pid: s.pty.pid,
bufferSize: s.outputBuffer.length,
createdAt: s.createdAt.toISOString()
}))
return Response.json({
status: 'ok',
sessions: sessionsInfo,
cwd: WORKING_DIR
}, { headers: corsHeaders })
}
// List active sessions
if (url.pathname === '/sessions') {
const list = Array.from(sessions.keys())
return Response.json({ sessions: list })
}
// Claude hook broadcast endpoint (rich data from stdin)
if (url.pathname === '/claude-hook' && req.method === 'POST') {
try {
const body = await req.json()
broadcastClaudeHook(body)
return Response.json({ success: true }, { headers: corsHeaders })
} catch {
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
}
}
// Agent sessions info
if (url.pathname === '/agent-sessions' && req.method === 'GET') {
const result: Record<string, any> = {}
for (const [id, state] of agentSessions) {
const session = sessions.get(state.sessionId)
result[id] = {
...state,
pid: session?.pty.pid ?? null,
bufferSize: session?.outputBuffer.length ?? 0,
clientCount: session?.clients.size ?? 0,
sessionExists: !!session
}
}
return Response.json(result, { headers: corsHeaders })
}
// Start agent in session
if (url.pathname === '/start-agent' && req.method === 'POST') {
try {
const body = await req.json() as { agentId: string; force?: boolean }
if (!body.agentId) {
return Response.json({ error: 'agentId required' }, { status: 400, headers: corsHeaders })
}
const state = await startAgentInSession(body.agentId, body.force)
return Response.json({ success: true, state }, { headers: corsHeaders })
} catch (e: any) {
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
}
}
// Stop agent session
if (url.pathname === '/stop-agent' && req.method === 'POST') {
try {
const body = await req.json() as { agentId: string }
if (!body.agentId) {
return Response.json({ error: 'agentId required' }, { status: 400, headers: corsHeaders })
}
const sessionId = `agent-${body.agentId}`
const killed = killSession(sessionId)
if (killed) {
const state = agentSessions.get(body.agentId)
if (state) state.isAgentRunning = false
}
return Response.json({ success: true, killed }, { headers: corsHeaders })
} catch (e: any) {
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
}
}
// Kill a specific session by ID (used for ephemeral sessions)
if (url.pathname === '/kill-session' && req.method === 'POST') {
try {
const body = await req.json() as { sessionId: string }
if (!body.sessionId) {
return Response.json({ error: 'sessionId required' }, { status: 400, headers: corsHeaders })
}
const killed = killSession(body.sessionId)
return Response.json({ success: true, killed }, { headers: corsHeaders })
} catch (e: any) {
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
}
}
// Transcript update broadcast endpoint
if (url.pathname === '/transcript-update' && req.method === 'POST') {
try {
const body = await req.json()
broadcastTranscriptUpdate(body as Record<string, unknown>)
return Response.json({ success: true }, { headers: corsHeaders })
} catch {
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
}
}
// ── Terminal Registry endpoints ──
// Create a new terminal (server-first flow: PTY + registry + broadcast)
if (url.pathname === '/create-terminal' && req.method === 'POST') {
try {
const body = await req.json() as {
agent: string
transcriptSessionId?: string
label?: string
command: string
}
if (!body.command) {
return Response.json({ error: 'command required' }, { status: 400, headers: corsHeaders })
}
// Enforce terminal limit
if (terminalRegistry.size >= MAX_TERMINALS) {
console.error(`[Terminal] Cannot create terminal: limit reached (${terminalRegistry.size}/${MAX_TERMINALS})`)
return Response.json(
{ error: `Terminal limit reached (max ${MAX_TERMINALS})` },
{ status: 429, headers: corsHeaders }
)
}
// Validate transcript session exists before resuming
const tsId = body.transcriptSessionId
if (tsId && tsId !== '__new__' && body.agent) {
if (!transcriptSessionExists(body.agent, tsId)) {
console.warn(`[Terminal] Transcript session not found: ${tsId} (agent: ${body.agent})`)
return Response.json(
{ error: `Transcript session "${tsId}" not found. It may have been deleted.` },
{ status: 404, headers: corsHeaders }
)
}
}
// Generate ephemeralSessionId server-side
const ephemeralSessionId = `pty-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`
// Create PTY session
const session = getOrCreateSession(ephemeralSessionId)
// Write command to PTY (with delay for shell init)
setTimeout(() => {
session.pty.write(body.command + '\r')
}, 300)
// Register in terminal registry
terminalRegistry.set(ephemeralSessionId, {
ephemeralSessionId,
transcriptSessionId: body.transcriptSessionId || '__new__',
agent: body.agent || '',
label: body.label || 'New session',
command: body.command,
createdAt: new Date().toISOString()
})
console.log(`[Terminal] Created terminal: ${ephemeralSessionId}${body.transcriptSessionId || '__new__'} (${body.agent})`)
broadcastRegistryChange()
return Response.json({ success: true, ephemeralSessionId }, { headers: corsHeaders })
} catch (e: any) {
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
}
}
// List all registered terminals (global, for all clients)
if (url.pathname === '/terminal-registry' && req.method === 'GET') {
return Response.json({ registry: getRegistrySnapshot() }, { headers: corsHeaders })
}
// Register a new terminal
if (url.pathname === '/register-terminal' && req.method === 'POST') {
try {
const body = await req.json() as TerminalRegistryEntry
if (!body.ephemeralSessionId) {
return Response.json({ error: 'ephemeralSessionId required' }, { status: 400, headers: corsHeaders })
}
terminalRegistry.set(body.ephemeralSessionId, {
ephemeralSessionId: body.ephemeralSessionId,
transcriptSessionId: body.transcriptSessionId || '',
agent: body.agent || '',
label: body.label || '',
command: body.command || '',
createdAt: body.createdAt || new Date().toISOString()
})
console.log(`[Terminal] Registered terminal: ${body.ephemeralSessionId}${body.transcriptSessionId} (${body.agent})`)
broadcastRegistryChange()
return Response.json({ success: true }, { headers: corsHeaders })
} catch (e: any) {
return Response.json({ error: e.message }, { status: 400, headers: corsHeaders })
}
}
// Update a registered terminal (e.g. re-key transcriptSessionId, update label)
if (url.pathname === '/update-terminal' && req.method === 'POST') {
try {
const body = await req.json() as Partial<TerminalRegistryEntry> & { ephemeralSessionId: string }
const entry = terminalRegistry.get(body.ephemeralSessionId)
if (!entry) {
return Response.json({ error: 'Not found' }, { status: 404, headers: corsHeaders })
}
if (body.transcriptSessionId !== undefined) entry.transcriptSessionId = body.transcriptSessionId
if (body.label !== undefined) entry.label = body.label
if (body.agent !== undefined) entry.agent = body.agent
if (body.command !== undefined) entry.command = body.command
broadcastRegistryChange()
return Response.json({ success: true }, { headers: corsHeaders })
} catch (e: any) {
return Response.json({ error: e.message }, { status: 400, headers: corsHeaders })
}
}
// Unregister a terminal (does NOT kill the PTY — use /kill-session for that)
if (url.pathname === '/unregister-terminal' && req.method === 'POST') {
try {
const body = await req.json() as { ephemeralSessionId: string }
const deleted = terminalRegistry.delete(body.ephemeralSessionId)
if (deleted) broadcastRegistryChange()
return Response.json({ success: true, deleted }, { headers: corsHeaders })
} catch (e: any) {
return Response.json({ error: e.message }, { status: 400, headers: corsHeaders })
}
}
// ── Session State endpoints (centralized state) ──
if (url.pathname === '/session-state' && req.method === 'GET') {
return Response.json({ agents: sessionState.getSnapshot() }, { headers: corsHeaders })
}
if (url.pathname.startsWith('/session-state/') && req.method === 'GET') {
const agent = url.pathname.replace('/session-state/', '')
const state = sessionState.getAgentState(agent)
if (!state) return Response.json({ error: 'Agent not found' }, { status: 404, headers: corsHeaders })
return Response.json(state, { headers: corsHeaders })
}
// ── Approval tracking endpoints ──
if (url.pathname === '/add-approval' && req.method === 'POST') {
try {
const body = await req.json() as { agent: string, approval: any }
const patch = sessionState.addApproval(body.agent, body.approval)
broadcastSessionStatePatch({
type: 'session-state-patch',
agent: body.agent,
patch,
event: 'approval-added',
timestamp: Date.now(),
})
return Response.json({ success: true }, { headers: corsHeaders })
} catch {
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
}
}
if (url.pathname === '/resolve-approval' && req.method === 'POST') {
try {
const body = await req.json() as { requestId: string, decision: string }
const result = sessionState.resolveApproval(body.requestId)
if (result) {
broadcastSessionStatePatch({
type: 'session-state-patch',
agent: result.agent,
patch: result.patch,
event: 'approval-resolved',
timestamp: Date.now(),
})
}
return Response.json({ success: true, resolved: !!result }, { headers: corsHeaders })
} catch {
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
}
}
// Check if this is a WebSocket upgrade request
const upgradeHeader = req.headers.get('upgrade')
console.log(`[Terminal] Request: ${req.method} ${url.pathname}, Upgrade: ${upgradeHeader}`)
if (upgradeHeader?.toLowerCase() === 'websocket') {
const sessionId = url.searchParams.get('session') || null
const isBroadcast = !sessionId
const success = server.upgrade(req, { data: { sessionId, broadcast: isBroadcast } })
console.log(`[Terminal] WebSocket upgrade ${isBroadcast ? '(broadcast-only)' : `for session "${sessionId}"`}: ${success ? 'success' : 'failed'}`)
if (success) {
return undefined
}
return new Response('WebSocket upgrade failed', { status: 400 })
}
return new Response(
'Terminal WebSocket Server - Persistent Sessions\n\nEndpoints:\n /health - Server status\n /sessions - List active sessions\n ws://...?session=<id> - Connect to session',
{ status: 200 }
)
},
websocket: {
open(ws) {
const data = ws.data as any
const isBroadcast = data?.broadcast === true
if (isBroadcast) {
// Broadcast-only client — no PTY, just receives state updates
broadcastClients.add(ws)
// Send session state snapshot + terminal registry
const snapshot = sessionState.getSnapshot()
if (Object.keys(snapshot).length > 0) {
ws.send(JSON.stringify({
type: 'session-state-snapshot',
agents: snapshot,
}))
}
// Send current terminal registry
ws.send(JSON.stringify({
type: 'terminal-registry-change',
registry: getRegistrySnapshot(),
timestamp: Date.now()
}))
console.log(`[Terminal] Broadcast client connected (${broadcastClients.size} broadcast clients)`)
return
}
// PTY client — connect to an existing session
const sessionId = data?.sessionId || DEFAULT_SESSION_ID
console.log(`[Terminal] Client connecting to session: ${sessionId}`)
try {
const session = getOrCreateSession(sessionId)
session.clients.add(ws)
wsToSession.set(ws, sessionId)
ws.send(JSON.stringify({
type: 'connected',
sessionId: session.id,
isNew: session.outputBuffer.length === 0,
hasHistory: session.outputBuffer.length > 0,
bufferSize: session.outputBuffer.length
}))
console.log(`[Terminal] Client joined session ${sessionId} (${session.clients.size} clients)`)
} catch (e: any) {
console.error('[Terminal] Error:', e)
ws.send(JSON.stringify({ type: 'error', message: e.message }))
}
},
message(ws, message) {
// Broadcast-only clients don't have a PTY — ignore their messages
if (broadcastClients.has(ws)) return
try {
const msg = JSON.parse(message as string)
const sessionId = wsToSession.get(ws)
if (!sessionId) return
const session = sessions.get(sessionId)
if (!session) return
if (msg.type === 'input') {
session.pty.write(msg.data)
} else if (msg.type === 'resize' && msg.cols && msg.rows) {
session.pty.resize(msg.cols, msg.rows)
console.log(`[Terminal] Session ${sessionId} resized to ${msg.cols}x${msg.rows}`)
} else if (msg.type === 'clear-buffer') {
session.outputBuffer = []
console.log(`[Terminal] Buffer cleared for session ${sessionId}`)
ws.send(JSON.stringify({ type: 'buffer-cleared' }))
} else if (msg.type === 'request-replay') {
// Client requests fresh replay (used when terminal becomes visible)
console.log(`[Terminal] Replay requested, buffer has ${session.outputBuffer.length} chunks`)
if (session.outputBuffer.length > 0) {
// If tailOnly specified, only send last N chunks (enough for a few screens)
const tailOnly = msg.tailOnly === true
const tailChunks = msg.chunks || 500 // Default ~500 chunks for tail
let data: string
if (tailOnly && session.outputBuffer.length > tailChunks) {
// Send only the tail - more efficient for large buffers
data = session.outputBuffer.slice(-tailChunks).join('')
console.log(`[Terminal] Replaying tail (${tailChunks}/${session.outputBuffer.length} chunks), ${data.length} bytes`)
} else {
data = session.outputBuffer.join('')
console.log(`[Terminal] Replaying full buffer (${session.outputBuffer.length} chunks), ${data.length} bytes`)
}
ws.send(JSON.stringify({
type: 'replay',
data,
isTail: tailOnly && session.outputBuffer.length > tailChunks
}))
} else {
console.log('[Terminal] No buffer to replay')
}
}
} catch (e: any) {
console.error('[Terminal] Error:', e)
}
},
close(ws) {
// Check if this was a broadcast-only client
if (broadcastClients.delete(ws)) {
console.log(`[Terminal] Broadcast client disconnected (${broadcastClients.size} remaining)`)
return
}
// PTY client cleanup
const sessionId = wsToSession.get(ws)
if (sessionId) {
const session = sessions.get(sessionId)
if (session) {
session.clients.delete(ws)
console.log(`[Terminal] Client left session ${sessionId} (${session.clients.size} clients remaining)`)
}
wsToSession.delete(ws)
}
}
}
})
console.log(`[Terminal] WebSocket running at ws://localhost:${PORT_TERMINAL}`)
return server
}
// Process hook event and broadcast session state patch to ALL clients
export function broadcastClaudeHook(data: Record<string, unknown>) {
const statePatch = sessionState.processHookEvent(data as any)
broadcastSessionStatePatch(statePatch)
// Track agent running state in terminal sessions
const agentName = (data.agent_name as string) || 'claude'
const event = data.hook_event_name as string
if (event === 'SessionStart' || event === 'SessionEnd') {
const state = agentSessions.get(agentName)
if (state) {
state.isAgentRunning = event === 'SessionStart'
}
}
}
// Broadcast transcript updates to ALL clients
export function broadcastTranscriptUpdate(data: Record<string, unknown>) {
const message = JSON.stringify({
type: 'transcript-update',
...data,
timestamp: Date.now()
})
const clientCount = broadcastToAll(message)
console.log(`[Terminal] Transcript update: ${data.hookEvent || 'fetch'} (${(data.messages as any[])?.length || 0} msgs) → ${clientCount} clients`)
}