From 8de86fd21ec864addce5c761518d04b6e93e12ee Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 09:44:47 +0200 Subject: [PATCH 01/21] First version --- api/main.py | 99 +++++++++-- config.yml.example | 3 - docker/structure.sql | 30 +++- frontend/authentication.html | 95 ++++++++++ frontend/js/authentication.js | 33 ++++ frontend/js/helpers/main.js | 17 +- frontend/request.html | 10 +- lib/job/base.py | 20 ++- lib/job/run.py | 8 + lib/secure_variable.py | 32 ++++ lib/user.py | 166 ++++++++++++++++++ migrations/2024_008_22_authentication.sql | 27 +++ optimization_providers/durations/container.py | 4 +- runner.py | 64 ++++--- tests/lib/test_jobs.py | 11 +- tools/client.py | 1 + tools/jobs.py | 1 + tools/timeline_projects.py | 22 +-- 18 files changed, 564 insertions(+), 79 deletions(-) create mode 100644 frontend/authentication.html create mode 100644 frontend/js/authentication.js create mode 100644 lib/secure_variable.py create mode 100644 lib/user.py create mode 100644 migrations/2024_008_22_authentication.sql diff --git a/api/main.py b/api/main.py index e0c38e5a..d23576f8 100644 --- a/api/main.py +++ b/api/main.py @@ -11,15 +11,20 @@ from typing import List from xml.sax.saxutils import escape as xml_escape import math -from fastapi import FastAPI, Request, Response +from urllib.parse import urlparse + +from fastapi import FastAPI, Request, Response, Depends, HTTPException from fastapi.responses import ORJSONResponse from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import APIKeyHeader + from datetime import date from starlette.responses import RedirectResponse from starlette.exceptions import HTTPException as StarletteHTTPException +from starlette.datastructures import Headers as StarletteHeaders from pydantic import BaseModel, ValidationError, field_validator from typing import Optional @@ -38,6 +43,8 @@ from lib.diff import get_diffable_row, diff_rows from lib import error_helpers from lib.job.base import Job +from lib.user import User, UserAuthenticationError +from lib.secure_variable import SecureVariable from tools.timeline_projects import TimelineProject from enum import Enum @@ -53,7 +60,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE url=request.url, query_params=request.query_params, client=request.client, - headers=request.headers, + headers=obfuscate_authentication_token(request.headers), body=exc.body, details=exc.errors(), exception=exc @@ -71,7 +78,7 @@ async def http_exception_handler(request, exc): url=request.url, query_params=request.query_params, client=request.client, - headers=request.headers, + headers=obfuscate_authentication_token(request.headers), body=body, details=exc.detail, exception=exc @@ -84,6 +91,7 @@ async def http_exception_handler(request, exc): async def catch_exceptions_middleware(request: Request, call_next): #pylint: disable=broad-except body = None + try: body = await request.body() return await call_next(request) @@ -93,7 +101,7 @@ async def catch_exceptions_middleware(request: Request, call_next): url=request.url, query_params=request.query_params, client=request.client, - headers=request.headers, + headers=obfuscate_authentication_token(request.headers), body=body, exception=exc ) @@ -122,6 +130,37 @@ async def catch_exceptions_middleware(request: Request, call_next): allow_headers=['*'], ) +header_scheme = APIKeyHeader( + name='X-Authentication', + scheme_name='Header', + description='Authentication key - See https://docs.green-coding.io/authentication', + auto_error=False +) + +def obfuscate_authentication_token(headers: StarletteHeaders): + headers_mut = headers.mutablecopy() + if 'X-Authentication' in headers_mut: + headers_mut['X-Authentication'] = '****OBFUSCATED****' + return headers_mut + +def authenticate(authentication_token=Depends(header_scheme), request: Request = None): + parsed_url = urlparse(str(request.url)) + try: + user = User.authenticate(SecureVariable(authentication_token)) # Note that if no token is supplied this will authenticate as the DEFAULT user, which in FOSS systems has full capabilities + + if parsed_url.path not in user._capabilities['api']['routes']: + raise HTTPException(status_code=401, detail="Route not allowed") from UserAuthenticationError + + if parsed_url.path in user._capabilities['api']['quotas']: + if user._capabilities['api']['quotas'][parsed_url.path] <= 0: + raise HTTPException(status_code=401, detail="Quota exceeded") from UserAuthenticationError + user._capabilities['api']['quotas'][parsed_url.path] -= 1 + user.update() + + except UserAuthenticationError: + raise HTTPException(status_code=401, detail="Invalid token") from UserAuthenticationError + return user + @app.get('/') async def home(): @@ -220,6 +259,7 @@ async def get_repositories(uri: str | None = None, branch: str | None = None, ma return ORJSONResponse({'success': True, 'data': escaped_data}) + # A route to return all of the available entries in our catalog. @app.get('/v1/runs') async def get_runs(uri: str | None = None, branch: str | None = None, machine_id: int | None = None, machine: str | None = None, filename: str | None = None, limit: int | None = None, uri_mode = 'none'): @@ -597,7 +637,6 @@ async def get_jobs(machine_id: int | None = None, state: str | None = None): return ORJSONResponse({'success': True, 'data': data}) -#### class HogMeasurement(BaseModel): time: int @@ -665,7 +704,10 @@ def validate_measurement_data(data): return True @app.post('/v1/hog/add') -async def hog_add(measurements: List[HogMeasurement]): +async def hog_add( + measurements: List[HogMeasurement], + user: User = Depends(authenticate), # pylint: disable=unused-argument + ): for measurement in measurements: decoded_data = base64.b64decode(measurement.data) @@ -1025,9 +1067,6 @@ async def hog_get_task_details(machine_uuid: str, measurements_id_start: int, me return ORJSONResponse({'success': True, 'tasks_data': tasks_data, 'coalitions_data': coalitions_data}) - -#### - class Software(BaseModel): name: str url: str @@ -1038,7 +1077,7 @@ class Software(BaseModel): schedule_mode: str @app.post('/v1/software/add') -async def software_add(software: Software): +async def software_add(software: Software, user: User = Depends(authenticate)): software = html_escape_multi(software) @@ -1064,22 +1103,26 @@ async def software_add(software: Software): if not DB().fetch_one('SELECT id FROM machines WHERE id=%s AND available=TRUE', params=(software.machine_id,)): raise RequestValidationError('Machine does not exist') + if not user.can_use_machine(software.machine_id): + raise RequestValidationError('Your user does not have the permissions to use that machine.') if software.schedule_mode not in ['one-off', 'daily', 'weekly', 'commit', 'variance']: raise RequestValidationError(f"Please select a valid measurement interval. ({software.schedule_mode}) is unknown.") - # notify admin of new add - if notification_email := GlobalConfig().config['admin']['notification_email']: - Job.insert('email', name='New run added from Web Interface', message=str(software), email=notification_email) - + if not user.can_schedule_job(software.schedule_mode): + raise RequestValidationError('Your user does not have the permissions to use that schedule mode.') if software.schedule_mode in ['daily', 'weekly', 'commit']: - TimelineProject.insert(software.name, software.url, software.branch, software.filename, software.machine_id, software.schedule_mode) + TimelineProject.insert(name=software.name, url=software.url, branch=software.branch, filename=software.filename, machine_id=software.machine_id, user_id=user._id, schedule_mode=software.schedule_mode) # even for timeline projects we do at least one run amount = 10 if software.schedule_mode == 'variance' else 1 for _ in range(0,amount): - Job.insert('run', name=software.name, url=software.url, email=software.email, branch=software.branch, filename=software.filename, machine_id=software.machine_id) + Job.insert('run', user_id=user._id, name=software.name, url=software.url, email=software.email, branch=software.branch, filename=software.filename, machine_id=software.machine_id) + + # notify admin of new add + if notification_email := GlobalConfig().config['admin']['notification_email']: + Job.insert('email', user_id=user._id, name='New run added from Web Interface', message=str(software), email=notification_email) return ORJSONResponse({'success': True}, status_code=202) @@ -1172,7 +1215,11 @@ class CI_Measurement(BaseModel): co2eq: Optional[str] = '' @app.post('/v1/ci/measurement/add') -async def post_ci_measurement_add(request: Request, measurement: CI_Measurement): +async def post_ci_measurement_add( + request: Request, + measurement: CI_Measurement, + user: User = Depends(authenticate) # pylint: disable=unused-argument + ): for key, value in measurement.model_dump().items(): match key: case 'unit': @@ -1384,7 +1431,11 @@ def empty_str_to_none(cls, values, _): return values @app.post('/v1/carbondb/add') -async def add_carbondb(request: Request, energydatas: List[EnergyData]): +async def add_carbondb( + request: Request, + energydatas: List[EnergyData], + user: User = Depends(authenticate) # pylint: disable=unused-argument + ): client_ip = request.headers.get("x-forwarded-for") if client_ip: @@ -1448,5 +1499,17 @@ async def carbondb_get_company_project_details(cptype: str, uuid: str): return ORJSONResponse({'success': True, 'data': data}) +# @app.get('/v1/authentication/new') +# async def get_authentication_token(name: str = None): +# if name is not None and name.strip() == '': +# name = None +# return ORJSONResponse({'success': True, 'data': User.get_new(name)}) + +@app.get('/v1/authentication/data') +async def read_authentication_token(user: User = Depends(authenticate)): + return ORJSONResponse({'success': True, 'data': user.__dict__}) + + + if __name__ == '__main__': app.run() # pylint: disable=no-member diff --git a/config.yml.example b/config.yml.example index 18b9dbb4..a478989c 100644 --- a/config.yml.example +++ b/config.yml.example @@ -26,7 +26,6 @@ admin: email_bcc: False - cluster: api_url: __API_URL__ metrics_url: __METRICS_URL__ @@ -62,8 +61,6 @@ measurement: pre-test-sleep: 5 idle-duration: 5 baseline-duration: 5 - flow-process-duration: 1800 # half hour - total-duration: 3600 # one hour post-test-sleep: 5 phase-transition-time: 1 boot: diff --git a/docker/structure.sql b/docker/structure.sql index c8dedcbf..079f0a90 100644 --- a/docker/structure.sql +++ b/docker/structure.sql @@ -4,6 +4,30 @@ CREATE DATABASE "green-coding"; CREATE EXTENSION "uuid-ossp"; CREATE EXTENSION "moddatetime"; +CREATE TABLE users ( + id SERIAL PRIMARY KEY, + name text, + token text NOT NULL, + capabilities JSONB NOT NULL, + created_at timestamp with time zone DEFAULT now(), + updated_at timestamp with time zone +); + +CREATE UNIQUE INDEX name_unique ON users(name text_ops); +CREATE UNIQUE INDEX token_unique ON users(token text_ops); + +CREATE TRIGGER users_moddatetime + BEFORE UPDATE ON users + FOR EACH ROW + EXECUTE PROCEDURE moddatetime (updated_at); + +# Default password for authentication is DEFAULT +INSERT INTO "public"."users"("name","token","capabilities","created_at","updated_at") +VALUES +(E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); + + + CREATE TABLE machines ( id SERIAL PRIMARY KEY, description text, @@ -36,6 +60,7 @@ CREATE TABLE jobs ( categories int[], machine_id int REFERENCES machines(id) ON DELETE SET NULL ON UPDATE CASCADE, message text, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -67,6 +92,7 @@ CREATE TABLE runs ( logs text, invalid_run text, failed boolean DEFAULT false, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -183,6 +209,7 @@ CREATE TABLE ci_measurements ( city text, co2i text, co2eq text, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -217,6 +244,7 @@ CREATE TABLE timeline_projects ( machine_id integer REFERENCES machines(id) ON DELETE RESTRICT ON UPDATE CASCADE NOT NULL, schedule_mode text NOT NULL, last_scheduled timestamp with time zone, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -238,6 +266,7 @@ CREATE TABLE hog_measurements ( thermal_pressure text, settings jsonb, data jsonb, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -287,7 +316,6 @@ CREATE TABLE hog_tasks ( diskio_byteswritten bigint, intr_wakeups bigint, idle_wakeups bigint, - data jsonb, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone diff --git a/frontend/authentication.html b/frontend/authentication.html new file mode 100644 index 00000000..9d538428 --- /dev/null +++ b/frontend/authentication.html @@ -0,0 +1,95 @@ + + + + + + + + + + + + + Green Metrics Tool + + + + + + + + + + + + + + +
+

+ + Authentication +

+
+
+
Authentication in GMT
+
+

The Green Metrics Tool supports restricting certain functionalites for fair-use or premium access.

+

In case you already have a token you can input it here to use it with all API calls in this Dashboard and / or see your current capabilities and quotas.

+

If you want to acquire a new authentication token to access certain premium features that are not distributed with the open-source version contact us at info@green-coding.io

+
+
+
+
+
+
Your authentication token
+
+
+
+ +
+

+ + +
+
+
+
+ +
+
+ + \ No newline at end of file diff --git a/frontend/js/authentication.js b/frontend/js/authentication.js new file mode 100644 index 00000000..cd3d678d --- /dev/null +++ b/frontend/js/authentication.js @@ -0,0 +1,33 @@ +(() => { + var authentication_token = localStorage.getItem('authentication_token'); + if(authentication_token == null) authentication_token = 'DEFAULT'; + + $(window).on('load', function() { + $("#authentication-token").val(authentication_token); + }) + + // $('#create-authentication-token').on('click', async function(){ + // try { + // $('#new-token-message').hide(); + // var new_authentication_token = await makeAPICall(`/v1/authentication/new?name=${$("#new-token-name").val()}`); + // $('#new-token-message').show(); + // $('#new-token').text(new_authentication_token.data); + // } catch (err) { + // showNotification('Could not create new authentication token', err); + // } + // }) + + $('#save-authentication-token').on('click', async function(){ + localStorage.setItem('authentication_token', $("#authentication-token").val()); + try { + $('#token-details-message').hide(); + var user_data = await makeAPICall('/v1/authentication/data'); + + $('#token-details-message').show(); + $('#token-details').text(JSON.stringify(user_data.data, null, 2)); + } catch (err) { + showNotification('Could not read authentication token data', err); + } + }) + +})(); diff --git a/frontend/js/helpers/main.js b/frontend/js/helpers/main.js index c32b62b4..d26aa3d8 100644 --- a/frontend/js/helpers/main.js +++ b/frontend/js/helpers/main.js @@ -25,15 +25,18 @@ class GMTMenu extends HTMLElement { Eco-CI - - Status - Power Hog CarbonDB + + Status + + + Authentication + Settings @@ -140,6 +143,7 @@ const escapeString = (string) =>{ async function makeAPICall(path, values=null) { + if(values != null ) { var options = { method: "POST", @@ -149,9 +153,14 @@ async function makeAPICall(path, values=null) { } } } else { - var options = { method: 'GET' } + var options = { method: 'GET', headers: {} } } + let authentication_token = localStorage.getItem('authentication_token'); + if(authentication_token == null) authentication_token = 'DEFAULT'; + + options.headers['X-Authentication'] = authentication_token; + let json_response = null; if(localStorage.getItem('remove_idle') == 'true') path += "?remove_idle=true" await fetch(API_URL + path, options) diff --git a/frontend/request.html b/frontend/request.html index e114ead6..1e8cc42b 100644 --- a/frontend/request.html +++ b/frontend/request.html @@ -54,15 +54,16 @@

Submit software for measurement
-
+
@@ -84,6 +85,7 @@

+

Find the specifications of the machines in our documentation

-

Find the specifications of the machines in our documentation

-
diff --git a/lib/job/base.py b/lib/job/base.py index dee5c270..79f0cf2a 100644 --- a/lib/job/base.py +++ b/lib/job/base.py @@ -23,7 +23,7 @@ """ class Job(ABC): - def __init__(self, state, name, email, url, branch, filename, machine_id, run_id, job_id, machine_description, message): + def __init__(self, *, state, name, email, url, branch, filename, machine_id, user_id, run_id, job_id, machine_description, message): self._id = job_id self._state = state self._name = name @@ -32,6 +32,7 @@ def __init__(self, state, name, email, url, branch, filename, machine_id, run_i self._branch = branch self._filename = filename self._machine_id = machine_id + self._user_id = user_id self._machine_description = machine_description self._run_id = run_id self._message = message @@ -71,18 +72,18 @@ def _process(self, **kwargs): pass @classmethod - def insert(cls, job_type, *, name=None, url=None, email=None, branch=None, filename=None, machine_id=None, message=None): + def insert(cls, job_type, *, user_id, name=None, url=None, email=None, branch=None, filename=None, machine_id=None, message=None): if job_type == 'run' and (not branch or not url or not filename or not machine_id): raise RuntimeError('For adding runs branch, url, filename and machine_id must be set') query = """ INSERT INTO - jobs (type, name, url, email, branch, filename, machine_id, message, state, created_at) + jobs (type, name, url, email, branch, filename, machine_id, user_id, message, state, created_at) VALUES - (%s, %s, %s, %s, %s, %s, %s, %s, 'WAITING', NOW()) RETURNING id; + (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'WAITING', NOW()) RETURNING id; """ - params = (job_type, name, url, email, branch, filename, machine_id, message) + params = (job_type, name, url, email, branch, filename, machine_id, user_id, message) return DB().fetch_one(query, params=params)[0] # A static method to get a job object @@ -93,7 +94,7 @@ def get_job(cls, job_type): query = ''' SELECT j.id, j.state, j.name, j.email, j.url, j.branch, - j.filename, j.machine_id, m.description, j.message, r.id as run_id + j.filename, j.machine_id, j.user_id, m.description, j.message, r.id as run_id FROM jobs as j LEFT JOIN machines as m on m.id = j.machine_id LEFT JOIN runs as r on r.job_id = j.id @@ -132,9 +133,10 @@ def get_job(cls, job_type): branch=job[5], filename=job[6], machine_id=job[7], - machine_description=job[8], - message=job[9], - run_id=job[10], + user_id=job[8], + machine_description=job[9], + message=job[10], + run_id=job[11], ) @classmethod diff --git a/lib/job/run.py b/lib/job/run.py index ceb72afa..f205e7ba 100644 --- a/lib/job/run.py +++ b/lib/job/run.py @@ -11,6 +11,7 @@ from lib.job.base import Job from lib.global_config import GlobalConfig from lib.db import DB +from lib.user import User from lib.terminal_colors import TerminalColors from lib.system_checks import ConfigurationCheckError from tools.phase_stats import build_and_store_phase_stats @@ -27,6 +28,8 @@ def check_job_running(self): #pylint: disable=arguments-differ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_prune=False): + user = User(self._user_id) + runner = Runner( name=self._name, uri=self._url, @@ -38,6 +41,9 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru full_docker_prune=full_docker_prune, docker_prune=docker_prune, job_id=self._id, + user_id=self._user_id, + measurement_flow_process_duration=user._capabilities['measurements']['settings']['flow-process-duration'], + measurement_total_duration=user._capabilities['measurements']['settings']['total-duration'], ) try: # Start main code. Only URL is allowed for cron jobs @@ -53,6 +59,7 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru if self._email: Job.insert( 'email', + user_id=self._user_id, email=self._email, name='Measurement Job successfully processed on Green Metrics Tool Cluster', message=f"Your report is now accessible under the URL: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={self._run_id}" @@ -64,6 +71,7 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru Job.insert( 'email', + user_id=self._user_id, email=self._email, name='Measurement Job on Green Metrics Tool Cluster failed', message=f"Run-ID: {self._run_id}\nName: {self._name}\n\nDetails can also be found in the log under: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={self._run_id}\n\nError message: {exc}\n" diff --git a/lib/secure_variable.py b/lib/secure_variable.py new file mode 100644 index 00000000..b205042a --- /dev/null +++ b/lib/secure_variable.py @@ -0,0 +1,32 @@ +import json + +class SecureVariable: + def __init__(self, value): + self._value = value + + def __repr__(self): + return '****OBFUSCATED****' + + def __str__(self): + return self.__repr__() + + def get_value(self): + return self._value + +class SecureVariableEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, SecureVariable): + return repr(o) + return super().default(o) + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('variable', help='Please supply a variable') + + args = parser.parse_args() # script will exit if arguments not present + + variable = SecureVariable(args.variable) + print("Variable print output looks like this:", variable) diff --git a/lib/user.py b/lib/user.py new file mode 100644 index 00000000..d72094c1 --- /dev/null +++ b/lib/user.py @@ -0,0 +1,166 @@ +import json +import hashlib +import uuid + +from lib.secure_variable import SecureVariable +from lib.db import DB + +class User(): + + def __init__(self, user_id: int): + user = DB().fetch_one(""" + SELECT id, name, capabilities + FROM users + WHERE id = %s + """, params=(user_id, )) + if not user: + raise RuntimeError(f"User with id {user_id} not found in database") + + self._id = user[0] + self._name = user[1] + self._capabilities = user[2] + + + def __repr__(self): + return str(self.__dict__) + + def update(self): + DB().query(""" + UPDATE users + SET capabilities = %s + WHERE id = %s + """, params=(json.dumps(self._capabilities), self._id, )) + + def can_use_machine(self, machine_id: int): + return machine_id in self._capabilities['machines'] + + def can_use_route(self, route: str): + return route in self._capabilities['api']['routes'] + + def measurement_quota(self): + if 'quota' in self._capabilities['measurement']: + return self._capabilities['measurement']['quota'] + return None # None means infinite amounts + + def api_quota(self, route: str): + if route in self._capabilities['measurement']['quota']: + return self._capabilities['measurement']['quota'][route] + return None # None means infinite amounts + + def can_schedule_job(self, schedule_mode: str): + return schedule_mode in self._capabilities['jobs']['schedule_modes'] + + @staticmethod + def authenticate(token: SecureVariable | None, silent=False): + sha256_hash = hashlib.sha256() + if token is None or token.get_value() is None: + sha256_hash.update("DEFAULT".encode('UTF-8')) + print(sha256_hash.hexdigest()) + else: + sha256_hash.update(token.get_value().encode('UTF-8')) + + user = DB().fetch_one(""" + SELECT id, name + FROM users + WHERE token = %s + """, params=((sha256_hash.hexdigest()), )) + if not user: + raise UserAuthenticationError('User with corresponding token not found') # do never output token everywhere cause it might land in logs + + print('Successfully authenticated user ', user[1]) + + return user[0] + + @staticmethod + def get_new(name=None): + + token = str(uuid.uuid4()).upper() + sha256_hash = hashlib.sha256() + sha256_hash.update(token.encode('UTF-8')) + + default_capabilities = { + "api": { + "quotas": { # An empty dictionary here means that no quotas apply + }, + "routes": [ # This will be dynamically loaded from the current main.py for all applicable routes + "/v1/carbondb/add", + "/v1/ci/measurement/add", + "/v1/software/add", + "/v1/hog/add", + "/v1/authentication/data", + ] + }, + "jobs": { + "schedule_modes": [ + "one-off", + "daily", + "weekly", + "commit", + "variance", + ], + }, + "measurements": { + "settings": { + "flow-process-duration": 3600, + "total-duration": 3600, + }, + "quotas": { # An empty dictionary here means that no quotas apply + "default": 10_000 + } + }, + "data": { + "runs": { + "retention": 3600, + }, + "measurements": { + "retention": 3600, + }, + "ci_measurements": { + "retention": 3600, + }, + "hog_measurements": { + "retention": 3600, + }, + "hog_coalitions": { + "retention": 3600, + }, + "hog_tasks": { + "retention": 3600, + }, + }, + "machines": [ # This will be dynamically loaded from the current database + 1, + ], + "optimizations": [ # This will be dynamically loaded from the current filesystem + "container_memory_utilization", + "container_cpu_utilization", + "message_optimization", + "container_build_time", + "container_boot_time", + "container_image_size", + ], + } + + user = DB().query(""" + INSERT INTO users + (name, token, capabilities) + VALUES + (%s, %s, %s) + """, params=((name, sha256_hash.hexdigest(), json.dumps(default_capabilities), ))) + + return token + +class UserAuthenticationError(Exception): + pass + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('token', help='Please supply a token to get the user') + + args = parser.parse_args() # script will exit if arguments not present + + fetched_user = User(args.token) + print("Users name is ", fetched_user._name, "and the caps are", fetched_user._capabilities) diff --git a/migrations/2024_008_22_authentication.sql b/migrations/2024_008_22_authentication.sql new file mode 100644 index 00000000..76cc39ed --- /dev/null +++ b/migrations/2024_008_22_authentication.sql @@ -0,0 +1,27 @@ +ALTER TABLE "jobs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "timeline_projects" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "runs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "ci_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "hog_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; + +CREATE TABLE users ( + id SERIAL PRIMARY KEY, + name text, + token text NOT NULL, + capabilities JSONB NOT NULL, + created_at timestamp with time zone DEFAULT now(), + updated_at timestamp with time zone +); + +CREATE UNIQUE INDEX name_unique ON users(name text_ops); +CREATE UNIQUE INDEX token_unique ON users(token text_ops); + +INSERT INTO "users"("id","name","token","capabilities","created_at","updated_at") +VALUES +(1,E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); + +CREATE TRIGGER users_moddatetime + BEFORE UPDATE ON users + FOR EACH ROW + EXECUTE PROCEDURE moddatetime (updated_at); + diff --git a/optimization_providers/durations/container.py b/optimization_providers/durations/container.py index 3d1fd0a6..cb394096 100644 --- a/optimization_providers/durations/container.py +++ b/optimization_providers/durations/container.py @@ -8,7 +8,7 @@ MAX_BOOT_DURATION = 5 # 5 seconds # pylint: disable=unused-argument -@register_reporter('container-build-time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) +@register_reporter('container_build_time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) def container_build_time(self, run, measurements, repo_path, network, notes, phases): installation_phase = run['phases'][1] @@ -25,7 +25,7 @@ def container_build_time(self, run, measurements, repo_path, network, notes, pha ) # pylint: disable=unused-argument -@register_reporter('container-boot-time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) +@register_reporter('container_boot_time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) def container_boot_time(self, run, measurements, repo_path, network, notes, phases): boot_phase = run['phases'][2] diff --git a/runner.py b/runner.py index 92ec4cc2..f80cedc4 100755 --- a/runner.py +++ b/runner.py @@ -52,7 +52,8 @@ def __init__(self, debug_mode=False, allow_unsafe=False, skip_system_checks=False, skip_unsafe=False, verbose_provider_boot=False, full_docker_prune=False, dev_no_sleeps=False, dev_no_build=False, dev_no_metrics=False, - dev_flow_timetravel=False, dev_no_optimizations=False, docker_prune=False, job_id=None): + dev_flow_timetravel=False, dev_no_optimizations=False, docker_prune=False, job_id=None, + user_id=None, measurement_flow_process_duration=None, measurement_total_duration=None): if skip_unsafe is True and allow_unsafe is True: raise RuntimeError('Cannot specify both --skip-unsafe and --allow-unsafe') @@ -88,6 +89,9 @@ def __init__(self, self._run_id = None self._commit_hash = None self._commit_timestamp = None + self._user_id = user_id + self._measurement_flow_process_duration = measurement_flow_process_duration + self._measurement_total_duration = measurement_total_duration del self._arguments['self'] # self is not needed and also cannot be serialzed. We remove it @@ -125,10 +129,19 @@ def initialize_run(self): # we also update the branch here again, as this might not be main in case of local filesystem self._run_id = DB().fetch_one(""" - INSERT INTO runs (job_id, name, uri, email, branch, filename, commit_hash, commit_timestamp, runner_arguments, created_at) - VALUES (%s, %s, %s, 'manual', %s, %s, %s, %s, %s, NOW()) + INSERT INTO runs ( + job_id, name, uri, email, branch, filename, commit_hash, + commit_timestamp, runner_arguments, user_id, created_at + ) + VALUES ( + %s, %s, %s, 'manual', %s, %s, %s, + %s, %s, %s, NOW() + ) RETURNING id - """, params=(self._job_id, self._name, self._uri, self._branch, self._original_filename, self._commit_hash, self._commit_timestamp, json.dumps(self._arguments)))[0] + """, params=( + self._job_id, self._name, self._uri, self._branch, self._original_filename, self._commit_hash, + self._commit_timestamp, json.dumps(self._arguments), self._user_id + ))[0] return self._run_id def get_optimizations_ignore(self): @@ -538,7 +551,6 @@ def clean_image_name(self, name): return name def build_docker_images(self): - config = GlobalConfig().config print(TerminalColors.HEADER, '\nBuilding Docker images', TerminalColors.ENDC) # Create directory /tmp/green-metrics-tool/docker_images @@ -594,7 +606,10 @@ def build_docker_images(self): print(' '.join(docker_build_command)) - ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', timeout=config['measurement']['total-duration'], check=False) + if self._measurement_total_duration: + ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', timeout=self._measurement_total_duration, check=False) + else: + ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', check=False) if ps.returncode != 0: print(f"Error: {ps.stderr} \n {ps.stdout}") @@ -1055,9 +1070,8 @@ def start_metric_providers(self, allow_container=True, allow_other=True): raise RuntimeError(f"Stderr on {metric_provider.__class__.__name__} was NOT empty: {stderr_read}") def check_total_runtime_exceeded(self): - config = GlobalConfig().config - if (time.time() - self.__start_measurement_seconds) > config['measurement']['total-duration']: - raise TimeoutError(f"Timeout of {config['measurement']['total-duration']} s was exceeded. This can be configured in 'total-duration'.") + if self._measurement_total_duration and (time.time() - self.__start_measurement_seconds) > self._measurement_total_duration: + raise TimeoutError(f"Timeout of {self._measurement_total_duration} s was exceeded. This can be configured in the user authentication for 'total-duration'.") def start_phase(self, phase, transition = True): config = GlobalConfig().config @@ -1101,8 +1115,6 @@ def end_phase(self, phase): self.__notes_helper.add_note({'note': f"Ending phase {phase}", 'detail_name': '[NOTES]', 'timestamp': phase_time}) def run_flows(self): - config = GlobalConfig().config - # run the flows ps_to_kill_tmp = [] ps_to_read_tmp = [] exception_occured = False @@ -1152,7 +1164,7 @@ def run_flows(self): if cmd_obj.get('detach', False) is True: - print('Process should be detached. Running asynchronously and detaching ...') + print('Executing process asynchronously and detaching ...') #pylint: disable=consider-using-with,subprocess-popen-preexec-fn ps = subprocess.Popen( docker_exec_command, @@ -1168,15 +1180,25 @@ def run_flows(self): ps_to_kill_tmp.append({'ps': ps, 'cmd': cmd_obj['command'], 'ps_group': False}) else: - print(f"Process should be synchronous. Alloting {config['measurement']['flow-process-duration']}s runtime ...") - ps = subprocess.run( - docker_exec_command, - stderr=stderr_behaviour, - stdout=stdout_behaviour, - encoding='UTF-8', - check=False, # cause it will be checked later and also ignore-errors checked - timeout=config['measurement']['flow-process-duration'], - ) + print('Executing process synchronously.') + if self._measurement_flow_process_duration: + print(f"Alloting {self._measurement_flow_process_duration}s runtime ...") + ps = subprocess.run( + docker_exec_command, + stderr=stderr_behaviour, + stdout=stdout_behaviour, + encoding='UTF-8', + check=False, # cause it will be checked later and also ignore-errors checked + timeout=self._measurement_flow_process_duration, + ) + else: + ps = subprocess.run( + docker_exec_command, + stderr=stderr_behaviour, + stdout=stdout_behaviour, + encoding='UTF-8', + check=False, # cause it will be checked later and also ignore-errors checked + ) ps_to_read_tmp.append({ 'cmd': docker_exec_command, diff --git a/tests/lib/test_jobs.py b/tests/lib/test_jobs.py index 6b0534bf..e1ac354c 100644 --- a/tests/lib/test_jobs.py +++ b/tests/lib/test_jobs.py @@ -71,7 +71,7 @@ def test_insert_job(): branch = 'main' machine_id = 1 - job_id = Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + job_id = Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) assert job_id is not None job = Job.get_job('run') assert job._state == 'WAITING' @@ -83,7 +83,7 @@ def test_simple_run_job(): branch = 'main' machine_id = 1 - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) ps = subprocess.run( ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], @@ -106,7 +106,7 @@ def test_simple_cluster_run(): branch = 'main' machine_id = 1 - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) ps = subprocess.run( ['python3', '../tools/client.py', '--testing', '--config-override', 'test-config.yml'], @@ -128,7 +128,7 @@ def test_simple_run_job_missing_filename_branch(): machine_id = 1 with pytest.raises(RuntimeError): - Job.insert('run', name=name, url=url, email=None, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, machine_id=machine_id) def test_simple_run_job_wrong_machine_id(): @@ -139,7 +139,7 @@ def test_simple_run_job_wrong_machine_id(): machine_id = 100 with pytest.raises(psycopg.errors.ForeignKeyViolation): - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) #pylint: disable=unused-variable # for the time being, until I get the mocking to work @@ -152,6 +152,7 @@ def todo_test_simple_email_job(): Job.insert( 'email', + user_id=1, email=email, name=subject, message=message, diff --git a/tools/client.py b/tools/client.py index 8f22f2af..7d8ae035 100644 --- a/tools/client.py +++ b/tools/client.py @@ -154,6 +154,7 @@ def do_cleanup(cur_temp, cooldown_time_after_job): if client_main['send_control_workload_status_mail'] and config_main['admin']['notification_email']: Job.insert( 'email', + user_id=None, email=config_main['admin']['notification_email'], name=f"{config_main['machine']['description']} is operating normally. All STDDEV below {cwl['threshold'] * 100} %", message='\n'.join(message) diff --git a/tools/jobs.py b/tools/jobs.py index 4a211630..359f806d 100644 --- a/tools/jobs.py +++ b/tools/jobs.py @@ -77,6 +77,7 @@ if job_main._email and not isinstance(exception, ConfigurationCheckError): Job.insert( 'email', + user_id=job_main._user_id, email=job_main._email, name='Measurement Job on Green Metrics Tool Cluster failed', message=f"Run-ID: {job_main._run_id}\nName: {job_main._name}\nMachine: {job_main._machine_description}\n\nDetails can also be found in the log under: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={job_main._run_id}\n\nError message: {exception}\n" diff --git a/tools/timeline_projects.py b/tools/timeline_projects.py index f5e0cb93..fc5ff49c 100644 --- a/tools/timeline_projects.py +++ b/tools/timeline_projects.py @@ -22,16 +22,16 @@ class TimelineProject(): #pylint:disable=redefined-outer-name @classmethod - def insert(cls, name, url, branch, filename, machine_id, schedule_mode): + def insert(cls, *, name, url, branch, filename, machine_id, user_id, schedule_mode): # Timeline projects never insert / use emails as they are always premium and made by admin # So they need no notification on success / add insert_query = """ INSERT INTO - timeline_projects (name, url, branch, filename, machine_id, schedule_mode, created_at) + timeline_projects (name, url, branch, filename, machine_id, user_id, schedule_mode, created_at) VALUES - (%s, %s, %s, %s, %s, %s, NOW()) RETURNING id; + (%s, %s, %s, %s, %s, %s, %s, NOW()) RETURNING id; """ - params = (name, url, branch, filename, machine_id, schedule_mode,) + params = (name, url, branch, filename, machine_id, user_id, schedule_mode,) return DB().fetch_one(insert_query, params=params)[0] @@ -63,33 +63,33 @@ def insert(cls, name, url, branch, filename, machine_id, schedule_mode): else: query = """ SELECT - id, name, url, branch, filename, machine_id, schedule_mode, last_scheduled, + id, name, url, branch, filename, machine_id, user_id, schedule_mode, last_scheduled, DATE(last_scheduled) >= DATE(NOW()) as "scheduled_today", DATE(last_scheduled) >= DATE(NOW() - INTERVAL '7 DAYS') as "scheduled_last_week" FROM timeline_projects """ data = DB().fetch_all(query) - for [project_id, name, url, branch, filename, machine_id, schedule_mode, last_scheduled, scheduled_today, scheduled_last_week] in data: + for [project_id, name, url, branch, filename, machine_id, user_id, schedule_mode, last_scheduled, scheduled_today, scheduled_last_week] in data: if not last_scheduled: print('Project was not scheduled yet ', url, branch, filename, machine_id) DB().query('UPDATE timeline_projects SET last_scheduled = NOW() WHERE id = %s', params=(project_id,)) - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=user_id, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) print('\tInserted ') elif schedule_mode == 'daily': print('Project is on daily schedule', url, branch, filename, machine_id) if scheduled_today is False: print('\tProject was not scheduled today', scheduled_today) DB().query('UPDATE timeline_projects SET last_scheduled = NOW() WHERE id = %s', params=(project_id,)) - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=user_id, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) print('\tInserted') elif schedule_mode == 'weekly': print('Project is on daily schedule', url, branch, filename, machine_id) if scheduled_last_week is False: print('\tProject was not scheduled in last 7 days', scheduled_last_week) DB().query('UPDATE timeline_projects SET last_scheduled = NOW() WHERE id = %s', params=(project_id,)) - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=user_id, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) print('\tInserted') elif schedule_mode == 'commit': - print('Project is on time schedule', url, branch, filename, machine_id) - print('This functionality is not yet implemented ...') + print('Project is on commit schedule', url, branch, filename, machine_id) + raise NotImplementedError('This functionality is not yet implemented ...') From fe8fed7b34c88fe1fb36a1a83838f100e80d04f9 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 10:12:20 +0200 Subject: [PATCH 02/21] Added daily to allowed schedule modes for default user --- docker/structure.sql | 2 +- migrations/2024_008_22_authentication.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/structure.sql b/docker/structure.sql index 079f0a90..7d3d1f32 100644 --- a/docker/structure.sql +++ b/docker/structure.sql @@ -24,7 +24,7 @@ CREATE TRIGGER users_moddatetime # Default password for authentication is DEFAULT INSERT INTO "public"."users"("name","token","capabilities","created_at","updated_at") VALUES -(E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); +(E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); diff --git a/migrations/2024_008_22_authentication.sql b/migrations/2024_008_22_authentication.sql index 76cc39ed..492901a3 100644 --- a/migrations/2024_008_22_authentication.sql +++ b/migrations/2024_008_22_authentication.sql @@ -18,7 +18,7 @@ CREATE UNIQUE INDEX token_unique ON users(token text_ops); INSERT INTO "users"("id","name","token","capabilities","created_at","updated_at") VALUES -(1,E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); +(1,E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); CREATE TRIGGER users_moddatetime BEFORE UPDATE ON users From c3b584efa8f922742b02736b60799dc0365d1243 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 13:06:56 +0200 Subject: [PATCH 03/21] REmoving user_id from object repr --- lib/user.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/user.py b/lib/user.py index d72094c1..9058b7d8 100644 --- a/lib/user.py +++ b/lib/user.py @@ -22,7 +22,9 @@ def __init__(self, user_id: int): def __repr__(self): - return str(self.__dict__) + values = self.__dict__.copy() + del values['_id'] + return str(values) def update(self): DB().query(""" @@ -162,5 +164,5 @@ class UserAuthenticationError(Exception): args = parser.parse_args() # script will exit if arguments not present - fetched_user = User(args.token) - print("Users name is ", fetched_user._name, "and the caps are", fetched_user._capabilities) + authenticated_user_id = User.authenticate(SecureVariable(args.token)) + print("User is", User(authenticated_user_id)) From 4f7b2f976d5ff4083b640d687f8f1ef9e1de69ec Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 13:18:46 +0200 Subject: [PATCH 04/21] SQL typo --- docker/structure.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/structure.sql b/docker/structure.sql index 7d3d1f32..9b9ac107 100644 --- a/docker/structure.sql +++ b/docker/structure.sql @@ -21,7 +21,7 @@ CREATE TRIGGER users_moddatetime FOR EACH ROW EXECUTE PROCEDURE moddatetime (updated_at); -# Default password for authentication is DEFAULT +-- Default password for authentication is DEFAULT INSERT INTO "public"."users"("name","token","capabilities","created_at","updated_at") VALUES (E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); From dd13b9cd71b6465c38d0c0b58c460beff63e6301 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 13:29:20 +0200 Subject: [PATCH 05/21] Fixed timeout tests --- tests/test_config_opts.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/test_config_opts.py b/tests/test_config_opts.py index bfe2d412..e8f04dc2 100644 --- a/tests/test_config_opts.py +++ b/tests/test_config_opts.py @@ -16,11 +16,9 @@ def test_global_timeout(): - total_duration_new = 1 - total_duration_before = GlobalConfig().config['measurement']['total-duration'] - GlobalConfig().config['measurement']['total-duration'] = total_duration_new + measurement_total_duration = 1 - runner = Runner(uri=CURRENT_DIR, uri_type='folder', filename='data/usage_scenarios/basic_stress.yml', skip_system_checks=True, dev_no_build=False, dev_no_sleeps=True, dev_no_metrics=True) + runner = Runner(uri=CURRENT_DIR, uri_type='folder', filename='data/usage_scenarios/basic_stress.yml', skip_system_checks=True, dev_no_build=False, dev_no_sleeps=True, dev_no_metrics=True, measurement_total_duration=1) out = io.StringIO() err = io.StringIO() @@ -28,15 +26,13 @@ def test_global_timeout(): with redirect_stdout(out), redirect_stderr(err): runner.run() except subprocess.TimeoutExpired as e: - assert str(e).startswith("Command '['docker', 'run', '--rm', '-v',") and f"timed out after {total_duration_new} seconds" in str(e), \ - Tests.assertion_info(f"Command '['docker', 'run', '--rm', '-v', ... timed out after {total_duration_new} seconds", str(e)) + assert str(e).startswith("Command '['docker', 'run', '--rm', '-v',") and f"timed out after {measurement_total_duration} seconds" in str(e), \ + Tests.assertion_info(f"Command '['docker', 'run', '--rm', '-v', ... timed out after {measurement_total_duration} seconds", str(e)) return except TimeoutError as e: - assert str(e) == f"Timeout of {total_duration_new} s was exceeded. This can be configured in 'total-duration'.", \ - Tests.assertion_info(f"Timeout of {total_duration_new} s was exceeded. This can be configured in 'total-duration'.", str(e)) + assert str(e) == f"Timeout of {measurement_total_duration} s was exceeded. This can be configured in the user authentication for 'total-duration'.", \ + Tests.assertion_info(f"Timeout of {measurement_total_duration} s was exceeded. This can be configured in the user authentication for 'total-duration'.", str(e)) return - finally: - GlobalConfig().config['measurement']['total-duration'] = total_duration_before # reset assert False, \ Tests.assertion_info('Timeout was not raised', str(out.getvalue())) From 7767047f216696eccdc647e0d8f7623622b1d0a7 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 14:34:17 +0200 Subject: [PATCH 06/21] Database is now reloaded from structure file instead of truncate --- docker/structure.sql | 2 ++ lib/db.py | 11 +++++++++++ tests/conftest.py | 14 ++++++++++---- tests/smoke_test.py | 12 ++++++++---- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/docker/structure.sql b/docker/structure.sql index 9b9ac107..1d9a9937 100644 --- a/docker/structure.sql +++ b/docker/structure.sql @@ -1,6 +1,8 @@ CREATE DATABASE "green-coding"; \c green-coding; +CREATE SCHEMA IF NOT EXISTS "public"; + CREATE EXTENSION "uuid-ossp"; CREATE EXTENSION "moddatetime"; diff --git a/lib/db.py b/lib/db.py index 163fff91..11274968 100644 --- a/lib/db.py +++ b/lib/db.py @@ -66,6 +66,17 @@ def fetch_one(self, query, params=None, row_factory=None): def fetch_all(self, query, params=None, row_factory=None): return self.__query(query, params=params, return_type='all', row_factory=row_factory) + def import_csv(self, filename): + with self._pool.connection() as conn: + conn.autocommit = True + cur = conn.cursor() + with open(filename, 'r', encoding='utf-8') as sql_file: + sql_script = sql_file.read() + for statement in sql_script.split(';'): + if statement.strip(): + cur.execute(statement) + conn.autocommit = False + def copy_from(self, file, table, columns, sep=','): with self._pool.connection() as conn: conn.autocommit = False # is implicit default diff --git a/tests/conftest.py b/tests/conftest.py index 111c76f9..d0e1d56b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import pytest +import subprocess from lib.db import DB @@ -16,10 +17,15 @@ def pytest_collection_modifyitems(items): @pytest.fixture(autouse=True) def cleanup_after_test(): yield - tables = DB().fetch_all("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") - for table in tables: - table_name = table[0] - DB().query(f'TRUNCATE TABLE "{table_name}" RESTART IDENTITY CASCADE') + DB().query('DROP schema "public" CASCADE') + subprocess.run( + ['docker', 'exec', '--user', 'postgres', 'test-green-coding-postgres-container', 'bash', '-c', 'psql --port 9573 < ./docker-entrypoint-initdb.d/structure.sql'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + ### If you wish to turn off the above auto-cleanup per test, include the following in your ### test module: diff --git a/tests/smoke_test.py b/tests/smoke_test.py index 784d6733..c73814d1 100644 --- a/tests/smoke_test.py +++ b/tests/smoke_test.py @@ -28,10 +28,14 @@ def cleanup_after_test(): @pytest.fixture(autouse=True, scope='module') def cleanup_after_module(): yield - tables = DB().fetch_all("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") - for table in tables: - table_name = table[0] - DB().query(f'TRUNCATE TABLE "{table_name}" RESTART IDENTITY CASCADE') + DB().query('DROP schema "public" CASCADE') + subprocess.run( + ['docker', 'exec', '--user', 'postgres', 'test-green-coding-postgres-container', 'bash', '-c', 'psql --port 9573 < ./docker-entrypoint-initdb.d/structure.sql'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) # Runs once per file before any test( #pylint: disable=expression-not-assigned From 0b1c4d3a903197a6e7b54aa854fc616553aa3778 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 14:34:45 +0200 Subject: [PATCH 07/21] Authenticate now returns user obj instead of just ID --- lib/user.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/user.py b/lib/user.py index 9058b7d8..e2213f3c 100644 --- a/lib/user.py +++ b/lib/user.py @@ -52,8 +52,8 @@ def api_quota(self, route: str): def can_schedule_job(self, schedule_mode: str): return schedule_mode in self._capabilities['jobs']['schedule_modes'] - @staticmethod - def authenticate(token: SecureVariable | None, silent=False): + @classmethod + def authenticate(cls, token: SecureVariable | None, silent=False): sha256_hash = hashlib.sha256() if token is None or token.get_value() is None: sha256_hash.update("DEFAULT".encode('UTF-8')) @@ -71,7 +71,7 @@ def authenticate(token: SecureVariable | None, silent=False): print('Successfully authenticated user ', user[1]) - return user[0] + return cls(user[0]) @staticmethod def get_new(name=None): From 70f0bf5d0fbe627f25ae64d7b96af730253f1710 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 25 Aug 2024 14:35:01 +0200 Subject: [PATCH 08/21] Updated diff test signature --- tests/lib/test_diff.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lib/test_diff.py b/tests/lib/test_diff.py index 86b27ce9..6872f5b6 100644 --- a/tests/lib/test_diff.py +++ b/tests/lib/test_diff.py @@ -11,7 +11,7 @@ # to the diffing. To prevent this, this Unit test checks if the table column signature is unchanged def test_run_signature(): - expected_signature = 'id,job_id,name,uri,branch,commit_hash,commit_timestamp,email,categories,usage_scenario,filename,machine_specs,runner_arguments,machine_id,gmt_hash,measurement_config,start_measurement,end_measurement,phases,logs,invalid_run,failed,created_at,updated_at' + expected_signature = 'id,job_id,name,uri,branch,commit_hash,commit_timestamp,email,categories,usage_scenario,filename,machine_specs,runner_arguments,machine_id,gmt_hash,measurement_config,start_measurement,end_measurement,phases,logs,invalid_run,failed,user_id,created_at,updated_at' current_signature = DB().fetch_all("SELECT column_name FROM information_schema.columns WHERE table_name = 'runs' ORDER BY ordinal_position;") current_signature = ",".join([x[0] for x in current_signature]) From 50904a03a94f77a50051f8d5cfa95d9906448128 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 11:22:34 +0200 Subject: [PATCH 09/21] Using TRUNCATE CASCADE to clear DB --- tools/prune_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/prune_db.py b/tools/prune_db.py index f83d2d27..c05367e8 100644 --- a/tools/prune_db.py +++ b/tools/prune_db.py @@ -20,7 +20,7 @@ print("This will remove ALL runs and measurement data from the DB. Continue? (y/N)") answer = sys.stdin.readline() if answer.strip().lower() == 'y': - DB().query('DELETE FROM runs') + DB().query('TRUNCATE runs CASCADE') print("Done") else: print("This will remove all runs that have not ended, which includes failed ones, but also possibly running, so be sure no measurement is currently active. Continue? (y/N)") From deda343f6511cddc395085afb22e9c2790dcb755 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 13:01:34 +0200 Subject: [PATCH 10/21] DELETE script for retention expired --- tools/prune_db.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/tools/prune_db.py b/tools/prune_db.py index c05367e8..4f928992 100644 --- a/tools/prune_db.py +++ b/tools/prune_db.py @@ -12,19 +12,36 @@ import argparse parser = argparse.ArgumentParser() - parser.add_argument('--all', action='store_true', default=False, help='Will also remove successful runs') + parser.add_argument('mode', choices=['all', 'failed-runs', 'retention-expired'], default=False, help='Will also remove successful runs if all is used') args = parser.parse_args() # script will exit if arguments not present - if args.all: - print("This will remove ALL runs and measurement data from the DB. Continue? (y/N)") + if args.mode == 'all': + print("This will remove ALL runs, measurement, CI, carbonDB and hog data from the DB. Continue? (y/N)") answer = sys.stdin.readline() if answer.strip().lower() == 'y': DB().query('TRUNCATE runs CASCADE') + DB().query('TRUNCATE ci_measurements CASCADE') + DB().query('TRUNCATE hog_measurements CASCADE') + DB().query('TRUNCATE carbondb_energy_data CASCADE') + DB().query('TRUNCATE carbondb_energy_data_day CASCADE') print("Done") - else: + elif args.mode == 'failed-runs': print("This will remove all runs that have not ended, which includes failed ones, but also possibly running, so be sure no measurement is currently active. Continue? (y/N)") answer = sys.stdin.readline() if answer.strip().lower() == 'y': DB().query('DELETE FROM runs WHERE end_measurement IS NULL') print("Done") + elif args.mode == 'retention-expired': + print("Getting all users on the system ...") + users = DB().fetch_all('SELECT * FROM users', fetch_mode='dict') + for user in users: + print('User:', user['name']) + print('Retention periods:') + for table, retention in user['capabilities']['data'].items(): + print("\t-", table, retention['retention']) + join_condition = 'WHERE' + if table == 'measurements': + join_condition = 'USING runs WHERE measurements.run_id = runs.id AND' + DB().query(f"DELETE FROM {table} {join_condition} user_id = {user['id']} AND {table}.created_at < NOW() - INTERVAL '{retention['retention']} SECONDS'") + print("Done") From f0a7c4359fffeba98b8f8788fe1774412ad803de Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 15:43:15 +0200 Subject: [PATCH 11/21] Implemented measurement quota with many tests and refactorings --- api/main.py | 11 ++- docker/structure.sql | 8 +- lib/job/run.py | 12 ++- lib/user.py | 35 ++++++--- ...tion.sql => 2024_08_22_authentication.sql} | 6 +- runner.py | 3 + tests/api/test_api.py | 31 ++++++-- tests/conftest.py | 17 ++--- tests/lib/test_client.py | 35 +++++++++ tests/lib/test_jobs.py | 75 +++++++++++++++---- tests/smoke_test.py | 10 +-- tests/test_functions.py | 15 ++++ 12 files changed, 197 insertions(+), 61 deletions(-) rename migrations/{2024_008_22_authentication.sql => 2024_08_22_authentication.sql} (77%) create mode 100644 tests/lib/test_client.py diff --git a/api/main.py b/api/main.py index 367c0dab..695ec406 100644 --- a/api/main.py +++ b/api/main.py @@ -143,14 +143,13 @@ def authenticate(authentication_token=Depends(header_scheme), request: Request = try: user = User.authenticate(SecureVariable(authentication_token)) # Note that if no token is supplied this will authenticate as the DEFAULT user, which in FOSS systems has full capabilities - if parsed_url.path not in user._capabilities['api']['routes']: + if not user.can_use_route(parsed_url.path): raise HTTPException(status_code=401, detail="Route not allowed") from UserAuthenticationError - if parsed_url.path in user._capabilities['api']['quotas']: - if user._capabilities['api']['quotas'][parsed_url.path] <= 0: - raise HTTPException(status_code=401, detail="Quota exceeded") from UserAuthenticationError - user._capabilities['api']['quotas'][parsed_url.path] -= 1 - user.update() + if not user.has_api_quota(parsed_url.path): + raise HTTPException(status_code=401, detail="Quota exceeded") from UserAuthenticationError + + user.deduct_api_quota(parsed_url.path, 1) except UserAuthenticationError: raise HTTPException(status_code=401, detail="Invalid token") from UserAuthenticationError diff --git a/docker/structure.sql b/docker/structure.sql index 1d9a9937..0e2a8d08 100644 --- a/docker/structure.sql +++ b/docker/structure.sql @@ -26,7 +26,7 @@ CREATE TRIGGER users_moddatetime -- Default password for authentication is DEFAULT INSERT INTO "public"."users"("name","token","capabilities","created_at","updated_at") VALUES -(E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); +(E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurement":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); @@ -50,6 +50,12 @@ CREATE TRIGGER machines_moddatetime FOR EACH ROW EXECUTE PROCEDURE moddatetime (updated_at); +-- Default password for authentication is DEFAULT +INSERT INTO "public"."machines"("description", "available") +VALUES +(E'Local machine', true); + + CREATE TABLE jobs ( id SERIAL PRIMARY KEY, type text, diff --git a/lib/job/run.py b/lib/job/run.py index f205e7ba..584f77eb 100644 --- a/lib/job/run.py +++ b/lib/job/run.py @@ -30,6 +30,12 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru user = User(self._user_id) + if not user.can_use_machine(self._machine_id): + raise RuntimeError(f"Your user does not have the permissions to use the selected machine. Machine ID: {self._machine_id}") + + if not user.has_measurement_quota(self._machine_id): + raise RuntimeError(f"Your user does not have enough measurement quota to run a job on the selected machine. Machine ID: {self._machine_id}") + runner = Runner( name=self._name, uri=self._url, @@ -42,12 +48,14 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru docker_prune=docker_prune, job_id=self._id, user_id=self._user_id, - measurement_flow_process_duration=user._capabilities['measurements']['settings']['flow-process-duration'], - measurement_total_duration=user._capabilities['measurements']['settings']['total-duration'], + measurement_flow_process_duration=user._capabilities['measurement']['settings']['flow-process-duration'], + measurement_total_duration=user._capabilities['measurement']['settings']['total-duration'], ) try: # Start main code. Only URL is allowed for cron jobs self._run_id = runner.run() + user.deduct_measurement_quota(self._machine_id, int(runner._last_measurement_duration/1_000_000)) # duration in runner is in microseconds. We need seconds + build_and_store_phase_stats(self._run_id, runner._sci) # We need to import this here as we need the correct config file diff --git a/lib/user.py b/lib/user.py index e2213f3c..8892677f 100644 --- a/lib/user.py +++ b/lib/user.py @@ -39,19 +39,32 @@ def can_use_machine(self, machine_id: int): def can_use_route(self, route: str): return route in self._capabilities['api']['routes'] - def measurement_quota(self): - if 'quota' in self._capabilities['measurement']: - return self._capabilities['measurement']['quota'] - return None # None means infinite amounts - - def api_quota(self, route: str): - if route in self._capabilities['measurement']['quota']: - return self._capabilities['measurement']['quota'][route] - return None # None means infinite amounts - def can_schedule_job(self, schedule_mode: str): return schedule_mode in self._capabilities['jobs']['schedule_modes'] + + def has_api_quota(self, route: str): + if route in self._capabilities['api']['quotas']: + return self._capabilities['api']['quotas'][route] > 0 + return True # None means infinite amounts + + def deduct_api_quota(self, route: str, amount: int): + if route in self._capabilities['api']['quotas']: + self._capabilities['api']['quotas'][route] -= amount + self.update() + + def has_measurement_quota(self, machine_id: int): + machine_id = str(machine_id) # json does not support integer keys + if machine_id in self._capabilities['measurement']['quotas']: + return self._capabilities['measurement']['quotas'][machine_id] > 0 + return True # None means infinite amounts + + def deduct_measurement_quota(self, machine_id: int, amount: int): + machine_id = str(machine_id) # json does not support integer keys + if machine_id in self._capabilities['measurement']['quotas']: + self._capabilities['measurement']['quotas'][machine_id] -= amount + self.update() + @classmethod def authenticate(cls, token: SecureVariable | None, silent=False): sha256_hash = hashlib.sha256() @@ -101,7 +114,7 @@ def get_new(name=None): "variance", ], }, - "measurements": { + "measurement": { "settings": { "flow-process-duration": 3600, "total-duration": 3600, diff --git a/migrations/2024_008_22_authentication.sql b/migrations/2024_08_22_authentication.sql similarity index 77% rename from migrations/2024_008_22_authentication.sql rename to migrations/2024_08_22_authentication.sql index 492901a3..6b065187 100644 --- a/migrations/2024_008_22_authentication.sql +++ b/migrations/2024_08_22_authentication.sql @@ -18,10 +18,14 @@ CREATE UNIQUE INDEX token_unique ON users(token text_ops); INSERT INTO "users"("id","name","token","capabilities","created_at","updated_at") VALUES -(1,E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurements":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); +(1,E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurement":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); CREATE TRIGGER users_moddatetime BEFORE UPDATE ON users FOR EACH ROW EXECUTE PROCEDURE moddatetime (updated_at); +-- Default password for authentication is DEFAULT +INSERT INTO "public"."machines"("description", "available") +VALUES +(E'Local machine', true); \ No newline at end of file diff --git a/runner.py b/runner.py index f5678688..c35428e4 100755 --- a/runner.py +++ b/runner.py @@ -92,6 +92,7 @@ def __init__(self, self._user_id = user_id self._measurement_flow_process_duration = measurement_flow_process_duration self._measurement_total_duration = measurement_total_duration + self._last_measurement_duration = 0 del self._arguments['self'] # self is not needed and also cannot be serialzed. We remove it @@ -1380,6 +1381,8 @@ def update_start_and_end_times(self): SET start_measurement=%s, end_measurement=%s WHERE id = %s """, params=(self.__start_measurement, self.__end_measurement, self._run_id)) + self._last_measurement_duration = self.__end_measurement - self.__start_measurement + def set_run_failed(self): if not self._run_id: diff --git a/tests/api/test_api.py b/tests/api/test_api.py index d889d634..fbb8fcb2 100644 --- a/tests/api/test_api.py +++ b/tests/api/test_api.py @@ -2,15 +2,14 @@ import os import time from uuid import UUID -import pytest import requests CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +from lib.user import User from lib.db import DB from lib import utils from lib.global_config import GlobalConfig -from tools.machine import Machine from tests import test_functions as Tests config = GlobalConfig(config_name='test-config.yml').config @@ -21,11 +20,6 @@ import hog_data -@pytest.fixture(autouse=True, name="register_machine") -def register_machine_fixture(): - machine = Machine(machine_id=1, description='test-machine') - machine.register() - def get_job_id(run_name): query = """ SELECT @@ -212,3 +206,26 @@ def test_carbonDB_add(): data = DB().fetch_one('SELECT * FROM carbondb_energy_data', fetch_mode='dict') assert data is not None or data != [] assert exp_data == {key: data[key] for key in exp_data if key in data}, "The specified keys do not have the same values in both dictionaries." + +def test_route_forbidden(): + user = User(1) + user._capabilities['api']['routes'] = [] + user.update() + + response = requests.get(f"{API_URL}/v1/authentication/data", timeout=15) + assert response.status_code == 401 + assert response.text == '{"success":false,"err":"Route not allowed"}' + +def test_can_read_authentication_data(): + response = requests.get(f"{API_URL}/v1/authentication/data", timeout=15) + assert response.status_code == 200 + assert response.text == '{"success":true,"data":{"_id":1,"_name":"DEFAULT","_capabilities":{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurement":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}}}' + +def test_api_quota_exhausted(): + user = User(1) + user._capabilities['api']['quotas'] = {'/v1/authentication/data': 0} + user.update() + + response = requests.get(f"{API_URL}/v1/authentication/data", timeout=15) + assert response.status_code == 401 + assert response.text == '{"success":false,"err":"Quota exceeded"}' diff --git a/tests/conftest.py b/tests/conftest.py index d0e1d56b..667c6039 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,6 @@ import pytest -import subprocess -from lib.db import DB +from tests import test_functions as Tests ## VERY IMPORTANT to override the config file here ## otherwise it will automatically connect to non-test DB and delete all your real data @@ -13,18 +12,14 @@ def pytest_collection_modifyitems(items): if item.fspath.basename == 'test_functions.py': item.add_marker(pytest.mark.skip(reason='Skipping this file')) -# should we hardcode test-db here? + +# Note: This fixture runs always +# Pytest collects all fixtures before running any tests +# no matter which order they are loaded in @pytest.fixture(autouse=True) def cleanup_after_test(): yield - DB().query('DROP schema "public" CASCADE') - subprocess.run( - ['docker', 'exec', '--user', 'postgres', 'test-green-coding-postgres-container', 'bash', '-c', 'psql --port 9573 < ./docker-entrypoint-initdb.d/structure.sql'], - check=True, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - encoding='UTF-8' - ) + Tests.reset_db() ### If you wish to turn off the above auto-cleanup per test, include the following in your diff --git a/tests/lib/test_client.py b/tests/lib/test_client.py new file mode 100644 index 00000000..19cc232f --- /dev/null +++ b/tests/lib/test_client.py @@ -0,0 +1,35 @@ +import os +import subprocess + +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) + +from lib import utils +from lib.global_config import GlobalConfig +from lib.job.base import Job +from tests import test_functions as Tests + +GlobalConfig().override_config(config_name='test-config.yml') +config = GlobalConfig().config + +def test_simple_cluster_run(): + name = utils.randomword(12) + url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' + filename = 'usage_scenario.yml' + branch = 'main' + machine_id = 1 + + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + + ps = subprocess.run( + ['python3', '../tools/client.py', '--testing', '--config-override', 'test-config.yml'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + assert ps.stderr == '', Tests.assertion_info('No Error', ps.stderr) + assert 'Successfully ended testing run of client.py' in ps.stdout,\ + Tests.assertion_info('Successfully ended testing run of client.py', ps.stdout) + + assert 'MEASUREMENT SUCCESSFULLY COMPLETED' in ps.stdout,\ + Tests.assertion_info('MEASUREMENT SUCCESSFULLY COMPLETED', ps.stdout) diff --git a/tests/lib/test_jobs.py b/tests/lib/test_jobs.py index 0046d6b2..bdb3e491 100644 --- a/tests/lib/test_jobs.py +++ b/tests/lib/test_jobs.py @@ -9,23 +9,18 @@ from lib.db import DB from lib import utils from lib.global_config import GlobalConfig -from tools.machine import Machine from lib.job.base import Job +from lib.user import User from tests import test_functions as Tests GlobalConfig().override_config(config_name='test-config.yml') config = GlobalConfig().config -@pytest.fixture(autouse=True, name="register_machine") -def register_machine_fixture(): - machine = Machine(machine_id=1, description='test-machine') - machine.register() - - # This should be done once per module @pytest.fixture(autouse=True, scope="module", name="build_image") def build_image_fixture(): - subprocess.run(['docker', 'compose', '-f', f"{CURRENT_DIR}/../data/stress-application/compose.yml", 'build'], check=True) + Tests.build_image_fixture() + def get_job(job_id): query = """ @@ -76,7 +71,7 @@ def test_insert_job(): job = Job.get_job('run') assert job._state == 'WAITING' -def test_simple_run_job(): +def test_simple_run_job_no_quota(): name = utils.randomword(12) url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' filename = 'usage_scenario.yml' @@ -99,7 +94,7 @@ def test_simple_run_job(): assert 'MEASUREMENT SUCCESSFULLY COMPLETED' in ps.stdout,\ Tests.assertion_info('MEASUREMENT SUCCESSFULLY COMPLETED', ps.stdout) -def test_simple_cluster_run(): +def test_simple_run_job_quota_gets_deducted(): name = utils.randomword(12) url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' filename = 'usage_scenario.yml' @@ -108,19 +103,24 @@ def test_simple_cluster_run(): Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + user = User(1) + user._capabilities['measurement']['quotas'] = {'1': 10_000 * 60} # typical quota is 10.000 minutes + user.update() + ps = subprocess.run( - ['python3', '../tools/client.py', '--testing', '--config-override', 'test-config.yml'], + ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, encoding='UTF-8' ) - assert ps.stderr == '', Tests.assertion_info('No Error', ps.stderr) - assert 'Successfully ended testing run of client.py' in ps.stdout,\ - Tests.assertion_info('Successfully ended testing run of client.py', ps.stdout) + assert ps.stderr == '', Tests.assertion_info('No Error', ps.stderr) + assert 'Successfully processed jobs queue item.' in ps.stdout,\ + Tests.assertion_info('Successfully processed jobs queue item.', ps.stdout) assert 'MEASUREMENT SUCCESSFULLY COMPLETED' in ps.stdout,\ Tests.assertion_info('MEASUREMENT SUCCESSFULLY COMPLETED', ps.stdout) + assert User(1)._capabilities['measurement']['quotas']['1'] < 10_000 * 60 def test_simple_run_job_missing_filename_branch(): name = utils.randomword(12) @@ -141,6 +141,53 @@ def test_simple_run_job_wrong_machine_id(): with pytest.raises(psycopg.errors.ForeignKeyViolation): Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) +def test_measurement_quota_exhausted(): + name = utils.randomword(12) + url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' + filename = 'usage_scenario.yml' + branch = 'main' + machine_id = 1 + + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + + user = User(1) + user._capabilities['measurement']['quotas'] = {'1': 2678400} + user.update() + user.deduct_measurement_quota(machine_id=machine_id, amount=2678400) + + ps = subprocess.run( + ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + + assert 'Your user does not have enough measurement quota to run a job on the selected machine. Machine ID: 1' in ps.stderr, Tests.assertion_info('Quota exhaused', ps.stderr) + +def test_machine_not_allowed(): + name = utils.randomword(12) + url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' + filename = 'usage_scenario.yml' + branch = 'main' + machine_id = 1 + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + + user = User(1) + user._capabilities['machines'] = [] + user.update() + + ps = subprocess.run( + ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + + assert 'Your user does not have the permissions to use the selected machine. Machine ID: 1' in ps.stderr, Tests.assertion_info('Machine forbidden', ps.stderr) + + #pylint: disable=unused-variable # for the time being, until I get the mocking to work ## This test doesn't really make sense anymore as is, since we don't have "email jobs" in the same way, diff --git a/tests/smoke_test.py b/tests/smoke_test.py index 25a7ef65..f6d88d2e 100644 --- a/tests/smoke_test.py +++ b/tests/smoke_test.py @@ -11,6 +11,7 @@ from lib.db import DB from lib import utils from lib.global_config import GlobalConfig +from tests import test_functions as Tests from runner import Runner run_stderr = None @@ -28,14 +29,7 @@ def cleanup_after_test(): @pytest.fixture(autouse=True, scope='module') def cleanup_after_module(): yield - DB().query('DROP schema "public" CASCADE') - subprocess.run( - ['docker', 'exec', '--user', 'postgres', 'test-green-coding-postgres-container', 'bash', '-c', 'psql --port 9573 < ./docker-entrypoint-initdb.d/structure.sql'], - check=True, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - encoding='UTF-8' - ) + Tests.reset_db() # Runs once per file before any test( #pylint: disable=expression-not-assigned diff --git a/tests/test_functions.py b/tests/test_functions.py index ee4541ca..aed9c33c 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -1,5 +1,6 @@ import os import subprocess +from lib.db import DB CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -20,6 +21,20 @@ def check_if_container_running(container_name): return False return True +def build_image_fixture(): + subprocess.run(['docker', 'compose', '-f', f"{CURRENT_DIR}/data/stress-application/compose.yml", 'build'], check=True) + +# should be preceded by a yield statement and on autouse +def reset_db(): + DB().query('DROP schema "public" CASCADE') + subprocess.run( + ['docker', 'exec', '--user', 'postgres', 'test-green-coding-postgres-container', 'bash', '-c', 'psql --port 9573 < ./docker-entrypoint-initdb.d/structure.sql'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + class RunUntilManager: def __init__(self, runner): self.__runner = runner From 49ed0c1cf41adc4f6b78d4755b18c308aabe79e8 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 18:53:20 +0200 Subject: [PATCH 12/21] Removed noise --- lib/user.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/user.py b/lib/user.py index 8892677f..9766cc8c 100644 --- a/lib/user.py +++ b/lib/user.py @@ -70,7 +70,6 @@ def authenticate(cls, token: SecureVariable | None, silent=False): sha256_hash = hashlib.sha256() if token is None or token.get_value() is None: sha256_hash.update("DEFAULT".encode('UTF-8')) - print(sha256_hash.hexdigest()) else: sha256_hash.update(token.get_value().encode('UTF-8')) @@ -82,8 +81,6 @@ def authenticate(cls, token: SecureVariable | None, silent=False): if not user: raise UserAuthenticationError('User with corresponding token not found') # do never output token everywhere cause it might land in logs - print('Successfully authenticated user ', user[1]) - return cls(user[0]) @staticmethod From 8d35b4777408e378a083f2adff523877d2198123 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 18:53:38 +0200 Subject: [PATCH 13/21] Email adding was not possible without user_id --- lib/error_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/error_helpers.py b/lib/error_helpers.py index 0085e76c..9726767b 100644 --- a/lib/error_helpers.py +++ b/lib/error_helpers.py @@ -39,4 +39,4 @@ def log_error(*messages, **kwargs): print(TerminalColors.FAIL, err, TerminalColors.ENDC, file=sys.stderr) if error_email := GlobalConfig().config['admin']['error_email']: - Job.insert('email', email=error_email, name='Green Metrics Tool Error', message=err) + Job.insert('email', user_id=None, email=error_email, name='Green Metrics Tool Error', message=err) From 06cfb732a5cb6b9391cfd1b67a772d1ed94dded3 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 18:58:03 +0200 Subject: [PATCH 14/21] migration needs to be wrapped around [skip ci] --- migrations/2024_08_22_authentication.sql | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/migrations/2024_08_22_authentication.sql b/migrations/2024_08_22_authentication.sql index 6b065187..07c0be56 100644 --- a/migrations/2024_08_22_authentication.sql +++ b/migrations/2024_08_22_authentication.sql @@ -1,9 +1,3 @@ -ALTER TABLE "jobs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; -ALTER TABLE "timeline_projects" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; -ALTER TABLE "runs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; -ALTER TABLE "ci_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; -ALTER TABLE "hog_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; - CREATE TABLE users ( id SERIAL PRIMARY KEY, name text, @@ -13,6 +7,13 @@ CREATE TABLE users ( updated_at timestamp with time zone ); +ALTER TABLE "jobs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "timeline_projects" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "runs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "ci_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "hog_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; + + CREATE UNIQUE INDEX name_unique ON users(name text_ops); CREATE UNIQUE INDEX token_unique ON users(token text_ops); From 37f3432405d7c6e309a88aacfa7cea239e56cd5d Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 19:07:35 +0200 Subject: [PATCH 15/21] user_id added to hog, ci and carbond --- api/api_helpers.py | 6 +++--- api/main.py | 17 ++++++++++------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/api/api_helpers.py b/api/api_helpers.py index 2a2a9310..9515cdfa 100644 --- a/api/api_helpers.py +++ b/api/api_helpers.py @@ -729,7 +729,7 @@ def get_carbon_intensity(latitude, longitude): return None -def carbondb_add(client_ip, energydatas): +def carbondb_add(client_ip, energydatas, user_id): latitude, longitude = get_geo(client_ip) carbon_intensity = get_carbon_intensity(latitude, longitude) @@ -765,12 +765,12 @@ def carbondb_add(client_ip, energydatas): project_uuid = e['project'] if e['project'] is not None else '' tags_clean = "{" + ",".join([f'"{tag.strip()}"' for tag in e['tags'].split(',') if e['tags']]) + "}" if e['tags'] is not None else '' - row = f"{e['type']}|{company_uuid}|{e['machine']}|{project_uuid}|{tags_clean}|{int(e['time_stamp'])}|{e['energy_value']}|{co2_value}|{carbon_intensity}|{latitude}|{longitude}|{client_ip}" + row = f"{e['type']}|{company_uuid}|{e['machine']}|{project_uuid}|{tags_clean}|{int(e['time_stamp'])}|{e['energy_value']}|{co2_value}|{carbon_intensity}|{latitude}|{longitude}|{client_ip}|{user_id}" data_rows.append(row) data_str = "\n".join(data_rows) data_file = io.StringIO(data_str) - columns = ['type', 'company', 'machine', 'project', 'tags', 'time_stamp', 'energy_value', 'co2_value', 'carbon_intensity', 'latitude', 'longitude', 'ip_address'] + columns = ['type', 'company', 'machine', 'project', 'tags', 'time_stamp', 'energy_value', 'co2_value', 'carbon_intensity', 'latitude', 'longitude', 'ip_address', 'user_id'] DB().copy_from(file=data_file, table='carbondb_energy_data', columns=columns, sep='|') diff --git a/api/main.py b/api/main.py index 695ec406..fdc93046 100644 --- a/api/main.py +++ b/api/main.py @@ -778,8 +778,9 @@ async def hog_add( ane_energy, energy_impact, thermal_pressure, - settings) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + settings, + user_id) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """ params = ( @@ -793,6 +794,7 @@ async def hog_add( cpu_energy_data['energy_impact'], measurement_data['thermal_pressure'], measurement.settings, + user._id, ) measurement_db_id = DB().fetch_one(query=query, params=params)[0] @@ -1254,14 +1256,15 @@ async def post_ci_measurement_add( lon, city, co2i, - co2eq + co2eq, + user_id ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = (measurement.energy_value, measurement.energy_unit, measurement.repo, measurement.branch, measurement.workflow, measurement.run_id, measurement.label, measurement.source, measurement.cpu, measurement.commit_hash, measurement.duration, measurement.cpu_util_avg, measurement.workflow_name, - measurement.lat, measurement.lon, measurement.city, measurement.co2i, measurement.co2eq) + measurement.lat, measurement.lon, measurement.city, measurement.co2i, measurement.co2eq, user._id) DB().query(query=query, params=params) @@ -1288,7 +1291,7 @@ async def post_ci_measurement_add( } # If there is an error the function will raise an Error - carbondb_add(client_ip, [energydata]) + carbondb_add(client_ip, [energydata], user._id) return ORJSONResponse({'success': True}, status_code=201) @@ -1437,7 +1440,7 @@ async def add_carbondb( else: client_ip = request.client.host - carbondb_add(client_ip, energydatas) + carbondb_add(client_ip, energydatas, user._id) return Response(status_code=204) From 83484d6f7a144be872efaa4530a5d5418ffb1144 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 19:11:59 +0200 Subject: [PATCH 16/21] CarbonDB user_id column [skip ci] --- migrations/2024_08_22_authentication.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/migrations/2024_08_22_authentication.sql b/migrations/2024_08_22_authentication.sql index 07c0be56..1ab87f2c 100644 --- a/migrations/2024_08_22_authentication.sql +++ b/migrations/2024_08_22_authentication.sql @@ -12,6 +12,8 @@ ALTER TABLE "timeline_projects" ADD COLUMN "user_id" integer REFERENCES users(id ALTER TABLE "runs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; ALTER TABLE "ci_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; ALTER TABLE "hog_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "carbondb_energy_data" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "carbondb_energy_data_day" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; CREATE UNIQUE INDEX name_unique ON users(name text_ops); From 6cb1c5b8354eca3c578e51729391aa1f6535daf9 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 19:26:03 +0200 Subject: [PATCH 17/21] Adding user_id to structure --- docker/structure.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/structure.sql b/docker/structure.sql index 0e2a8d08..e637c448 100644 --- a/docker/structure.sql +++ b/docker/structure.sql @@ -372,6 +372,7 @@ CREATE TABLE carbondb_energy_data ( latitude DOUBLE PRECISION, longitude DOUBLE PRECISION, ip_address INET, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -401,7 +402,7 @@ CREATE TABLE carbondb_energy_data_day ( co2_sum FLOAT, carbon_intensity_avg FLOAT, record_count INT, - + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); From 70d16273baffe78fb928f814223be6676a1e7160 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Sun, 8 Sep 2024 22:31:12 +0200 Subject: [PATCH 18/21] Added more JOINs for delete [skip ci] --- tools/prune_db.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/prune_db.py b/tools/prune_db.py index 4f928992..bed803fc 100644 --- a/tools/prune_db.py +++ b/tools/prune_db.py @@ -43,5 +43,9 @@ join_condition = 'WHERE' if table == 'measurements': join_condition = 'USING runs WHERE measurements.run_id = runs.id AND' + elif table in 'hog_coalitions': + join_condition = 'USING hog_measurements WHERE hog_coalitions.measurement = hog_measurements.id AND' + elif table in 'hog_tasks': + join_condition = 'USING hog_measurements, hog_tasks WHERE hog_coalitions.measurement = hog_measurements.id AND hog_tasks.coalition = hog_coalitions.id AND' DB().query(f"DELETE FROM {table} {join_condition} user_id = {user['id']} AND {table}.created_at < NOW() - INTERVAL '{retention['retention']} SECONDS'") print("Done") From 05077b752dc66ff1ffc835cf73dfd2afe4829fa6 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Mon, 16 Sep 2024 13:12:50 +0200 Subject: [PATCH 19/21] Added machine to error in client.py [skip ci] --- tools/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tools/client.py b/tools/client.py index 7d8ae035..f2e32175 100644 --- a/tools/client.py +++ b/tools/client.py @@ -106,7 +106,7 @@ def do_cleanup(cur_temp, cooldown_time_after_job): while True: job = Job.get_job('run') if job and job.check_job_running(): - error_helpers.log_error('Job is still running. This is usually an error case! Continuing for now ...') + error_helpers.log_error('Job is still running. This is usually an error case! Continuing for now ...', machine=config_main['machine']['description']) time.sleep(client_main['sleep_time_no_job']) continue @@ -177,18 +177,18 @@ def do_cleanup(cur_temp, cooldown_time_after_job): except ConfigurationCheckError as exc: # ConfigurationChecks indicate that before the job ran, some setup with the machine was incorrect. So we soft-fail here with sleeps set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id) if exc.status == Status.WARN: # Warnings is something like CPU% too high. Here short sleep - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, sleep_duration=600) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, machine=config_main['machine']['description'], sleep_duration=600) time.sleep(600) else: # Hard fails won't resolve on it's own. We sleep until next cluster validation - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, sleep_duration=client_main['time_between_control_workload_validations']) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, machine=config_main['machine']['description'], sleep_duration=client_main['time_between_control_workload_validations']) time.sleep(client_main['time_between_control_workload_validations']) except subprocess.CalledProcessError as exc: set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id) - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, stdout=exc.stdout, stderr=exc.stderr, run_id=job._run_id, name=job._name, url=job._url) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, stdout=exc.stdout, stderr=exc.stderr, run_id=job._run_id, machine=config_main['machine']['description'], name=job._name, url=job._url) except Exception as exc: set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id) - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, run_id=job._run_id, name=job._name, url=job._url) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, run_id=job._run_id, machine=config_main['machine']['description'], name=job._name, url=job._url) finally: if not args.testing: do_cleanup(current_temperature, last_cooldown_time) From 35c3ff12fe1e1e6dd9c6b58e0d94888f4e79bfb3 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Wed, 18 Sep 2024 14:49:09 +0200 Subject: [PATCH 20/21] Run-ID link and class name added to errors --- lib/error_helpers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/error_helpers.py b/lib/error_helpers.py index 9726767b..b56ea871 100644 --- a/lib/error_helpers.py +++ b/lib/error_helpers.py @@ -13,7 +13,9 @@ def end_error(*messages, **kwargs): def format_error(*messages, **kwargs): err = '\n'.join(messages) err += '\n\n' - err += '\n'.join([f"{key.capitalize()} ({value.__class__}): {value}" for key, value in kwargs.items()]) + err += '\n'.join([f"{key.capitalize()} ({value.__class__.__name__}): {value}" for key, value in kwargs.items()]) + if 'run_id' in kwargs: + err += f"Run-ID Link: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={kwargs['run_id']}" error_string = f""" \n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n From df1dee2596a9dbf2d0a1800fc1e8ce3a27a46022 Mon Sep 17 00:00:00 2001 From: Arne Tarara Date: Wed, 18 Sep 2024 18:15:40 +0200 Subject: [PATCH 21/21] Run-ID Link only of not empty [skip ci] --- lib/error_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/error_helpers.py b/lib/error_helpers.py index b56ea871..b67e0f21 100644 --- a/lib/error_helpers.py +++ b/lib/error_helpers.py @@ -14,8 +14,8 @@ def format_error(*messages, **kwargs): err = '\n'.join(messages) err += '\n\n' err += '\n'.join([f"{key.capitalize()} ({value.__class__.__name__}): {value}" for key, value in kwargs.items()]) - if 'run_id' in kwargs: - err += f"Run-ID Link: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={kwargs['run_id']}" + if 'run_id' in kwargs and kwargs['run_id']: + err += f"\nRun-ID Link: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={kwargs['run_id']}" error_string = f""" \n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n