From 5bd115e19769e73b6de1ac6c20c8032db5adb69d Mon Sep 17 00:00:00 2001 From: josedario87 Date: Tue, 24 Feb 2026 11:01:54 -0600 Subject: [PATCH] perf: incremental transcript updates via WebSocket, eliminate polling - Replace 1500ms polling with fs.watch recursive (reliable on Windows) - Enrich WS broadcast with newContent delta + session metadata - Client appends incrementally instead of 2 sequential HTTP requests - Pre-initialize Tauri HTTP plugin at module load to avoid dynamic import overhead - Per-file debounce timers (150ms) instead of single shared timer - Size-based validation for safe incremental appends with HTTP fallback --- .../transcript-debug/useTranscriptDebug.ts | 131 +++++++++---- frontend/src/lib/tauri.ts | 32 +++- .../handlers/transcript-debug-handler.ts | 173 ++++++++++++------ 3 files changed, 240 insertions(+), 96 deletions(-) diff --git a/frontend/src/composables/transcript-debug/useTranscriptDebug.ts b/frontend/src/composables/transcript-debug/useTranscriptDebug.ts index e96826e..80ede1c 100644 --- a/frontend/src/composables/transcript-debug/useTranscriptDebug.ts +++ b/frontend/src/composables/transcript-debug/useTranscriptDebug.ts @@ -33,6 +33,7 @@ export function useTranscriptDebug() { const sessions = ref([]) const selectedSessionId = ref(null) const rawContent = ref('') + let knownByteSize = 0 // Track byte size for incremental WS updates const conversation = ref(null) const loading = ref(false) const transitioning = ref(false) @@ -336,7 +337,7 @@ export function useTranscriptDebug() { try { const msg = JSON.parse(event.data) if (msg.type === 'transcript-debug-change') { - handleRealtimeChange(msg.sessionId) + handleRealtimeChange(msg) } else if (msg.type === 'transcript-debug-done') { handleRealtimeDone(msg.sessionId) } @@ -368,13 +369,77 @@ export function useTranscriptDebug() { isRealtime.value = false } - async function handleRealtimeChange(changedSessionId: string) { - // Refresh session list (new sessions or size changes) - await fetchSessions() + // Update session list from enriched WS event (no HTTP request needed) + function updateSessionFromEvent(msg: { + sessionId: string; agent: string; filename: string + size: number; mtime: number; firstUserMessage: string + }) { + // Only update if this event matches the selected agent + if (msg.agent !== selectedAgent.value) return - // Terminal registry is now updated via WS broadcast (no polling needed) + const idx = sessions.value.findIndex(s => s.id === msg.sessionId) + if (idx >= 0) { + // Update existing session metadata + sessions.value[idx] = { + ...sessions.value[idx], + size: msg.size, + mtime: msg.mtime, + mtimeISO: new Date(msg.mtime).toISOString(), + firstUserMessage: msg.firstUserMessage || sessions.value[idx].firstUserMessage + } + } else { + // New session appeared — add to list + sessions.value.unshift({ + id: msg.sessionId, + filename: msg.filename, + size: msg.size, + mtime: msg.mtime, + mtimeISO: new Date(msg.mtime).toISOString(), + firstUserMessage: msg.firstUserMessage || '' + }) + } + // Re-sort by mtime (newest first) + sessions.value.sort((a, b) => b.mtime - a.mtime) + } - // New session just appeared — lock onto it, re-key from __new__ + // Apply optimistic message/processing state to a parsed conversation + function applyOptimisticState(parsed: ParsedConversation) { + if (optimisticMessage.value) { + const optimisticText = optimisticMessage.value.content + const found = parsed.messages.some( + m => m.kind === 'user' && (m as ParsedUserMessage).content.includes(optimisticText) + ) + if (found) { + optimisticMessage.value = null + } else { + parsed.messages.push(optimisticMessage.value) + } + } + + if (optimisticProcessing.value) { + const agentState = sessionStore.agents[selectedAgent.value] + if (agentState && ['idle', 'sessionStart', 'sessionEnd'].includes(agentState.status)) { + optimisticProcessing.value = false + } + } + + conversation.value = parsed + } + + async function handleRealtimeChange(msg: { + sessionId: string; agent: string; filename: string + size: number; prevSize: number; mtime: number + firstUserMessage: string; newContent: string; timestamp: number + }) { + const changedSessionId = msg.sessionId + + // 1. Update session list from WS data (no HTTP!) + updateSessionFromEvent(msg) + console.log(`[TranscriptDebug] WS update: ${changedSessionId.slice(0, 8)}... ` + + `(+${msg.size - msg.prevSize} bytes, incremental=${msg.newContent ? 'yes' : 'no'}, ` + + `aligned=${msg.prevSize === knownByteSize})`) + + // 2. New session just appeared — lock onto it, re-key from __new__ if (awaitingNewSession.value) { awaitingNewSession.value = false @@ -396,27 +461,45 @@ export function useTranscriptDebug() { } selectedSessionId.value = changedSessionId - await fetchSessionContent(changedSessionId) + + // Use WS content if available (prevSize=0 for new files) + if (msg.newContent) { + rawContent.value = msg.newContent + knownByteSize = msg.size + conversation.value = parseJsonl(rawContent.value, changedSessionId) + } else { + await fetchSessionContent(changedSessionId) + } // Auto-send queued initial prompt if (pendingPrompt.value) { const prompt = pendingPrompt.value pendingPrompt.value = null - // Small delay to let terminal fully settle setTimeout(() => sendPrompt(prompt), 300) } return } - // If the changed session is the one we're viewing, re-fetch it + // 3. If the changed session is the one we're viewing, update incrementally if (selectedSessionId.value && selectedSessionId.value === changedSessionId) { - await reloadCurrentSession() + if (msg.newContent && msg.prevSize === knownByteSize) { + // Incremental append — no HTTP request! + rawContent.value += msg.newContent + knownByteSize = msg.size + console.log(`[TranscriptDebug] ✓ Incremental update (0 HTTP) — knownByteSize=${knownByteSize}`) + const parsed = parseJsonl(rawContent.value, changedSessionId) + applyOptimisticState(parsed) + } else if (msg.size !== knownByteSize) { + console.log(`[TranscriptDebug] ✗ Fallback to HTTP — prevSize=${msg.prevSize} knownByteSize=${knownByteSize} size=${msg.size}`) + // Mismatch or delta too large — fallback to full HTTP reload + await reloadCurrentSession() + } } else if (!selectedSessionId.value && sessions.value.length > 0) { // No session selected yet — auto-select the newest await selectSession(sessions.value[0].id) } - // Update label for any terminal in the registry matching this session + // 4. Update terminal label if changed const entry = serverRegistry.value.find(e => e.transcriptSessionId === changedSessionId) if (entry) { const newLabel = getSessionLabel(changedSessionId) @@ -440,30 +523,9 @@ export function useTranscriptDebug() { const res = await apiFetch(`/api/transcript-debug/${selectedSessionId.value}/raw?agent=${selectedAgent.value}`) if (!res.ok) throw new Error(`HTTP ${res.status}`) rawContent.value = await res.text() + knownByteSize = new TextEncoder().encode(rawContent.value).length const parsed = parseJsonl(rawContent.value, selectedSessionId.value) - - // Check if the optimistic user message now exists in the real JSONL - if (optimisticMessage.value) { - const optimisticText = optimisticMessage.value.content - const found = parsed.messages.some( - m => m.kind === 'user' && (m as ParsedUserMessage).content.includes(optimisticText) - ) - if (found) { - optimisticMessage.value = null - } else { - parsed.messages.push(optimisticMessage.value) - } - } - - // Clear optimistic processing if server state is now idle - if (optimisticProcessing.value) { - const agentState = sessionStore.agents[selectedAgent.value] - if (agentState && ['idle', 'sessionStart', 'sessionEnd'].includes(agentState.status)) { - optimisticProcessing.value = false - } - } - - conversation.value = parsed + applyOptimisticState(parsed) } catch (e: any) { console.error('[TranscriptDebug] Reload failed:', e.message) } @@ -561,6 +623,7 @@ export function useTranscriptDebug() { const res = await apiFetch(`/api/transcript-debug/${sessionId}/raw?agent=${agent}`) if (!res.ok) throw new Error(`HTTP ${res.status}`) rawContent.value = await res.text() + knownByteSize = new TextEncoder().encode(rawContent.value).length conversation.value = parseJsonl(rawContent.value, sessionId) return true } catch (e: any) { diff --git a/frontend/src/lib/tauri.ts b/frontend/src/lib/tauri.ts index f06312c..bda054c 100644 --- a/frontend/src/lib/tauri.ts +++ b/frontend/src/lib/tauri.ts @@ -17,6 +17,9 @@ export function isMobileTauri(): boolean { // Server URL storage (in-memory, loaded from Tauri store on init) let _serverUrl = '' +// Cached Tauri HTTP fetch function (avoid dynamic import on every request) +let _tauriFetch: typeof fetch | null = null + export function getServerUrl(): string { return _serverUrl } @@ -55,10 +58,13 @@ export async function apiFetch(input: string | URL | Request, init?: RequestInit // Resolve URL const url = typeof input === 'string' ? resolveUrl(input) : input - // Use Tauri HTTP plugin for cross-origin requests + // Use Tauri HTTP plugin for cross-origin requests (cached after first load) try { - const { fetch: tauriFetch } = await import('@tauri-apps/plugin-http') - return tauriFetch(url, init) + if (!_tauriFetch) { + const { fetch: tauriFetch } = await import('@tauri-apps/plugin-http') + _tauriFetch = tauriFetch + } + return _tauriFetch(url, init) } catch (e) { // Fallback to native fetch if plugin fails console.warn('[Tauri] HTTP plugin failed, falling back to fetch:', e) @@ -66,6 +72,26 @@ export async function apiFetch(input: string | URL | Request, init?: RequestInit } } +/** + * Eagerly initialize the Tauri HTTP plugin to avoid dynamic import overhead + * on the first apiFetch call. Call this once at app startup. + */ +export async function initTauriFetch() { + if (!isTauri || _tauriFetch) return + try { + const { fetch: tauriFetch } = await import('@tauri-apps/plugin-http') + _tauriFetch = tauriFetch + console.log('[Tauri] HTTP plugin pre-initialized') + } catch (e) { + console.warn('[Tauri] Failed to pre-init HTTP plugin:', e) + } +} + +// Auto-init on module load in Tauri mode +if (isTauri) { + initTauriFetch() +} + // Dynamic plugin imports (only used behind isTauri checks) export async function getTauriStore() { const { LazyStore } = await import('@tauri-apps/plugin-store') diff --git a/server/services/handlers/transcript-debug-handler.ts b/server/services/handlers/transcript-debug-handler.ts index 36784eb..6a90af2 100644 --- a/server/services/handlers/transcript-debug-handler.ts +++ b/server/services/handlers/transcript-debug-handler.ts @@ -3,28 +3,34 @@ * Watches all agent project dirs (.claude-ejecutor/projects/, .claude-nucleo000/projects/) * for JSONL file changes and broadcasts notifications via the sync server. * - * Uses a polling approach on Windows (fs.watch is unreliable for appends) - * combined with fs.watch for immediate detection. + * Uses fs.watch with { recursive: true } which is reliable on Windows (ReadDirectoryChangesW), + * macOS (FSEvents) and Linux (inotify). Same strategy as git-handler.ts. + * File size tracking is used to filter out duplicate/spurious events. + * + * Broadcasts enriched events including new JSONL content and session metadata, + * so clients can update incrementally without additional HTTP requests. */ -import { watch, existsSync, readdirSync, statSync, type FSWatcher } from 'fs' -import { join } from 'path' +import { watch, existsSync, readdirSync, statSync, readFileSync, openSync, readSync, closeSync, type FSWatcher } from 'fs' +import { join, basename } from 'path' import { homedir } from 'os' const AGENT_NAMES = ['ejecutor', 'nucleo000', 'claude'] -const DEBOUNCE_MS = 25 -const POLL_INTERVAL_MS = 1500 +const DEBOUNCE_MS = 150 // Per-file debounce — balances responsiveness and batching +const MAX_WS_DELTA_BYTES = 256 * 1024 // Skip pushing content > 256KB via WS // Per-agent state interface AgentWatcherState { projectDir: string watcher: FSWatcher | null fileSizeCache: Map - debounceTimer: ReturnType | null + debounceTimers: Map> } const agentStates = new Map() -let pollTimer: ReturnType | null = null + +// Cache firstUserMessage per session — never changes after first extraction +const firstMessageCache = new Map() // Project hash matching the working dir const PROJECT_HASH = 'C--Users-jodar-agent-ui' @@ -45,28 +51,108 @@ function findProjectDir(workingDir: string, agent: string): string | null { return dirs.length > 0 ? join(agentDir, dirs[0]) : null } -function emitChange(agent: string, sessionId: string, filename: string, projectDir: string, broadcast: (message: string) => void) { - const state = agentStates.get(agent) - if (!state) return +function extractFirstUserMessage(filePath: string): string { + // Read only first 8KB — enough for session header + first user message + try { + const fd = openSync(filePath, 'r') + const buf = Buffer.alloc(8192) + const bytesRead = readSync(fd, buf, 0, 8192, 0) + closeSync(fd) - if (state.debounceTimer) clearTimeout(state.debounceTimer) - state.debounceTimer = setTimeout(() => { - const filePath = join(projectDir, filename) + const content = buf.toString('utf-8', 0, bytesRead) + const lines = content.split('\n') + for (const line of lines) { + if (!line.trim()) continue + try { + const obj = JSON.parse(line) + if (obj.type === 'user' && obj.message) { + const c = obj.message.content + if (typeof c === 'string') return c.slice(0, 120) + if (Array.isArray(c)) { + const textBlock = c.find((b: any) => b.type === 'text' && b.text?.trim()) + if (textBlock) return textBlock.text.slice(0, 120) + } + } + } catch {} + } + } catch {} + return '' +} + +function getFirstUserMessage(agent: string, sessionId: string, filePath: string): string { + const key = `${agent}:${sessionId}` + const cached = firstMessageCache.get(key) + if (cached) return cached + const msg = extractFirstUserMessage(filePath) + if (msg) firstMessageCache.set(key, msg) + return msg +} + +function readNewBytes(filePath: string, offset: number, size: number): string { + const length = size - offset + if (length <= 0) return '' + try { + const fd = openSync(filePath, 'r') + const buf = Buffer.alloc(length) + readSync(fd, buf, 0, length, offset) + closeSync(fd) + return buf.toString('utf-8') + } catch { + return '' + } +} + +function emitChange(agent: string, filename: string, state: AgentWatcherState, broadcast: (message: string) => void) { + const sessionId = filename.replace('.jsonl', '') + const filePath = join(state.projectDir, filename) + + // Per-file debounce so rapid writes to the same file coalesce + const existing = state.debounceTimers.get(filename) + if (existing) clearTimeout(existing) + + state.debounceTimers.set(filename, setTimeout(() => { + state.debounceTimers.delete(filename) + + // Check actual size change to filter spurious events let size = 0 + let mtime = 0 try { - size = statSync(filePath).size - } catch {} + const stat = statSync(filePath) + size = stat.size + mtime = stat.mtimeMs + } catch { + return // File may have been deleted + } - console.log(`[TranscriptDebug:${agent}] Change: ${filename} (${size} bytes)`) + const prevSize = state.fileSizeCache.get(filename) || 0 + if (size === prevSize) return // No actual change + + // Read new bytes (only the delta) — skip if too large + const deltaSize = size - prevSize + let newContent = '' + if (deltaSize > 0 && deltaSize <= MAX_WS_DELTA_BYTES) { + newContent = readNewBytes(filePath, prevSize, size) + } + + // Get or cache firstUserMessage + const firstUserMessage = getFirstUserMessage(agent, sessionId, filePath) + + state.fileSizeCache.set(filename, size) + + console.log(`[TranscriptDebug:${agent}] Change: ${filename} (${prevSize} → ${size} bytes, +${deltaSize})`) broadcast(JSON.stringify({ type: 'transcript-debug-change', sessionId, agent, filename, size, + prevSize, + mtime, + firstUserMessage, + newContent, // empty string if delta was too large timestamp: Date.now() })) - }, DEBOUNCE_MS) + }, DEBOUNCE_MS)) } export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (message: string) => void) { @@ -80,7 +166,7 @@ export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (mess projectDir, watcher: null, fileSizeCache: new Map(), - debounceTimer: null + debounceTimers: new Map() } // Initialize file size cache @@ -93,15 +179,16 @@ export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (mess } } catch {} - // fs.watch for immediate detection + // fs.watch with recursive: true — reliable on Windows, macOS, and Linux try { - state.watcher = watch(projectDir, { recursive: false }, (_, filename) => { + state.watcher = watch(projectDir, { recursive: true }, (_, filename) => { if (!filename) return - if (!filename.endsWith('.jsonl')) return - const sessionId = filename.replace('.jsonl', '') - emitChange(agent, sessionId, filename, projectDir, broadcast) + // recursive may report subdirectory paths — extract just the filename + const base = basename(filename) + if (!base.endsWith('.jsonl')) return + emitChange(agent, base, state, broadcast) }) - console.log(`[TranscriptDebug] Watching ${agent}: ${projectDir}`) + console.log(`[TranscriptDebug] Watching ${agent}: ${projectDir} (recursive)`) } catch (e: any) { console.error(`[TranscriptDebug] Watch failed for ${agent}: ${e.message}`) } @@ -112,34 +199,7 @@ export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (mess if (!anyWatched) { console.log('[TranscriptDebug] No agent project directories found, skipping watcher') - return } - - // Shared polling fallback for all agents - pollTimer = setInterval(() => { - for (const [agent, state] of agentStates) { - try { - const files = readdirSync(state.projectDir).filter(f => f.endsWith('.jsonl')) - for (const f of files) { - try { - const size = statSync(join(state.projectDir, f)).size - const prevSize = state.fileSizeCache.get(f) || 0 - if (size !== prevSize) { - state.fileSizeCache.set(f, size) - const sessionId = f.replace('.jsonl', '') - emitChange(agent, sessionId, f, state.projectDir, broadcast) - } - } catch {} - } - // Detect new files - for (const f of files) { - if (!state.fileSizeCache.has(f)) { - state.fileSizeCache.set(f, 0) - } - } - } catch {} - } - }, POLL_INTERVAL_MS) } export function cleanupTranscriptDebugWatcher() { @@ -147,14 +207,9 @@ export function cleanupTranscriptDebugWatcher() { if (state.watcher) { state.watcher.close() } - if (state.debounceTimer) { - clearTimeout(state.debounceTimer) + for (const [, timer] of state.debounceTimers) { + clearTimeout(timer) } } agentStates.clear() - - if (pollTimer) { - clearInterval(pollTimer) - pollTimer = null - } }