Skip to content

Commit

Permalink
added task validation
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasbisurgi authored and MariusWirtz committed Jun 26, 2023
1 parent ee6a2a0 commit c203664
Showing 1 changed file with 69 additions and 10 deletions.
79 changes: 69 additions & 10 deletions rushti.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
MSG_PROCESS_ABORTED_UNCOMPLETE_PREDECESSOR = (
"Execution aborted. Process: '{process}' with parameters: {parameters} is not run "
"due to uncompleted predecessor {predecessor}, on instance: '{instance}'")
MSG_PROCESS_NOT_EXISTS = (
"Validation Failed. Process: '{process}' does not existon instance: '{instance}'")
MSG_PROCESS_PARAMS_INCORRECT = (
"Validation Failed. Process: '{process}' does not have: {parameters}, "
"on instance: '{instance}'")

# used to wrap blackslashes before using
UNIQUE_STRING = uuid.uuid4().hex[:8].upper()
Expand All @@ -73,7 +78,7 @@

def setup_tm1_services(max_workers: int, tasks_file_path: str, execution_mode: ExecutionMode) -> dict:
""" Return Dictionary with TM1ServerName (as in config.ini) : Instantiated TM1Service
:return: Dictionary server_names and TM1py.TM1Service instances pairs
"""
if not os.path.isfile(CONFIG):
Expand Down Expand Up @@ -119,7 +124,7 @@ def get_instances_from_tasks_file(execution_mode, max_workers, tasks_file_path):

def decrypt_password(encrypted_password: str) -> str:
""" b64 decoding
:param encrypted_password: encrypted password with b64
:return: password in plain text
"""
Expand Down Expand Up @@ -410,8 +415,8 @@ def execute_task(task: Task, retries: int, tm1_services: Dict[str, TM1Service])
""" Execute one line from the txt file
:param task:
:param retries:
:param tm1_services:
:return:
:param tm1_services:
:return:
"""

# check predecessors success
Expand Down Expand Up @@ -493,17 +498,71 @@ def verify_predecessors_ok(task: OptimizedTask) -> bool:
return True


def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bool:
validated_tasks = []
validation_ok = True

tasks = [task for task in tasks if isinstance(task, Task)] # --> ignore Wait(s)
for task in tasks:
current_task = {
'instance': task.instance_name,
'process': task.process_name,
'parameters': task.parameters.keys()
}

tm1 = tm1_services[task.instance_name]

# avoid repeated validations
if current_task in validated_tasks:
continue

# check for process exiatance
if not tm1.processes.exists(task.process_name):
msg = MSG_PROCESS_NOT_EXISTS.format(
process=task.process_name,
instance=task.instance_name
)
logger.error(msg)
validated_tasks.append(current_task)
validation_ok = False
continue

# check for parameters
task_params = task.parameters.keys()
if task_params:
process_params = [param['Name'] for param in tm1.processes.get(task.process_name).parameters]

# check for missing parameter names
missing_params = [param for param in task_params if param not in process_params]
if len(missing_params) > 0:
msg = MSG_PROCESS_PARAMS_INCORRECT.format(
process=task.process_name,
parameters=missing_params,
instance=task.instance_name
)
logger.error(msg)
validation_ok = False

validated_tasks.append(current_task)

return validation_ok


async def work_through_tasks(file_path: str, max_workers: int, mode: ExecutionMode, retries: int, tm1_services: dict):
""" loop through file. Add all lines to the execution queue.
:param file_path:
:param max_workers:
:param mode:
:param retries:
:param tm1_services:
:return:
:param tm1_services:
:return:
"""
tasks = get_ordered_tasks_and_waits(file_path, max_workers, mode)

if not validate_tasks(tasks, tm1_services):
logout(tm1_services)
return [False]

# split lines into the blocks separated by 'wait' line
task_sets = [
list(y)
Expand All @@ -530,17 +589,17 @@ async def work_through_tasks(file_path: str, max_workers: int, mode: ExecutionMo

def logout(tm1_services):
""" logout from all instances
:param tm1_services:
:return:
:param tm1_services:
:return:
"""
for tm1 in tm1_services.values():
tm1.logout()


def translate_cmd_arguments(*args):
""" Translation and Validity-checks for command line arguments.
:param args:
:return: tasks_file_path, maximum_workers, execution_mode
"""
Expand Down

0 comments on commit c203664

Please sign in to comment.