diff --git a/.travis.yml b/.travis.yml index fe06ce958..e79f53ee8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,4 +8,4 @@ install: script: - flake8 src - nose2 - - coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -c params/config/sim_config.yaml + - coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml diff --git a/params/services/abc.yaml b/params/services/abc.yaml index 2cca18594..153beeee9 100644 --- a/params/services/abc.yaml +++ b/params/services/abc.yaml @@ -12,9 +12,11 @@ sf_list: a: processing_delay_mean: 5.0 processing_delay_stdev: 0.0 + resource_function_id: A b: processing_delay_mean: 5.0 processing_delay_stdev: 0.0 + resource_function_id: B c: processing_delay_mean: 5.0 processing_delay_stdev: 0.0 diff --git a/params/services/resource_functions/A.py b/params/services/resource_functions/A.py new file mode 100644 index 000000000..a01d98c50 --- /dev/null +++ b/params/services/resource_functions/A.py @@ -0,0 +1,2 @@ +def resource_function(load): + return load diff --git a/params/services/resource_functions/B.py b/params/services/resource_functions/B.py new file mode 100644 index 000000000..a01d98c50 --- /dev/null +++ b/params/services/resource_functions/B.py @@ -0,0 +1,2 @@ +def resource_function(load): + return load diff --git a/src/coordsim/main.py b/src/coordsim/main.py index 059363311..bebb86208 100644 --- a/src/coordsim/main.py +++ b/src/coordsim/main.py @@ -32,7 +32,7 @@ def main(): # Getting current SFC list, and the SF list of each SFC, and config sfc_list = reader.get_sfc(args.sf) - sf_list = reader.get_sf(args.sf) + sf_list = reader.get_sf(args.sf, args.sfr) config = reader.get_config(args.config) # use dummy placement and schedule for running simulator without algorithm @@ -66,6 +66,8 @@ def parse_args(): help="The duration of the simulation (simulates milliseconds).") parser.add_argument('-sf', '--sf', required=True, dest="sf", help="VNF file which contains the SFCs and their respective SFs and their properties.") + parser.add_argument('-sfr', '--sfr', required=False, default='', dest='sfr', + help="Path which contains the SF resource consumption functions.") parser.add_argument('-n', '--network', required=True, dest='network', help="The GraphML network file that specifies the nodes and edges of the network.") parser.add_argument('-c', '--config', required=True, dest='config', help="Path to the simulator config file.") diff --git a/src/coordsim/reader/reader.py b/src/coordsim/reader/reader.py index 2542c0e8f..ad571b247 100644 --- a/src/coordsim/reader/reader.py +++ b/src/coordsim/reader/reader.py @@ -1,11 +1,13 @@ import networkx as nx from geopy.distance import distance as dist import numpy as np -import logging as log +import logging import yaml import math from collections import defaultdict +import importlib +log = logging.getLogger(__name__) # Disclaimer: Some snippets of the following file were imported/modified from B-JointSP on GitHub. # Original code can be found on https://github.com/CN-UPB/B-JointSP @@ -40,7 +42,21 @@ def get_sfc(sfc_file): return sfc_list -def get_sf(sf_file): +def load_resource_function(name, path): + try: + spec = importlib.util.spec_from_file_location(name, path + '/' + name + '.py') + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + except Exception: + raise Exception(f'Cannot load file "{name}.py" from specified location "{path}".') + + try: + return getattr(module, 'resource_function') + except Exception: + raise Exception(f'There is no "resource_function" defined in file "{name}.py."') + + +def get_sf(sf_file, resource_functions_path): """ Get the list of SFs and their properties from the yaml data. """ @@ -50,6 +66,7 @@ def get_sf(sf_file): # Configureable default mean and stdev defaults default_processing_delay_mean = 1.0 default_processing_delay_stdev = 1.0 + def default_resource_function(x): return x sf_list = defaultdict(None) for sf_name, sf_details in sf_data['sf_list'].items(): sf_list[sf_name] = sf_details @@ -58,6 +75,19 @@ def get_sf(sf_file): default_processing_delay_mean) sf_list[sf_name]["processing_delay_stdev"] = sf_list[sf_name].get("processing_delay_stdev", default_processing_delay_stdev) + if 'resource_function_id' in sf_list[sf_name]: + try: + sf_list[sf_name]['resource_function'] = load_resource_function(sf_list[sf_name]['resource_function_id'], + resource_functions_path) + except Exception as ex: + sf_list[sf_name]['resource_function_id'] = 'default' + sf_list[sf_name]['resource_function'] = default_resource_function + log.warning(f'{str(ex)} SF {sf_name} will use default resource function instead.') + else: + sf_list[sf_name]["resource_function_id"] = 'default' + sf_list[sf_name]["resource_function"] = default_resource_function + log.info( + f'No resource function specified for SF {sf_name}. Default resource function will be used instead.') return sf_list diff --git a/src/coordsim/simulation/flowsimulator.py b/src/coordsim/simulation/flowsimulator.py index b6dc4ad73..35355b6c2 100644 --- a/src/coordsim/simulation/flowsimulator.py +++ b/src/coordsim/simulation/flowsimulator.py @@ -187,19 +187,32 @@ def process_flow(self, flow, sfc): # Add the delay to the flow's end2end delay metrics.add_processing_delay(processing_delay) flow.end2end_delay += processing_delay + + # Calculate the demanded capacity when the flow is processed at this node + demanded_total_capacity = 0.0 + for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items(): + if sf == sf_i: + # Include flows data rate in requested sf capacity calculation + demanded_total_capacity += self.params.sf_list[sf]['resource_function'](sf_data['load'] + flow.dr) + else: + demanded_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load']) + # Get node capacities node_cap = self.params.network.nodes[current_node_id]["cap"] node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"] assert node_remaining_cap >= 0, "Remaining node capacity cannot be less than 0 (zero)!" # Metrics: Add active flow to the SF once the flow has begun processing. metrics.add_active_flow(flow, current_node_id, current_sf) - if flow.dr <= node_remaining_cap: + if demanded_total_capacity <= node_cap: log.info( - "Flow {} started proccessing at sf {} at node {}. Time: {}, " + "Flow {} started processing at sf {} at node {}. Time: {}, " "Processing delay: {}".format(flow.flow_id, current_sf, current_node_id, self.env.now, processing_delay)) - self.params.network.nodes[current_node_id]["remaining_cap"] -= flow.dr + # Add load to sf + self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] += flow.dr + # Set remaining node capacity + self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - demanded_total_capacity # Just for the sake of keeping lines small, the node_remaining_cap is updated again. node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"] @@ -223,7 +236,17 @@ def process_flow(self, flow, sfc): .format(flow.flow_id, current_sf, current_node_id, self.env.now)) # Remove the active flow from the SF after it departed the SF metrics.remove_active_flow(flow, current_node_id, current_sf) - self.params.network.nodes[current_node_id]["remaining_cap"] += flow.dr + + # Remove load from sf + self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] -= flow.dr + assert self.params.network.nodes[current_node_id]['available_sf'][sf][ + 'load'] >= 0, 'SF load cannot be less than 0!' + # Recalculation is necessary because other flows could have already arrived or departed at the node + used_total_capacity = 0.0 + for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items(): + used_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load']) + # Set remaining node capacity + self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - used_total_capacity # Just for the sake of keeping lines small, the node_remaining_cap is updated again. node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"] diff --git a/src/coordsim/simulation/simulatorparams.py b/src/coordsim/simulation/simulatorparams.py index 52d531963..2d3386ebc 100644 --- a/src/coordsim/simulation/simulatorparams.py +++ b/src/coordsim/simulation/simulatorparams.py @@ -26,7 +26,12 @@ def __init__(self, network, ing_nodes, sfc_list, sf_list, config, seed, schedule self.schedule = schedule # Placement of SFs in each node: defaultdict(list) self.sf_placement = sf_placement - + # Update which sf is available at which node + for node_id, placed_sf_list in sf_placement.items(): + available_sf = {} + for sf in placed_sf_list: + available_sf[sf] = self.network.nodes[node_id]['available_sf'].get(sf, {'load': 0.0}) + self.network.nodes[node_id]['available_sf'] = available_sf # Flow interarrival exponential distribution mean: float self.inter_arr_mean = config['inter_arrival_mean'] # Flow data rate normal distribution mean: float diff --git a/src/siminterface/simulator.py b/src/siminterface/simulator.py index 464d21da8..6518c2d73 100644 --- a/src/siminterface/simulator.py +++ b/src/siminterface/simulator.py @@ -21,7 +21,7 @@ def __init__(self, test_mode=False): # Create CSV writer self.writer = ResultWriter(self.test_mode) - def init(self, network_file, service_functions_file, config_file, seed): + def init(self, network_file, service_functions_file, config_file, seed, resource_functions_path=""): # Initialize metrics, record start time metrics.reset() @@ -31,7 +31,7 @@ def init(self, network_file, service_functions_file, config_file, seed): # Parse network and SFC + SF file self.network, self.ing_nodes = reader.read_network(network_file, node_cap=10, link_cap=10) self.sfc_list = reader.get_sfc(service_functions_file) - self.sf_list = reader.get_sf(service_functions_file) + self.sf_list = reader.get_sf(service_functions_file, resource_functions_path) self.config = reader.get_config(config_file) # Generate SimPy simulation environment @@ -78,6 +78,12 @@ def apply(self, actions: SimulatorAction): # Get the new placement from the action passed by the RL agent # Modify and set the placement parameter of the instantiated simulator object. self.simulator.params.sf_placement = actions.placement + # Update which sf is available at which node + for node_id, placed_sf_list in actions.placement.items(): + available_sf = {} + for sf in placed_sf_list: + available_sf[sf] = self.simulator.params.network.nodes[node_id]['available_sf'].get(sf, {'load': 0.0}) + self.simulator.params.network.nodes[node_id]['available_sf'] = available_sf # Get the new schedule from the SimulatorAction # Set it in the params of the instantiated simulator object. diff --git a/tests/test_simulator.py b/tests/test_simulator.py index fcb870d36..dc0f370bd 100644 --- a/tests/test_simulator.py +++ b/tests/test_simulator.py @@ -10,6 +10,7 @@ NETWORK_FILE = "params/networks/triangle.graphml" SERVICE_FUNCTIONS_FILE = "params/services/abc.yaml" +RESOURCE_FUNCTION_PATH = "params/services/resource_functions" CONFIG_FILE = "params/config/sim_config.yaml" SIMULATION_DURATION = 1000 SEED = 1234 @@ -30,7 +31,7 @@ def setUp(self): # Configure simulator parameters network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10) sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE) - sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE) + sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH) config = reader.get_config(CONFIG_FILE) sf_placement = dummy_data.triangle_placement diff --git a/tests/test_simulatorInterface.py b/tests/test_simulatorInterface.py index 16d098867..9717597ab 100644 --- a/tests/test_simulatorInterface.py +++ b/tests/test_simulatorInterface.py @@ -8,6 +8,7 @@ NETWORK_FILE = "params/networks/triangle.graphml" SERVICE_FUNCTIONS_FILE = "params/services/3sfcs.yaml" +RESOURCE_FUNCTION_PATH = "params/services/resource_functions" CONFIG_FILE = "params/config/sim_config.yaml" SIMULATOR_MODULE_NAME = "siminterface.simulator" @@ -27,7 +28,7 @@ def setUp(self): """ # TODO: replace SimulatorInterface with implementation self.simulator = SIMULATOR_CLS(TEST_MODE) - self.simulator.init(NETWORK_FILE, SERVICE_FUNCTIONS_FILE, CONFIG_FILE, 1234) + self.simulator.init(NETWORK_FILE, SERVICE_FUNCTIONS_FILE, CONFIG_FILE, 1234, resource_functions_path=RESOURCE_FUNCTION_PATH) def test_apply(self): # test if placement and schedule can be applied @@ -246,7 +247,7 @@ def test_apply(self): } """ network_stats = simulator_state.network_stats - self.assertIs(len(network_stats), 5) + self.assertIs(len(network_stats), 7) self.assertIn('total_flows', network_stats) self.assertIn('successful_flows', network_stats) self.assertIn('dropped_flows', network_stats)