Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,39 @@ def _poll_for_queues(self) -> None:

This is only used when using polling to get queues with queued tasks.
"""
if not self._did_work:
time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"])

self._wait_to_refresh_queue_set()
self._refresh_queue_set()

def _wait_to_refresh_queue_set(self):
interval = self.config["POLL_TASK_QUEUES_INTERVAL"]

if self._did_work:
self.log.info("Queue poll: No delay")
return

def throttle_queue_poll():
return self.connection.exists(
self._key("throttle_queue_poll", self.worker_group_name)
)

lock = self.connection.lock(
self._key("lockv2", "queue_poll", self.worker_group_name),
timeout=interval * 0.5,
)

while True:
if not throttle_queue_poll():
self.log.info(f"Queue poll: Sleeping {interval}s")
time.sleep(interval)
return

if lock.acquire(blocking=False):
self.log.info("Queue poll: Acquired lock")
return

time.sleep(interval * 0.1)

def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None:
"""
Check activity channel for new queues and wait as necessary.
Expand Down Expand Up @@ -1155,6 +1184,17 @@ def _refresh_queue_set(self) -> None:
self._filter_queues(self._retrieve_queues(self._key(QUEUED)))
)

self.log.info("Queue poll: Done")

throttle_key = self._key("throttle_queue_poll", self.worker_group_name)
interval = self.config["POLL_TASK_QUEUES_INTERVAL"]

if interval:
if self._queue_set:
self.connection.delete(throttle_key)
else:
self.connection.set(throttle_key, 1, px=int(interval * 1000))

def _retrieve_queues(self, key) -> Set[str]:
if len(self.only_queues) != 1:
return self.connection.smembers(key)
Expand Down