From e135868cb7c9bab41b998491b00611705203874f Mon Sep 17 00:00:00 2001 From: josedario87 <71241187+josedario87@users.noreply.github.com> Date: Thu, 5 Jun 2025 10:01:28 -0600 Subject: [PATCH] Update handlers to receive conversation objects --- conversation-layer-agent/src/index.ts | 17 +- whatsapp-router/src/chatHandlers.ts | 4 +- whatsapp-router/src/helloAgent.ts | 4 +- whatsapp-router/src/index.ts | 168 +++----------------- whatsapp-router/src/routes/conversations.ts | 38 +++++ whatsapp-router/src/store/conversation.ts | 67 ++++++++ whatsapp-router/src/types.ts | 8 + whatsapp-router/src/webhook.ts | 159 ++++++++++++++++++ 8 files changed, 308 insertions(+), 157 deletions(-) create mode 100644 whatsapp-router/src/routes/conversations.ts create mode 100644 whatsapp-router/src/store/conversation.ts create mode 100644 whatsapp-router/src/webhook.ts diff --git a/conversation-layer-agent/src/index.ts b/conversation-layer-agent/src/index.ts index f602cdc..b791493 100644 --- a/conversation-layer-agent/src/index.ts +++ b/conversation-layer-agent/src/index.ts @@ -6,6 +6,14 @@ import dotenv from 'dotenv'; dotenv.config(); +interface Conversation { + chatId: string; + messages: { text: string }[]; + createdAt: number; + updatedAt: number; + messageCount: number; +} + const PORT = Number(process.env.PORT) || 8001; const API_KEY = process.env.GEMINI_API_KEY || ''; console.log(`Using Gemini API key: ${API_KEY}`); @@ -34,8 +42,9 @@ const app = express(); app.use(express.json()); app.post('/', async (req, res) => { - const message = req.body?.message as string | undefined; - if (!message) return res.status(400).json({ error: 'Missing message' }); + const conversation = req.body?.conversation as Conversation | undefined; + if (!conversation) return res.status(400).json({ error: 'Missing conversation' }); + const message = conversation.messages[conversation.messages.length - 1]?.text || ''; if (!genAI) { return res.json({ reply: repoInfo }); @@ -66,8 +75,8 @@ app.get('/', (req, res) => { res.send(`

Conversation Layer Agent

This service answers questions about the repository.

-

Send a POST request to / with a JSON body containing {"message": "your question"}

-

Example: {"message": "What is this repository about?"}

+

Send a POST request to / with a JSON body containing {"conversation": {...}}

+

Example: {"conversation": {"chatId": "123@c.us", "messages": [{"text": "hello"}]}}

It will respond with a JSON object containing {"reply": "the answer"}

Repository info: ${repoInfo}

diff --git a/whatsapp-router/src/chatHandlers.ts b/whatsapp-router/src/chatHandlers.ts index 988f9a2..2e6a28d 100644 --- a/whatsapp-router/src/chatHandlers.ts +++ b/whatsapp-router/src/chatHandlers.ts @@ -1,9 +1,9 @@ import { helloWorldAgent } from './helloAgent'; -import { WhatsAppMessage } from './types'; +import { Conversation } from './types'; import dotenv from 'dotenv'; dotenv.config(); -export type Handler = string | ((msg: WhatsAppMessage | string) => Promise); +export type Handler = string | ((conv: Conversation) => Promise); export const chatHandlers: Record = { '50498554225@c.us': process.env.CONVERSATION_AGENT_URL || 'http://conversation-layer-agent:8001', diff --git a/whatsapp-router/src/helloAgent.ts b/whatsapp-router/src/helloAgent.ts index 5d0f2a0..e5774e0 100644 --- a/whatsapp-router/src/helloAgent.ts +++ b/whatsapp-router/src/helloAgent.ts @@ -1,3 +1,5 @@ -export async function helloWorldAgent(): Promise { +import { Conversation } from './types'; + +export async function helloWorldAgent(_conv: Conversation): Promise { return 'hello world'; } diff --git a/whatsapp-router/src/index.ts b/whatsapp-router/src/index.ts index 5810f56..8305672 100644 --- a/whatsapp-router/src/index.ts +++ b/whatsapp-router/src/index.ts @@ -1,10 +1,15 @@ import express from 'express'; -import axios from 'axios'; -import { WhatsAppMessage } from './types'; -import { getHandler, Handler } from './chatHandlers'; import dotenv from 'dotenv'; -dotenv.config(); +import { registerConversationRoutes } from './routes/conversations'; +import { + registerWebhookRoutes, + clearWebhooks, + registerWebhook, + waitForGateway, + WebhookConfig, +} from './webhook'; +dotenv.config(); if (process.env.NODE_ENV === 'development') { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; @@ -17,166 +22,29 @@ if ( throw new Error('NODE_TLS_REJECT_UNAUTHORIZED está activado en producción. Abortando.'); } - const app = express(); +app.use(express.json()); const port = Number(process.env.PORT) || 3001; const agentUrl = process.env.LLM_AGENT_URL as string | undefined; const openWaUrl = process.env.OPEN_WA_URL as string | undefined; -const config = { +const config: WebhookConfig = { API_URL: openWaUrl || '', MAX_ATTEMPTS: parseInt(process.env.MAX_ATTEMPTS || '100', 10), - RETRY_MS: parseInt(process.env.RETRY_MS || '2000', 10) + RETRY_MS: parseInt(process.env.RETRY_MS || '2000', 10), }; -function log(level: keyof Console | 'info' | 'warn' | 'error' | 'debug', ...args: unknown[]) { - const logger = (console as any)[level] as ((...args: unknown[]) => void) | undefined; - if (logger) logger(...args); - else console.log(...args); -} - -async function waitForGateway() { - for (let i = 1; i <= config.MAX_ATTEMPTS; i++) { - try { - await axios.get(`${config.API_URL}/api-docs/`); - log('info', '🟢 nucleo-whatsapp ready'); - return; - } catch(e) { - log('warn', `Gateway not responding`,' connecting to: ', `${config.API_URL}/api-docs/`, ` (attempt ${i}/${config.MAX_ATTEMPTS})…`, e); - await new Promise(r => setTimeout(r, config.RETRY_MS)); - } - } - throw new Error('nucleo-whatsapp did not respond in time'); -} - -async function clearWebhooks() { - try { - const { data } = await axios.post(`${config.API_URL}/listWebhooks`); - const hooks = data?.response || []; - if (!hooks.length) { - log('info', 'No existing webhooks to remove'); - return; - } - - log('info', `Removing ${hooks.length} webhooks…`); - const results = await Promise.allSettled( - hooks.map((h: any) => axios.post(`${config.API_URL}/removeWebhook`, { args: { webhookId: h.id } })) - ); - - results.forEach((r: PromiseSettledResult, i: number) => { - const id = hooks[i].id; - if (r.status === 'fulfilled' && r.value?.data?.response === true) { - log('debug', `✔️ Removed webhook ${id}`); - } else { - log('warn', `⚠️ Failed to remove webhook ${id}`); - } - }); - - const ok = results.filter((r: PromiseSettledResult) => r.status === 'fulfilled' && (r as PromiseFulfilledResult).value?.data?.response === true).length; - log('info', `Cleanup OK (${ok}/${hooks.length} removed)`); - } catch (e: any) { - log('error', 'Failed cleaning webhooks:', e.response?.data || e.message); - } -} - -async function registerWebhook() { - const url = process.env.WEBHOOK_URL || `http://whatsapp-router:${port}/webhook`; - const eventConfig = { - onAck: false, - onAddedToGroup: true, - onAnyMessage: false, - onBattery: true, - onBroadcast: true, - onButton: true, - onCallState: false, - onChatDeleted: true, - onChatOpened: true, - onChatState: true, - onContactAdded: true, - onGlobalParticipantsChanged: true, - onGroupApprovalRequest: true, - onGroupChange: true, - onIncomingCall: false, - onLabel: true, - onLogout: true, - onMessage: true, - onMessageDeleted: true, - onNewProduct: true, - onOrder: true, - onPlugged: false, - onPollVote: true, - onReaction: true, - onRemovedFromGroup: false, - onStateChanged: false, - onStory: false, - }; - - const events = Object.entries(eventConfig) - .filter(([_, enabled]) => enabled) - .map(([event]) => event); - - const { data } = await axios.post(`${config.API_URL}/registerWebhook`, { - args: { url, events } - }); - - log('info', '✔️ Webhook registered:', data); -} - -app.use(express.json()); - -app.post('/webhook', async (req: express.Request, res: express.Response) => { - let message: WhatsAppMessage | undefined; - let text: string | undefined; - let from: string | undefined; - - try { - if (req.body && req.body.data) { - // New webhook format from nucleo-whatsapp - message = req.body.data as WhatsAppMessage; - text = (req.body.data.body as string) || req.body.data.text; - from = req.body.data.from; - } else { - throw new Error('Invalid webhook format'); - } - }catch{} - - - if (message) { - const tipo = typeof message === 'string' ? 'texto' : 'objeto'; - const origen = from || (message?.chatId ?? 'desconocido'); - log('info', `📩 Mensaje recibido (${message?.text}) de ${origen}`); - } - - try { - if (!message) return res.sendStatus(200); - if (!openWaUrl) throw new Error('Service URLs not configured'); - const chatId = (message && message.chatId) || from; - const handler = getHandler(chatId, agentUrl); - if (!handler) throw new Error('No handler configured'); - let reply: string; - if (typeof handler === 'string') { - const agentRes = await axios.post(handler, {message: message.text}); - reply = agentRes.data.reply || agentRes.data; - } else { - reply = await handler(message); - } - await axios.post(`${openWaUrl}/sendText`, { args: { to: from, content: reply } }); - } catch (err: any) { - console.error('Error processing message', err.message); - } - - res.sendStatus(200); -}); - +registerConversationRoutes(app, openWaUrl); +registerWebhookRoutes(app, config, openWaUrl, agentUrl); app.listen(port, async () => { console.log(`WhatsApp router listening on ${port}`); try { - await waitForGateway(); - await clearWebhooks(); - await registerWebhook(); + await waitForGateway(config); + await clearWebhooks(config); + await registerWebhook(config, port); } catch (err: any) { - log('error', 'Webhook setup failed:', err.message); + console.error('Webhook setup failed:', err.message); } }); diff --git a/whatsapp-router/src/routes/conversations.ts b/whatsapp-router/src/routes/conversations.ts new file mode 100644 index 0000000..143fe7e --- /dev/null +++ b/whatsapp-router/src/routes/conversations.ts @@ -0,0 +1,38 @@ +import { Application } from 'express'; +import { + deleteConversation, + getConversation, + listConversations, + buildConversation, +} from '../store/conversation'; + +export function registerConversationRoutes(app: Application, openWaUrl: string | undefined) { + app.get('/conversations', (req, res) => { + res.json({ conversations: listConversations() }); + }); + + app.get('/conversations/:id', async (req, res) => { + if (!openWaUrl) return res.status(500).json({ error: 'Service URLs not configured' }); + try { + const conv = await getConversation(req.params.id, openWaUrl); + res.json(conv); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } + }); + + app.post('/conversations/:id/update', async (req, res) => { + if (!openWaUrl) return res.status(500).json({ error: 'Service URLs not configured' }); + try { + const conv = await buildConversation(req.params.id, openWaUrl); + res.json(conv); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } + }); + + app.delete('/conversations/:id', (req, res) => { + const deleted = deleteConversation(req.params.id); + res.json({ success: deleted }); + }); +} diff --git a/whatsapp-router/src/store/conversation.ts b/whatsapp-router/src/store/conversation.ts new file mode 100644 index 0000000..a93cdd9 --- /dev/null +++ b/whatsapp-router/src/store/conversation.ts @@ -0,0 +1,67 @@ +import axios from 'axios'; +import { WhatsAppMessage, Conversation } from '../types'; + +const conversations = new Map(); + +async function loadMessages( + chatId: string, + openWaUrl: string +): Promise { + const { data } = await axios.post(`${openWaUrl}/loadAndGetAllMessagesInChat`, { + args: { + chatId, + includeMe: true, + includeNotifications: true, + }, + }); + const msgs: WhatsAppMessage[] = data?.response || data || []; + return msgs; +} + +export async function getConversation( + chatId: string, + openWaUrl: string +): Promise { + let conv = conversations.get(chatId); + if (!conv) { + conv = await buildConversation(chatId, openWaUrl); + } + return conv; +} + +export function listConversations(): Conversation[] { + return Array.from(conversations.values()); +} + +export async function buildConversation( + chatId: string, + openWaUrl: string +): Promise { + const messages = await loadMessages(chatId, openWaUrl); + const now = Date.now(); + const conv: Conversation = { + chatId, + messages, + createdAt: conversations.get(chatId)?.createdAt || now, + updatedAt: now, + messageCount: messages.length, + }; + conversations.set(chatId, conv); + return conv; +} + +export function deleteConversation(chatId: string): boolean { + return conversations.delete(chatId); +} + +export async function addMessageToConversation( + chatId: string, + msg: WhatsAppMessage, + openWaUrl: string +): Promise { + const conv = await getConversation(chatId, openWaUrl); + conv.messages.push(msg); + conv.messageCount = conv.messages.length; + conv.updatedAt = Date.now(); + return conv; +} diff --git a/whatsapp-router/src/types.ts b/whatsapp-router/src/types.ts index 8c76c3d..5152240 100644 --- a/whatsapp-router/src/types.ts +++ b/whatsapp-router/src/types.ts @@ -192,3 +192,11 @@ export interface WhatsAppMessage { mediaData: Record; text: string; } + +export interface Conversation { + chatId: string; + messages: WhatsAppMessage[]; + createdAt: number; + updatedAt: number; + messageCount: number; +} diff --git a/whatsapp-router/src/webhook.ts b/whatsapp-router/src/webhook.ts new file mode 100644 index 0000000..1a1e6c2 --- /dev/null +++ b/whatsapp-router/src/webhook.ts @@ -0,0 +1,159 @@ +import express, { Application } from 'express'; +import axios from 'axios'; +import { getHandler } from './chatHandlers'; +import { addMessageToConversation } from './store/conversation'; +import { WhatsAppMessage, Conversation } from './types'; + +export interface WebhookConfig { + API_URL: string; + MAX_ATTEMPTS: number; + RETRY_MS: number; +} + +export function registerWebhookRoutes( + app: Application, + config: WebhookConfig, + openWaUrl: string | undefined, + agentUrl: string | undefined +) { + app.post('/webhook', async (req: express.Request, res: express.Response) => { + let message: WhatsAppMessage | undefined; + let from: string | undefined; + + try { + if (req.body && req.body.data) { + message = req.body.data as WhatsAppMessage; + from = req.body.data.from; + } else { + throw new Error('Invalid webhook format'); + } + } catch {} + + if (message) { + const origen = from || message.chatId || 'desconocido'; + console.log(`📩 Mensaje recibido (${message.text}) de ${origen}`); + } + + try { + if (!message) return res.sendStatus(200); + if (!openWaUrl) throw new Error('Service URLs not configured'); + const chatId = message.chatId || from; + let conv: Conversation | undefined; + if (chatId) { + try { + conv = await addMessageToConversation(chatId, message, openWaUrl); + } catch (err: any) { + console.warn('Failed updating conversation:', err.message); + } + } + if (!conv) throw new Error('Conversation unavailable'); + const handler = getHandler(chatId, agentUrl); + if (!handler) throw new Error('No handler configured'); + let reply: string; + if (typeof handler === 'string') { + const agentRes = await axios.post(handler, { conversation: conv }); + reply = agentRes.data.reply || agentRes.data; + } else { + reply = await handler(conv); + } + await axios.post(`${openWaUrl}/sendText`, { args: { to: from, content: reply } }); + } catch (err: any) { + console.error('Error processing message', err.message); + } + + res.sendStatus(200); + }); +} + +export async function waitForGateway(config: WebhookConfig) { + for (let i = 1; i <= config.MAX_ATTEMPTS; i++) { + try { + await axios.get(`${config.API_URL}/api-docs/`); + console.log('🟢 nucleo-whatsapp ready'); + return; + } catch (e) { + console.warn( + 'Gateway not responding', + `connecting to: ${config.API_URL}/api-docs/ (attempt ${i}/${config.MAX_ATTEMPTS})…`, + e + ); + await new Promise((r) => setTimeout(r, config.RETRY_MS)); + } + } + throw new Error('nucleo-whatsapp did not respond in time'); +} + +export async function clearWebhooks(config: WebhookConfig) { + try { + const { data } = await axios.post(`${config.API_URL}/listWebhooks`); + const hooks = data?.response || []; + if (!hooks.length) { + console.log('No existing webhooks to remove'); + return; + } + + console.log(`Removing ${hooks.length} webhooks…`); + const results = await Promise.allSettled( + hooks.map((h: any) => axios.post(`${config.API_URL}/removeWebhook`, { args: { webhookId: h.id } })) + ); + + results.forEach((r: PromiseSettledResult, i: number) => { + const id = hooks[i].id; + if (r.status === 'fulfilled' && (r as PromiseFulfilledResult).value?.data?.response === true) { + console.log(`✔️ Removed webhook ${id}`); + } else { + console.warn(`⚠️ Failed to remove webhook ${id}`); + } + }); + + const ok = results.filter( + (r: PromiseSettledResult) => r.status === 'fulfilled' && (r as PromiseFulfilledResult).value?.data?.response === true + ).length; + console.log(`Cleanup OK (${ok}/${hooks.length} removed)`); + } catch (e: any) { + console.error('Failed cleaning webhooks:', e.response?.data || e.message); + } +} + +export async function registerWebhook(config: WebhookConfig, port: number) { + const url = process.env.WEBHOOK_URL || `http://whatsapp-router:${port}/webhook`; + const eventConfig = { + onAck: false, + onAddedToGroup: true, + onAnyMessage: false, + onBattery: true, + onBroadcast: true, + onButton: true, + onCallState: false, + onChatDeleted: true, + onChatOpened: true, + onChatState: true, + onContactAdded: true, + onGlobalParticipantsChanged: true, + onGroupApprovalRequest: true, + onGroupChange: true, + onIncomingCall: false, + onLabel: true, + onLogout: true, + onMessage: true, + onMessageDeleted: true, + onNewProduct: true, + onOrder: true, + onPlugged: false, + onPollVote: true, + onReaction: true, + onRemovedFromGroup: false, + onStateChanged: false, + onStory: false, + } as const; + + const events = Object.entries(eventConfig) + .filter(([_, enabled]) => enabled) + .map(([event]) => event); + + const { data } = await axios.post(`${config.API_URL}/registerWebhook`, { + args: { url, events }, + }); + + console.log('✔️ Webhook registered:', data); +}