Skip to content

Commit 3875bb2

Browse files
authored
feat(engine): run debounce system (#2794)
Adds support for **debounced task runs** - when triggering a task with a debounce key, subsequent triggers with the same key will reschedule the existing delayed run instead of creating new runs. This continues until no new triggers occur within the delay window. ## Usage ```typescript await myTask.trigger({ userId: "123" }, { debounce: { key: "user-123-update", delay: "5s", mode: "leading", // default } }); ``` - **key**: Scoped to the task identifier - **delay**: How long to wait before executing (supports duration strings like `"5s"`, `"1m"`) - **mode**: Either `"leading"` or `"trailing"`. Leading debounce will use the payload and options from the first run created with the debounce key. Trailing will use payload and options from the last run. ### "trailing" mode overrides When using `mode: "trailing"` with debounce, the following options are updated from the **last** trigger: - **`payload`** - The task input data - **`metadata`** - Run metadata - **`tags`** - Run tags (replaces existing tags) - **`maxAttempts`** - Maximum retry attempts - **`maxDuration`** - Maximum compute time - **`machine`** - Machine preset (cpu/memory) ## Behavior - **First run wins**: The first trigger creates the run, subsequent triggers push its execution time later - **Idempotency keys take precedence**: If both are specified, idempotency is checked first - **Max duration**: Configurable via `DEBOUNCE_MAX_DURATION_MS` env var (default: 10 minutes) Works with `triggerAndWait` - parent runs correctly block on the debounced run.
1 parent ff80742 commit 3875bb2

File tree

32 files changed

+5170
-86
lines changed

32 files changed

+5170
-86
lines changed

.changeset/ninety-cows-lay.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
feat(sdk): Support debouncing runs when triggering with new debounce options

.cursor/rules/migrations.mdc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
description: how to create and apply database migrations
3+
alwaysApply: false
4+
---
5+
6+
Follow our [migrations.md](mdc:ai/references/migrations.md) guide for how to create and apply database migrations.

ai/references/migrations.md

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
## Creating and applying migrations
2+
3+
We use prisma migrations to manage the database schema. Please follow the following steps when editing the `internal-packages/database/prisma/schema.prisma` file:
4+
5+
Edit the `schema.prisma` file to add or modify the schema.
6+
7+
Create a new migration file but don't apply it yet:
8+
9+
```bash
10+
cd internal-packages/database
11+
pnpm run db:migrate:dev:create --name "add_new_column_to_table"
12+
```
13+
14+
The migration file will be created in the `prisma/migrations` directory, but it will have a bunch of edits to the schema that are not needed and will need to be removed before we can apply the migration. Here's an example of what the migration file might look like:
15+
16+
```sql
17+
-- AlterEnum
18+
ALTER TYPE "public"."TaskRunExecutionStatus" ADD VALUE 'DELAYED';
19+
20+
-- AlterTable
21+
ALTER TABLE "public"."TaskRun" ADD COLUMN "debounce" JSONB;
22+
23+
-- AlterTable
24+
ALTER TABLE "public"."_BackgroundWorkerToBackgroundWorkerFile" ADD CONSTRAINT "_BackgroundWorkerToBackgroundWorkerFile_AB_pkey" PRIMARY KEY ("A", "B");
25+
26+
-- DropIndex
27+
DROP INDEX "public"."_BackgroundWorkerToBackgroundWorkerFile_AB_unique";
28+
29+
-- AlterTable
30+
ALTER TABLE "public"."_BackgroundWorkerToTaskQueue" ADD CONSTRAINT "_BackgroundWorkerToTaskQueue_AB_pkey" PRIMARY KEY ("A", "B");
31+
32+
-- DropIndex
33+
DROP INDEX "public"."_BackgroundWorkerToTaskQueue_AB_unique";
34+
35+
-- AlterTable
36+
ALTER TABLE "public"."_TaskRunToTaskRunTag" ADD CONSTRAINT "_TaskRunToTaskRunTag_AB_pkey" PRIMARY KEY ("A", "B");
37+
38+
-- DropIndex
39+
DROP INDEX "public"."_TaskRunToTaskRunTag_AB_unique";
40+
41+
-- AlterTable
42+
ALTER TABLE "public"."_WaitpointRunConnections" ADD CONSTRAINT "_WaitpointRunConnections_AB_pkey" PRIMARY KEY ("A", "B");
43+
44+
-- DropIndex
45+
DROP INDEX "public"."_WaitpointRunConnections_AB_unique";
46+
47+
-- AlterTable
48+
ALTER TABLE "public"."_completedWaitpoints" ADD CONSTRAINT "_completedWaitpoints_AB_pkey" PRIMARY KEY ("A", "B");
49+
50+
-- DropIndex
51+
DROP INDEX "public"."_completedWaitpoints_AB_unique";
52+
53+
-- CreateIndex
54+
CREATE INDEX "SecretStore_key_idx" ON "public"."SecretStore"("key" text_pattern_ops);
55+
56+
-- CreateIndex
57+
CREATE INDEX "TaskRun_runtimeEnvironmentId_id_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "id" DESC);
58+
59+
-- CreateIndex
60+
CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "createdAt" DESC);
61+
```
62+
63+
All the following lines should be removed:
64+
65+
```sql
66+
-- AlterTable
67+
ALTER TABLE "public"."_BackgroundWorkerToBackgroundWorkerFile" ADD CONSTRAINT "_BackgroundWorkerToBackgroundWorkerFile_AB_pkey" PRIMARY KEY ("A", "B");
68+
69+
-- DropIndex
70+
DROP INDEX "public"."_BackgroundWorkerToBackgroundWorkerFile_AB_unique";
71+
72+
-- AlterTable
73+
ALTER TABLE "public"."_BackgroundWorkerToTaskQueue" ADD CONSTRAINT "_BackgroundWorkerToTaskQueue_AB_pkey" PRIMARY KEY ("A", "B");
74+
75+
-- DropIndex
76+
DROP INDEX "public"."_BackgroundWorkerToTaskQueue_AB_unique";
77+
78+
-- AlterTable
79+
ALTER TABLE "public"."_TaskRunToTaskRunTag" ADD CONSTRAINT "_TaskRunToTaskRunTag_AB_pkey" PRIMARY KEY ("A", "B");
80+
81+
-- DropIndex
82+
DROP INDEX "public"."_TaskRunToTaskRunTag_AB_unique";
83+
84+
-- AlterTable
85+
ALTER TABLE "public"."_WaitpointRunConnections" ADD CONSTRAINT "_WaitpointRunConnections_AB_pkey" PRIMARY KEY ("A", "B");
86+
87+
-- DropIndex
88+
DROP INDEX "public"."_WaitpointRunConnections_AB_unique";
89+
90+
-- AlterTable
91+
ALTER TABLE "public"."_completedWaitpoints" ADD CONSTRAINT "_completedWaitpoints_AB_pkey" PRIMARY KEY ("A", "B");
92+
93+
-- DropIndex
94+
DROP INDEX "public"."_completedWaitpoints_AB_unique";
95+
96+
-- CreateIndex
97+
CREATE INDEX "SecretStore_key_idx" ON "public"."SecretStore"("key" text_pattern_ops);
98+
99+
-- CreateIndex
100+
CREATE INDEX "TaskRun_runtimeEnvironmentId_id_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "id" DESC);
101+
102+
-- CreateIndex
103+
CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "createdAt" DESC);
104+
```
105+
106+
Leaving only this:
107+
108+
```sql
109+
-- AlterEnum
110+
ALTER TYPE "public"."TaskRunExecutionStatus" ADD VALUE 'DELAYED';
111+
112+
-- AlterTable
113+
ALTER TABLE "public"."TaskRun" ADD COLUMN "debounce" JSONB;
114+
```
115+
116+
After editing the migration file, apply the migration:
117+
118+
```bash
119+
cd internal-packages/database
120+
pnpm run db:migrate:deploy && pnpm run generate
121+
```

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,12 @@ const EnvironmentSchema = z
611611
.default(60_000),
612612
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR: z.coerce.number().default(2),
613613

614+
/** Maximum duration in milliseconds that a run can be debounced. Default: 1 hour (3,600,000ms) */
615+
RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS: z.coerce
616+
.number()
617+
.int()
618+
.default(60_000 * 60), // 1 hour
619+
614620
RUN_ENGINE_WORKER_REDIS_HOST: z
615621
.string()
616622
.optional()

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ export class SpanPresenter extends BasePresenter {
234234
environmentId: run.runtimeEnvironment.id,
235235
idempotencyKey: run.idempotencyKey,
236236
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
237+
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
237238
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
238239
queue: {
239240
name: run.queue,
@@ -357,6 +358,8 @@ export class SpanPresenter extends BasePresenter {
357358
//idempotency
358359
idempotencyKey: true,
359360
idempotencyKeyExpiresAt: true,
361+
//debounce
362+
debounce: true,
360363
//delayed
361364
delayUntil: true,
362365
//ttl

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,19 @@ function RunBody({
556556
)}
557557
</Property.Value>
558558
</Property.Item>
559+
<Property.Item>
560+
<Property.Label>Debounce</Property.Label>
561+
<Property.Value>
562+
{run.debounce ? (
563+
<div>
564+
<div className="break-all">Key: {run.debounce.key}</div>
565+
<div>Delay: {run.debounce.delay}</div>
566+
</div>
567+
) : (
568+
"–"
569+
)}
570+
</Property.Value>
571+
</Property.Item>
559572
<Property.Item>
560573
<Property.Label>Version</Property.Label>
561574
<Property.Value>

apps/webapp/app/runEngine/concerns/traceEvents.server.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
5151
traceparent,
5252
setAttribute: (key, value) => event.setAttribute(key as any, value),
5353
failWithError: event.failWithError.bind(event),
54+
stop: event.stop.bind(event),
5455
},
5556
store
5657
);
@@ -116,6 +117,73 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
116117
traceparent,
117118
setAttribute: (key, value) => event.setAttribute(key as any, value),
118119
failWithError: event.failWithError.bind(event),
120+
stop: event.stop.bind(event),
121+
},
122+
store
123+
);
124+
}
125+
);
126+
}
127+
128+
async traceDebouncedRun<T>(
129+
request: TriggerTaskRequest,
130+
parentStore: string | undefined,
131+
options: {
132+
existingRun: TaskRun;
133+
debounceKey: string;
134+
incomplete: boolean;
135+
isError: boolean;
136+
},
137+
callback: (span: TracedEventSpan, store: string) => Promise<T>
138+
): Promise<T> {
139+
const { existingRun, debounceKey, incomplete, isError } = options;
140+
const { repository, store } = await this.#getEventRepository(request, parentStore);
141+
142+
return await repository.traceEvent(
143+
`${request.taskId} (debounced)`,
144+
{
145+
context: request.options?.traceContext,
146+
spanParentAsLink: request.options?.spanParentAsLink,
147+
kind: "SERVER",
148+
environment: request.environment,
149+
taskSlug: request.taskId,
150+
attributes: {
151+
properties: {
152+
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
153+
},
154+
style: {
155+
icon: "task-cached",
156+
},
157+
runId: existingRun.friendlyId,
158+
},
159+
incomplete,
160+
isError,
161+
immediate: true,
162+
},
163+
async (event, traceContext, traceparent) => {
164+
// Log a message about the debounced trigger
165+
await repository.recordEvent(
166+
`Debounced: using existing run with key "${debounceKey}"`,
167+
{
168+
taskSlug: request.taskId,
169+
environment: request.environment,
170+
attributes: {
171+
runId: existingRun.friendlyId,
172+
},
173+
context: request.options?.traceContext,
174+
parentId: event.spanId,
175+
}
176+
);
177+
178+
return await callback(
179+
{
180+
traceId: event.traceId,
181+
spanId: event.spanId,
182+
traceContext,
183+
traceparent,
184+
setAttribute: (key, value) => event.setAttribute(key as any, value),
185+
failWithError: event.failWithError.bind(event),
186+
stop: event.stop.bind(event),
119187
},
120188
store
121189
);

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,34 @@ export class RunEngineTriggerTaskService {
160160
}
161161
}
162162

163-
const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay));
163+
// Parse delay from either explicit delay option or debounce.delay
164+
const delaySource = body.options?.delay ?? body.options?.debounce?.delay;
165+
const [parseDelayError, delayUntil] = await tryCatch(parseDelay(delaySource));
164166

165167
if (parseDelayError) {
166-
throw new ServiceValidationError(`Invalid delay ${body.options?.delay}`);
168+
throw new ServiceValidationError(`Invalid delay ${delaySource}`);
169+
}
170+
171+
// Validate debounce options
172+
if (body.options?.debounce) {
173+
if (!delayUntil) {
174+
throw new ServiceValidationError(
175+
`Debounce requires a valid delay duration. Provided: ${body.options.debounce.delay}`
176+
);
177+
}
178+
179+
// Always validate debounce.delay separately since it's used for rescheduling
180+
// This catches the case where options.delay is valid but debounce.delay is invalid
181+
const [debounceDelayError, debounceDelayUntil] = await tryCatch(
182+
parseDelay(body.options.debounce.delay)
183+
);
184+
185+
if (debounceDelayError || !debounceDelayUntil) {
186+
throw new ServiceValidationError(
187+
`Invalid debounce delay: ${body.options.debounce.delay}. ` +
188+
`Supported formats: {number}s, {number}m, {number}h, {number}d, {number}w`
189+
);
190+
}
167191
}
168192

169193
const ttl =
@@ -340,10 +364,48 @@ export class RunEngineTriggerTaskService {
340364
bulkActionId: body.options?.bulkActionId,
341365
planType,
342366
realtimeStreamsVersion: options.realtimeStreamsVersion,
367+
debounce: body.options?.debounce,
368+
// When debouncing with triggerAndWait, create a span for the debounced trigger
369+
onDebounced:
370+
body.options?.debounce && body.options?.resumeParentOnCompletion
371+
? async ({ existingRun, waitpoint, debounceKey }) => {
372+
return await this.traceEventConcern.traceDebouncedRun(
373+
triggerRequest,
374+
parentRun?.taskEventStore,
375+
{
376+
existingRun,
377+
debounceKey,
378+
incomplete: waitpoint.status === "PENDING",
379+
isError: waitpoint.outputIsError,
380+
},
381+
async (spanEvent) => {
382+
const spanId =
383+
options?.parentAsLinkType === "replay"
384+
? spanEvent.spanId
385+
: spanEvent.traceparent?.spanId
386+
? `${spanEvent.traceparent.spanId}:${spanEvent.spanId}`
387+
: spanEvent.spanId;
388+
return spanId;
389+
}
390+
);
391+
}
392+
: undefined,
343393
},
344394
this.prisma
345395
);
346396

397+
// If the returned run has a different friendlyId, it was debounced.
398+
// For triggerAndWait: stop the outer span since a replacement debounced span was created via onDebounced.
399+
// For regular trigger: let the span complete normally - no replacement span needed since the
400+
// original run already has its span from when it was first created.
401+
if (
402+
taskRun.friendlyId !== runFriendlyId &&
403+
body.options?.debounce &&
404+
body.options?.resumeParentOnCompletion
405+
) {
406+
event.stop();
407+
}
408+
347409
const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;
348410

349411
if (error) {

apps/webapp/app/runEngine/types.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ export type TracedEventSpan = {
131131
};
132132
setAttribute: (key: string, value: string) => void;
133133
failWithError: (error: TaskRunError) => void;
134+
/**
135+
* Stop the span without writing any event.
136+
* Used when a debounced run is returned - the span for the debounced
137+
* trigger is created separately via traceDebouncedRun.
138+
*/
139+
stop: () => void;
134140
};
135141

136142
export interface TraceEventConcern {
@@ -150,6 +156,17 @@ export interface TraceEventConcern {
150156
},
151157
callback: (span: TracedEventSpan, store: string) => Promise<T>
152158
): Promise<T>;
159+
traceDebouncedRun<T>(
160+
request: TriggerTaskRequest,
161+
parentStore: string | undefined,
162+
options: {
163+
existingRun: TaskRun;
164+
debounceKey: string;
165+
incomplete: boolean;
166+
isError: boolean;
167+
},
168+
callback: (span: TracedEventSpan, store: string) => Promise<T>
169+
): Promise<T>;
153170
}
154171

155172
export type TriggerRacepoints = "idempotencyKey";

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ function createRunEngine() {
182182
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)
183183
: undefined,
184184
},
185+
// Debounce configuration
186+
debounce: {
187+
maxDebounceDurationMs: env.RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS,
188+
},
185189
});
186190

187191
return engine;

0 commit comments

Comments
 (0)