forked from jacopotagliabue/session-path
-
Notifications
You must be signed in to change notification settings - Fork 0
/
model_pipeline.py
193 lines (159 loc) · 6.99 KB
/
model_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
This script runs a local Luigi pipeline that goes from zero to a fully trained and tested Ludwig model.
The pipeline stores the sequence of the intermediate models and feature files in a timestamp-based folder:
to re-run specific sequences, the folder name can be specified in the init process at the bottom.
The pipeline has four steps:
1. train prod2vec model on user behavior
2. prepare training and testing dataset as Ludwig-friendly csv files
3. define and train Ludwig model
4. re-load and test Ludwig model
If you already have files for embeddings and the dataset, you can also just try out ludwig with the stand-alone
script in the ludwig_playground folder.
"""
import os
import csv
import json
import numpy as np
from time import time
import luigi
from snowflake_client import SnowflakeClient
from dotenv import load_dotenv
# now import all the scripts we are gluing together
from prod2vec_train import calculate_prod_to_vecs
from ludwig_wrapper import train_with_ludwig, run_test_with_ludwig
from data_service import prepare_training_and_test_set
# load env variables from local file
load_dotenv(dotenv_path='.env', verbose=True)
# get a Snowflake instance for the entire pipeline
sf_client = SnowflakeClient(
user=os.getenv('SNOWFLAKE_USER'),
pwd=os.getenv('SNOWFLAKE_PWD'),
account=os.getenv('SNOWFLAKE_ACCOUNT'),
keep_alive=False
)
def convert(o):
"""
Helper function to json dump np.int64 objects to a local JSON
:param o: object to convert
:return: either a Python int or the original object
"""
if isinstance(o, np.int64):
return int(o)
raise o
# Task 0 - Calculate Product Embeddings
class Prod2Vec(luigi.Task):
"""
This class encapsulates the function training product embedding on browsing data; prod2vec embeddings are
used by the final model to represent in-session intent for the shopper
"""
def requires(self):
return None
def output(self):
return luigi.LocalTarget(os.path.join(os.getenv('PIPELINE_FOLDER'), 'prod2vec.tsv'))
def run(self):
"""
The function calculate_prod_to_vecs returns a gensim Keyed Vector object, which is then saved locally
in the Glove format
"""
prod2vec_model = calculate_prod_to_vecs(
env_id=os.getenv('ENV_ID'),
train_start=os.getenv('TRAIN_START'),
train_end=os.getenv('TRAIN_END'),
snowflake_client=sf_client
)
with open(self.output().path, "w") as f:
for sku in prod2vec_model.vocab:
f.write("{}\t{}\n".format(sku, '\t'.join(['{:.10f}'.format(_) for _ in prod2vec_model[sku]])))
return
# Task 1 - Prepare Training and Test Dataset
class PrepareDataset(luigi.Task):
"""
This class encapsulates the function responsible to retrieve the dataset for training/testing.
"""
def requires(self):
return Prod2Vec()
def output(self):
return luigi.LocalTarget(os.path.join(os.getenv('PIPELINE_FOLDER'), 'data.csv'))
def run(self):
"""
The function prepare_training_and_test_set returns a list of dictionary. Each dictionary is a row,
with field: value structure - fields are: "sku_in_session" for in-session product interaction, "query"
for the query made by the user, "path" as the target taxonomy path.
"""
data_set = prepare_training_and_test_set(snowflake_client=sf_client,
env_id=os.getenv('ENV_ID'),
start_date=os.getenv('TRAIN_START'),
end_date=os.getenv('TRAIN_END'),
category_map_file=os.path.join('data', os.getenv('CATALOG_FILE')))
# make sure there is data!
assert len(data_set) > 0
# write to final csv
with open(self.output().path, 'w') as f:
w = csv.DictWriter(f, data_set[0].keys())
w.writeheader()
for d in data_set:
w.writerow(d)
return
# Task 2 - Train Wide-and-Deep Enc-Decoder
class LudwigTrain(luigi.Task):
def requires(self):
return {
'data': PrepareDataset(),
'embeddings': Prod2Vec()
}
def output(self):
return luigi.LocalTarget(os.path.join(os.getenv('PIPELINE_FOLDER'), 'train_stats.json'))
def run(self):
ludwig_model_definition = {
'input_features': [
{'name': 'skus_in_session', 'type': 'set',
'pretrained_embeddings': self.input()['embeddings'].path, 'embedding_size': 48,
'embeddings_trainable': False},
{'name': 'query', 'type': 'text', 'encoder': 'rnn', 'level': 'char'}
],
'combiner': {'type': 'concat', 'num_fc_layers': 2},
'output_features': [
{'name': 'path', 'cell_type': 'lstm', 'type': 'sequence'}
],
'training': {'epochs': 100, 'early_stopping': 5}
}
train_stats = train_with_ludwig(model_definition=ludwig_model_definition,
train_file_csv=self.input()['data'].path,
target_model_path=luigi.LocalTarget(os.path.join(os.getenv('PIPELINE_FOLDER'))).path
)
with open(self.output().path, 'w') as outfile:
json.dump(train_stats, outfile)
return
# Task 3 - Test Deep Model
class LudwigTest(luigi.Task):
def requires(self):
return {
'train': LudwigTrain(),
'data': PrepareDataset()
}
def output(self):
return luigi.LocalTarget(os.path.join(os.getenv('PIPELINE_FOLDER'), 'test_stats.json'))
def run(self):
predictions, test_stats = run_test_with_ludwig(model_path=luigi.LocalTarget(os.path.join(os.getenv('PIPELINE_FOLDER'))).path,
test_file_csv=self.input()['data'].path)
print("\n ======> Test accuracy (last): {}".format(test_stats['path']['last_accuracy']))
with open(self.output().path, 'w') as outfile:
json.dump(test_stats['path'], outfile, default=convert)
return
if __name__ == '__main__':
# just some paths we need
MODELS_DIR = os.getenv('MODELS_DIR')
print("Target folder is {}".format(MODELS_DIR))
# create a local output folder
if not os.path.exists(MODELS_DIR):
os.makedirs(MODELS_DIR)
# attach a specific timestamp to the run
pipeline_timestamp = str(int(time()))
PIPELINE_FOLDER = os.path.join(MODELS_DIR, pipeline_timestamp)
# create if does not exists
if not os.path.exists(PIPELINE_FOLDER):
os.makedirs(PIPELINE_FOLDER)
# set folder as env for the entire process
os.environ['PIPELINE_FOLDER'] = PIPELINE_FOLDER
# just run pipeline locally
luigi.build([LudwigTest()], local_scheduler=True)