|
3 | 3 | import datetime |
4 | 4 | import time |
5 | 5 |
|
6 | | -from tasktiger import Task, Worker, periodic |
| 6 | +from tasktiger import Task, Worker, periodic, cron_expr |
7 | 7 | from tasktiger._internal import ( |
8 | 8 | QUEUED, |
9 | 9 | SCHEDULED, |
@@ -64,6 +64,50 @@ def test_periodic_schedule(self): |
64 | 64 | f = periodic(minutes=1, end_date=dt) |
65 | 65 | assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None |
66 | 66 |
|
| 67 | + def test_cron_schedule(self): |
| 68 | + """ |
| 69 | + Test the cron_expr() schedule function. |
| 70 | + """ |
| 71 | + dt = datetime.datetime(2010, 1, 1) |
| 72 | + |
| 73 | + f = cron_expr("* * * * *") |
| 74 | + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 0, 1) |
| 75 | + |
| 76 | + f = cron_expr("0 * * * *") |
| 77 | + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 1) |
| 78 | + |
| 79 | + f = cron_expr("0 0 * * *") |
| 80 | + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2) |
| 81 | + |
| 82 | + f = cron_expr("0 0 * * 6") |
| 83 | + # 2010-01-02 is a Saturday |
| 84 | + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2) |
| 85 | + |
| 86 | + f = cron_expr("0 0 * * 0", start_date=datetime.datetime(2000, 1, 2)) |
| 87 | + # 2000-01-02 is a Sunday and 2010-01-02 is a Saturday |
| 88 | + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 3) |
| 89 | + |
| 90 | + f = cron_expr("2 3 * * *", start_date=dt) |
| 91 | + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 3, 2) |
| 92 | + # Make sure we return the start_date if the current date is earlier. |
| 93 | + assert f[0](datetime.datetime(1990, 1, 1), *f[1]) == dt |
| 94 | + |
| 95 | + f = cron_expr("* * * * *", end_date=dt) |
| 96 | + assert f[0]( |
| 97 | + datetime.datetime(2009, 12, 31, 23, 58), *f[1] |
| 98 | + ) == datetime.datetime(2009, 12, 31, 23, 59) |
| 99 | + |
| 100 | + f = cron_expr("* * * * *", end_date=dt) |
| 101 | + assert f[0]( |
| 102 | + datetime.datetime(2009, 12, 31, 23, 59), *f[1] |
| 103 | + ) == datetime.datetime(2010, 1, 1, 0, 0) |
| 104 | + |
| 105 | + f = cron_expr("* * * * *", end_date=dt) |
| 106 | + assert f[0](datetime.datetime(2010, 1, 1, 0, 0), *f[1]) is None |
| 107 | + |
| 108 | + f = cron_expr("* * * * *", end_date=dt) |
| 109 | + assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None |
| 110 | + |
67 | 111 | def test_periodic_execution(self): |
68 | 112 | """ |
69 | 113 | Test periodic task execution. |
|
0 commit comments