From 465007e75588e39812f019c2b0a49e375041e3ce Mon Sep 17 00:00:00 2001 From: Young Date: Tue, 9 Aug 2022 11:09:34 +0800 Subject: [PATCH] Refine type hint and recorder --- qlib/typehint.py | 50 +++++++++++++++++++++++++++++++++++++++ qlib/utils/__init__.py | 40 ++++++++++--------------------- qlib/workflow/__init__.py | 40 ++++++++++++++++++++++++++++++- qlib/workflow/exp.py | 2 +- qlib/workflow/recorder.py | 50 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 29 deletions(-) diff --git a/qlib/typehint.py b/qlib/typehint.py index 3a584b4b75..89f4d6f66b 100644 --- a/qlib/typehint.py +++ b/qlib/typehint.py @@ -4,6 +4,8 @@ """Commonly used types.""" import sys +from typing import Union +from pathlib import Path __all__ = ["Literal", "TypedDict", "final"] @@ -11,3 +13,51 @@ from typing import Literal, TypedDict, final # type: ignore # pylint: disable=no-name-in-module else: from typing_extensions import Literal, TypedDict, final + + +class InstDictConf(TypedDict): + """ + InstDictConf is a Dict-based config to describe an instance + + case 1) + { + 'class': 'ClassName', + 'kwargs': dict, # It is optional. {} will be used if not given + 'model_path': path, # It is optional if module is given in the class + } + case 2) + { + 'class': , + 'kwargs': dict, # It is optional. {} will be used if not given + } + """ + # class: str # because class is a keyword of Python. We have to comment it + kwargs: dict # It is optional. {} will be used if not given + module_path: str # It is optional if module is given in the class + + + +InstConf = Union[InstDictConf, str, object, Path] +""" +InstConf is a type to describe an instance; it will be passed into init_instance_by_config for Qlib + + config : Union[str, dict, object, Path] + + InstDictConf example. + please refer to the docs of InstDictConf + + str example. + 1) specify a pickle object + - path like 'file:////obj.pkl' + 2) specify a class name + - "ClassName": getattr(module, "ClassName")() will be used. + 3) specify module path with class name + - "a.b.c.ClassName" getattr(, "ClassName")() will be used. + + object example: + instance of accept_types + + Path example: + specify a pickle object + - it will be treated like 'file:////obj.pkl' +""" diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index 433f82be76..ad4feffc00 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -11,6 +11,7 @@ import sys import copy import json +from qlib.typehint import InstConf import yaml import redis import bisect @@ -291,7 +292,11 @@ def get_module_by_module_path(module_path: Union[str, ModuleType]): :param module_path: :return: + :raises: ModuleNotFoundError """ + if module_path is None: + raise ModuleNotFoundError("None is passed in as parameters as module_path") + if isinstance(module_path, ModuleType): module = module_path else: @@ -324,7 +329,7 @@ def split_module_path(module_path: str) -> Tuple[str, str]: return m_path, cls -def get_callable_kwargs(config: Union[dict, str], default_module: Union[str, ModuleType] = None) -> (type, dict): +def get_callable_kwargs(config: InstConf, default_module: Union[str, ModuleType] = None) -> (type, dict): """ extract class/func and kwargs from config info @@ -343,6 +348,10 @@ def get_callable_kwargs(config: Union[dict, str], default_module: Union[str, Mod ------- (type, dict): the class/func object and it's arguments. + + Raises + ------ + ModuleNotFoundError """ if isinstance(config, dict): key = "class" if "class" in config else "func" @@ -376,7 +385,7 @@ def get_callable_kwargs(config: Union[dict, str], default_module: Union[str, Mod def init_instance_by_config( - config: Union[str, dict, object, Path], # TODO: use a user-defined type to replace this Union. + config: InstConf, default_module=None, accept_types: Union[type, Tuple[type]] = (), try_kwargs: Dict = {}, @@ -387,31 +396,8 @@ def init_instance_by_config( Parameters ---------- - config : Union[str, dict, object] - dict example. - case 1) - { - 'class': 'ClassName', - 'kwargs': dict, # It is optional. {} will be used if not given - 'model_path': path, # It is optional if module is given - } - case 2) - { - 'class': , - 'kwargs': dict, # It is optional. {} will be used if not given - } - str example. - 1) specify a pickle object - - path like 'file:////obj.pkl' - 2) specify a class name - - "ClassName": getattr(module, "ClassName")() will be used. - 3) specify module path with class name - - "a.b.c.ClassName" getattr(, "ClassName")() will be used. - object example: - instance of accept_types - Path example: - specify a pickle object - - it will be treated like 'file:////obj.pkl' + config : InstConf + default_module : Python module Optional. It should be a python module. NOTE: the "module_path" will be override by `module` arguments diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index a528fa67a9..e969d4664a 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -575,6 +575,44 @@ def log_metrics(self, step=None, **kwargs): """ self.get_exp(start=True).get_recorder(start=True).log_metrics(step, **kwargs) + def log_artifact(self, local_path:str, artifact_path: Optional[str] = None): + """ + Log a local file or directory as an artifact of the currently active run + + - If `active recorder` exists: it will set tags through the active recorder. + - If `active recorder` not exists: the system will create a default experiment as well as a new recorder, and set the tags under it. + + Parameters + ---------- + local_path : str + Path to the file to write. + artifact_path : Optional[str] + If provided, the directory in ``artifact_uri`` to write to. + """ + self.get_exp(start=True).get_recorder(start=True).log_artifact(local_path, artifact_path) + + def download_artifact(self, path: str, dst_path: Optional[str] = None) -> str: + """ + Download an artifact file or directory from a run to a local directory if applicable, + and return a local path for it. + + Parameters + ---------- + path : str + Relative source path to the desired artifact. + dst_path : Optional[str] + Absolute path of the local filesystem destination directory to which to + download the specified artifacts. This directory must already exist. + If unspecified, the artifacts will either be downloaded to a new + uniquely-named directory on the local filesystem. + + Returns + ------- + str + Local path of desired artifact. + """ + self.get_exp(start=True).get_recorder(start=True).download_artifact(path, dst_path) + def set_tags(self, **kwargs): """ Method for setting tags for a recorder. In addition to using ``R``, one can also set the tag to a specific recorder after getting it with `get_recorder` API. @@ -611,7 +649,7 @@ def register(self, provider): expm = getattr(self._provider, "exp_manager") if expm.active_experiment is not None: raise RecorderInitializationError( - "Please don't reinitialize Qlib if QlibRecorder is already acivated. Otherwise, the experiment stored location will be modified." + "Please don't reinitialize Qlib if QlibRecorder is already activated. Otherwise, the experiment stored location will be modified." ) self._provider = provider diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index b9f4200155..19bec02dd2 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -111,7 +111,7 @@ def delete_recorder(self, recorder_id): """ raise NotImplementedError(f"Please implement the `delete_recorder` method.") - def get_recorder(self, recorder_id=None, recorder_name=None, create: bool = True, start: bool = False): + def get_recorder(self, recorder_id=None, recorder_name=None, create: bool = True, start: bool = False) -> Recorder: """ Retrieve a Recorder for user. When user specify recorder id and name, the method will try to return the specific recorder. When user does not provide recorder id or name, the method will try to return the current diff --git a/qlib/workflow/recorder.py b/qlib/workflow/recorder.py index 9d6e03b4e5..b210dd052e 100644 --- a/qlib/workflow/recorder.py +++ b/qlib/workflow/recorder.py @@ -3,6 +3,7 @@ import os import sys +from typing import Optional import mlflow import logging import shutil @@ -138,6 +139,20 @@ def log_metrics(self, step=None, **kwargs): """ raise NotImplementedError(f"Please implement the `log_metrics` method.") + + def log_artifact(self, local_path: str, artifact_path: Optional[str]=None): + """ + Log a local file or directory as an artifact of the currently active run. + + Parameters + ---------- + local_path : str + Path to the file to write. + artifact_path : Optional[str] + If provided, the directory in ``artifact_uri`` to write to. + """ + raise NotImplementedError(f"Please implement the `log_metrics` method.") + def set_tags(self, **kwargs): """ Log a batch of tags for the current run. @@ -175,6 +190,28 @@ def list_artifacts(self, artifact_path: str = None): """ raise NotImplementedError(f"Please implement the `list_artifacts` method.") + def download_artifact(self, path: str, dst_path: Optional[str] = None) -> str: + """ + Download an artifact file or directory from a run to a local directory if applicable, + and return a local path for it. + + Parameters + ---------- + path : str + Relative source path to the desired artifact. + dst_path : Optional[str] + Absolute path of the local filesystem destination directory to which to + download the specified artifacts. This directory must already exist. + If unspecified, the artifacts will either be downloaded to a new + uniquely-named directory on the local filesystem. + + Returns + ------- + str + Local path of desired artifact. + """ + raise NotImplementedError(f"Please implement the `list_artifacts` method.") + def list_metrics(self): """ List all the metrics of a recorder. @@ -212,6 +249,13 @@ class MLflowRecorder(Recorder): Due to the fact that mlflow will only log artifact from a file or directory, we decide to use file manager to help maintain the objects in the project. + + Instead of using mlflow directly, we use another interface wrapping mlflow to log experiments. + Though it takes extra efforts, but it brings users benefits due to following reasons. + - It will be more convenient to change the experiment logging backend without changing any code in upper level + - We can provide more convenience to automatically do some extra things and make interface easier. For examples: + - It can automatically log the uncommitted code + - User can control several different runs by just creating different Recorder (in mlflow, you always have to switch artifact_uri and pass in run ids frequently) """ def __init__(self, experiment_id, uri, name=None, mlflow_run=None): @@ -398,6 +442,9 @@ def log_metrics(self, step=None, **kwargs): for name, data in kwargs.items(): self.client.log_metric(self.id, name, data, step=step) + def log_artifact(self, local_path, artifact_path: Optional[str] = None): + self.client.log_artifact(self.id, local_path=local_path, artifact_path=artifact_path) + @AsyncCaller.async_dec(ac_attr="async_log") def set_tags(self, **kwargs): for name, data in kwargs.items(): @@ -420,6 +467,9 @@ def list_artifacts(self, artifact_path=None): artifacts = self.client.list_artifacts(self.id, artifact_path) return [art.path for art in artifacts] + def download_artifact(self, path: str, dst_path: Optional[str] = None) -> str: + return self.client.download_artifacts(self.id, path, dst_path) + def list_metrics(self): run = self.client.get_run(self.id) return run.data.metrics