Skip to content
46 changes: 46 additions & 0 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ begin
'r_' || p_queue_name
);

-- Partial index for claim candidate ORDER BY (available_at, run_id).
-- Matches the exact ordering used in the claim query for ready runs.
execute format(
'create index if not exists %I on durable.%I (available_at, run_id) include (task_id)
where state in (''pending'', ''sleeping'')',
('r_' || p_queue_name) || '_ready',
'r_' || p_queue_name
);

execute format(
'create index if not exists %I on durable.%I (task_id)',
('r_' || p_queue_name) || '_ti',
Expand All @@ -172,12 +181,46 @@ begin
'w_' || p_queue_name
);

-- Speed up cleanup_task_terminal wait deletion by task_id.
execute format(
'create index if not exists %I on durable.%I (task_id)',
('w_' || p_queue_name) || '_ti',
'w_' || p_queue_name
);

-- Index for finding children of a parent task (for cascade cancellation)
execute format(
'create index if not exists %I on durable.%I (parent_task_id) where parent_task_id is not null',
('t_' || p_queue_name) || '_pti',
't_' || p_queue_name
);

-- Speed up claim timeout scans.
execute format(
'create index if not exists %I on durable.%I (claim_expires_at)
where state = ''running'' and claim_expires_at is not null',
('r_' || p_queue_name) || '_cei',
'r_' || p_queue_name
);

-- Speed up cancellation sweep: only index tasks that have cancellation policies.
execute format(
'create index if not exists %I on durable.%I (task_id)
where state in (''pending'', ''sleeping'', ''running'')
and cancellation is not null
and (cancellation ? ''max_delay'' or cancellation ? ''max_duration'')',
('t_' || p_queue_name) || '_cxlpol',
't_' || p_queue_name
);

-- Composite index for active task state lookups.
-- Enables Index Only Scans for claim_task join, emit_event, and cancel propagation.
execute format(
'create index if not exists %I on durable.%I (state, task_id)
where state in (''pending'', ''sleeping'', ''running'', ''cancelled'')',
('t_' || p_queue_name) || '_state_tid',
't_' || p_queue_name
);
end;
$$;

Expand Down Expand Up @@ -346,6 +389,7 @@ begin
-- These are max_delay (delay before starting) and
-- max_duration (duration from created to finished)
-- Use a loop so we can cleanup each cancelled task properly.
-- Only scan tasks that actually have cancellation policies set.
for v_cancelled_task in
execute format(
'with limits as (
Expand All @@ -357,6 +401,8 @@ begin
state
from durable.%I
where state in (''pending'', ''sleeping'', ''running'')
and cancellation is not null
and (cancellation ? ''max_delay'' or cancellation ? ''max_duration'')
),
to_cancel as (
select task_id
Expand Down
Loading