Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ The following options can be only specified in the task decorator:
the initial task execution date when a worker is initialized, and to determine
the next execution date when the task is about to get executed.

For most common scenarios, the ``periodic`` built-in function can be passed:
For most common scenarios, the below mentioned built-in functions can be passed:

- ``periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None,
end_date=None)``
Expand All @@ -401,6 +401,17 @@ The following options can be only specified in the task decorator:
every Sunday at 4am UTC, you could use
``schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4))``.

- ``cron_expr(expr, start_date=None, end_date=None)``

``start_date``, to specify the periodic task start date. It defaults to
``2000-01-01T00:00Z``, a Saturday, if not given.
``end_date``, to specify the periodic task end date. The task repeats
forever if ``end_date`` is not given.
For example, to run a task every hour indefinitely,
use ``schedule=cron_expr("0 * * * *")``. To run a task every Sunday at
4am UTC, you could use ``schedule=cron_expr("0 4 * * 0")``.


Custom retrying
---------------

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
click==8.1.3
redis==4.5.2
structlog==22.3.0
croniter
3 changes: 2 additions & 1 deletion tasktiger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
TaskNotFound,
)
from .retry import exponential, fixed, linear
from .schedule import periodic
from .schedule import cron_expr, periodic
from .task import Task
from .tasktiger import TaskTiger, run_worker
from .worker import Worker
Expand All @@ -31,6 +31,7 @@
"exponential",
# Schedules
"periodic",
"cron_expr",
]


Expand Down
59 changes: 57 additions & 2 deletions tasktiger/schedule.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime
from typing import Callable, Optional, Tuple

__all__ = ["periodic"]
__all__ = ["periodic", "cron_expr"]

START_DATE = datetime.datetime(2000, 1, 1)


def _periodic(
Expand Down Expand Up @@ -54,5 +56,58 @@ def periodic(
assert period > 0, "Must specify a positive period."
if not start_date:
# Saturday at midnight
start_date = datetime.datetime(2000, 1, 1)
start_date = START_DATE
return (_periodic, (period, start_date, end_date))


def _cron_expr(
dt: datetime.datetime,
expr: str,
start_date: datetime.datetime,
end_date: Optional[datetime.datetime] = None,
) -> Optional[datetime.datetime]:
import croniter # type: ignore
import pytz # type: ignore

localize = pytz.utc.localize

if end_date and dt >= end_date:
return None

if dt < start_date:
return start_date

assert croniter.croniter.is_valid(expr), "Cron expression is not valid."

start_date = localize(start_date)
dt = localize(dt)

next_utc = croniter.croniter(expr, dt).get_next(ret_type=datetime.datetime)
next_utc = next_utc.replace(tzinfo=None)

# Make sure the time is still within bounds.
if end_date and next_utc > end_date:
return None

return next_utc


def cron_expr(
expr: str,
start_date: Optional[datetime.datetime] = None,
end_date: Optional[datetime.datetime] = None,
) -> Tuple[Callable[..., Optional[datetime.datetime]], Tuple]:
"""
Periodic task schedule via cron expression: Use to schedule a task to run periodically,
starting from start_date (or None to be active immediately) until end_date
(or None to repeat forever).

This function behaves similar to the cron jobs, which run with a minimum of 1 minute
granularity. So specifying "* * * * *" expression will the run the task every
minute.

For more details, see README.
"""
if not start_date:
start_date = START_DATE
return (_cron_expr, (expr, start_date, end_date))
46 changes: 45 additions & 1 deletion tests/test_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import time

from tasktiger import Task, Worker, periodic
from tasktiger import Task, Worker, cron_expr, periodic
from tasktiger._internal import (
QUEUED,
SCHEDULED,
Expand Down Expand Up @@ -64,6 +64,50 @@ def test_periodic_schedule(self):
f = periodic(minutes=1, end_date=dt)
assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None

def test_cron_schedule(self):
"""
Test the cron_expr() schedule function.
"""
dt = datetime.datetime(2010, 1, 1)

f = cron_expr("* * * * *")
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 0, 1)

f = cron_expr("0 * * * *")
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 1)

f = cron_expr("0 0 * * *")
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2)

f = cron_expr("0 0 * * 6")
# 2010-01-02 is a Saturday
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2)

f = cron_expr("0 0 * * 0", start_date=datetime.datetime(2000, 1, 2))
# 2000-01-02 is a Sunday and 2010-01-02 is a Saturday
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 3)

f = cron_expr("2 3 * * *", start_date=dt)
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 3, 2)
# Make sure we return the start_date if the current date is earlier.
assert f[0](datetime.datetime(1990, 1, 1), *f[1]) == dt

f = cron_expr("* * * * *", end_date=dt)
assert f[0](
datetime.datetime(2009, 12, 31, 23, 58), *f[1]
) == datetime.datetime(2009, 12, 31, 23, 59)

f = cron_expr("* * * * *", end_date=dt)
assert f[0](
datetime.datetime(2009, 12, 31, 23, 59), *f[1]
) == datetime.datetime(2010, 1, 1, 0, 0)

f = cron_expr("* * * * *", end_date=dt)
assert f[0](datetime.datetime(2010, 1, 1, 0, 0), *f[1]) is None

f = cron_expr("* * * * *", end_date=dt)
assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None

def test_periodic_execution(self):
"""
Test periodic task execution.
Expand Down