Skip to content

Conversation

@ViperTecCorporation
Copy link
Contributor

@ViperTecCorporation ViperTecCorporation commented Jan 23, 2026

Summary\n- Add webhook circuit breaker to fail fast when endpoints are offline\n- Add Redis-backed counters/state (with in-memory fallback)\n- Document new env vars and behavior, plus example config\n- Reduce default webhook timeout to 60s to avoid long consumer blocking\n\n## Why\nOffline webhook endpoints were causing long blocking and queue backlog. The circuit breaker short-circuits repeated failures and auto-recovers after a cool-down.\n\n## Env vars\n- WEBHOOK_CB_ENABLED (default true)\n- WEBHOOK_CB_FAILURE_THRESHOLD (default 1)\n- WEBHOOK_CB_FAILURE_TTL_MS (default 300000)\n- WEBHOOK_CB_OPEN_MS (default 120000)\n- WEBHOOK_TIMEOUT_MS (default 60000)\n\n## Notes\n- When the circuit is open, sends to that endpoint are skipped until the open window expires.\n- After the window, sends resume automatically.\n

Summary by CodeRabbit

  • New Features

    • Add configurable webhook circuit breaker (thresholds, open window, failure TTL, requeue delay, local cleanup) and per-session behavior.
    • New webhook controls to fine‑tune which events/messages are forwarded.
  • Bug Fixes / Behavior

    • Adjusted default timeouts for faster fail‑fast behavior and reduced backlog; retry/requeue delays now respect circuit‑open conditions.
  • Documentation

    • README expanded with circuit breaker guidance, examples, and extended webhook/Redis configuration samples.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 23, 2026

📝 Walkthrough

Walkthrough

Adds a Redis-backed webhook circuit breaker with local in-process checks integrated into outgoing HTTP sends; shortens consumer and webhook/fetch timeouts; introduces WEBHOOK_CB_* env vars and many webhook-related flags; adjusts AMQP retry delay for circuit-open errors; updates README docs and examples.

Changes

Cohort / File(s) Change Summary
Documentation & Defaults
README.md, src/defaults.ts
Shortened default timeouts (CONSUMER_TIMEOUT_MS → 15000 ms; WEBHOOK/FETCH/FORWARD → 60000 ms). Added webhook circuit-breaker env vars (WEBHOOK_CB_*) and expanded webhook/Redis-related flags and README examples/docs.
Redis utilities
src/services/redis.ts
Added Redis helpers and circuit keys: redisDelKey, redisIncrWithTtl, webhookCircuitOpenKey, webhookCircuitFailKey, isWebhookCircuitOpen, openWebhookCircuit, closeWebhookCircuit, bumpWebhookCircuitFailure. Implements resilient increment + TTL logic.
Outgoing HTTP (circuit logic)
src/services/outgoing_cloud_api.ts
Integrated circuit-breaker into sendHttp(): global (Redis) and local checks, local failure tracking and open/close, WebhookCircuitOpenError short-circuiting, failure bumping, and requeue-delay derivation.
AMQP retry behavior
src/amqp.ts
When processing errors indicate an open webhook circuit, override requeue delay using error.delayMs or WEBHOOK_CB_REQUEUE_DELAY_MS before republishing.
Examples & per-session config
README.md
Expanded per-session Redis webhook config example, added circuit-breaker docs and sample settings, removed duplicate Redis sample line, minor formatting adjustments.

Sequence Diagram

sequenceDiagram
    participant Producer as Client/Producer
    participant AMQP as AMQP Consumer
    participant API as OutgoingCloudApi
    participant Redis as Redis
    participant Webhook as Webhook Endpoint

    Producer->>AMQP: publish webhook message
    AMQP->>API: process message -> sendHttp()

    API->>Redis: isWebhookCircuitOpen(session, webhookId)?
    Redis-->>API: open / closed

    alt Circuit Open
        API-->>AMQP: throw WebhookCircuitOpenError (with delayMs)
        AMQP->>AMQP: schedule retry using error.delayMs or WEBHOOK_CB_REQUEUE_DELAY_MS
    else Circuit Closed
        API->>Webhook: HTTP POST (with WEBHOOK_TIMEOUT_MS / FETCH_TIMEOUT_MS)
        alt Success (2xx)
            Webhook-->>API: success
            API->>Redis: closeWebhookCircuit(session, webhookId)
            API-->>AMQP: ack
        else Failure / timeout / non-2xx
            Webhook-->>API: error
            API->>Redis: bumpWebhookCircuitFailure(session, webhookId, ttl)
            Redis-->>API: new failure count
            alt Threshold reached
                API->>Redis: openWebhookCircuit(session, webhookId, openMs)
                API-->>AMQP: throw WebhookCircuitOpenError (with requeue delayMs)
            else Below threshold
                API-->>AMQP: nack / standard retry
            end
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I nudged the keys and watched the counters grow,
I closed the gate when distant endpoints doze,
I queue a patient hop, then wait a beat,
Reopen when the sun warms sleeping feet,
A rabbit guard for webhooks, soft and slow.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'Add webhook circuit breaker' directly and concisely summarizes the main change: introducing a webhook circuit breaker system to handle offline endpoints.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 111-131: sendHttp currently swallows failures when the circuit
breaker is enabled by calling handleCircuitFailure(...) and returning, which
prevents the AMQP consumer from seeing errors and triggering
retries/dead-lettering; after calling handleCircuitFailure(phone, cbId, cbKey,
error) (on fetch/network error) and after handleCircuitFailure(phone, cbId,
cbKey, errText) (on non-OK response), rethrow the original error/errText so the
exception contract is preserved; keep the existing closeWebhookCircuit(phone,
cbId) try/catch and resetCircuitLocal(cbKey) behavior but ensure errors are not
silently returned in sendHttp so the amqpConsume framework can handle them.
🧹 Nitpick comments (1)
src/services/outgoing_cloud_api.ts (1)

156-188: Consider adding cleanup for stale local circuit state entries.

The module-level cbOpenUntil and cbFailState Maps grow unbounded as new webhook URLs are encountered. While isCircuitOpenLocal cleans up expired entries on access, entries for webhooks that are no longer called will accumulate indefinitely.

For most deployments with stable webhook configurations this is negligible, but high-churn scenarios could see gradual memory growth.

♻️ Optional: Add periodic cleanup or size cap
+const MAX_LOCAL_CB_ENTRIES = 1000
+
 const resetCircuitLocal = (key: string) => {
   cbOpenUntil.delete(key)
   cbFailState.delete(key)
 }
+
+const cleanupStaleEntries = () => {
+  const now = Date.now()
+  for (const [key, until] of cbOpenUntil) {
+    if (now >= until) cbOpenUntil.delete(key)
+  }
+  for (const [key, state] of cbFailState) {
+    if (now >= state.exp) cbFailState.delete(key)
+  }
+}

Then call cleanupStaleEntries() periodically or when maps exceed MAX_LOCAL_CB_ENTRIES.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 47-58: The try/catch around the Redis circuit breaker check is too
broad and swallows the WebhookCircuitOpenError; narrow the try to only call
isWebhookCircuitOpen(phone, cbId) and handle its errors, then perform the throw
new WebhookCircuitOpenError(...) outside the catch so the exception is not
consumed. Specifically, in the cbEnabled branch ensure isWebhookCircuitOpen is
awaited inside a try that only catches/handles Redis/network errors, and if it
returns true then call logger.warn(...) and throw WebhookCircuitOpenError (using
this.cbRequeueDelayMs()) outside the catch; keep the local check using
isCircuitOpenLocal(cbKey, now) unchanged.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
README.md (1)

580-582: Duplicate key in JSON example.

Line 581 has "sendGroupMessages" defined twice, which is invalid JSON (the second value would override the first). This appears to be a pre-existing issue, but it should be corrected.

🐛 Proposed fix
       "token": "kslflkhlkwq",
       "header": "api_access_token",
       "sendGroupMessages": false,
-      "sendGroupMessages": false,
       "sendNewMessages": false,
♻️ Duplicate comments (1)
src/services/outgoing_cloud_api.ts (1)

47-59: Critical: WebhookCircuitOpenError is swallowed by the empty catch block.

The throw new WebhookCircuitOpenError(...) on line 52 is inside the try block, and the empty catch {} on line 54 swallows it. When Redis reports the circuit is open, the error is caught and discarded, and execution continues—completely bypassing the Redis-based circuit breaker check.

The try/catch should only wrap the isWebhookCircuitOpen call, not the throw statement.

🐛 Proposed fix
     if (cbEnabled) {
+      let redisOpen = false
       try {
-        const open = await isWebhookCircuitOpen(phone, cbId)
-        if (open) {
-          logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId)
-          throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs())
-        }
-      } catch {}
+        redisOpen = await isWebhookCircuitOpen(phone, cbId)
+      } catch {
+        // Redis unavailable - fall through to local check
+      }
+      if (redisOpen) {
+        logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId)
+        throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs())
+      }
       if (isCircuitOpenLocal(cbKey, now)) {
🧹 Nitpick comments (1)
src/services/outgoing_cloud_api.ts (1)

151-173: Consider logging the original error details when the circuit handler itself fails.

When the catch block at line 168 handles an exception from the circuit breaker logic, the original webhook error (error parameter) is logged at line 171, but the circuit handler exception (e) is logged with a generic message. This could make debugging difficult if both Redis and logging fail.

Also, the method returns false when an exception occurs (line 172), which means the original error will be thrown but the circuit won't open. This is reasonable as a fallback, but worth documenting.

♻️ Minor improvement for clarity
     } catch (e) {
-      logger.warn(e as any, 'WEBHOOK_CB failure handler error')
+      logger.warn(e as any, 'WEBHOOK_CB failure handler error (phone=%s webhook=%s)', phone, cbId)
     }
-    try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {}
+    try { 
+      logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) 
+    } catch {
+      // Logging failed - nothing more we can do
+    }
     return false

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 148-176: The trailing logging and return after the try/catch in
handleCircuitFailure are unreachable because both the try and catch blocks
always return; remove the final try { logger.warn(error...) } catch {} and final
return false, or move that log into the catch block before its return (and
ensure it references the correct error variable), so that there is no dead code
after the try/catch in the handleCircuitFailure method.
- Around line 131-139: The code currently throws a raw string (errText); instead
create and throw an Error object containing response metadata and body so
upstream retains stack/context. Build an Error (e.g., new Error(`HTTP
${response?.status}: ${response?.statusText}`)) and attach properties like
status = response?.status, statusText = response?.statusText, and body =
errText, then pass that Error object into handleCircuitFailure(phone, cbId,
cbKey, err) (matching the earlier catch usage) and, if opened, throw the
WebhookCircuitOpenError as before; otherwise throw the constructed Error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant