Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added VNF resource consumption functions #78

Merged
merged 11 commits into from
Aug 22, 2019
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ install:
script:
- flake8 src
- nose2
- coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -c params/config/sim_config.yaml
- coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml
2 changes: 2 additions & 0 deletions params/services/abc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ sf_list:
a:
processing_delay_mean: 5.0
processing_delay_stdev: 0.0
resource_function_id: A
b:
processing_delay_mean: 5.0
processing_delay_stdev: 0.0
resource_function_id: B
c:
processing_delay_mean: 5.0
processing_delay_stdev: 0.0
2 changes: 2 additions & 0 deletions params/services/resource_functions/A.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def resource_function(load):
return load
2 changes: 2 additions & 0 deletions params/services/resource_functions/B.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def resource_function(load):
return load
4 changes: 3 additions & 1 deletion src/coordsim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def main():

# Getting current SFC list, and the SF list of each SFC, and config
sfc_list = reader.get_sfc(args.sf)
sf_list = reader.get_sf(args.sf)
sf_list = reader.get_sf(args.sf, args.sfr)
config = reader.get_config(args.config)

# use dummy placement and schedule for running simulator without algorithm
Expand Down Expand Up @@ -66,6 +66,8 @@ def parse_args():
help="The duration of the simulation (simulates milliseconds).")
parser.add_argument('-sf', '--sf', required=True, dest="sf",
help="VNF file which contains the SFCs and their respective SFs and their properties.")
parser.add_argument('-sfr', '--sfr', required=False, default='', dest='sfr',
help="Path which contains the SF resource consumption functions.")
parser.add_argument('-n', '--network', required=True, dest='network',
help="The GraphML network file that specifies the nodes and edges of the network.")
parser.add_argument('-c', '--config', required=True, dest='config', help="Path to the simulator config file.")
Expand Down
34 changes: 32 additions & 2 deletions src/coordsim/reader/reader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import networkx as nx
from geopy.distance import distance as dist
import numpy as np
import logging as log
import logging
import yaml
import math
from collections import defaultdict
import importlib

log = logging.getLogger(__name__)

# Disclaimer: Some snippets of the following file were imported/modified from B-JointSP on GitHub.
# Original code can be found on https://github.com/CN-UPB/B-JointSP
Expand Down Expand Up @@ -40,7 +42,21 @@ def get_sfc(sfc_file):
return sfc_list


def get_sf(sf_file):
def load_resource_function(name, path):
try:
spec = importlib.util.spec_from_file_location(name, path + '/' + name + '.py')
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
except Exception:
raise Exception(f'Cannot load file "{name}.py" from specified location "{path}".')

try:
return getattr(module, 'resource_function')
except Exception:
raise Exception(f'There is no "resource_function" defined in file "{name}.py."')


def get_sf(sf_file, resource_functions_path):
"""
Get the list of SFs and their properties from the yaml data.
"""
Expand All @@ -50,6 +66,7 @@ def get_sf(sf_file):
# Configureable default mean and stdev defaults
default_processing_delay_mean = 1.0
default_processing_delay_stdev = 1.0
def default_resource_function(x): return x
sf_list = defaultdict(None)
for sf_name, sf_details in sf_data['sf_list'].items():
sf_list[sf_name] = sf_details
Expand All @@ -58,6 +75,19 @@ def get_sf(sf_file):
default_processing_delay_mean)
sf_list[sf_name]["processing_delay_stdev"] = sf_list[sf_name].get("processing_delay_stdev",
default_processing_delay_stdev)
if 'resource_function_id' in sf_list[sf_name]:
try:
sf_list[sf_name]['resource_function'] = load_resource_function(sf_list[sf_name]['resource_function_id'],
resource_functions_path)
except Exception as ex:
sf_list[sf_name]['resource_function_id'] = 'default'
sf_list[sf_name]['resource_function'] = default_resource_function
log.warning(f'{str(ex)} SF {sf_name} will use default resource function instead.')
else:
sf_list[sf_name]["resource_function_id"] = 'default'
sf_list[sf_name]["resource_function"] = default_resource_function
log.info(
f'No resource function specified for SF {sf_name}. Default resource function will be used instead.')
return sf_list


Expand Down
31 changes: 27 additions & 4 deletions src/coordsim/simulation/flowsimulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,32 @@ def process_flow(self, flow, sfc):
# Add the delay to the flow's end2end delay
metrics.add_processing_delay(processing_delay)
flow.end2end_delay += processing_delay

# Calculate the demanded capacity when the flow is processed at this node
demanded_total_capacity = 0.0
for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items():
if sf == sf_i:
# Include flows data rate in requested sf capacity calculation
demanded_total_capacity += self.params.sf_list[sf]['resource_function'](sf_data['load'] + flow.dr)
else:
demanded_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load'])

# Get node capacities
node_cap = self.params.network.nodes[current_node_id]["cap"]
node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"]
assert node_remaining_cap >= 0, "Remaining node capacity cannot be less than 0 (zero)!"
# Metrics: Add active flow to the SF once the flow has begun processing.
metrics.add_active_flow(flow, current_node_id, current_sf)
if flow.dr <= node_remaining_cap:
if demanded_total_capacity <= node_cap:
log.info(
"Flow {} started proccessing at sf {} at node {}. Time: {}, "
"Flow {} started processing at sf {} at node {}. Time: {}, "
"Processing delay: {}".format(flow.flow_id, current_sf, current_node_id, self.env.now,
processing_delay))

self.params.network.nodes[current_node_id]["remaining_cap"] -= flow.dr
# Add load to sf
self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] += flow.dr
# Set remaining node capacity
self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - demanded_total_capacity
# Just for the sake of keeping lines small, the node_remaining_cap is updated again.
node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"]

Expand All @@ -223,7 +236,17 @@ def process_flow(self, flow, sfc):
.format(flow.flow_id, current_sf, current_node_id, self.env.now))
# Remove the active flow from the SF after it departed the SF
metrics.remove_active_flow(flow, current_node_id, current_sf)
self.params.network.nodes[current_node_id]["remaining_cap"] += flow.dr

# Remove load from sf
self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] -= flow.dr
stefanbschneider marked this conversation as resolved.
Show resolved Hide resolved
assert self.params.network.nodes[current_node_id]['available_sf'][sf][
'load'] >= 0, 'SF load cannot be less than 0!'
# Recalculation is necessary because other flows could have already arrived or departed at the node
used_total_capacity = 0.0
for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items():
used_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load'])
# Set remaining node capacity
self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - used_total_capacity
# Just for the sake of keeping lines small, the node_remaining_cap is updated again.
node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"]

Expand Down
7 changes: 6 additions & 1 deletion src/coordsim/simulation/simulatorparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ def __init__(self, network, ing_nodes, sfc_list, sf_list, config, seed, schedule
self.schedule = schedule
# Placement of SFs in each node: defaultdict(list)
self.sf_placement = sf_placement

# Update which sf is available at which node
for node_id, placed_sf_list in sf_placement.items():
available_sf = {}
for sf in placed_sf_list:
available_sf[sf] = self.network.nodes[node_id]['available_sf'].get(sf, {'load': 0.0})
self.network.nodes[node_id]['available_sf'] = available_sf
# Flow interarrival exponential distribution mean: float
self.inter_arr_mean = config['inter_arrival_mean']
# Flow data rate normal distribution mean: float
Expand Down
10 changes: 8 additions & 2 deletions src/siminterface/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, test_mode=False):
# Create CSV writer
self.writer = ResultWriter(self.test_mode)

def init(self, network_file, service_functions_file, config_file, seed):
def init(self, network_file, service_functions_file, config_file, seed, resource_functions_path=""):

# Initialize metrics, record start time
metrics.reset()
Expand All @@ -31,7 +31,7 @@ def init(self, network_file, service_functions_file, config_file, seed):
# Parse network and SFC + SF file
self.network, self.ing_nodes = reader.read_network(network_file, node_cap=10, link_cap=10)
self.sfc_list = reader.get_sfc(service_functions_file)
self.sf_list = reader.get_sf(service_functions_file)
self.sf_list = reader.get_sf(service_functions_file, resource_functions_path)
self.config = reader.get_config(config_file)

# Generate SimPy simulation environment
Expand Down Expand Up @@ -78,6 +78,12 @@ def apply(self, actions: SimulatorAction):
# Get the new placement from the action passed by the RL agent
# Modify and set the placement parameter of the instantiated simulator object.
self.simulator.params.sf_placement = actions.placement
# Update which sf is available at which node
for node_id, placed_sf_list in actions.placement.items():
available_sf = {}
for sf in placed_sf_list:
available_sf[sf] = self.simulator.params.network.nodes[node_id]['available_sf'].get(sf, {'load': 0.0})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do exactly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each node stores a list of SFs that where placed and there current load as a node attribute. When changing the placement the list has to be updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the .get(sf, {'load': 0.0}) gets the current load of the SF at the node and defaults to 0 if the SF wasn't available?

self.simulator.params.network.nodes[node_id]['available_sf'] = available_sf

# Get the new schedule from the SimulatorAction
# Set it in the params of the instantiated simulator object.
Expand Down
3 changes: 2 additions & 1 deletion tests/test_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

NETWORK_FILE = "params/networks/triangle.graphml"
SERVICE_FUNCTIONS_FILE = "params/services/abc.yaml"
RESOURCE_FUNCTION_PATH = "params/services/resource_functions"
CONFIG_FILE = "params/config/sim_config.yaml"
SIMULATION_DURATION = 1000
SEED = 1234
Expand All @@ -30,7 +31,7 @@ def setUp(self):
# Configure simulator parameters
network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE)
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE)
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH)
config = reader.get_config(CONFIG_FILE)

sf_placement = dummy_data.triangle_placement
Expand Down
5 changes: 3 additions & 2 deletions tests/test_simulatorInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

NETWORK_FILE = "params/networks/triangle.graphml"
SERVICE_FUNCTIONS_FILE = "params/services/3sfcs.yaml"
RESOURCE_FUNCTION_PATH = "params/services/resource_functions"
CONFIG_FILE = "params/config/sim_config.yaml"

SIMULATOR_MODULE_NAME = "siminterface.simulator"
Expand All @@ -27,7 +28,7 @@ def setUp(self):
"""
# TODO: replace SimulatorInterface with implementation
self.simulator = SIMULATOR_CLS(TEST_MODE)
self.simulator.init(NETWORK_FILE, SERVICE_FUNCTIONS_FILE, CONFIG_FILE, 1234)
self.simulator.init(NETWORK_FILE, SERVICE_FUNCTIONS_FILE, CONFIG_FILE, 1234, resource_functions_path=RESOURCE_FUNCTION_PATH)

def test_apply(self):
# test if placement and schedule can be applied
Expand Down Expand Up @@ -246,7 +247,7 @@ def test_apply(self):
}
"""
network_stats = simulator_state.network_stats
self.assertIs(len(network_stats), 5)
self.assertIs(len(network_stats), 7)
self.assertIn('total_flows', network_stats)
self.assertIn('successful_flows', network_stats)
self.assertIn('dropped_flows', network_stats)
Expand Down