diff --git a/README.md b/README.md index 198b174a..9779ec68 100644 --- a/README.md +++ b/README.md @@ -435,7 +435,7 @@ Create a `.env`file and put configuration if you need change default value: This a general env: ```env -CONSUMER_TIMEOUT_MS=miliseconds in timeout for consume job, default is 30000 +CONSUMER_TIMEOUT_MS=miliseconds in timeout for consume job, default is 15000 AVAILABLE_LOCALES=default is `["en", "pt_BR", "pt"]` DEFAULT_LOCALE=locale for notifications status, now possibile is en, pt_BR and pt, default is en, to add new, use docker volume for exempla `/app/dist/src/locales/custom.json` and add `custom` in `AVAILABLE_LOCALES` ONLY_HELLO_TEMPLATE=true sets hello template as the only default template, default false. @@ -487,7 +487,13 @@ WEBHOOK_URL_ABSOLUTE=the webhook absolute url, not use this if already use WEBHO WEBHOOK_URL=the webhook url, this config attribute put phone number on the end, no use if use WEBHOOK_URL_ABSOLUTE WEBHOOK_TOKEN=the webhook header token WEBHOOK_HEADER=the webhook header name -WEBHOOK_TIMEOUT_MS=webhook request timeout, default 5000 ms +WEBHOOK_TIMEOUT_MS=webhook request timeout, default 60000 ms +WEBHOOK_CB_ENABLED=true enable webhook circuit breaker to avoid backlog when endpoint is offline, default true +WEBHOOK_CB_FAILURE_THRESHOLD=number of failures within window to open circuit, default 1 +WEBHOOK_CB_OPEN_MS=how long to keep the circuit open (skip sends), default 120000 +WEBHOOK_CB_FAILURE_TTL_MS=failure counter window in ms, default 300000 +WEBHOOK_CB_REQUEUE_DELAY_MS=delay (ms) used to requeue when circuit is open, default 300000 +WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS=local CB map cleanup interval (ms), default 3600000 WEBHOOK_SEND_NEW_MESSAGES=true, send new messages to webhook, caution with this, messages will be duplicated, default is false WEBHOOK_SEND_GROUP_MESSAGES=true, send group messages to webhook, default is true WEBHOOK_SEND_OUTGOING_MESSAGES=true, send outgoing messages to webhook, default is true @@ -521,7 +527,27 @@ WEBHOOK_FORWARD_BUSINESS_ACCOUNT_ID=the business account id of whatsapp cloud ap WEBHOOK_FORWARD_TOKEN=the token of whatsapp cloud api, default is empty WEBHOOK_FORWARD_VERSION=the version of whatsapp cloud api, default is v17.0 WEBHOOK_FORWARD_URL=the url of whatsapp cloud api, default is https://graph.facebook.com -WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 360000 +WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 60000 +``` +Circuit breaker behavior: +- Counts consecutive webhook failures within `WEBHOOK_CB_FAILURE_TTL_MS`. +- When the count reaches `WEBHOOK_CB_FAILURE_THRESHOLD`, the circuit opens for `WEBHOOK_CB_OPEN_MS` and sends are skipped. +- After the open window, delivery is attempted again automatically. +- When the circuit is open, the message is requeued with a longer delay (`WEBHOOK_CB_REQUEUE_DELAY_MS`) to avoid retry storms. + +Why keep `WEBHOOK_TIMEOUT_MS` low: +- A high timeout blocks the consumer for too long when the endpoint is offline. +- With lower timeout, failures are detected faster and the circuit opens sooner, reducing backlog. + +Example (circuit breaker): +```env +WEBHOOK_CB_ENABLED=true +WEBHOOK_CB_FAILURE_THRESHOLD=1 +WEBHOOK_CB_FAILURE_TTL_MS=300000 +WEBHOOK_CB_OPEN_MS=120000 +WEBHOOK_CB_REQUEUE_DELAY_MS=300000 +WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS=3600000 +WEBHOOK_TIMEOUT_MS=60000 ``` ### Config session with redis @@ -552,7 +578,6 @@ The `.env` can be save one config, but on redis use different webhook by session "token": "kslflkhlkwq", "header": "api_access_token", "sendGroupMessages": false, - "sendGroupMessages": false, "sendNewMessages": false, } ], @@ -808,4 +833,5 @@ Mail to sales@unoapi.cloud - Connect with pairing code: https://github.com/WhiskeySockets/Baileys#starting-socket-with-pairing-code - Counting connection retry attempts even when restarting to prevent looping messages - Message delete endpoint -- Send reply message with please to send again, when any error and message enqueue in .dead \ No newline at end of file +- Send reply message with please to send again, when any error and message enqueue in .dead + diff --git a/src/amqp.ts b/src/amqp.ts index b954edb6..df540255 100644 --- a/src/amqp.ts +++ b/src/amqp.ts @@ -16,6 +16,7 @@ import { UNOAPI_EXCHANGE_BRIDGE_NAME, IGNORED_TO_NUMBERS, UNOAPI_QUEUE_LISTENER, + WEBHOOK_CB_REQUEUE_DELAY_MS, } from './defaults' import logger from './services/logger' import { version } from '../package.json' @@ -352,7 +353,14 @@ export const amqpConsume = async ( await amqpPublish(exchange, queue, routingKey, { ...data, traces }, { dead: true, type: options.type }) } else { logger.info('Publish retry %s of %s', countRetries, maxRetries) - const delay = (options.delay || UNOAPI_MESSAGE_RETRY_DELAY) * countRetries + let delay = (options.delay || UNOAPI_MESSAGE_RETRY_DELAY) * countRetries + try { + const err: any = error as any + if (err && (err.code === 'WEBHOOK_CB_OPEN' || err.name === 'WebhookCircuitOpenError')) { + delay = err.delayMs || WEBHOOK_CB_REQUEUE_DELAY_MS || delay + logger.info('WEBHOOK_CB requeue delay %s ms (queue=%s)', delay, queue) + } + } catch {} await amqpPublish(exchange, queue, routingKey, data, { delay, maxRetries, countRetries, type: options.type }) } await channel?.ack(payload) diff --git a/src/defaults.ts b/src/defaults.ts index 8e32a59a..2c0f0446 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -34,7 +34,7 @@ export const WEBHOOK_FORWARD_BUSINESS_ACCOUNT_ID = process.env.WEBHOOK_FORWARD_B export const WEBHOOK_FORWARD_TOKEN = process.env.WEBHOOK_FORWARD_TOKEN || '' export const WEBHOOK_FORWARD_VERSION = process.env.WEBHOOK_FORWARD_VERSION || 'v17.0' export const WEBHOOK_FORWARD_URL = process.env.WEBHOOK_FORWARD_URL || 'https://graph.facebook.com' -export const WEBHOOK_FORWARD_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '360000') +export const WEBHOOK_FORWARD_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '60000') // comunication export const UNOAPI_URL = process.env.UNOAPI_URL || 'http://localhost:9876' @@ -42,11 +42,11 @@ export const WEBHOOK_URL_ABSOLUTE = process.env.WEBHOOK_URL_ABSOLUTE || '' export const WEBHOOK_URL = process.env.WEBHOOK_URL || 'http://localhost:9876/webhooks/fake' export const WEBHOOK_HEADER = process.env.WEBHOOK_HEADER || 'Authorization' export const WEBHOOK_TOKEN = process.env.WEBHOOK_TOKEN || UNOAPI_AUTH_TOKEN || '123abc' -export const WEBHOOK_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '360000') -export const FETCH_TIMEOUT_MS = parseInt(process.env.FETCH_TIMEOUT_MS || '360000') +export const WEBHOOK_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '60000') +export const FETCH_TIMEOUT_MS = parseInt(process.env.FETCH_TIMEOUT_MS || '60000') export const CONNECTION_TYPE = process.env.CONNECTION_TYPE || 'qrcode' -export const CONSUMER_TIMEOUT_MS = parseInt(process.env.CONSUMER_TIMEOUT_MS || '360000') +export const CONSUMER_TIMEOUT_MS = parseInt(process.env.CONSUMER_TIMEOUT_MS || '15000') export const WEBHOOK_SEND_NEW_MESSAGES = process.env.WEBHOOK_SEND_NEW_MESSAGES == _undefined ? false : process.env.WEBHOOK_SEND_NEW_MESSAGES == 'true' export const WEBHOOK_SEND_INCOMING_MESSAGES = process.env.WEBHOOK_SEND_INCOMING_MESSAGES == _undefined ? true : process.env.WEBHOOK_SEND_INCOMING_MESSAGES == 'true' @@ -65,6 +65,14 @@ export const WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL = ? undefined : parseInt(process.env.WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL!) export const WEBHOOK_SESSION = process.env.WEBHOOK_SESSION || '' +// Webhook circuit breaker (fail fast when endpoints are offline) +export const WEBHOOK_CB_ENABLED = + process.env.WEBHOOK_CB_ENABLED == _undefined ? true : process.env.WEBHOOK_CB_ENABLED == 'true' +export const WEBHOOK_CB_FAILURE_THRESHOLD = parseInt(process.env.WEBHOOK_CB_FAILURE_THRESHOLD || '1') +export const WEBHOOK_CB_OPEN_MS = parseInt(process.env.WEBHOOK_CB_OPEN_MS || '120000') +export const WEBHOOK_CB_FAILURE_TTL_MS = parseInt(process.env.WEBHOOK_CB_FAILURE_TTL_MS || '300000') +export const WEBHOOK_CB_REQUEUE_DELAY_MS = parseInt(process.env.WEBHOOK_CB_REQUEUE_DELAY_MS || '300000') +export const WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS = parseInt(process.env.WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS || '3600000') export const AMQP_URL = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672' export const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379' export const PROXY_URL = process.env.PROXY_URL diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index cdbff65e..f45b0d37 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -5,6 +5,17 @@ import logger from './logger' import { completeCloudApiWebHook, isGroupMessage, isOutgoingMessage, isNewsletterMessage, isUpdateMessage, extractDestinyPhone, extractFromPhone } from './transformer' import { addToBlacklist, isInBlacklist } from './blacklist' import { PublishOption } from '../amqp' +import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS, WEBHOOK_CB_REQUEUE_DELAY_MS, WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS } from '../defaults' +import { isWebhookCircuitOpen, openWebhookCircuit, closeWebhookCircuit, bumpWebhookCircuitFailure } from './redis' + +class WebhookCircuitOpenError extends Error { + public code = 'WEBHOOK_CB_OPEN' + public delayMs: number + constructor(message: string, delayMs: number) { + super(message) + this.delayMs = delayMs + } +} export class OutgoingCloudApi implements Outgoing { private getConfig: getConfig @@ -29,6 +40,24 @@ export class OutgoingCloudApi implements Outgoing { } public async sendHttp(phone: string, webhook: Webhook, message: object, _options: Partial = {}) { + const cbEnabled = !!WEBHOOK_CB_ENABLED && WEBHOOK_CB_FAILURE_THRESHOLD > 0 && WEBHOOK_CB_OPEN_MS > 0 + const cbId = (webhook && (webhook.id || webhook.url || webhook.urlAbsolute)) ? `${webhook.id || webhook.url || webhook.urlAbsolute}` : 'default' + const cbKey = `${phone}:${cbId}` + const now = Date.now() + if (cbEnabled) { + let open = false + try { + open = await isWebhookCircuitOpen(phone, cbId) + } catch {} + if (open) { + logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) + throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) + } + if (isCircuitOpenLocal(cbKey, now)) { + logger.warn('WEBHOOK_CB open (local): skipping send (phone=%s webhook=%s)', phone, cbId) + throw new WebhookCircuitOpenError(`WEBHOOK_CB open (local) for ${cbId}`, this.cbRequeueDelayMs()) + } + } const destinyPhone = await this.isInBlacklist(phone, webhook.id, message) if (destinyPhone) { logger.info(`Session phone %s webhook %s and destiny phone %s are in blacklist`, phone, webhook.id, destinyPhone) @@ -89,11 +118,113 @@ export class OutgoingCloudApi implements Outgoing { } catch (error) { logger.error('Error on send to url %s with headers %s and body %s', url, JSON.stringify(headers), body) logger.error(error) + if (cbEnabled) { + const opened = await this.handleCircuitFailure(phone, cbId, cbKey, error as any) + if (opened) { + throw new WebhookCircuitOpenError(`WEBHOOK_CB opened for ${cbId}`, this.cbRequeueDelayMs()) + } + } throw error } logger.debug('Response: %s', response?.status) if (!response?.ok) { - throw await response?.text() + const errText = await response?.text() + const err = new Error(`Webhook response ${response?.status} ${response?.statusText}: ${errText}`) + if (cbEnabled) { + const opened = await this.handleCircuitFailure(phone, cbId, cbKey, err) + if (opened) { + throw new WebhookCircuitOpenError(`WEBHOOK_CB opened for ${cbId}`, this.cbRequeueDelayMs()) + } + } + throw err + } + if (cbEnabled) { + try { + await closeWebhookCircuit(phone, cbId) + } catch {} + resetCircuitLocal(cbKey) + } + } + + private cbRequeueDelayMs() { + return WEBHOOK_CB_REQUEUE_DELAY_MS || WEBHOOK_CB_OPEN_MS || 120000 + } + + private async handleCircuitFailure(phone: string, cbId: string, cbKey: string, error: any): Promise { + try { + const threshold = WEBHOOK_CB_FAILURE_THRESHOLD || 1 + const openMs = WEBHOOK_CB_OPEN_MS || 120000 + const ttlMs = WEBHOOK_CB_FAILURE_TTL_MS || openMs + const count = await bumpWebhookCircuitFailure(phone, cbId, ttlMs) + const localCount = bumpCircuitFailureLocal(cbKey, ttlMs) + const finalCount = Math.max(count || 0, localCount || 0) + if (finalCount >= threshold) { + await openWebhookCircuit(phone, cbId, openMs) + openCircuitLocal(cbKey, openMs) + logger.warn('WEBHOOK_CB opened (phone=%s webhook=%s count=%s openMs=%s)', phone, cbId, finalCount, openMs) + return true + } else { + logger.warn('WEBHOOK_CB failure (phone=%s webhook=%s count=%s/%s)', phone, cbId, finalCount, threshold) + try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} + return false + } + } catch (e) { + logger.warn(e as any, 'WEBHOOK_CB failure handler error (phone=%s webhook=%s)', phone, cbId) + try { logger.warn(error as any, 'WEBHOOK_CB original error (phone=%s webhook=%s)', phone, cbId) } catch {} + // If the CB handler fails, fall back to the original error path (no circuit open) + return false } } } + +const cbOpenUntil: Map = new Map() +const cbFailState: Map = new Map() +let cbLastCleanup = 0 +const CB_CLEANUP_INTERVAL_MS = WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS || 60 * 60 * 1000 + +const isCircuitOpenLocal = (key: string, now: number) => { + maybeCleanupLocalCircuit(now) + const until = cbOpenUntil.get(key) + if (!until) return false + if (now >= until) { + cbOpenUntil.delete(key) + return false + } + return true +} + +const openCircuitLocal = (key: string, openMs: number) => { + maybeCleanupLocalCircuit(Date.now()) + cbOpenUntil.set(key, Date.now() + Math.max(1, openMs || 0)) +} + +const resetCircuitLocal = (key: string) => { + maybeCleanupLocalCircuit(Date.now()) + cbOpenUntil.delete(key) + cbFailState.delete(key) +} + +const bumpCircuitFailureLocal = (key: string, ttlMs: number): number => { + const now = Date.now() + maybeCleanupLocalCircuit(now) + const ttl = Math.max(1, ttlMs || 0) + const current = cbFailState.get(key) + if (!current || now >= current.exp) { + cbFailState.set(key, { count: 1, exp: now + ttl }) + return 1 + } + current.count += 1 + return current.count +} + +const maybeCleanupLocalCircuit = (now: number) => { + if (now - cbLastCleanup < CB_CLEANUP_INTERVAL_MS) return + cbLastCleanup = now + for (const [key, until] of cbOpenUntil) { + if (now >= until) cbOpenUntil.delete(key) + } + for (const [key, st] of cbFailState) { + if (now >= st.exp) cbFailState.delete(key) + } +} + diff --git a/src/services/redis.ts b/src/services/redis.ts index 6dc14c9c..8827d8e3 100644 --- a/src/services/redis.ts +++ b/src/services/redis.ts @@ -131,6 +131,67 @@ const redisSetAndExpire = async function (key: string, value: any, ttl: number) } } +export const redisDelKey = async (key: string) => redisDel(key) + +// Atomic increment with TTL (seconds). Sets TTL on first increment. +export const redisIncrWithTtl = async (key: string, ttlSec: number): Promise => { + logger.trace(`INCR ${key} with ttl ${ttlSec}s`) + try { + const v = await client.incr(key) + if (v === 1 && ttlSec > 0) { + try { await client.expire(key, ttlSec) } catch {} + } + return v + } catch (error) { + if (!client) { + await getRedis() + const v = await client.incr(key) + if (v === 1 && ttlSec > 0) { + try { await client.expire(key, ttlSec) } catch {} + } + return v + } + throw error + } +} + +// Webhook circuit breaker keys +export const webhookCircuitOpenKey = (session: string, webhookId: string) => + `${BASE_KEY}webhook-cb:${session}:${webhookId}:open` +export const webhookCircuitFailKey = (session: string, webhookId: string) => + `${BASE_KEY}webhook-cb:${session}:${webhookId}:fail` + +export const isWebhookCircuitOpen = async (session: string, webhookId: string): Promise => { + const key = webhookCircuitOpenKey(session, webhookId) + try { + const v = await redisGet(key) + return !!v + } catch { + return false + } +} + +export const openWebhookCircuit = async (session: string, webhookId: string, openMs: number): Promise => { + const ttlSec = Math.max(1, Math.ceil((openMs || 0) / 1000)) + try { + await redisSetAndExpire(webhookCircuitOpenKey(session, webhookId), '1', ttlSec) + } catch {} +} + +export const closeWebhookCircuit = async (session: string, webhookId: string): Promise => { + try { await redisDel(webhookCircuitOpenKey(session, webhookId)) } catch {} + try { await redisDel(webhookCircuitFailKey(session, webhookId)) } catch {} +} + +export const bumpWebhookCircuitFailure = async (session: string, webhookId: string, ttlMs: number): Promise => { + const ttlSec = Math.max(1, Math.ceil((ttlMs || 0) / 1000)) + try { + return await redisIncrWithTtl(webhookCircuitFailKey(session, webhookId), ttlSec) + } catch { + return 0 + } +} + export const authKey = (phone: string) => { return `${BASE_KEY}auth:${phone}` }