Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple freq support #580

Merged
merged 15 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ jobs:
- name: Lint with Black
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
python.exe -m pip install pip --upgrade
python.exe -m pip install wheel --upgrade
python.exe -m pip install black
python.exe -m black qlib -l 120 --check --diff
else
python -m pip install pip --upgrade
python -m pip install wheel --upgrade
python -m pip install black
python -m black qlib -l 120 --check --diff
fi
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ jobs:
- name: Lint with Black
run: |
cd ..
python -m pip install pip --upgrade
python -m pip install wheel --upgrade
python -m pip install black
python -m black qlib -l 120 --check --diff
# Test Qlib installed with pip
Expand Down
16 changes: 16 additions & 0 deletions examples/benchmarks/LightGBM/features_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import datetime
import pandas as pd

from qlib.data.inst_processor import InstProcessor


class Resample1minProcessor(InstProcessor):
def __init__(self, hour: int, minute: int, **kwargs):
self.hour = hour
self.minute = minute

def __call__(self, df: pd.DataFrame, *args, **kwargs):
df.index = pd.to_datetime(df.index)
df = df.loc[df.index.time == datetime.time(self.hour, self.minute)]
df.index = df.index.normalize()
return df
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
qlib_init:
provider_uri:
day: "~/.qlib/qlib_data/cn_data"
1min: "~/.qlib/qlib_data/cn_data_1min"
region: cn
dataset_cache: null
maxtasksperchild: 1
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
# 1min closing time is 15:00:00
end_time: "2020-08-01 15:00:00"
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
freq:
label: day
feature: 1min
# with label as reference
inst_processor:
feature:
- class: Resample1minProcessor
module_path: features_sample.py
kwargs:
hour: 14
minute: 56

port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: LGBModel
module_path: qlib.contrib.model.gbdt
kwargs:
loss: mse
colsample_bytree: 0.8879
learning_rate: 0.2
subsample: 0.8789
lambda_l1: 205.6999
lambda_l2: 580.9768
max_depth: 8
num_leaves: 210
num_threads: 20
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs: {}
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config
67 changes: 33 additions & 34 deletions qlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,69 +33,70 @@ def init(default_conf="client", **kwargs):
H.clear()
C.set(default_conf, **kwargs)

# check path if server/local
if C.get_uri_type() == C.LOCAL_URI:
if not os.path.exists(C["provider_uri"]):
if C["auto_mount"]:
logger.error(
f"Invalid provider uri: {C['provider_uri']}, please check if a valid provider uri has been set. This path does not exist."
)
else:
logger.warning(f"auto_path is False, please make sure {C['mount_path']} is mounted")
elif C.get_uri_type() == C.NFS_URI:
_mount_nfs_uri(C)
else:
raise NotImplementedError(f"This type of URI is not supported")
# mount nfs
for _freq, provider_uri in C.provider_uri.items():
mount_path = C["mount_path"][_freq]
# check path if server/local
uri_type = C.dpm.get_uri_type(provider_uri)
if uri_type == C.LOCAL_URI:
if not Path(provider_uri).exists():
if C["auto_mount"]:
logger.error(
f"Invalid provider uri: {provider_uri}, please check if a valid provider uri has been set. This path does not exist."
)
else:
logger.warning(f"auto_path is False, please make sure {mount_path} is mounted")
elif uri_type == C.NFS_URI:
_mount_nfs_uri(provider_uri, mount_path, C["auto_mount"])
else:
raise NotImplementedError(f"This type of URI is not supported")

C.register()

if "flask_server" in C:
logger.info(f"flask_server={C['flask_server']}, flask_port={C['flask_port']}")
logger.info("qlib successfully initialized based on %s settings." % default_conf)
logger.info(f"data_path={C.get_data_path()}")
data_path = {_freq: C.dpm.get_data_path(_freq) for _freq in C.dpm.provider_uri.keys()}
logger.info(f"data_path={data_path}")


def _mount_nfs_uri(C):
def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False):

LOG = get_module_logger("mount nfs", level=logging.INFO)

# FIXME: the C["provider_uri"] is modified in this function
# If it is not modified, we can pass only provider_uri or mount_path instead of C
mount_command = "sudo mount.nfs %s %s" % (C["provider_uri"], C["mount_path"])
mount_command = "sudo mount.nfs %s %s" % (provider_uri, mount_path)
# If the provider uri looks like this 172.23.233.89//data/csdesign'
# It will be a nfs path. The client provider will be used
if not C["auto_mount"]:
if not os.path.exists(C["mount_path"]):
if not auto_mount:
if not Path(mount_path).exists():
raise FileNotFoundError(
f"Invalid mount path: {C['mount_path']}! Please mount manually: {mount_command} or Set init parameter `auto_mount=True`"
f"Invalid mount path: {mount_path}! Please mount manually: {mount_command} or Set init parameter `auto_mount=True`"
)
else:
# Judging system type
sys_type = platform.system()
if "win" in sys_type.lower():
# system: window
exec_result = os.popen("mount -o anon %s %s" % (C["provider_uri"], C["mount_path"] + ":"))
exec_result = os.popen("mount -o anon %s %s" % (provider_uri, mount_path + ":"))
result = exec_result.read()
if "85" in result:
LOG.warning("already mounted or window mount path already exists")
LOG.warning(f"{provider_uri} on Windows:{mount_path} is already mounted")
elif "53" in result:
raise OSError("not find network path")
elif "error" in result or "错误" in result:
raise OSError("Invalid mount path")
elif C["provider_uri"] in result:
elif provider_uri in result:
LOG.info("window success mount..")
else:
raise OSError(f"unknown error: {result}")

# config mount path
C["mount_path"] = C["mount_path"] + ":\\"
else:
# system: linux/Unix/Mac
# check mount
_remote_uri = C["provider_uri"]
_remote_uri = _remote_uri[:-1] if _remote_uri.endswith("/") else _remote_uri
_mount_path = C["mount_path"]
_mount_path = _mount_path[:-1] if _mount_path.endswith("/") else _mount_path
_remote_uri = provider_uri[:-1] if provider_uri.endswith("/") else provider_uri
_mount_path = mount_path[:-1] if mount_path.endswith("/") else mount_path
_check_level_num = 2
_is_mount = False
while _check_level_num:
Expand All @@ -121,11 +122,9 @@ def _mount_nfs_uri(C):

if not _is_mount:
try:
os.makedirs(C["mount_path"], exist_ok=True)
Path(mount_path).mkdir(parents=True, exist_ok=True)
except Exception:
raise OSError(
f"Failed to create directory {C['mount_path']}, please create {C['mount_path']} manually!"
)
raise OSError(f"Failed to create directory {mount_path}, please create {mount_path} manually!")

# check nfs-common
command_res = os.popen("dpkg -l | grep nfs-common")
Expand All @@ -136,11 +135,11 @@ def _mount_nfs_uri(C):
command_status = os.system(mount_command)
if command_status == 256:
raise OSError(
f"mount {C['provider_uri']} on {C['mount_path']} error! Needs SUDO! Please mount manually: {mount_command}"
f"mount {provider_uri} on {mount_path} error! Needs SUDO! Please mount manually: {mount_command}"
)
elif command_status == 32512:
# LOG.error("Command error")
raise OSError(f"mount {C['provider_uri']} on {C['mount_path']} error! Command error")
raise OSError(f"mount {provider_uri} on {mount_path} error! Command error")
elif command_status == 0:
LOG.info("Mount finished")
else:
Expand Down
101 changes: 76 additions & 25 deletions qlib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import re
import copy
import logging
import platform
import multiprocessing
from pathlib import Path
from typing import Union


class Config:
Expand Down Expand Up @@ -82,6 +84,15 @@ def set_conf_from_C(self, config_c):
"dataset_provider": "LocalDatasetProvider",
"provider": "LocalProvider",
# config it in qlib.init()
# "provider_uri" str or dict:
# # str
# "~/.qlib/stock_data/cn_data"
# # dict
# {"day": "~/.qlib/stock_data/cn_data", "1min": "~/.qlib/stock_data/cn_data_1min"}
# NOTE: provider_uri priority:
# 1. backend_config: backend_obj["kwargs"]["provider_uri"]
# 2. backend_config: backend_obj["kwargs"]["provider_uri_map"]
# 3. qlib.init: provider_uri
"provider_uri": "",
# cache
"expression_cache": None,
Expand Down Expand Up @@ -228,11 +239,43 @@ class QlibConfig(Config):
# URI_TYPE
LOCAL_URI = "local"
NFS_URI = "nfs"
DEFAULT_FREQ = "__DEFAULT_FREQ"

def __init__(self, default_conf):
super().__init__(default_conf)
self._registered = False

class DataPathManager:
def __init__(self, provider_uri: Union[str, Path, dict], mount_path: Union[str, Path, dict]):
self.provider_uri = provider_uri
self.mount_path = mount_path

@staticmethod
def get_uri_type(uri: Union[str, Path]):
uri = uri if isinstance(uri, str) else str(uri.expanduser().resolve())
is_win = re.match("^[a-zA-Z]:.*", uri) is not None # such as 'C:\\data', 'D:'
# such as 'host:/data/' (User may define short hostname by themselves or use localhost)
is_nfs_or_win = re.match("^[^/]+:.+", uri) is not None

if is_nfs_or_win and not is_win:
return QlibConfig.NFS_URI
else:
return QlibConfig.LOCAL_URI

def get_data_path(self, freq: str = None) -> Path:
if freq is None or freq not in self.provider_uri:
freq = QlibConfig.DEFAULT_FREQ
_provider_uri = self.provider_uri[freq]
if self.get_uri_type(_provider_uri) == QlibConfig.LOCAL_URI:
return Path(_provider_uri)
elif self.get_uri_type(_provider_uri) == QlibConfig.NFS_URI:
if "win" in platform.system().lower():
# windows, mount_path is the drive
return Path(f"{self.mount_path[freq]}:\\")
return Path(self.mount_path[freq])
else:
raise NotImplementedError(f"This type of uri is not supported")

def set_mode(self, mode):
# raise KeyError
self.update(MODE_CONF[mode])
Expand All @@ -242,32 +285,39 @@ def set_region(self, region):
# raise KeyError
self.update(_default_region_config[region])

@property
def dpm(self):
return self.DataPathManager(self["provider_uri"], self["mount_path"])

def resolve_path(self):
# resolve path
if self["mount_path"] is not None:
self["mount_path"] = str(Path(self["mount_path"]).expanduser().resolve())

if self.get_uri_type() == QlibConfig.LOCAL_URI:
self["provider_uri"] = str(Path(self["provider_uri"]).expanduser().resolve())

def get_uri_type(self):
is_win = re.match("^[a-zA-Z]:.*", self["provider_uri"]) is not None # such as 'C:\\data', 'D:'
is_nfs_or_win = (
re.match("^[^/]+:.+", self["provider_uri"]) is not None
) # such as 'host:/data/' (User may define short hostname by themselves or use localhost)

if is_nfs_or_win and not is_win:
return QlibConfig.NFS_URI
else:
return QlibConfig.LOCAL_URI

def get_data_path(self):
if self.get_uri_type() == QlibConfig.LOCAL_URI:
return self["provider_uri"]
elif self.get_uri_type() == QlibConfig.NFS_URI:
return self["mount_path"]
else:
raise NotImplementedError(f"This type of uri is not supported")
_mount_path = self["mount_path"]
_provider_uri = self["provider_uri"]
if _provider_uri is None:
raise ValueError("provider_uri cannot be None")
if not isinstance(_provider_uri, dict):
_provider_uri = {self.DEFAULT_FREQ: _provider_uri}
if not isinstance(_mount_path, dict):
_mount_path = {_freq: _mount_path for _freq in _provider_uri.keys()}

# check provider_uri and mount_path
_miss_freq = set(_provider_uri.keys()) - set(_mount_path.keys())
assert len(_miss_freq) == 0, f"mount_path is missing freq: {_miss_freq}"

# resolve
for _freq, _uri in _provider_uri.items():
# provider_uri
if self.DataPathManager.get_uri_type(_uri) == QlibConfig.LOCAL_URI:
_provider_uri[_freq] = str(Path(_uri).expanduser().resolve())
# mount_path
_mount_path[_freq] = (
_mount_path[_freq]
if _mount_path[_freq] is None
else str(Path(_mount_path[_freq]).expanduser().resolve())
)

self["provider_uri"] = _provider_uri
self["mount_path"] = _mount_path

def set(self, default_conf="client", **kwargs):
from .utils import set_log_with_config, get_module_logger, can_use_cache
Expand Down Expand Up @@ -300,7 +350,8 @@ def set(self, default_conf="client", **kwargs):
# check redis
if not can_use_cache():
logger.warning(
f"redis connection failed(host={self['redis_host']} port={self['redis_port']}), cache will not be used!"
f"redis connection failed(host={self['redis_host']} port={self['redis_port']}), "
f"cache will not be used!"
)
self["expression_cache"] = None
self["dataset_cache"] = None
Expand Down
Loading