Merge branch 'main' of https://github.com/josedario87/conversation-layer
All checks were successful
Deploy conversation layer / deploy (push) Successful in 22s

This commit is contained in:
2025-06-05 14:05:59 -06:00
8 changed files with 320 additions and 157 deletions

View File

@@ -6,6 +6,14 @@ import dotenv from 'dotenv';
dotenv.config(); dotenv.config();
interface Conversation {
chatId: string;
messages: { text: string }[];
createdAt: number;
updatedAt: number;
messageCount: number;
}
const PORT = Number(process.env.PORT) || 8001; const PORT = Number(process.env.PORT) || 8001;
const API_KEY = process.env.GEMINI_API_KEY || ''; const API_KEY = process.env.GEMINI_API_KEY || '';
console.log(`Using Gemini API key: ${API_KEY}`); console.log(`Using Gemini API key: ${API_KEY}`);
@@ -34,8 +42,9 @@ const app = express();
app.use(express.json()); app.use(express.json());
app.post('/', async (req, res) => { app.post('/', async (req, res) => {
const message = req.body?.message as string | undefined; const conversation = req.body?.conversation as Conversation | undefined;
if (!message) return res.status(400).json({ error: 'Missing message' }); if (!conversation) return res.status(400).json({ error: 'Missing conversation' });
const message = conversation.messages[conversation.messages.length - 1]?.text || '';
if (!genAI) { if (!genAI) {
return res.json({ reply: repoInfo }); return res.json({ reply: repoInfo });
@@ -67,8 +76,8 @@ app.get('/', (req, res) => {
res.send(` res.send(`
<h1>Conversation Layer Agent</h1> <h1>Conversation Layer Agent</h1>
<p>This service answers questions about the repository.</p> <p>This service answers questions about the repository.</p>
<p>Send a POST request to / with a JSON body containing {"message": "your question"}</p> <p>Send a POST request to / with a JSON body containing {"conversation": {...}}</p>
<p>Example: {"message": "What is this repository about?"}</p> <p>Example: {"conversation": {"chatId": "123@c.us", "messages": [{"text": "hello"}]}}</p>
<p>It will respond with a JSON object containing {"reply": "the answer"}</p> <p>It will respond with a JSON object containing {"reply": "the answer"}</p>
<p>Repository info: ${repoInfo}</p> <p>Repository info: ${repoInfo}</p>

View File

@@ -1,9 +1,9 @@
import { helloWorldAgent } from './helloAgent'; import { helloWorldAgent } from './helloAgent';
import { WhatsAppMessage } from './types'; import { Conversation } from './types';
import dotenv from 'dotenv'; import dotenv from 'dotenv';
dotenv.config(); dotenv.config();
export type Handler = string | ((msg: WhatsAppMessage | string) => Promise<string>); export type Handler = string | ((conv: Conversation) => Promise<string>);
export const chatHandlers: Record<string, Handler> = { export const chatHandlers: Record<string, Handler> = {
'50498554225@c.us': process.env.CONVERSATION_AGENT_URL || 'http://conversation-layer-agent:8001', '50498554225@c.us': process.env.CONVERSATION_AGENT_URL || 'http://conversation-layer-agent:8001',

View File

@@ -1,3 +1,5 @@
export async function helloWorldAgent(): Promise<string> { import { Conversation } from './types';
export async function helloWorldAgent(_conv: Conversation): Promise<string> {
return 'hello world'; return 'hello world';
} }

View File

@@ -1,10 +1,15 @@
import express from 'express'; import express from 'express';
import axios from 'axios';
import { WhatsAppMessage } from './types';
import { getHandler, Handler } from './chatHandlers';
import dotenv from 'dotenv'; import dotenv from 'dotenv';
dotenv.config(); import { registerConversationRoutes } from './routes/conversations';
import {
registerWebhookRoutes,
clearWebhooks,
registerWebhook,
waitForGateway,
WebhookConfig,
} from './webhook';
dotenv.config();
if (process.env.NODE_ENV === 'development') { if (process.env.NODE_ENV === 'development') {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
@@ -17,166 +22,29 @@ if (
throw new Error('NODE_TLS_REJECT_UNAUTHORIZED está activado en producción. Abortando.'); throw new Error('NODE_TLS_REJECT_UNAUTHORIZED está activado en producción. Abortando.');
} }
const app = express(); const app = express();
app.use(express.json());
const port = Number(process.env.PORT) || 3001; const port = Number(process.env.PORT) || 3001;
const agentUrl = process.env.LLM_AGENT_URL as string | undefined; const agentUrl = process.env.LLM_AGENT_URL as string | undefined;
const openWaUrl = process.env.OPEN_WA_URL as string | undefined; const openWaUrl = process.env.OPEN_WA_URL as string | undefined;
const config = { const config: WebhookConfig = {
API_URL: openWaUrl || '', API_URL: openWaUrl || '',
MAX_ATTEMPTS: parseInt(process.env.MAX_ATTEMPTS || '100', 10), MAX_ATTEMPTS: parseInt(process.env.MAX_ATTEMPTS || '100', 10),
RETRY_MS: parseInt(process.env.RETRY_MS || '2000', 10) RETRY_MS: parseInt(process.env.RETRY_MS || '2000', 10),
}; };
function log(level: keyof Console | 'info' | 'warn' | 'error' | 'debug', ...args: unknown[]) { registerConversationRoutes(app, openWaUrl);
const logger = (console as any)[level] as ((...args: unknown[]) => void) | undefined; registerWebhookRoutes(app, config, openWaUrl, agentUrl);
if (logger) logger(...args);
else console.log(...args);
}
async function waitForGateway() {
for (let i = 1; i <= config.MAX_ATTEMPTS; i++) {
try {
await axios.get(`${config.API_URL}/api-docs/`);
log('info', '🟢 nucleo-whatsapp ready');
return;
} catch(e) {
log('warn', `Gateway not responding`,' connecting to: ', `${config.API_URL}/api-docs/`, ` (attempt ${i}/${config.MAX_ATTEMPTS})…`, e);
await new Promise(r => setTimeout(r, config.RETRY_MS));
}
}
throw new Error('nucleo-whatsapp did not respond in time');
}
async function clearWebhooks() {
try {
const { data } = await axios.post(`${config.API_URL}/listWebhooks`);
const hooks = data?.response || [];
if (!hooks.length) {
log('info', 'No existing webhooks to remove');
return;
}
log('info', `Removing ${hooks.length} webhooks…`);
const results = await Promise.allSettled(
hooks.map((h: any) => axios.post(`${config.API_URL}/removeWebhook`, { args: { webhookId: h.id } }))
);
results.forEach((r: PromiseSettledResult<any>, i: number) => {
const id = hooks[i].id;
if (r.status === 'fulfilled' && r.value?.data?.response === true) {
log('debug', `✔️ Removed webhook ${id}`);
} else {
log('warn', `⚠️ Failed to remove webhook ${id}`);
}
});
const ok = results.filter((r: PromiseSettledResult<any>) => r.status === 'fulfilled' && (r as PromiseFulfilledResult<any>).value?.data?.response === true).length;
log('info', `Cleanup OK (${ok}/${hooks.length} removed)`);
} catch (e: any) {
log('error', 'Failed cleaning webhooks:', e.response?.data || e.message);
}
}
async function registerWebhook() {
const url = process.env.WEBHOOK_URL || `http://whatsapp-router:${port}/webhook`;
const eventConfig = {
onAck: false,
onAddedToGroup: true,
onAnyMessage: false,
onBattery: true,
onBroadcast: true,
onButton: true,
onCallState: false,
onChatDeleted: true,
onChatOpened: true,
onChatState: true,
onContactAdded: true,
onGlobalParticipantsChanged: true,
onGroupApprovalRequest: true,
onGroupChange: true,
onIncomingCall: false,
onLabel: true,
onLogout: true,
onMessage: true,
onMessageDeleted: true,
onNewProduct: true,
onOrder: true,
onPlugged: false,
onPollVote: true,
onReaction: true,
onRemovedFromGroup: false,
onStateChanged: false,
onStory: false,
};
const events = Object.entries(eventConfig)
.filter(([_, enabled]) => enabled)
.map(([event]) => event);
const { data } = await axios.post(`${config.API_URL}/registerWebhook`, {
args: { url, events }
});
log('info', '✔️ Webhook registered:', data);
}
app.use(express.json());
app.post('/webhook', async (req: express.Request, res: express.Response) => {
let message: WhatsAppMessage | undefined;
let text: string | undefined;
let from: string | undefined;
try {
if (req.body && req.body.data) {
// New webhook format from nucleo-whatsapp
message = req.body.data as WhatsAppMessage;
text = (req.body.data.body as string) || req.body.data.text;
from = req.body.data.from;
} else {
throw new Error('Invalid webhook format');
}
}catch{}
if (message) {
const tipo = typeof message === 'string' ? 'texto' : 'objeto';
const origen = from || (message?.chatId ?? 'desconocido');
log('info', `📩 Mensaje recibido (${message?.text}) de ${origen}`);
}
try {
if (!message) return res.sendStatus(200);
if (!openWaUrl) throw new Error('Service URLs not configured');
const chatId = (message && message.chatId) || from;
const handler = getHandler(chatId, agentUrl);
if (!handler) throw new Error('No handler configured');
let reply: string;
if (typeof handler === 'string') {
const agentRes = await axios.post(handler, {message: message.text});
reply = agentRes.data.reply || agentRes.data;
} else {
reply = await handler(message);
}
await axios.post(`${openWaUrl}/sendText`, { args: { to: from, content: reply } });
} catch (err: any) {
console.error('Error processing message', err.message);
}
res.sendStatus(200);
});
app.listen(port, async () => { app.listen(port, async () => {
console.log(`WhatsApp router listening on ${port}`); console.log(`WhatsApp router listening on ${port}`);
try { try {
await waitForGateway(); await waitForGateway(config);
await clearWebhooks(); await clearWebhooks(config);
await registerWebhook(); await registerWebhook(config, port);
} catch (err: any) { } catch (err: any) {
log('error', 'Webhook setup failed:', err.message); console.error('Webhook setup failed:', err.message);
} }
}); });

View File

@@ -0,0 +1,44 @@
import { Application } from 'express';
import {
deleteConversation,
getConversation,
listConversations,
buildConversation,
} from '../store/conversation';
export function registerConversationRoutes(app: Application, openWaUrl: string | undefined) {
app.get('/conversations', (req, res) => {
console.log('[routes] GET /conversations');
res.json({ conversations: listConversations() });
});
app.get('/conversations/:id', async (req, res) => {
console.log(`[routes] GET /conversations/${req.params.id}`);
if (!openWaUrl) return res.status(500).json({ error: 'Service URLs not configured' });
try {
const conv = await getConversation(req.params.id, openWaUrl);
res.json(conv);
} catch (err: any) {
console.error('Failed to get conversation:', err.message);
res.status(500).json({ error: err.message });
}
});
app.post('/conversations/:id/update', async (req, res) => {
console.log(`[routes] POST /conversations/${req.params.id}/update`);
if (!openWaUrl) return res.status(500).json({ error: 'Service URLs not configured' });
try {
const conv = await buildConversation(req.params.id, openWaUrl);
res.json(conv);
} catch (err: any) {
console.error('Failed to update conversation:', err.message);
res.status(500).json({ error: err.message });
}
});
app.delete('/conversations/:id', (req, res) => {
console.log(`[routes] DELETE /conversations/${req.params.id}`);
const deleted = deleteConversation(req.params.id);
res.json({ success: deleted });
});
}

View File

@@ -0,0 +1,73 @@
import axios from 'axios';
import { WhatsAppMessage, Conversation } from '../types';
const conversations = new Map<string, Conversation>();
async function loadMessages(
chatId: string,
openWaUrl: string
): Promise<WhatsAppMessage[]> {
console.log(`[conversationStore] Loading messages for ${chatId}`);
const { data } = await axios.post(`${openWaUrl}/loadAndGetAllMessagesInChat`, {
args: {
chatId,
includeMe: true,
includeNotifications: true,
},
});
const msgs: WhatsAppMessage[] = data?.response || data || [];
return msgs;
}
export async function getConversation(
chatId: string,
openWaUrl: string
): Promise<Conversation> {
console.log(`[conversationStore] Retrieving conversation for ${chatId}`);
let conv = conversations.get(chatId);
if (!conv) {
conv = await buildConversation(chatId, openWaUrl);
}
return conv;
}
export function listConversations(): Conversation[] {
console.log('[conversationStore] Listing conversations');
return Array.from(conversations.values());
}
export async function buildConversation(
chatId: string,
openWaUrl: string
): Promise<Conversation> {
console.log(`[conversationStore] Building conversation for ${chatId}`);
const messages = await loadMessages(chatId, openWaUrl);
const now = Date.now();
const conv: Conversation = {
chatId,
messages,
createdAt: conversations.get(chatId)?.createdAt || now,
updatedAt: now,
messageCount: messages.length,
};
conversations.set(chatId, conv);
return conv;
}
export function deleteConversation(chatId: string): boolean {
console.log(`[conversationStore] Deleting conversation ${chatId}`);
return conversations.delete(chatId);
}
export async function addMessageToConversation(
chatId: string,
msg: WhatsAppMessage,
openWaUrl: string
): Promise<Conversation> {
console.log(`[conversationStore] Adding message to ${chatId}`);
const conv = await getConversation(chatId, openWaUrl);
conv.messages.push(msg);
conv.messageCount = conv.messages.length;
conv.updatedAt = Date.now();
return conv;
}

View File

@@ -192,3 +192,11 @@ export interface WhatsAppMessage {
mediaData: Record<string, unknown>; mediaData: Record<string, unknown>;
text: string; text: string;
} }
export interface Conversation {
chatId: string;
messages: WhatsAppMessage[];
createdAt: number;
updatedAt: number;
messageCount: number;
}

View File

@@ -0,0 +1,159 @@
import express, { Application } from 'express';
import axios from 'axios';
import { getHandler } from './chatHandlers';
import { addMessageToConversation } from './store/conversation';
import { WhatsAppMessage, Conversation } from './types';
export interface WebhookConfig {
API_URL: string;
MAX_ATTEMPTS: number;
RETRY_MS: number;
}
export function registerWebhookRoutes(
app: Application,
config: WebhookConfig,
openWaUrl: string | undefined,
agentUrl: string | undefined
) {
app.post('/webhook', async (req: express.Request, res: express.Response) => {
let message: WhatsAppMessage | undefined;
let from: string | undefined;
try {
if (req.body && req.body.data) {
message = req.body.data as WhatsAppMessage;
from = req.body.data.from;
} else {
throw new Error('Invalid webhook format');
}
} catch {}
if (message) {
const origen = from || message.chatId || 'desconocido';
console.log(`📩 Mensaje recibido (${message.text}) de ${origen}`);
}
try {
if (!message) return res.sendStatus(200);
if (!openWaUrl) throw new Error('Service URLs not configured');
const chatId = message.chatId || from;
let conv: Conversation | undefined;
if (chatId) {
try {
conv = await addMessageToConversation(chatId, message, openWaUrl);
} catch (err: any) {
console.warn('Failed updating conversation:', err.message);
}
}
if (!conv) throw new Error('Conversation unavailable');
const handler = getHandler(chatId, agentUrl);
if (!handler) throw new Error('No handler configured');
let reply: string;
if (typeof handler === 'string') {
const agentRes = await axios.post(handler, { conversation: conv });
reply = agentRes.data.reply || agentRes.data;
} else {
reply = await handler(conv);
}
await axios.post(`${openWaUrl}/sendText`, { args: { to: from, content: reply } });
} catch (err: any) {
console.error('Error processing message', err.message);
}
res.sendStatus(200);
});
}
export async function waitForGateway(config: WebhookConfig) {
for (let i = 1; i <= config.MAX_ATTEMPTS; i++) {
try {
await axios.get(`${config.API_URL}/api-docs/`);
console.log('🟢 nucleo-whatsapp ready');
return;
} catch (e) {
console.warn(
'Gateway not responding',
`connecting to: ${config.API_URL}/api-docs/ (attempt ${i}/${config.MAX_ATTEMPTS})…`,
e
);
await new Promise((r) => setTimeout(r, config.RETRY_MS));
}
}
throw new Error('nucleo-whatsapp did not respond in time');
}
export async function clearWebhooks(config: WebhookConfig) {
try {
const { data } = await axios.post(`${config.API_URL}/listWebhooks`);
const hooks = data?.response || [];
if (!hooks.length) {
console.log('No existing webhooks to remove');
return;
}
console.log(`Removing ${hooks.length} webhooks…`);
const results = await Promise.allSettled(
hooks.map((h: any) => axios.post(`${config.API_URL}/removeWebhook`, { args: { webhookId: h.id } }))
);
results.forEach((r: PromiseSettledResult<any>, i: number) => {
const id = hooks[i].id;
if (r.status === 'fulfilled' && (r as PromiseFulfilledResult<any>).value?.data?.response === true) {
console.log(`✔️ Removed webhook ${id}`);
} else {
console.warn(`⚠️ Failed to remove webhook ${id}`);
}
});
const ok = results.filter(
(r: PromiseSettledResult<any>) => r.status === 'fulfilled' && (r as PromiseFulfilledResult<any>).value?.data?.response === true
).length;
console.log(`Cleanup OK (${ok}/${hooks.length} removed)`);
} catch (e: any) {
console.error('Failed cleaning webhooks:', e.response?.data || e.message);
}
}
export async function registerWebhook(config: WebhookConfig, port: number) {
const url = process.env.WEBHOOK_URL || `http://whatsapp-router:${port}/webhook`;
const eventConfig = {
onAck: false,
onAddedToGroup: true,
onAnyMessage: false,
onBattery: true,
onBroadcast: true,
onButton: true,
onCallState: false,
onChatDeleted: true,
onChatOpened: true,
onChatState: true,
onContactAdded: true,
onGlobalParticipantsChanged: true,
onGroupApprovalRequest: true,
onGroupChange: true,
onIncomingCall: false,
onLabel: true,
onLogout: true,
onMessage: true,
onMessageDeleted: true,
onNewProduct: true,
onOrder: true,
onPlugged: false,
onPollVote: true,
onReaction: true,
onRemovedFromGroup: false,
onStateChanged: false,
onStory: false,
} as const;
const events = Object.entries(eventConfig)
.filter(([_, enabled]) => enabled)
.map(([event]) => event);
const { data } = await axios.post(`${config.API_URL}/registerWebhook`, {
args: { url, events },
});
console.log('✔️ Webhook registered:', data);
}