perf: incremental transcript updates via WebSocket, eliminate polling

- Replace 1500ms polling with fs.watch recursive (reliable on Windows)
- Enrich WS broadcast with newContent delta + session metadata
- Client appends incrementally instead of 2 sequential HTTP requests
- Pre-initialize Tauri HTTP plugin at module load to avoid dynamic import overhead
- Per-file debounce timers (150ms) instead of single shared timer
- Size-based validation for safe incremental appends with HTTP fallback
This commit is contained in:
2026-02-24 11:01:54 -06:00
parent c46b1283d1
commit 5bd115e197
3 changed files with 240 additions and 96 deletions

View File

@@ -3,28 +3,34 @@
* Watches all agent project dirs (.claude-ejecutor/projects/, .claude-nucleo000/projects/)
* for JSONL file changes and broadcasts notifications via the sync server.
*
* Uses a polling approach on Windows (fs.watch is unreliable for appends)
* combined with fs.watch for immediate detection.
* Uses fs.watch with { recursive: true } which is reliable on Windows (ReadDirectoryChangesW),
* macOS (FSEvents) and Linux (inotify). Same strategy as git-handler.ts.
* File size tracking is used to filter out duplicate/spurious events.
*
* Broadcasts enriched events including new JSONL content and session metadata,
* so clients can update incrementally without additional HTTP requests.
*/
import { watch, existsSync, readdirSync, statSync, type FSWatcher } from 'fs'
import { join } from 'path'
import { watch, existsSync, readdirSync, statSync, readFileSync, openSync, readSync, closeSync, type FSWatcher } from 'fs'
import { join, basename } from 'path'
import { homedir } from 'os'
const AGENT_NAMES = ['ejecutor', 'nucleo000', 'claude']
const DEBOUNCE_MS = 25
const POLL_INTERVAL_MS = 1500
const DEBOUNCE_MS = 150 // Per-file debounce — balances responsiveness and batching
const MAX_WS_DELTA_BYTES = 256 * 1024 // Skip pushing content > 256KB via WS
// Per-agent state
interface AgentWatcherState {
projectDir: string
watcher: FSWatcher | null
fileSizeCache: Map<string, number>
debounceTimer: ReturnType<typeof setTimeout> | null
debounceTimers: Map<string, ReturnType<typeof setTimeout>>
}
const agentStates = new Map<string, AgentWatcherState>()
let pollTimer: ReturnType<typeof setInterval> | null = null
// Cache firstUserMessage per session — never changes after first extraction
const firstMessageCache = new Map<string, string>()
// Project hash matching the working dir
const PROJECT_HASH = 'C--Users-jodar-agent-ui'
@@ -45,28 +51,108 @@ function findProjectDir(workingDir: string, agent: string): string | null {
return dirs.length > 0 ? join(agentDir, dirs[0]) : null
}
function emitChange(agent: string, sessionId: string, filename: string, projectDir: string, broadcast: (message: string) => void) {
const state = agentStates.get(agent)
if (!state) return
function extractFirstUserMessage(filePath: string): string {
// Read only first 8KB — enough for session header + first user message
try {
const fd = openSync(filePath, 'r')
const buf = Buffer.alloc(8192)
const bytesRead = readSync(fd, buf, 0, 8192, 0)
closeSync(fd)
if (state.debounceTimer) clearTimeout(state.debounceTimer)
state.debounceTimer = setTimeout(() => {
const filePath = join(projectDir, filename)
const content = buf.toString('utf-8', 0, bytesRead)
const lines = content.split('\n')
for (const line of lines) {
if (!line.trim()) continue
try {
const obj = JSON.parse(line)
if (obj.type === 'user' && obj.message) {
const c = obj.message.content
if (typeof c === 'string') return c.slice(0, 120)
if (Array.isArray(c)) {
const textBlock = c.find((b: any) => b.type === 'text' && b.text?.trim())
if (textBlock) return textBlock.text.slice(0, 120)
}
}
} catch {}
}
} catch {}
return ''
}
function getFirstUserMessage(agent: string, sessionId: string, filePath: string): string {
const key = `${agent}:${sessionId}`
const cached = firstMessageCache.get(key)
if (cached) return cached
const msg = extractFirstUserMessage(filePath)
if (msg) firstMessageCache.set(key, msg)
return msg
}
function readNewBytes(filePath: string, offset: number, size: number): string {
const length = size - offset
if (length <= 0) return ''
try {
const fd = openSync(filePath, 'r')
const buf = Buffer.alloc(length)
readSync(fd, buf, 0, length, offset)
closeSync(fd)
return buf.toString('utf-8')
} catch {
return ''
}
}
function emitChange(agent: string, filename: string, state: AgentWatcherState, broadcast: (message: string) => void) {
const sessionId = filename.replace('.jsonl', '')
const filePath = join(state.projectDir, filename)
// Per-file debounce so rapid writes to the same file coalesce
const existing = state.debounceTimers.get(filename)
if (existing) clearTimeout(existing)
state.debounceTimers.set(filename, setTimeout(() => {
state.debounceTimers.delete(filename)
// Check actual size change to filter spurious events
let size = 0
let mtime = 0
try {
size = statSync(filePath).size
} catch {}
const stat = statSync(filePath)
size = stat.size
mtime = stat.mtimeMs
} catch {
return // File may have been deleted
}
console.log(`[TranscriptDebug:${agent}] Change: ${filename} (${size} bytes)`)
const prevSize = state.fileSizeCache.get(filename) || 0
if (size === prevSize) return // No actual change
// Read new bytes (only the delta) — skip if too large
const deltaSize = size - prevSize
let newContent = ''
if (deltaSize > 0 && deltaSize <= MAX_WS_DELTA_BYTES) {
newContent = readNewBytes(filePath, prevSize, size)
}
// Get or cache firstUserMessage
const firstUserMessage = getFirstUserMessage(agent, sessionId, filePath)
state.fileSizeCache.set(filename, size)
console.log(`[TranscriptDebug:${agent}] Change: ${filename} (${prevSize}${size} bytes, +${deltaSize})`)
broadcast(JSON.stringify({
type: 'transcript-debug-change',
sessionId,
agent,
filename,
size,
prevSize,
mtime,
firstUserMessage,
newContent, // empty string if delta was too large
timestamp: Date.now()
}))
}, DEBOUNCE_MS)
}, DEBOUNCE_MS))
}
export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (message: string) => void) {
@@ -80,7 +166,7 @@ export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (mess
projectDir,
watcher: null,
fileSizeCache: new Map(),
debounceTimer: null
debounceTimers: new Map()
}
// Initialize file size cache
@@ -93,15 +179,16 @@ export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (mess
}
} catch {}
// fs.watch for immediate detection
// fs.watch with recursive: true — reliable on Windows, macOS, and Linux
try {
state.watcher = watch(projectDir, { recursive: false }, (_, filename) => {
state.watcher = watch(projectDir, { recursive: true }, (_, filename) => {
if (!filename) return
if (!filename.endsWith('.jsonl')) return
const sessionId = filename.replace('.jsonl', '')
emitChange(agent, sessionId, filename, projectDir, broadcast)
// recursive may report subdirectory paths — extract just the filename
const base = basename(filename)
if (!base.endsWith('.jsonl')) return
emitChange(agent, base, state, broadcast)
})
console.log(`[TranscriptDebug] Watching ${agent}: ${projectDir}`)
console.log(`[TranscriptDebug] Watching ${agent}: ${projectDir} (recursive)`)
} catch (e: any) {
console.error(`[TranscriptDebug] Watch failed for ${agent}: ${e.message}`)
}
@@ -112,34 +199,7 @@ export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (mess
if (!anyWatched) {
console.log('[TranscriptDebug] No agent project directories found, skipping watcher')
return
}
// Shared polling fallback for all agents
pollTimer = setInterval(() => {
for (const [agent, state] of agentStates) {
try {
const files = readdirSync(state.projectDir).filter(f => f.endsWith('.jsonl'))
for (const f of files) {
try {
const size = statSync(join(state.projectDir, f)).size
const prevSize = state.fileSizeCache.get(f) || 0
if (size !== prevSize) {
state.fileSizeCache.set(f, size)
const sessionId = f.replace('.jsonl', '')
emitChange(agent, sessionId, f, state.projectDir, broadcast)
}
} catch {}
}
// Detect new files
for (const f of files) {
if (!state.fileSizeCache.has(f)) {
state.fileSizeCache.set(f, 0)
}
}
} catch {}
}
}, POLL_INTERVAL_MS)
}
export function cleanupTranscriptDebugWatcher() {
@@ -147,14 +207,9 @@ export function cleanupTranscriptDebugWatcher() {
if (state.watcher) {
state.watcher.close()
}
if (state.debounceTimer) {
clearTimeout(state.debounceTimer)
for (const [, timer] of state.debounceTimers) {
clearTimeout(timer)
}
}
agentStates.clear()
if (pollTimer) {
clearInterval(pollTimer)
pollTimer = null
}
}