Skip to content

Commit 1e376f4

Browse files
committed
Reduce worker idle queries
1 parent 6d96607 commit 1e376f4

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

tasktiger/worker.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,17 @@ def _filter_queues(self, queues):
160160
)
161161
]
162162

163+
def _worker_perform_secondary_tasks(self):
164+
# We should queue scheduled tasks every QUEUE_SCHEDULED_TASKS_TIME time
165+
# and expired tasks every REQUEUE_EXPIRED_TASKS_INTERVAL time. Use only
166+
# one Redis query to enter this block since every single worker calls
167+
# this every second.
168+
# XXX: Ideally, we should keep track of workers and take turns.
169+
key = self._key("lock", "secondary_tasks")
170+
if self.connection.set(key, "1", ex=1, nx=True):
171+
self._worker_queue_scheduled_tasks()
172+
self._worker_queue_expired_tasks()
173+
163174
def _worker_queue_scheduled_tasks(self):
164175
"""
165176
Helper method that takes due tasks from the SCHEDULED queue and puts
@@ -1046,8 +1057,7 @@ def _worker_run(self):
10461057
time.time() - self._last_task_check > self.config['SELECT_TIMEOUT']
10471058
and not self._stop_requested
10481059
):
1049-
self._worker_queue_scheduled_tasks()
1050-
self._worker_queue_expired_tasks()
1060+
self._worker_perform_secondary_tasks()
10511061
self._last_task_check = time.time()
10521062

10531063
def _queue_periodic_tasks(self):

0 commit comments

Comments
 (0)