Merge pull request #21 from josedario87/codex/add-sse-connection-to-whatsapp-router

Refactor SSE log streaming
This commit is contained in:
josedario87
2025-06-11 23:10:45 -06:00
committed by GitHub
7 changed files with 381 additions and 2 deletions

View File

@@ -12,6 +12,7 @@
"@google/genai": "^1.4.0", "@google/genai": "^1.4.0",
"@open-wa/wa-automate": "^4.76.0", "@open-wa/wa-automate": "^4.76.0",
"axios": "^1.5.0", "axios": "^1.5.0",
"cors": "^2.8.5",
"dotenv": "^16.5.0", "dotenv": "^16.5.0",
"express": "^4.18.2", "express": "^4.18.2",
"ffmpeg-static": "^5.2.0", "ffmpeg-static": "^5.2.0",

View File

@@ -12,6 +12,7 @@
"@google/genai": "^1.4.0", "@google/genai": "^1.4.0",
"@open-wa/wa-automate": "^4.76.0", "@open-wa/wa-automate": "^4.76.0",
"axios": "^1.5.0", "axios": "^1.5.0",
"cors": "^2.8.5",
"dotenv": "^16.5.0", "dotenv": "^16.5.0",
"express": "^4.18.2", "express": "^4.18.2",
"ffmpeg-static": "^5.2.0", "ffmpeg-static": "^5.2.0",

View File

@@ -7,7 +7,11 @@ export type Handler = string | ((conv: Conversation) => Promise<string>);
export const chatHandlers: Record<string, Handler> = { export const chatHandlers: Record<string, Handler> = {
'50498554225@c.us': process.env.CONVERSATION_AGENT_URL || 'http://conversation-layer-agent:8001', '50498554225@c.us': process.env.CONVERSATION_AGENT_URL || 'http://conversation-layer-agent:8001',
'120363401804322608@g.us' : process.env.PLANILLA_AGENT_URL ||'http://planilla-agent:8012' '120363401804322608@g.us' : process.env.PLANILLA_AGENT_URL ||'http://planilla-agent:8012',
'planilla-UI' : process.env.PLANILLA_AGENT_URL ||'http://planilla-agent:8012'
//map any conversation that follow this pattern, planilla
// Add other mappings like: // Add other mappings like:
// '50496210031@c.us': 'http://llm-agent:8000' // '50496210031@c.us': 'http://llm-agent:8000'
}; };

View File

@@ -1,7 +1,10 @@
import express from 'express'; import express from 'express';
import dotenv from 'dotenv'; import dotenv from 'dotenv';
import { registerConversationRoutes } from './routes/conversationActions'; import { registerConversationRoutes } from './routes/conversationActions';
import whatsappActionsRouter from './routes/whatsappActions'; // New import import { registerChatUIRoutes } from './routes/chatUI_Actions';
import whatsappActionsRouter from './routes/whatsappActions';
import { registerLogSse } from './sse/logSse';
import cors from 'cors';
import { import {
registerWebhookRoutes, registerWebhookRoutes,
clearWebhooks, clearWebhooks,
@@ -37,8 +40,23 @@ if (
} }
const app = express(); const app = express();
app.use(cors({
origin: 'http://localhost:5173',
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
credentials: true, // si usás cookies o headers personalizados
}));
app.use(express.json()); app.use(express.json());
registerLogSse(app);
const port = Number(process.env.PORT) || 3001; const port = Number(process.env.PORT) || 3001;
const agentUrl = process.env.LLM_AGENT_URL as string | undefined; const agentUrl = process.env.LLM_AGENT_URL as string | undefined;
const openWaUrl = process.env.OPEN_WA_URL as string | undefined; const openWaUrl = process.env.OPEN_WA_URL as string | undefined;
@@ -51,6 +69,7 @@ const config: WebhookConfig = {
registerConversationRoutes(app, openWaUrl); registerConversationRoutes(app, openWaUrl);
registerWebhookRoutes(app, config, openWaUrl, agentUrl); registerWebhookRoutes(app, config, openWaUrl, agentUrl);
registerChatUIRoutes(app, openWaUrl);
app.use('/whatsapp', whatsappActionsRouter); // New line app.use('/whatsapp', whatsappActionsRouter); // New line
// Register new whatsappActions routes // Register new whatsappActions routes

View File

@@ -0,0 +1,304 @@
import { Application } from 'express';
import express from 'express';
import { WhatsAppMessage, Conversation, Msg } from '../types';
import { addMessageToConversation } from '../store/conversation';
import { mapWhatsAppMessageToMsg } from '../messageProcessor';
import { getHandler } from '../chatHandlers';
import axios from 'axios';
interface UIMessage {
chatId: string;
id: string;
from: string;
to: string;
ts: number;
type: 'chat';
text: string;
mediaUrl: string | null;
mentions: string[] | null;
meta: {
ack: number;
hasReaction: boolean;
isQuoted: boolean;
};
}
const conversations = new Map<string, Conversation>();
export const processIncomingMessageForConversation = (
conversation: Conversation,
message: WhatsAppMessage
): void => {
const mappedMsg = mapWhatsAppMessageToMsg(message);
// Avoid duplicates if multiple webhook events deliver the same message
if (!conversation.messages.some((m) => m.id === mappedMsg.id)) {
conversation.messages.push(mappedMsg);
// if (conversation.messages.length > 20) {
// conversation.messages.shift(); // Keep only the last 20 messages
// }
}
// Add new participants if necessary
const sender = message.sender;
if (sender && !conversation.participants.some((p) => p.id === sender.id)) {
conversation.participants.push({
id: sender.id,
name: sender.pushname || sender.name || '',
isMe: sender.isMe,
// isAdmin property is not available here, it's part of group metadata
});
}
// The conversation object is modified directly
};
export function registerChatUIRoutes(app: Application, openWaUrl: string | undefined) {
app.post('/chatUI/sendMessage', async (req: express.Request, res: express.Response) => {
console.log(`[routes] POST /chatUI/sendMessage`, 'v2.0');
try {
if (!req.body || !req.body.data) {
throw new Error('Invalid request format');
}
const uiMessage: UIMessage = req.body.data;
if (!uiMessage.text || !uiMessage.chatId) {
throw new Error('Missing required fields: text and chatId');
}
// convertimos el mensaje de la UI a un mensaje de whatsapp
const message: WhatsAppMessage = {
id: uiMessage.id,
body: uiMessage.text,
text: uiMessage.text,
type: uiMessage.type,
from: uiMessage.from,
to: uiMessage.to,
chatId: uiMessage.chatId,
timestamp: uiMessage.ts * 1000, // Convertir a milisegundos
fromMe: true,
viewed: false,
t: uiMessage.ts * 1000,
notifyName: 'User',
author: null,
invis: false,
isNewMsg: true,
star: false,
kicNotified: false,
recvFresh: true,
isFromTemplate: false,
pollInvalidated: false,
isSentCagPollCreation: false,
latestEditMsgKey: null,
latestEditSenderTimestampMs: null,
mentionedJidList: uiMessage.mentions || [],
groupMentions: [],
isEventCanceled: false,
eventInvalidated: false,
isVcardOverMmsDocument: false,
labels: [],
hasReaction: uiMessage.meta.hasReaction,
ephemeralDuration: 0,
ephemeralSettingTimestamp: 0,
disappearingModeInitiator: '',
disappearingModeTrigger: '',
viewMode: '',
productHeaderImageRejected: false,
lastPlaybackProgress: 0,
isDynamicReplyButtonsMsg: false,
isCarouselCard: false,
parentMsgId: null,
callSilenceReason: null,
isVideoCall: false,
callDuration: null,
callParticipants: null,
isMdHistoryMsg: false,
stickerSentTs: 0,
isAvatar: false,
lastUpdateFromServerTs: 0,
invokedBotWid: null,
bizBotType: null,
botResponseTargetId: null,
botPluginType: null,
botPluginReferenceIndex: null,
botPluginSearchProvider: null,
botPluginSearchUrl: null,
botPluginSearchQuery: null,
botPluginMaybeParent: false,
botReelPluginThumbnailCdnUrl: null,
botMessageDisclaimerText: null,
botMsgBodyType: null,
reportingTokenInfo: null,
requiresDirectConnection: false,
bizContentPlaceholderType: null,
hostedBizEncStateMismatch: false,
senderOrRecipientAccountTypeHosted: false,
placeholderCreatedWhenAccountIsHosted: false,
device: 0,
local: true,
mId: uiMessage.id,
senderId: uiMessage.from,
content: uiMessage.text,
isGroupMsg: false,
isQuotedMsgAvailable: uiMessage.meta.isQuoted,
isMedia: !!uiMessage.mediaUrl,
mediaData: uiMessage.mediaUrl ? { url: uiMessage.mediaUrl } : {},
isOnline: false,
sender: {
id: uiMessage.from,
name: 'User',
shortName: 'User',
pushname: 'User',
type: 'user',
isBusiness: false,
isEnterprise: false,
isSmb: false,
isContactSyncCompleted: 1,
disappearingModeDuration: 0,
disappearingModeSettingTimestamp: 0,
textStatusLastUpdateTime: 0,
syncToAddressbook: false,
formattedName: 'User',
isMe: true,
isMyContact: false,
isPSA: false,
isUser: true,
isVerified: false,
isWAContact: true,
msgs: []
},
chat: {
id: uiMessage.chatId,
name: 'Chat',
isGroup: false,
participantsCount: 2,
formattedTitle: 'Chat',
pendingMsgs: false,
t: uiMessage.ts * 1000,
unreadCount: 0,
unreadDividerOffset: 0,
archive: false,
isReadOnly: false,
isLocked: false,
muteExpiration: 0,
isAutoMuted: false,
notSpam: true,
pin: 0,
ephemeralDuration: 0,
ephemeralSettingTimestamp: 0,
disappearingModeInitiator: '',
disappearingModeTrigger: '',
createdLocally: false,
unreadMentionsOfMe: [],
unreadMentionCount: 0,
hasUnreadMention: false,
archiveAtMentionViewedInDrawer: false,
hasChatBeenOpened: true,
tcToken: {},
tcTokenTimestamp: 0,
tcTokenSenderTimestamp: 0,
endOfHistoryTransferType: 0,
pendingInitialLoading: false,
unreadEditTimestampMs: 0,
celebrationAnimationLastPlayed: 0,
hasRequestedWelcomeMsg: false,
canSend: true,
groupMetadata: {},
isOnline: false,
contact: {
id: uiMessage.chatId,
name: 'User',
shortName: 'User',
pushname: 'User',
type: 'user',
isBusiness: false,
isEnterprise: false,
isSmb: false,
isContactSyncCompleted: 1,
disappearingModeDuration: 0,
disappearingModeSettingTimestamp: 0,
textStatusLastUpdateTime: 0,
syncToAddressbook: false,
formattedName: 'User',
isMe: true,
isMyContact: false,
isPSA: false,
isUser: true,
isVerified: false,
isWAContact: true,
msgs: []
},
msgs: []
}
};
// nosotros mismos tenemos que buscar la conversacion in la memoria, conversations. si no existe, crearla
let conversation = conversations.get(uiMessage.chatId);
if (!conversation) {
conversation = {
chatId: uiMessage.chatId,
title: 'Chat',
isGroup: false,
unreadCount: 0,
participants: [],
messages: [],
createdAt: Date.now()
}
conversations.set(uiMessage.chatId, conversation);
}
processIncomingMessageForConversation(conversation, message);
const handler = getHandler(uiMessage.chatId);
if (!handler) throw new Error('No handler configured');
let reply: string;
if (typeof handler === 'string') {
console.log(`🔗 Calling agent at ${handler} for conversation ${uiMessage.chatId}\n`);
const agentRes = await axios.post(handler, { conversation });
reply = agentRes.data.reply || agentRes.data;
} else {
reply = await handler(conversation);
}
const replyMsg: Msg = {
id: uiMessage.id,
from: uiMessage.to,
to: uiMessage.from,
ts: uiMessage.ts,
type: 'chat',
text: reply,
mediaUrl: undefined,
mentions: [],
meta: {
ack: 0,
hasReaction: false,
isQuoted: false
}
};
conversation.messages.push(replyMsg);
console.log('conversation', conversation);
// Devolver la conversación completa
/* it needs to be able to be parsed by the UI
const response = await axios.post(routerUrl, {data:messagePayload});
console.log('Message sent, raw response:', response.data); // Log raw response
const conversation = response.data;
*/
res.json(conversation);
} catch (err: any) {
console.error('Error processing UI message:', err.message);
res.status(400).json({ error: err.message });
}
});
}

View File

@@ -43,4 +43,5 @@ export function registerConversationRoutes(app: Application, openWaUrl: string |
console.log(`Conversation ${req.params.id} deleted: ${deleted}`); console.log(`Conversation ${req.params.id} deleted: ${deleted}`);
res.json({ success: deleted }); res.json({ success: deleted });
}); });
} }

View File

@@ -0,0 +1,49 @@
import { Express, Response } from 'express';
export function registerLogSse(app: Express) {
const clients: Response[] = [];
app.get('/logs/sse', (req, res) => {
res.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
res.flushHeaders();
res.write('event: connected\ndata: {}\n\n');
const keepAlive = setInterval(() => {
res.write(':\n\n');
}, 15000);
clients.push(res);
console.log('🟢 SSE log client connected (%d)', clients.length);
req.on('close', () => {
clearInterval(keepAlive);
clients.splice(clients.indexOf(res), 1);
console.log('🔌 SSE log client disconnected (%d)', clients.length);
});
});
const broadcast = (data: string) => {
const payload = `data: ${data}\n\n`;
clients.forEach((c) => c.write(payload));
};
const originalLog = console.log;
const originalError = console.error;
console.log = (...args: any[]) => {
originalLog(...args);
broadcast(args.join(' '));
};
console.error = (...args: any[]) => {
originalError(...args);
broadcast(args.join(' '));
};
}