Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ begin
'r_' || p_queue_name
);

-- Partial index for claim candidate ORDER BY (available_at, run_id).
-- Matches the exact ordering used in the claim query for ready runs.
execute format(
'create index if not exists %I on durable.%I (available_at, run_id) include (task_id)
where state in (''pending'', ''sleeping'')',
('r_' || p_queue_name) || '_ready',
'r_' || p_queue_name
);

execute format(
'create index if not exists %I on durable.%I (task_id)',
('r_' || p_queue_name) || '_ti',
Expand All @@ -172,12 +181,46 @@ begin
'w_' || p_queue_name
);

-- Speed up cleanup_task_terminal wait deletion by task_id.
execute format(
'create index if not exists %I on durable.%I (task_id)',
('w_' || p_queue_name) || '_ti',
'w_' || p_queue_name
);

-- Index for finding children of a parent task (for cascade cancellation)
execute format(
'create index if not exists %I on durable.%I (parent_task_id) where parent_task_id is not null',
('t_' || p_queue_name) || '_pti',
't_' || p_queue_name
);

-- Speed up claim timeout scans.
execute format(
'create index if not exists %I on durable.%I (claim_expires_at)
where state = ''running'' and claim_expires_at is not null',
('r_' || p_queue_name) || '_cei',
'r_' || p_queue_name
);

-- Speed up cancellation sweep: only index tasks that have cancellation policies.
execute format(
'create index if not exists %I on durable.%I (task_id)
where state in (''pending'', ''sleeping'', ''running'')
and cancellation is not null
and (cancellation ? ''max_delay'' or cancellation ? ''max_duration'')',
('t_' || p_queue_name) || '_cxlpol',
't_' || p_queue_name
);

-- Composite index for active task state lookups.
-- Enables Index Only Scans for claim_task join, emit_event, and cancel propagation.
execute format(
'create index if not exists %I on durable.%I (state, task_id)
where state in (''pending'', ''sleeping'', ''running'', ''cancelled'')',
('t_' || p_queue_name) || '_state_tid',
't_' || p_queue_name
);
end;
$$;

Expand Down Expand Up @@ -346,6 +389,7 @@ begin
-- These are max_delay (delay before starting) and
-- max_duration (duration from created to finished)
-- Use a loop so we can cleanup each cancelled task properly.
-- Only scan tasks that actually have cancellation policies set.
for v_cancelled_task in
execute format(
'with limits as (
Expand All @@ -357,6 +401,8 @@ begin
state
from durable.%I
where state in (''pending'', ''sleeping'', ''running'')
and cancellation is not null
and (cancellation ? ''max_delay'' or cancellation ? ''max_duration'')
),
to_cancel as (
select task_id
Expand Down
Loading