Server now checks that transcript .jsonl files exist before creating terminals, preventing dead sessions from --resume errors. Frontend shows error banner in modal when resume fails. Fixed race condition where init() would overwrite FAB terminal selection after page refresh by guarding with pendingSwitchTarget flag.
809 lines
30 KiB
TypeScript
809 lines
30 KiB
TypeScript
import { spawn, type IPty } from '@skitee3000/bun-pty'
|
|
import { existsSync } from 'fs'
|
|
import { join } from 'path'
|
|
import { homedir } from 'os'
|
|
import { PORT_TERMINAL, WORKING_DIR, SHELL, SHELL_ARGS, DEFAULT_SESSION_ID, MAX_BUFFER_LINES, MAX_TERMINALS } from '../config'
|
|
import { sessionState, type SessionStatePatch } from './session-state'
|
|
|
|
// Agent transcript directories (mirrored from transcript-debug.ts)
|
|
const AGENT_TRANSCRIPT_DIRS: Record<string, string> = {
|
|
ejecutor: join(WORKING_DIR, '.claude-ejecutor', 'projects'),
|
|
nucleo000: join(WORKING_DIR, '.claude-nucleo000', 'projects'),
|
|
claude: join(homedir(), '.claude', 'projects')
|
|
}
|
|
|
|
const PROJECT_HASH = 'C--Users-jodar-agent-ui'
|
|
|
|
function getTranscriptProjectDir(agent: string): string | null {
|
|
const baseDir = AGENT_TRANSCRIPT_DIRS[agent]
|
|
if (!baseDir || !existsSync(baseDir)) return null
|
|
const exact = join(baseDir, PROJECT_HASH)
|
|
if (existsSync(exact)) return exact
|
|
return null
|
|
}
|
|
|
|
function transcriptSessionExists(agent: string, sessionId: string): boolean {
|
|
const projectDir = getTranscriptProjectDir(agent)
|
|
if (!projectDir) return false
|
|
return existsSync(join(projectDir, `${sessionId}.jsonl`))
|
|
}
|
|
|
|
interface TerminalSession {
|
|
id: string
|
|
pty: IPty
|
|
outputBuffer: string[]
|
|
maxBufferSize: number
|
|
clients: Set<any>
|
|
createdAt: Date
|
|
}
|
|
|
|
// Agent terminal state tracking
|
|
interface AgentTerminalState {
|
|
agentId: string
|
|
sessionId: string
|
|
command: string
|
|
startedAt: Date | null
|
|
isAgentRunning: boolean
|
|
}
|
|
|
|
export const agentSessions = new Map<string, AgentTerminalState>()
|
|
|
|
const AGENT_COMMANDS: Record<string, string> = {
|
|
'claude': 'claude',
|
|
'ejecutor': 'ejecutor',
|
|
'nucleo000': 'nucleo000'
|
|
}
|
|
|
|
// Store active terminal sessions by ID (persistent across reconnections)
|
|
const sessions = new Map<string, TerminalSession>()
|
|
|
|
// Map WebSocket to sessionId (only for PTY-connected clients)
|
|
const wsToSession = new Map<any, string>()
|
|
|
|
// Broadcast-only clients (no PTY, just receive state updates)
|
|
const broadcastClients = new Set<any>()
|
|
|
|
// ── Global terminal registry ──
|
|
// Tracks metadata about transcript-debug terminals so all clients can see/connect to them
|
|
|
|
interface TerminalRegistryEntry {
|
|
ephemeralSessionId: string // PTY session ID on this server
|
|
transcriptSessionId: string // Claude transcript session being resumed (or '__new__')
|
|
agent: string // ejecutor | nucleo000 | claude
|
|
label: string // First user message or short description
|
|
command: string // Full command that was run
|
|
createdAt: string // ISO timestamp
|
|
}
|
|
|
|
const terminalRegistry = new Map<string, TerminalRegistryEntry>() // keyed by ephemeralSessionId
|
|
|
|
function getRegistrySnapshot() {
|
|
return Array.from(terminalRegistry.values()).map(entry => {
|
|
const ptySession = sessions.get(entry.ephemeralSessionId)
|
|
return {
|
|
...entry,
|
|
alive: !!ptySession,
|
|
clients: ptySession?.clients.size ?? 0,
|
|
bufferSize: ptySession?.outputBuffer.length ?? 0
|
|
}
|
|
})
|
|
}
|
|
|
|
// Send a message to ALL clients: broadcast-only + PTY-connected
|
|
function broadcastToAll(message: string): number {
|
|
let count = 0
|
|
for (const ws of broadcastClients) {
|
|
try { ws.send(message); count++ } catch { /* skip */ }
|
|
}
|
|
for (const [, session] of sessions) {
|
|
for (const ws of session.clients) {
|
|
try { ws.send(message); count++ } catch { /* skip */ }
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
// Broadcast session state patch to ALL clients
|
|
function broadcastSessionStatePatch(patch: SessionStatePatch) {
|
|
const message = JSON.stringify(patch)
|
|
const count = broadcastToAll(message)
|
|
console.log(`[Terminal] State patch: ${patch.event} (${patch.agent}) → ${count} clients`)
|
|
}
|
|
|
|
function broadcastRegistryChange() {
|
|
const message = JSON.stringify({
|
|
type: 'terminal-registry-change',
|
|
registry: getRegistrySnapshot(),
|
|
timestamp: Date.now()
|
|
})
|
|
const count = broadcastToAll(message)
|
|
console.log(`[Terminal] Registry broadcast → ${count} clients (${terminalRegistry.size} entries)`)
|
|
}
|
|
|
|
function getOrCreateSession(sessionId: string = DEFAULT_SESSION_ID): TerminalSession {
|
|
let session = sessions.get(sessionId)
|
|
|
|
if (!session) {
|
|
console.log(`[Terminal] Creating new session: ${sessionId}`)
|
|
const pty = spawn(SHELL, SHELL_ARGS, {
|
|
name: 'xterm-256color',
|
|
cols: 80,
|
|
rows: 24,
|
|
cwd: WORKING_DIR
|
|
})
|
|
|
|
session = {
|
|
id: sessionId,
|
|
pty,
|
|
outputBuffer: [],
|
|
maxBufferSize: MAX_BUFFER_LINES,
|
|
clients: new Set(),
|
|
createdAt: new Date()
|
|
}
|
|
|
|
// Capture output to buffer and send to clients
|
|
pty.onData((data: string) => {
|
|
session!.outputBuffer.push(data)
|
|
if (session!.outputBuffer.length > session!.maxBufferSize) {
|
|
session!.outputBuffer.shift()
|
|
}
|
|
|
|
for (const ws of session!.clients) {
|
|
try {
|
|
ws.send(JSON.stringify({ type: 'output', data }))
|
|
} catch {
|
|
// Client disconnected
|
|
}
|
|
}
|
|
})
|
|
|
|
// Handle PTY exit
|
|
pty.onExit(({ exitCode }) => {
|
|
console.log(`[Terminal] Session ${sessionId} exited with code ${exitCode}`)
|
|
for (const ws of session!.clients) {
|
|
try {
|
|
ws.send(JSON.stringify({
|
|
type: 'exit',
|
|
data: `\r\n\x1b[33mSession ended (code ${exitCode})\x1b[0m\r\n`
|
|
}))
|
|
} catch { /* ignore */ }
|
|
}
|
|
sessions.delete(sessionId)
|
|
|
|
// Auto-remove from terminal registry
|
|
if (terminalRegistry.has(sessionId)) {
|
|
terminalRegistry.delete(sessionId)
|
|
broadcastRegistryChange()
|
|
}
|
|
|
|
// Mark agent as not running if this is an agent session
|
|
if (sessionId.startsWith('agent-')) {
|
|
const agentId = sessionId.replace('agent-', '')
|
|
const state = agentSessions.get(agentId)
|
|
if (state) {
|
|
state.isAgentRunning = false
|
|
console.log(`[Terminal] Agent ${agentId} marked as stopped (exit code ${exitCode})`)
|
|
}
|
|
}
|
|
})
|
|
|
|
sessions.set(sessionId, session)
|
|
console.log(`[Terminal] Session ${sessionId} created, PID: ${pty.pid}`)
|
|
}
|
|
|
|
return session
|
|
}
|
|
|
|
// Kill an existing session's PTY process
|
|
export function killSession(sessionId: string): boolean {
|
|
const session = sessions.get(sessionId)
|
|
if (!session) return false
|
|
|
|
console.log(`[Terminal] Killing session: ${sessionId} (PID: ${session.pty.pid})`)
|
|
|
|
// Notify clients before killing
|
|
for (const ws of session.clients) {
|
|
try {
|
|
ws.send(JSON.stringify({ type: 'session-restart', sessionId }))
|
|
} catch { /* ignore */ }
|
|
}
|
|
|
|
try {
|
|
session.pty.kill()
|
|
} catch (e) {
|
|
console.error(`[Terminal] Error killing PTY for ${sessionId}:`, e)
|
|
}
|
|
|
|
sessions.delete(sessionId)
|
|
|
|
// Auto-remove from terminal registry
|
|
if (terminalRegistry.has(sessionId)) {
|
|
terminalRegistry.delete(sessionId)
|
|
broadcastRegistryChange()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// Start an agent command in its dedicated session
|
|
export async function startAgentInSession(agentId: string, force = false): Promise<AgentTerminalState> {
|
|
const sessionId = `agent-${agentId}`
|
|
const command = AGENT_COMMANDS[agentId] || agentId
|
|
|
|
// If force restart, kill existing session first
|
|
if (force && sessions.has(sessionId)) {
|
|
killSession(sessionId)
|
|
await new Promise(r => setTimeout(r, 300))
|
|
}
|
|
|
|
const session = getOrCreateSession(sessionId)
|
|
|
|
// Write the agent command to the PTY
|
|
session.pty.write(command + '\r')
|
|
|
|
const state: AgentTerminalState = {
|
|
agentId,
|
|
sessionId,
|
|
command,
|
|
startedAt: new Date(),
|
|
isAgentRunning: true
|
|
}
|
|
|
|
agentSessions.set(agentId, state)
|
|
console.log(`[Terminal] Agent ${agentId} started in session ${sessionId} with command: ${command}`)
|
|
|
|
return state
|
|
}
|
|
|
|
export function startTerminalServer() {
|
|
const server = Bun.serve({
|
|
port: PORT_TERMINAL,
|
|
async fetch(req, server) {
|
|
const url = new URL(req.url)
|
|
|
|
const corsHeaders = {
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
|
|
'Access-Control-Allow-Headers': 'Content-Type'
|
|
}
|
|
|
|
// CORS preflight
|
|
if (req.method === 'OPTIONS') {
|
|
return new Response(null, { headers: corsHeaders })
|
|
}
|
|
|
|
// Health check with session info
|
|
if (url.pathname === '/health') {
|
|
const sessionsInfo = Array.from(sessions.entries()).map(([id, s]) => ({
|
|
id,
|
|
clients: s.clients.size,
|
|
pid: s.pty.pid,
|
|
bufferSize: s.outputBuffer.length,
|
|
createdAt: s.createdAt.toISOString()
|
|
}))
|
|
return Response.json({
|
|
status: 'ok',
|
|
sessions: sessionsInfo,
|
|
cwd: WORKING_DIR
|
|
}, { headers: corsHeaders })
|
|
}
|
|
|
|
// List active sessions
|
|
if (url.pathname === '/sessions') {
|
|
const list = Array.from(sessions.keys())
|
|
return Response.json({ sessions: list })
|
|
}
|
|
|
|
// Claude status broadcast endpoint
|
|
if (url.pathname === '/claude-status' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as { status: ClaudeStatus, tool?: string, agent?: string }
|
|
broadcastClaudeStatus(body.status, body.tool, body.agent)
|
|
return Response.json({ success: true }, { headers: corsHeaders })
|
|
} catch {
|
|
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Claude hook broadcast endpoint (rich data from stdin)
|
|
if (url.pathname === '/claude-hook' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json()
|
|
broadcastClaudeHook(body)
|
|
return Response.json({ success: true }, { headers: corsHeaders })
|
|
} catch {
|
|
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Claude permission request broadcast endpoint
|
|
if (url.pathname === '/claude-permission' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json()
|
|
broadcastPermissionRequest(body)
|
|
return Response.json({ success: true }, { headers: corsHeaders })
|
|
} catch {
|
|
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Agent sessions info
|
|
if (url.pathname === '/agent-sessions' && req.method === 'GET') {
|
|
const result: Record<string, any> = {}
|
|
for (const [id, state] of agentSessions) {
|
|
const session = sessions.get(state.sessionId)
|
|
result[id] = {
|
|
...state,
|
|
pid: session?.pty.pid ?? null,
|
|
bufferSize: session?.outputBuffer.length ?? 0,
|
|
clientCount: session?.clients.size ?? 0,
|
|
sessionExists: !!session
|
|
}
|
|
}
|
|
return Response.json(result, { headers: corsHeaders })
|
|
}
|
|
|
|
// Start agent in session
|
|
if (url.pathname === '/start-agent' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as { agentId: string; force?: boolean }
|
|
if (!body.agentId) {
|
|
return Response.json({ error: 'agentId required' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
const state = await startAgentInSession(body.agentId, body.force)
|
|
return Response.json({ success: true, state }, { headers: corsHeaders })
|
|
} catch (e: any) {
|
|
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Stop agent session
|
|
if (url.pathname === '/stop-agent' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as { agentId: string }
|
|
if (!body.agentId) {
|
|
return Response.json({ error: 'agentId required' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
const sessionId = `agent-${body.agentId}`
|
|
const killed = killSession(sessionId)
|
|
if (killed) {
|
|
const state = agentSessions.get(body.agentId)
|
|
if (state) state.isAgentRunning = false
|
|
}
|
|
return Response.json({ success: true, killed }, { headers: corsHeaders })
|
|
} catch (e: any) {
|
|
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Kill a specific session by ID (used for ephemeral sessions)
|
|
if (url.pathname === '/kill-session' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as { sessionId: string }
|
|
if (!body.sessionId) {
|
|
return Response.json({ error: 'sessionId required' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
const killed = killSession(body.sessionId)
|
|
return Response.json({ success: true, killed }, { headers: corsHeaders })
|
|
} catch (e: any) {
|
|
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Transcript update broadcast endpoint
|
|
if (url.pathname === '/transcript-update' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json()
|
|
broadcastTranscriptUpdate(body as Record<string, unknown>)
|
|
return Response.json({ success: true }, { headers: corsHeaders })
|
|
} catch {
|
|
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// ── Terminal Registry endpoints ──
|
|
|
|
// Create a new terminal (server-first flow: PTY + registry + broadcast)
|
|
if (url.pathname === '/create-terminal' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as {
|
|
agent: string
|
|
transcriptSessionId?: string
|
|
label?: string
|
|
command: string
|
|
}
|
|
|
|
if (!body.command) {
|
|
return Response.json({ error: 'command required' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
|
|
// Enforce terminal limit
|
|
if (terminalRegistry.size >= MAX_TERMINALS) {
|
|
console.error(`[Terminal] Cannot create terminal: limit reached (${terminalRegistry.size}/${MAX_TERMINALS})`)
|
|
return Response.json(
|
|
{ error: `Terminal limit reached (max ${MAX_TERMINALS})` },
|
|
{ status: 429, headers: corsHeaders }
|
|
)
|
|
}
|
|
|
|
// Validate transcript session exists before resuming
|
|
const tsId = body.transcriptSessionId
|
|
if (tsId && tsId !== '__new__' && body.agent) {
|
|
if (!transcriptSessionExists(body.agent, tsId)) {
|
|
console.warn(`[Terminal] Transcript session not found: ${tsId} (agent: ${body.agent})`)
|
|
return Response.json(
|
|
{ error: `Transcript session "${tsId}" not found. It may have been deleted.` },
|
|
{ status: 404, headers: corsHeaders }
|
|
)
|
|
}
|
|
}
|
|
|
|
// Generate ephemeralSessionId server-side
|
|
const ephemeralSessionId = `pty-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`
|
|
|
|
// Create PTY session
|
|
const session = getOrCreateSession(ephemeralSessionId)
|
|
|
|
// Write command to PTY (with delay for shell init)
|
|
setTimeout(() => {
|
|
session.pty.write(body.command + '\r')
|
|
}, 300)
|
|
|
|
// Register in terminal registry
|
|
terminalRegistry.set(ephemeralSessionId, {
|
|
ephemeralSessionId,
|
|
transcriptSessionId: body.transcriptSessionId || '__new__',
|
|
agent: body.agent || '',
|
|
label: body.label || 'New session',
|
|
command: body.command,
|
|
createdAt: new Date().toISOString()
|
|
})
|
|
|
|
console.log(`[Terminal] Created terminal: ${ephemeralSessionId} → ${body.transcriptSessionId || '__new__'} (${body.agent})`)
|
|
broadcastRegistryChange()
|
|
|
|
return Response.json({ success: true, ephemeralSessionId }, { headers: corsHeaders })
|
|
} catch (e: any) {
|
|
return Response.json({ error: e.message }, { status: 500, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// List all registered terminals (global, for all clients)
|
|
if (url.pathname === '/terminal-registry' && req.method === 'GET') {
|
|
return Response.json({ registry: getRegistrySnapshot() }, { headers: corsHeaders })
|
|
}
|
|
|
|
// Register a new terminal
|
|
if (url.pathname === '/register-terminal' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as TerminalRegistryEntry
|
|
if (!body.ephemeralSessionId) {
|
|
return Response.json({ error: 'ephemeralSessionId required' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
terminalRegistry.set(body.ephemeralSessionId, {
|
|
ephemeralSessionId: body.ephemeralSessionId,
|
|
transcriptSessionId: body.transcriptSessionId || '',
|
|
agent: body.agent || '',
|
|
label: body.label || '',
|
|
command: body.command || '',
|
|
createdAt: body.createdAt || new Date().toISOString()
|
|
})
|
|
console.log(`[Terminal] Registered terminal: ${body.ephemeralSessionId} → ${body.transcriptSessionId} (${body.agent})`)
|
|
broadcastRegistryChange()
|
|
return Response.json({ success: true }, { headers: corsHeaders })
|
|
} catch (e: any) {
|
|
return Response.json({ error: e.message }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Update a registered terminal (e.g. re-key transcriptSessionId, update label)
|
|
if (url.pathname === '/update-terminal' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as Partial<TerminalRegistryEntry> & { ephemeralSessionId: string }
|
|
const entry = terminalRegistry.get(body.ephemeralSessionId)
|
|
if (!entry) {
|
|
return Response.json({ error: 'Not found' }, { status: 404, headers: corsHeaders })
|
|
}
|
|
if (body.transcriptSessionId !== undefined) entry.transcriptSessionId = body.transcriptSessionId
|
|
if (body.label !== undefined) entry.label = body.label
|
|
if (body.agent !== undefined) entry.agent = body.agent
|
|
if (body.command !== undefined) entry.command = body.command
|
|
broadcastRegistryChange()
|
|
return Response.json({ success: true }, { headers: corsHeaders })
|
|
} catch (e: any) {
|
|
return Response.json({ error: e.message }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Unregister a terminal (does NOT kill the PTY — use /kill-session for that)
|
|
if (url.pathname === '/unregister-terminal' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as { ephemeralSessionId: string }
|
|
const deleted = terminalRegistry.delete(body.ephemeralSessionId)
|
|
if (deleted) broadcastRegistryChange()
|
|
return Response.json({ success: true, deleted }, { headers: corsHeaders })
|
|
} catch (e: any) {
|
|
return Response.json({ error: e.message }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// ── Session State endpoints (centralized state) ──
|
|
|
|
if (url.pathname === '/session-state' && req.method === 'GET') {
|
|
return Response.json({ agents: sessionState.getSnapshot() }, { headers: corsHeaders })
|
|
}
|
|
|
|
if (url.pathname.startsWith('/session-state/') && req.method === 'GET') {
|
|
const agent = url.pathname.replace('/session-state/', '')
|
|
const state = sessionState.getAgentState(agent)
|
|
if (!state) return Response.json({ error: 'Agent not found' }, { status: 404, headers: corsHeaders })
|
|
return Response.json(state, { headers: corsHeaders })
|
|
}
|
|
|
|
// ── Approval tracking endpoints ──
|
|
|
|
if (url.pathname === '/add-approval' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as { agent: string, approval: any }
|
|
const patch = sessionState.addApproval(body.agent, body.approval)
|
|
broadcastSessionStatePatch({
|
|
type: 'session-state-patch',
|
|
agent: body.agent,
|
|
patch,
|
|
event: 'approval-added',
|
|
timestamp: Date.now(),
|
|
})
|
|
return Response.json({ success: true }, { headers: corsHeaders })
|
|
} catch {
|
|
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
if (url.pathname === '/resolve-approval' && req.method === 'POST') {
|
|
try {
|
|
const body = await req.json() as { requestId: string, decision: string }
|
|
const result = sessionState.resolveApproval(body.requestId)
|
|
if (result) {
|
|
broadcastSessionStatePatch({
|
|
type: 'session-state-patch',
|
|
agent: result.agent,
|
|
patch: result.patch,
|
|
event: 'approval-resolved',
|
|
timestamp: Date.now(),
|
|
})
|
|
}
|
|
return Response.json({ success: true, resolved: !!result }, { headers: corsHeaders })
|
|
} catch {
|
|
return Response.json({ error: 'Invalid JSON' }, { status: 400, headers: corsHeaders })
|
|
}
|
|
}
|
|
|
|
// Check if this is a WebSocket upgrade request
|
|
const upgradeHeader = req.headers.get('upgrade')
|
|
console.log(`[Terminal] Request: ${req.method} ${url.pathname}, Upgrade: ${upgradeHeader}`)
|
|
|
|
if (upgradeHeader?.toLowerCase() === 'websocket') {
|
|
const sessionId = url.searchParams.get('session') || null
|
|
const isBroadcast = !sessionId
|
|
const success = server.upgrade(req, { data: { sessionId, broadcast: isBroadcast } })
|
|
console.log(`[Terminal] WebSocket upgrade ${isBroadcast ? '(broadcast-only)' : `for session "${sessionId}"`}: ${success ? 'success' : 'failed'}`)
|
|
if (success) {
|
|
return undefined
|
|
}
|
|
return new Response('WebSocket upgrade failed', { status: 400 })
|
|
}
|
|
|
|
return new Response(
|
|
'Terminal WebSocket Server - Persistent Sessions\n\nEndpoints:\n /health - Server status\n /sessions - List active sessions\n ws://...?session=<id> - Connect to session',
|
|
{ status: 200 }
|
|
)
|
|
},
|
|
websocket: {
|
|
open(ws) {
|
|
const data = ws.data as any
|
|
const isBroadcast = data?.broadcast === true
|
|
|
|
if (isBroadcast) {
|
|
// Broadcast-only client — no PTY, just receives state updates
|
|
broadcastClients.add(ws)
|
|
|
|
// Send session state snapshot + terminal registry
|
|
const snapshot = sessionState.getSnapshot()
|
|
if (Object.keys(snapshot).length > 0) {
|
|
ws.send(JSON.stringify({
|
|
type: 'session-state-snapshot',
|
|
agents: snapshot,
|
|
}))
|
|
}
|
|
|
|
// Send current terminal registry
|
|
ws.send(JSON.stringify({
|
|
type: 'terminal-registry-change',
|
|
registry: getRegistrySnapshot(),
|
|
timestamp: Date.now()
|
|
}))
|
|
|
|
console.log(`[Terminal] Broadcast client connected (${broadcastClients.size} broadcast clients)`)
|
|
return
|
|
}
|
|
|
|
// PTY client — connect to an existing session
|
|
const sessionId = data?.sessionId || DEFAULT_SESSION_ID
|
|
console.log(`[Terminal] Client connecting to session: ${sessionId}`)
|
|
|
|
try {
|
|
const session = getOrCreateSession(sessionId)
|
|
session.clients.add(ws)
|
|
wsToSession.set(ws, sessionId)
|
|
|
|
ws.send(JSON.stringify({
|
|
type: 'connected',
|
|
sessionId: session.id,
|
|
isNew: session.outputBuffer.length === 0,
|
|
hasHistory: session.outputBuffer.length > 0,
|
|
bufferSize: session.outputBuffer.length
|
|
}))
|
|
|
|
console.log(`[Terminal] Client joined session ${sessionId} (${session.clients.size} clients)`)
|
|
} catch (e: any) {
|
|
console.error('[Terminal] Error:', e)
|
|
ws.send(JSON.stringify({ type: 'error', message: e.message }))
|
|
}
|
|
},
|
|
message(ws, message) {
|
|
// Broadcast-only clients don't have a PTY — ignore their messages
|
|
if (broadcastClients.has(ws)) return
|
|
|
|
try {
|
|
const msg = JSON.parse(message as string)
|
|
const sessionId = wsToSession.get(ws)
|
|
if (!sessionId) return
|
|
|
|
const session = sessions.get(sessionId)
|
|
if (!session) return
|
|
|
|
if (msg.type === 'input') {
|
|
session.pty.write(msg.data)
|
|
} else if (msg.type === 'resize' && msg.cols && msg.rows) {
|
|
session.pty.resize(msg.cols, msg.rows)
|
|
console.log(`[Terminal] Session ${sessionId} resized to ${msg.cols}x${msg.rows}`)
|
|
} else if (msg.type === 'clear-buffer') {
|
|
session.outputBuffer = []
|
|
console.log(`[Terminal] Buffer cleared for session ${sessionId}`)
|
|
ws.send(JSON.stringify({ type: 'buffer-cleared' }))
|
|
} else if (msg.type === 'request-replay') {
|
|
// Client requests fresh replay (used when terminal becomes visible)
|
|
console.log(`[Terminal] Replay requested, buffer has ${session.outputBuffer.length} chunks`)
|
|
|
|
if (session.outputBuffer.length > 0) {
|
|
// If tailOnly specified, only send last N chunks (enough for a few screens)
|
|
const tailOnly = msg.tailOnly === true
|
|
const tailChunks = msg.chunks || 500 // Default ~500 chunks for tail
|
|
|
|
let data: string
|
|
if (tailOnly && session.outputBuffer.length > tailChunks) {
|
|
// Send only the tail - more efficient for large buffers
|
|
data = session.outputBuffer.slice(-tailChunks).join('')
|
|
console.log(`[Terminal] Replaying tail (${tailChunks}/${session.outputBuffer.length} chunks), ${data.length} bytes`)
|
|
} else {
|
|
data = session.outputBuffer.join('')
|
|
console.log(`[Terminal] Replaying full buffer (${session.outputBuffer.length} chunks), ${data.length} bytes`)
|
|
}
|
|
|
|
ws.send(JSON.stringify({
|
|
type: 'replay',
|
|
data,
|
|
isTail: tailOnly && session.outputBuffer.length > tailChunks
|
|
}))
|
|
} else {
|
|
console.log('[Terminal] No buffer to replay')
|
|
}
|
|
}
|
|
} catch (e: any) {
|
|
console.error('[Terminal] Error:', e)
|
|
}
|
|
},
|
|
close(ws) {
|
|
// Check if this was a broadcast-only client
|
|
if (broadcastClients.delete(ws)) {
|
|
console.log(`[Terminal] Broadcast client disconnected (${broadcastClients.size} remaining)`)
|
|
return
|
|
}
|
|
|
|
// PTY client cleanup
|
|
const sessionId = wsToSession.get(ws)
|
|
if (sessionId) {
|
|
const session = sessions.get(sessionId)
|
|
if (session) {
|
|
session.clients.delete(ws)
|
|
console.log(`[Terminal] Client left session ${sessionId} (${session.clients.size} clients remaining)`)
|
|
}
|
|
wsToSession.delete(ws)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
console.log(`[Terminal] WebSocket running at ws://localhost:${PORT_TERMINAL}`)
|
|
return server
|
|
}
|
|
|
|
// Claude status types
|
|
type ClaudeStatus = 'idle' | 'thinking' | 'toolUse' | 'reading' | 'writing' | 'sessionStart' | 'sessionEnd' | 'permissionRequest' | 'interrupted' | 'error'
|
|
|
|
// Broadcast Claude status to ALL clients across ALL sessions
|
|
export function broadcastClaudeStatus(status: ClaudeStatus, tool?: string, agent?: string) {
|
|
const agentName = agent || 'claude'
|
|
|
|
// Track agent running state
|
|
if (status === 'sessionStart') {
|
|
const state = agentSessions.get(agentName)
|
|
if (state) {
|
|
state.isAgentRunning = true
|
|
console.log(`[Terminal] Agent ${agentName} marked as running (sessionStart)`)
|
|
}
|
|
} else if (status === 'sessionEnd') {
|
|
const state = agentSessions.get(agentName)
|
|
if (state) {
|
|
state.isAgentRunning = false
|
|
console.log(`[Terminal] Agent ${agentName} marked as stopped (sessionEnd)`)
|
|
}
|
|
}
|
|
|
|
const message = JSON.stringify({
|
|
type: 'claude-status',
|
|
status,
|
|
tool,
|
|
agent: agentName,
|
|
timestamp: Date.now()
|
|
})
|
|
|
|
const clientCount = broadcastToAll(message)
|
|
console.log(`[Terminal] Claude status broadcast: ${status}${tool ? ` (${tool})` : ''} → ${clientCount} clients`)
|
|
|
|
// Note: session state is updated via broadcastClaudeHook which has full payload context.
|
|
// Direct /claude-status POSTs (from ejecutor's settings.local.json) are lightweight
|
|
// and don't carry enough context to update full session state.
|
|
}
|
|
|
|
// Broadcast full Claude hook data to ALL clients
|
|
export function broadcastClaudeHook(data: Record<string, unknown>) {
|
|
// ── Update centralized session state and broadcast patch ──
|
|
const statePatch = sessionState.processHookEvent(data as any)
|
|
broadcastSessionStatePatch(statePatch)
|
|
|
|
// ── Legacy raw broadcast (dual temporal — kept for backward compatibility) ──
|
|
const message = JSON.stringify({
|
|
type: 'claude-hook',
|
|
...data,
|
|
timestamp: Date.now()
|
|
})
|
|
|
|
const clientCount = broadcastToAll(message)
|
|
console.log(`[Terminal] Claude hook broadcast: ${data.hook_event_name || 'unknown'}${data.tool_name ? ` (${data.tool_name})` : ''} → ${clientCount} clients`)
|
|
}
|
|
|
|
// Broadcast permission request to ALL clients
|
|
export function broadcastPermissionRequest(data: Record<string, unknown>) {
|
|
const message = JSON.stringify({
|
|
type: 'claude-permission',
|
|
...data,
|
|
timestamp: Date.now()
|
|
})
|
|
|
|
const clientCount = broadcastToAll(message)
|
|
console.log(`[Terminal] Permission request broadcast: ${data.tool_name || 'unknown'} (${data.requestId}) → ${clientCount} clients`)
|
|
}
|
|
|
|
// Broadcast transcript updates to ALL clients
|
|
export function broadcastTranscriptUpdate(data: Record<string, unknown>) {
|
|
const message = JSON.stringify({
|
|
type: 'transcript-update',
|
|
...data,
|
|
timestamp: Date.now()
|
|
})
|
|
|
|
const clientCount = broadcastToAll(message)
|
|
console.log(`[Terminal] Transcript update: ${data.hookEvent || 'fetch'} (${(data.messages as any[])?.length || 0} msgs) → ${clientCount} clients`)
|
|
}
|