Skip to content
36 changes: 31 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ Create a `.env`file and put configuration if you need change default value:
This a general env:

```env
CONSUMER_TIMEOUT_MS=miliseconds in timeout for consume job, default is 30000
CONSUMER_TIMEOUT_MS=miliseconds in timeout for consume job, default is 15000
AVAILABLE_LOCALES=default is `["en", "pt_BR", "pt"]`
DEFAULT_LOCALE=locale for notifications status, now possibile is en, pt_BR and pt, default is en, to add new, use docker volume for exempla `/app/dist/src/locales/custom.json` and add `custom` in `AVAILABLE_LOCALES`
ONLY_HELLO_TEMPLATE=true sets hello template as the only default template, default false.
Expand Down Expand Up @@ -487,7 +487,13 @@ WEBHOOK_URL_ABSOLUTE=the webhook absolute url, not use this if already use WEBHO
WEBHOOK_URL=the webhook url, this config attribute put phone number on the end, no use if use WEBHOOK_URL_ABSOLUTE
WEBHOOK_TOKEN=the webhook header token
WEBHOOK_HEADER=the webhook header name
WEBHOOK_TIMEOUT_MS=webhook request timeout, default 5000 ms
WEBHOOK_TIMEOUT_MS=webhook request timeout, default 60000 ms
WEBHOOK_CB_ENABLED=true enable webhook circuit breaker to avoid backlog when endpoint is offline, default true
WEBHOOK_CB_FAILURE_THRESHOLD=number of failures within window to open circuit, default 1
WEBHOOK_CB_OPEN_MS=how long to keep the circuit open (skip sends), default 120000
WEBHOOK_CB_FAILURE_TTL_MS=failure counter window in ms, default 300000
WEBHOOK_CB_REQUEUE_DELAY_MS=delay (ms) used to requeue when circuit is open, default 300000
WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS=local CB map cleanup interval (ms), default 3600000
WEBHOOK_SEND_NEW_MESSAGES=true, send new messages to webhook, caution with this, messages will be duplicated, default is false
WEBHOOK_SEND_GROUP_MESSAGES=true, send group messages to webhook, default is true
WEBHOOK_SEND_OUTGOING_MESSAGES=true, send outgoing messages to webhook, default is true
Expand Down Expand Up @@ -521,7 +527,27 @@ WEBHOOK_FORWARD_BUSINESS_ACCOUNT_ID=the business account id of whatsapp cloud ap
WEBHOOK_FORWARD_TOKEN=the token of whatsapp cloud api, default is empty
WEBHOOK_FORWARD_VERSION=the version of whatsapp cloud api, default is v17.0
WEBHOOK_FORWARD_URL=the url of whatsapp cloud api, default is https://graph.facebook.com
WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 360000
WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 60000
```
Circuit breaker behavior:
- Counts consecutive webhook failures within `WEBHOOK_CB_FAILURE_TTL_MS`.
- When the count reaches `WEBHOOK_CB_FAILURE_THRESHOLD`, the circuit opens for `WEBHOOK_CB_OPEN_MS` and sends are skipped.
- After the open window, delivery is attempted again automatically.
- When the circuit is open, the message is requeued with a longer delay (`WEBHOOK_CB_REQUEUE_DELAY_MS`) to avoid retry storms.

Why keep `WEBHOOK_TIMEOUT_MS` low:
- A high timeout blocks the consumer for too long when the endpoint is offline.
- With lower timeout, failures are detected faster and the circuit opens sooner, reducing backlog.

Example (circuit breaker):
```env
WEBHOOK_CB_ENABLED=true
WEBHOOK_CB_FAILURE_THRESHOLD=1
WEBHOOK_CB_FAILURE_TTL_MS=300000
WEBHOOK_CB_OPEN_MS=120000
WEBHOOK_CB_REQUEUE_DELAY_MS=300000
WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS=3600000
WEBHOOK_TIMEOUT_MS=60000
```

### Config session with redis
Expand Down Expand Up @@ -552,7 +578,6 @@ The `.env` can be save one config, but on redis use different webhook by session
"token": "kslflkhlkwq",
"header": "api_access_token",
"sendGroupMessages": false,
"sendGroupMessages": false,
"sendNewMessages": false,
}
],
Expand Down Expand Up @@ -808,4 +833,5 @@ Mail to sales@unoapi.cloud
- Connect with pairing code: https://github.com/WhiskeySockets/Baileys#starting-socket-with-pairing-code
- Counting connection retry attempts even when restarting to prevent looping messages
- Message delete endpoint
- Send reply message with please to send again, when any error and message enqueue in .dead
- Send reply message with please to send again, when any error and message enqueue in .dead

10 changes: 9 additions & 1 deletion src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
UNOAPI_EXCHANGE_BRIDGE_NAME,
IGNORED_TO_NUMBERS,
UNOAPI_QUEUE_LISTENER,
WEBHOOK_CB_REQUEUE_DELAY_MS,
} from './defaults'
import logger from './services/logger'
import { version } from '../package.json'
Expand Down Expand Up @@ -352,7 +353,14 @@ export const amqpConsume = async (
await amqpPublish(exchange, queue, routingKey, { ...data, traces }, { dead: true, type: options.type })
} else {
logger.info('Publish retry %s of %s', countRetries, maxRetries)
const delay = (options.delay || UNOAPI_MESSAGE_RETRY_DELAY) * countRetries
let delay = (options.delay || UNOAPI_MESSAGE_RETRY_DELAY) * countRetries
try {
const err: any = error as any
if (err && (err.code === 'WEBHOOK_CB_OPEN' || err.name === 'WebhookCircuitOpenError')) {
delay = err.delayMs || WEBHOOK_CB_REQUEUE_DELAY_MS || delay
logger.info('WEBHOOK_CB requeue delay %s ms (queue=%s)', delay, queue)
}
} catch {}
await amqpPublish(exchange, queue, routingKey, data, { delay, maxRetries, countRetries, type: options.type })
}
await channel?.ack(payload)
Expand Down
16 changes: 12 additions & 4 deletions src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ export const WEBHOOK_FORWARD_BUSINESS_ACCOUNT_ID = process.env.WEBHOOK_FORWARD_B
export const WEBHOOK_FORWARD_TOKEN = process.env.WEBHOOK_FORWARD_TOKEN || ''
export const WEBHOOK_FORWARD_VERSION = process.env.WEBHOOK_FORWARD_VERSION || 'v17.0'
export const WEBHOOK_FORWARD_URL = process.env.WEBHOOK_FORWARD_URL || 'https://graph.facebook.com'
export const WEBHOOK_FORWARD_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '360000')
export const WEBHOOK_FORWARD_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '60000')

// comunication
export const UNOAPI_URL = process.env.UNOAPI_URL || 'http://localhost:9876'
export const WEBHOOK_URL_ABSOLUTE = process.env.WEBHOOK_URL_ABSOLUTE || ''
export const WEBHOOK_URL = process.env.WEBHOOK_URL || 'http://localhost:9876/webhooks/fake'
export const WEBHOOK_HEADER = process.env.WEBHOOK_HEADER || 'Authorization'
export const WEBHOOK_TOKEN = process.env.WEBHOOK_TOKEN || UNOAPI_AUTH_TOKEN || '123abc'
export const WEBHOOK_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '360000')
export const FETCH_TIMEOUT_MS = parseInt(process.env.FETCH_TIMEOUT_MS || '360000')
export const WEBHOOK_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '60000')
export const FETCH_TIMEOUT_MS = parseInt(process.env.FETCH_TIMEOUT_MS || '60000')
export const CONNECTION_TYPE = process.env.CONNECTION_TYPE || 'qrcode'

export const CONSUMER_TIMEOUT_MS = parseInt(process.env.CONSUMER_TIMEOUT_MS || '360000')
export const CONSUMER_TIMEOUT_MS = parseInt(process.env.CONSUMER_TIMEOUT_MS || '15000')
export const WEBHOOK_SEND_NEW_MESSAGES = process.env.WEBHOOK_SEND_NEW_MESSAGES == _undefined ? false : process.env.WEBHOOK_SEND_NEW_MESSAGES == 'true'
export const WEBHOOK_SEND_INCOMING_MESSAGES =
process.env.WEBHOOK_SEND_INCOMING_MESSAGES == _undefined ? true : process.env.WEBHOOK_SEND_INCOMING_MESSAGES == 'true'
Expand All @@ -65,6 +65,14 @@ export const WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL =
? undefined
: parseInt(process.env.WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL!)
export const WEBHOOK_SESSION = process.env.WEBHOOK_SESSION || ''
// Webhook circuit breaker (fail fast when endpoints are offline)
export const WEBHOOK_CB_ENABLED =
process.env.WEBHOOK_CB_ENABLED == _undefined ? true : process.env.WEBHOOK_CB_ENABLED == 'true'
export const WEBHOOK_CB_FAILURE_THRESHOLD = parseInt(process.env.WEBHOOK_CB_FAILURE_THRESHOLD || '1')
export const WEBHOOK_CB_OPEN_MS = parseInt(process.env.WEBHOOK_CB_OPEN_MS || '120000')
export const WEBHOOK_CB_FAILURE_TTL_MS = parseInt(process.env.WEBHOOK_CB_FAILURE_TTL_MS || '300000')
export const WEBHOOK_CB_REQUEUE_DELAY_MS = parseInt(process.env.WEBHOOK_CB_REQUEUE_DELAY_MS || '300000')
export const WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS = parseInt(process.env.WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS || '3600000')
export const AMQP_URL = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672'
export const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'
export const PROXY_URL = process.env.PROXY_URL
Expand Down
133 changes: 132 additions & 1 deletion src/services/outgoing_cloud_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ import logger from './logger'
import { completeCloudApiWebHook, isGroupMessage, isOutgoingMessage, isNewsletterMessage, isUpdateMessage, extractDestinyPhone, extractFromPhone } from './transformer'
import { addToBlacklist, isInBlacklist } from './blacklist'
import { PublishOption } from '../amqp'
import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS, WEBHOOK_CB_REQUEUE_DELAY_MS, WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS } from '../defaults'
import { isWebhookCircuitOpen, openWebhookCircuit, closeWebhookCircuit, bumpWebhookCircuitFailure } from './redis'

class WebhookCircuitOpenError extends Error {
public code = 'WEBHOOK_CB_OPEN'
public delayMs: number
constructor(message: string, delayMs: number) {
super(message)
this.delayMs = delayMs
}
}

export class OutgoingCloudApi implements Outgoing {
private getConfig: getConfig
Expand All @@ -29,6 +40,24 @@ export class OutgoingCloudApi implements Outgoing {
}

public async sendHttp(phone: string, webhook: Webhook, message: object, _options: Partial<PublishOption> = {}) {
const cbEnabled = !!WEBHOOK_CB_ENABLED && WEBHOOK_CB_FAILURE_THRESHOLD > 0 && WEBHOOK_CB_OPEN_MS > 0
const cbId = (webhook && (webhook.id || webhook.url || webhook.urlAbsolute)) ? `${webhook.id || webhook.url || webhook.urlAbsolute}` : 'default'
const cbKey = `${phone}:${cbId}`
const now = Date.now()
if (cbEnabled) {
let open = false
try {
open = await isWebhookCircuitOpen(phone, cbId)
} catch {}
if (open) {
logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId)
throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs())
}
if (isCircuitOpenLocal(cbKey, now)) {
logger.warn('WEBHOOK_CB open (local): skipping send (phone=%s webhook=%s)', phone, cbId)
throw new WebhookCircuitOpenError(`WEBHOOK_CB open (local) for ${cbId}`, this.cbRequeueDelayMs())
}
}
const destinyPhone = await this.isInBlacklist(phone, webhook.id, message)
if (destinyPhone) {
logger.info(`Session phone %s webhook %s and destiny phone %s are in blacklist`, phone, webhook.id, destinyPhone)
Expand Down Expand Up @@ -89,11 +118,113 @@ export class OutgoingCloudApi implements Outgoing {
} catch (error) {
logger.error('Error on send to url %s with headers %s and body %s', url, JSON.stringify(headers), body)
logger.error(error)
if (cbEnabled) {
const opened = await this.handleCircuitFailure(phone, cbId, cbKey, error as any)
if (opened) {
throw new WebhookCircuitOpenError(`WEBHOOK_CB opened for ${cbId}`, this.cbRequeueDelayMs())
}
}
throw error
}
logger.debug('Response: %s', response?.status)
if (!response?.ok) {
throw await response?.text()
const errText = await response?.text()
const err = new Error(`Webhook response ${response?.status} ${response?.statusText}: ${errText}`)
if (cbEnabled) {
const opened = await this.handleCircuitFailure(phone, cbId, cbKey, err)
if (opened) {
throw new WebhookCircuitOpenError(`WEBHOOK_CB opened for ${cbId}`, this.cbRequeueDelayMs())
}
}
throw err
}
if (cbEnabled) {
try {
await closeWebhookCircuit(phone, cbId)
} catch {}
resetCircuitLocal(cbKey)
}
}

private cbRequeueDelayMs() {
return WEBHOOK_CB_REQUEUE_DELAY_MS || WEBHOOK_CB_OPEN_MS || 120000
}

private async handleCircuitFailure(phone: string, cbId: string, cbKey: string, error: any): Promise<boolean> {
try {
const threshold = WEBHOOK_CB_FAILURE_THRESHOLD || 1
const openMs = WEBHOOK_CB_OPEN_MS || 120000
const ttlMs = WEBHOOK_CB_FAILURE_TTL_MS || openMs
const count = await bumpWebhookCircuitFailure(phone, cbId, ttlMs)
const localCount = bumpCircuitFailureLocal(cbKey, ttlMs)
const finalCount = Math.max(count || 0, localCount || 0)
if (finalCount >= threshold) {
await openWebhookCircuit(phone, cbId, openMs)
openCircuitLocal(cbKey, openMs)
logger.warn('WEBHOOK_CB opened (phone=%s webhook=%s count=%s openMs=%s)', phone, cbId, finalCount, openMs)
return true
} else {
logger.warn('WEBHOOK_CB failure (phone=%s webhook=%s count=%s/%s)', phone, cbId, finalCount, threshold)
try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {}
return false
}
} catch (e) {
logger.warn(e as any, 'WEBHOOK_CB failure handler error (phone=%s webhook=%s)', phone, cbId)
try { logger.warn(error as any, 'WEBHOOK_CB original error (phone=%s webhook=%s)', phone, cbId) } catch {}
// If the CB handler fails, fall back to the original error path (no circuit open)
return false
}
}
}

const cbOpenUntil: Map<string, number> = new Map()
const cbFailState: Map<string, { count: number; exp: number }> = new Map()
let cbLastCleanup = 0
const CB_CLEANUP_INTERVAL_MS = WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS || 60 * 60 * 1000

const isCircuitOpenLocal = (key: string, now: number) => {
maybeCleanupLocalCircuit(now)
const until = cbOpenUntil.get(key)
if (!until) return false
if (now >= until) {
cbOpenUntil.delete(key)
return false
}
return true
}

const openCircuitLocal = (key: string, openMs: number) => {
maybeCleanupLocalCircuit(Date.now())
cbOpenUntil.set(key, Date.now() + Math.max(1, openMs || 0))
}

const resetCircuitLocal = (key: string) => {
maybeCleanupLocalCircuit(Date.now())
cbOpenUntil.delete(key)
cbFailState.delete(key)
}

const bumpCircuitFailureLocal = (key: string, ttlMs: number): number => {
const now = Date.now()
maybeCleanupLocalCircuit(now)
const ttl = Math.max(1, ttlMs || 0)
const current = cbFailState.get(key)
if (!current || now >= current.exp) {
cbFailState.set(key, { count: 1, exp: now + ttl })
return 1
}
current.count += 1
return current.count
}

const maybeCleanupLocalCircuit = (now: number) => {
if (now - cbLastCleanup < CB_CLEANUP_INTERVAL_MS) return
cbLastCleanup = now
for (const [key, until] of cbOpenUntil) {
if (now >= until) cbOpenUntil.delete(key)
}
for (const [key, st] of cbFailState) {
if (now >= st.exp) cbFailState.delete(key)
}
}

61 changes: 61 additions & 0 deletions src/services/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,67 @@ const redisSetAndExpire = async function (key: string, value: any, ttl: number)
}
}

export const redisDelKey = async (key: string) => redisDel(key)

// Atomic increment with TTL (seconds). Sets TTL on first increment.
export const redisIncrWithTtl = async (key: string, ttlSec: number): Promise<number> => {
logger.trace(`INCR ${key} with ttl ${ttlSec}s`)
try {
const v = await client.incr(key)
if (v === 1 && ttlSec > 0) {
try { await client.expire(key, ttlSec) } catch {}
}
return v
} catch (error) {
if (!client) {
await getRedis()
const v = await client.incr(key)
if (v === 1 && ttlSec > 0) {
try { await client.expire(key, ttlSec) } catch {}
}
return v
}
throw error
}
}

// Webhook circuit breaker keys
export const webhookCircuitOpenKey = (session: string, webhookId: string) =>
`${BASE_KEY}webhook-cb:${session}:${webhookId}:open`
export const webhookCircuitFailKey = (session: string, webhookId: string) =>
`${BASE_KEY}webhook-cb:${session}:${webhookId}:fail`

export const isWebhookCircuitOpen = async (session: string, webhookId: string): Promise<boolean> => {
const key = webhookCircuitOpenKey(session, webhookId)
try {
const v = await redisGet(key)
return !!v
} catch {
return false
}
}

export const openWebhookCircuit = async (session: string, webhookId: string, openMs: number): Promise<void> => {
const ttlSec = Math.max(1, Math.ceil((openMs || 0) / 1000))
try {
await redisSetAndExpire(webhookCircuitOpenKey(session, webhookId), '1', ttlSec)
} catch {}
}

export const closeWebhookCircuit = async (session: string, webhookId: string): Promise<void> => {
try { await redisDel(webhookCircuitOpenKey(session, webhookId)) } catch {}
try { await redisDel(webhookCircuitFailKey(session, webhookId)) } catch {}
}

export const bumpWebhookCircuitFailure = async (session: string, webhookId: string, ttlMs: number): Promise<number> => {
const ttlSec = Math.max(1, Math.ceil((ttlMs || 0) / 1000))
try {
return await redisIncrWithTtl(webhookCircuitFailKey(session, webhookId), ttlSec)
} catch {
return 0
}
}

export const authKey = (phone: string) => {
return `${BASE_KEY}auth:${phone}`
}
Expand Down