Skip to content

Commit 11e4d54

Browse files
committed
Updated all json.dump calls to use custom serializer
Better json serialization/deserialization
1 parent 4f8068c commit 11e4d54

File tree

4 files changed

+29
-12
lines changed

4 files changed

+29
-12
lines changed

tasktiger/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from collections import defaultdict
33
import importlib
44
import logging
5+
from json import JSONDecoder, JSONEncoder
56
import redis
67
import structlog
78

@@ -159,6 +160,13 @@ def __init__(self, connection=None, config=None, setup_structlog=False):
159160

160161
# If non-empty, a worker excludes the given queues from processing.
161162
'EXCLUDE_QUEUES': [],
163+
164+
# Serializer / Deserilaizer to use for serializing/deserializing tasks
165+
166+
'JSON_ENCODER': JSONEncoder,
167+
168+
'JSON_DECODER': JSONDecoder
169+
162170
}
163171
if config:
164172
self.config.update(config)

tasktiger/_internal.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def gen_id():
5353
"""
5454
return binascii.b2a_hex(os.urandom(32)).decode('utf8')
5555

56-
def gen_unique_id(serialized_name, args, kwargs):
56+
def gen_unique_id(serialized_name, args, kwargs, cls=None):
5757
"""
5858
Generates and returns a hex-encoded 256-bit ID for the given task name and
5959
args. Used to generate IDs for unique tasks or for task locks.
@@ -62,7 +62,7 @@ def gen_unique_id(serialized_name, args, kwargs):
6262
'func': serialized_name,
6363
'args': args,
6464
'kwargs': kwargs,
65-
}, sort_keys=True).encode('utf8')).hexdigest()
65+
}, sort_keys=True, cls=cls).encode('utf8')).hexdigest()
6666

6767
def serialize_func_name(func):
6868
"""

tasktiger/task.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ def __init__(self, tiger, func=None, args=None, kwargs=None, queue=None,
2828
self._state = _state
2929
self._ts = _ts
3030
self._executions = _executions or []
31+
self.json_encoder = self.tiger.config.get('JSON_ENCODER')
32+
self.json_decoder = self.tiger.config.get('JSON_DECODER')
3133

3234
# Internal initialization based on raw data.
3335
if _data is not None:
@@ -57,7 +59,7 @@ def __init__(self, tiger, func=None, args=None, kwargs=None, queue=None,
5759
retry_method = getattr(func, '_task_retry_method', None)
5860

5961
if unique:
60-
task_id = gen_unique_id(serialized_name, args, kwargs)
62+
task_id = gen_unique_id(serialized_name, args, kwargs, cls=self.json_encoder)
6163
else:
6264
task_id = gen_id()
6365

@@ -280,7 +282,7 @@ def delay(self, when=None):
280282

281283
# When using ALWAYS_EAGER, make sure we have serialized the task to
282284
# ensure there are no serialization errors.
283-
serialized_task = json.dumps(self._data)
285+
serialized_task = json.dumps(self._data, cls=self.json_encoder)
284286

285287
if tiger.config['ALWAYS_EAGER'] and state == QUEUED:
286288
return self.execute()
@@ -341,8 +343,9 @@ def from_id(self, tiger, queue, state, task_id, load_executions=0):
341343
serialized_executions = []
342344
# XXX: No timestamp for now
343345
if serialized_data:
344-
data = json.loads(serialized_data)
345-
executions = [json.loads(e) for e in serialized_executions if e]
346+
json_decoder = tiger.config.get('JSON_DECODER')
347+
data = json.loads(serialized_data, cls=json_decoder)
348+
executions = [json.loads(e, cls=json_decoder) for e in serialized_executions if e]
346349
return Task(tiger, queue=queue, _data=data, _state=state,
347350
_executions=executions)
348351
else:
@@ -370,6 +373,8 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000,
370373

371374
tasks = []
372375

376+
json_decoder = tiger.config.get('JSON_DECODER')
377+
373378
if items:
374379
tss = [datetime.datetime.utcfromtimestamp(item[1]) for item in items]
375380
if load_executions:
@@ -380,8 +385,8 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000,
380385
results = pipeline.execute()
381386

382387
for serialized_data, serialized_executions, ts in zip(results[0], results[1:], tss):
383-
data = json.loads(serialized_data)
384-
executions = [json.loads(e) for e in serialized_executions if e]
388+
data = json.loads(serialized_data, cls=json_decoder)
389+
executions = [json.loads(e, cls=json_decoder) for e in serialized_executions if e]
385390

386391
task = Task(tiger, queue=queue, _data=data, _state=state,
387392
_ts=ts, _executions=executions)
@@ -390,7 +395,7 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000,
390395
else:
391396
data = tiger.connection.mget([tiger._key('task', item[0]) for item in items])
392397
for serialized_data, ts in zip(data, tss):
393-
data = json.loads(serialized_data)
398+
data = json.loads(serialized_data, cls=json_decoder)
394399
task = Task(tiger, queue=queue, _data=data, _state=state,
395400
_ts=ts)
396401
tasks.append(task)

tasktiger/worker.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ def __init__(self, tiger, queues=None, exclude_queues=None,
4444
self._did_work = True
4545
self._last_task_check = 0
4646
self.stats_thread = None
47+
self.json_encoder = tiger.config.get('JSON_ENCODER')
48+
self.json_decoder = tiger.config.get('JSON_DECODER')
4749

4850
if queues:
4951
self.only_queues = set(queues)
@@ -327,7 +329,7 @@ def _execute_forked(self, tasks, log):
327329
''.join(traceback.format_exception(*exc_info))
328330
execution['success'] = success
329331
execution['host'] = socket.gethostname()
330-
serialized_execution = json.dumps(execution)
332+
serialized_execution = json.dumps(execution, cls=self.json_encoder)
331333
for task in tasks:
332334
self.connection.rpush(self._key('task', task.id, 'executions'),
333335
serialized_execution)
@@ -544,7 +546,7 @@ def _process_queue_tasks(self, queue, queue_lock, task_ids, now, log):
544546
tasks = []
545547
for task_id, serialized_task in zip(task_ids, serialized_tasks):
546548
if serialized_task:
547-
task_data = json.loads(serialized_task)
549+
task_data = json.loads(serialized_task, cls=self.json_decoder)
548550
else:
549551
# In the rare case where we don't find the task which is
550552
# queued (see ReliabilityTestCase.test_task_disappears),
@@ -671,12 +673,14 @@ def _execute_task_group(self, queue, tasks, all_task_ids, queue_lock):
671673
task.serialized_func,
672674
None,
673675
{key: kwargs.get(key) for key in task.lock_key},
676+
cls=self.json_encoder
674677
)
675678
else:
676679
lock_id = gen_unique_id(
677680
task.serialized_func,
678681
task.args,
679682
task.kwargs,
683+
cls=self.json_encoder
680684
)
681685

682686
if lock_id not in lock_ids:
@@ -739,7 +743,7 @@ def _mark_done():
739743
self._key('task', task.id, 'executions'), -1)
740744

741745
if execution:
742-
execution = json.loads(execution)
746+
execution = json.loads(execution, cls=self.json_decoder)
743747

744748
if execution and execution.get('retry'):
745749
if 'retry_method' in execution:

0 commit comments

Comments
 (0)