diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index 41e243b435..18b389c734 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -47,6 +47,14 @@ class TaskManager: The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure. + This class can be used as a tool from commandline. Here are serveral examples + + .. code-block:: shell + + python -m qlib.workflow.task.manage -t wait + python -m qlib.workflow.task.manage -t task_stat + + .. note:: Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded @@ -80,7 +88,7 @@ def __init__(self, task_pool: str): task_pool: str the name of Collection in MongoDB """ - self.task_pool = getattr(get_mongodb(), task_pool) + self.task_pool: pymongo.collection.Collection = getattr(get_mongodb(), task_pool) self.logger = get_module_logger(self.__class__.__name__) @staticmethod @@ -101,6 +109,20 @@ def _encode_task(self, task): return task def _decode_task(self, task): + """ + _decode_task is Serialization tool. + Mongodb needs JSON, so it needs to convert Python objects into JSON objects through pickle + + Parameters + ---------- + task : dict + task information + + Returns + ------- + dict + JSON required by mongodb + """ for prefix in self.ENCODE_FIELDS_PREFIX: for k in list(task.keys()): if k.startswith(prefix): @@ -211,6 +233,7 @@ def create_task(self, task_def_l, dry_run=False, print_nt=False) -> List[str]: r = self.task_pool.find_one({"filter": t}) except InvalidDocument: r = self.task_pool.find_one({"filter": self._dict_to_str(t)}) + # When r is none, it indicates that r s a new task if r is None: new_tasks.append(t) if not dry_run: @@ -461,11 +484,11 @@ def run_task( After running this method, here are 4 situations (before_status -> after_status): - STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param + STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param, it means that the task has not been started STATUS_WAITING -> STATUS_PART_DONE: use task["def"] as `task_func` param - STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param + STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param, it means that the task has been started but not completed STATUS_PART_DONE -> STATUS_DONE: use task["res"] as `task_func` param