- Replace 1500ms polling with fs.watch recursive (reliable on Windows) - Enrich WS broadcast with newContent delta + session metadata - Client appends incrementally instead of 2 sequential HTTP requests - Pre-initialize Tauri HTTP plugin at module load to avoid dynamic import overhead - Per-file debounce timers (150ms) instead of single shared timer - Size-based validation for safe incremental appends with HTTP fallback
216 lines
6.8 KiB
TypeScript
216 lines
6.8 KiB
TypeScript
/**
|
|
* Transcript Debug Handler
|
|
* Watches all agent project dirs (.claude-ejecutor/projects/, .claude-nucleo000/projects/)
|
|
* for JSONL file changes and broadcasts notifications via the sync server.
|
|
*
|
|
* Uses fs.watch with { recursive: true } which is reliable on Windows (ReadDirectoryChangesW),
|
|
* macOS (FSEvents) and Linux (inotify). Same strategy as git-handler.ts.
|
|
* File size tracking is used to filter out duplicate/spurious events.
|
|
*
|
|
* Broadcasts enriched events including new JSONL content and session metadata,
|
|
* so clients can update incrementally without additional HTTP requests.
|
|
*/
|
|
|
|
import { watch, existsSync, readdirSync, statSync, readFileSync, openSync, readSync, closeSync, type FSWatcher } from 'fs'
|
|
import { join, basename } from 'path'
|
|
import { homedir } from 'os'
|
|
|
|
const AGENT_NAMES = ['ejecutor', 'nucleo000', 'claude']
|
|
const DEBOUNCE_MS = 150 // Per-file debounce — balances responsiveness and batching
|
|
const MAX_WS_DELTA_BYTES = 256 * 1024 // Skip pushing content > 256KB via WS
|
|
|
|
// Per-agent state
|
|
interface AgentWatcherState {
|
|
projectDir: string
|
|
watcher: FSWatcher | null
|
|
fileSizeCache: Map<string, number>
|
|
debounceTimers: Map<string, ReturnType<typeof setTimeout>>
|
|
}
|
|
|
|
const agentStates = new Map<string, AgentWatcherState>()
|
|
|
|
// Cache firstUserMessage per session — never changes after first extraction
|
|
const firstMessageCache = new Map<string, string>()
|
|
|
|
// Project hash matching the working dir
|
|
const PROJECT_HASH = 'C--Users-jodar-agent-ui'
|
|
|
|
function findProjectDir(workingDir: string, agent: string): string | null {
|
|
const agentDir = agent === 'claude'
|
|
? join(homedir(), '.claude', 'projects')
|
|
: join(workingDir, `.claude-${agent}`, 'projects')
|
|
if (!existsSync(agentDir)) return null
|
|
|
|
// For claude global dir, go straight to the project hash
|
|
if (agent === 'claude') {
|
|
const exact = join(agentDir, PROJECT_HASH)
|
|
return existsSync(exact) ? exact : null
|
|
}
|
|
|
|
const dirs = readdirSync(agentDir)
|
|
return dirs.length > 0 ? join(agentDir, dirs[0]) : null
|
|
}
|
|
|
|
function extractFirstUserMessage(filePath: string): string {
|
|
// Read only first 8KB — enough for session header + first user message
|
|
try {
|
|
const fd = openSync(filePath, 'r')
|
|
const buf = Buffer.alloc(8192)
|
|
const bytesRead = readSync(fd, buf, 0, 8192, 0)
|
|
closeSync(fd)
|
|
|
|
const content = buf.toString('utf-8', 0, bytesRead)
|
|
const lines = content.split('\n')
|
|
for (const line of lines) {
|
|
if (!line.trim()) continue
|
|
try {
|
|
const obj = JSON.parse(line)
|
|
if (obj.type === 'user' && obj.message) {
|
|
const c = obj.message.content
|
|
if (typeof c === 'string') return c.slice(0, 120)
|
|
if (Array.isArray(c)) {
|
|
const textBlock = c.find((b: any) => b.type === 'text' && b.text?.trim())
|
|
if (textBlock) return textBlock.text.slice(0, 120)
|
|
}
|
|
}
|
|
} catch {}
|
|
}
|
|
} catch {}
|
|
return ''
|
|
}
|
|
|
|
function getFirstUserMessage(agent: string, sessionId: string, filePath: string): string {
|
|
const key = `${agent}:${sessionId}`
|
|
const cached = firstMessageCache.get(key)
|
|
if (cached) return cached
|
|
const msg = extractFirstUserMessage(filePath)
|
|
if (msg) firstMessageCache.set(key, msg)
|
|
return msg
|
|
}
|
|
|
|
function readNewBytes(filePath: string, offset: number, size: number): string {
|
|
const length = size - offset
|
|
if (length <= 0) return ''
|
|
try {
|
|
const fd = openSync(filePath, 'r')
|
|
const buf = Buffer.alloc(length)
|
|
readSync(fd, buf, 0, length, offset)
|
|
closeSync(fd)
|
|
return buf.toString('utf-8')
|
|
} catch {
|
|
return ''
|
|
}
|
|
}
|
|
|
|
function emitChange(agent: string, filename: string, state: AgentWatcherState, broadcast: (message: string) => void) {
|
|
const sessionId = filename.replace('.jsonl', '')
|
|
const filePath = join(state.projectDir, filename)
|
|
|
|
// Per-file debounce so rapid writes to the same file coalesce
|
|
const existing = state.debounceTimers.get(filename)
|
|
if (existing) clearTimeout(existing)
|
|
|
|
state.debounceTimers.set(filename, setTimeout(() => {
|
|
state.debounceTimers.delete(filename)
|
|
|
|
// Check actual size change to filter spurious events
|
|
let size = 0
|
|
let mtime = 0
|
|
try {
|
|
const stat = statSync(filePath)
|
|
size = stat.size
|
|
mtime = stat.mtimeMs
|
|
} catch {
|
|
return // File may have been deleted
|
|
}
|
|
|
|
const prevSize = state.fileSizeCache.get(filename) || 0
|
|
if (size === prevSize) return // No actual change
|
|
|
|
// Read new bytes (only the delta) — skip if too large
|
|
const deltaSize = size - prevSize
|
|
let newContent = ''
|
|
if (deltaSize > 0 && deltaSize <= MAX_WS_DELTA_BYTES) {
|
|
newContent = readNewBytes(filePath, prevSize, size)
|
|
}
|
|
|
|
// Get or cache firstUserMessage
|
|
const firstUserMessage = getFirstUserMessage(agent, sessionId, filePath)
|
|
|
|
state.fileSizeCache.set(filename, size)
|
|
|
|
console.log(`[TranscriptDebug:${agent}] Change: ${filename} (${prevSize} → ${size} bytes, +${deltaSize})`)
|
|
broadcast(JSON.stringify({
|
|
type: 'transcript-debug-change',
|
|
sessionId,
|
|
agent,
|
|
filename,
|
|
size,
|
|
prevSize,
|
|
mtime,
|
|
firstUserMessage,
|
|
newContent, // empty string if delta was too large
|
|
timestamp: Date.now()
|
|
}))
|
|
}, DEBOUNCE_MS))
|
|
}
|
|
|
|
export function setupTranscriptDebugWatcher(workingDir: string, broadcast: (message: string) => void) {
|
|
let anyWatched = false
|
|
|
|
for (const agent of AGENT_NAMES) {
|
|
const projectDir = findProjectDir(workingDir, agent)
|
|
if (!projectDir) continue
|
|
|
|
const state: AgentWatcherState = {
|
|
projectDir,
|
|
watcher: null,
|
|
fileSizeCache: new Map(),
|
|
debounceTimers: new Map()
|
|
}
|
|
|
|
// Initialize file size cache
|
|
try {
|
|
const files = readdirSync(projectDir).filter(f => f.endsWith('.jsonl'))
|
|
for (const f of files) {
|
|
try {
|
|
state.fileSizeCache.set(f, statSync(join(projectDir, f)).size)
|
|
} catch {}
|
|
}
|
|
} catch {}
|
|
|
|
// fs.watch with recursive: true — reliable on Windows, macOS, and Linux
|
|
try {
|
|
state.watcher = watch(projectDir, { recursive: true }, (_, filename) => {
|
|
if (!filename) return
|
|
// recursive may report subdirectory paths — extract just the filename
|
|
const base = basename(filename)
|
|
if (!base.endsWith('.jsonl')) return
|
|
emitChange(agent, base, state, broadcast)
|
|
})
|
|
console.log(`[TranscriptDebug] Watching ${agent}: ${projectDir} (recursive)`)
|
|
} catch (e: any) {
|
|
console.error(`[TranscriptDebug] Watch failed for ${agent}: ${e.message}`)
|
|
}
|
|
|
|
agentStates.set(agent, state)
|
|
anyWatched = true
|
|
}
|
|
|
|
if (!anyWatched) {
|
|
console.log('[TranscriptDebug] No agent project directories found, skipping watcher')
|
|
}
|
|
}
|
|
|
|
export function cleanupTranscriptDebugWatcher() {
|
|
for (const [, state] of agentStates) {
|
|
if (state.watcher) {
|
|
state.watcher.close()
|
|
}
|
|
for (const [, timer] of state.debounceTimers) {
|
|
clearTimeout(timer)
|
|
}
|
|
}
|
|
agentStates.clear()
|
|
}
|