class TransportPCEtesting(unittest.TestCase):
odl_process = None
- simple_data = None
- complex_data = None
+ simple_topo_bi_dir_data = None
+ simple_topo_uni_dir_data = None
+ complex_topo_uni_dir_data = None
restconf_baseurl = "http://localhost:8181/restconf"
@classmethod
def _get_file(cls):
- simple_topology_file = "sample_configs/NW-simple-topology.xml"
- if os.path.isfile(simple_topology_file):
- with open(simple_topology_file, 'r') as simple_file:
- cls.simple_data = simple_file.read();
- complex_topology_file = "sample_configs/NW-for-test-5-4.xml"
- if os.path.isfile(complex_topology_file):
- with open(complex_topology_file, 'r') as complex_file:
- cls.complex_data = complex_file.read();
+ topo_bi_dir_file = "sample_configs/honeynode-topo.xml"
+ if os.path.isfile(topo_bi_dir_file):
+ with open(topo_bi_dir_file, 'r') as topo_bi_dir:
+ cls.simple_topo_bi_dir_data = topo_bi_dir.read();
+ topo_uni_dir_file = "sample_configs/NW-simple-topology.xml"
+ if os.path.isfile(topo_uni_dir_file):
+ with open(topo_uni_dir_file, 'r') as topo_uni_dir:
+ cls.simple_topo_uni_dir_data = topo_uni_dir.read();
+ topo_uni_dir_complex_file = "sample_configs/NW-for-test-5-4.xml"
+ if os.path.isfile(topo_uni_dir_complex_file):
+ with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
+ cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read();
@classmethod
def __start_odl(cls):
def setUp(self): # instruction executed before each test method
time.sleep(1)
- # Load simple topology
- def test_01_load_simple_topology(self):
+ # Load simple bidirectional topology
+ def test_01_load_simple_topology_bi(self):
url = ("{}/config/ietf-network:network/openroadm-topology"
.format(self.restconf_baseurl))
- body = self.simple_data
+ body = self.simple_topo_bi_dir_data
headers = {'content-type': 'application/xml',
"Accept": "application/json"}
response = requests.request(
# Get existing nodeId
def test_02_get_nodeId(self):
+ url = ("{}/config/ietf-network:network/openroadm-topology/node/ROADMA-SRG1"
+ .format(self.restconf_baseurl))
+ headers = {'content-type': 'application/json',
+ "Accept": "application/json"}
+ response = requests.request(
+ "GET", url, headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(
+ res['node'][0]['node-id'], 'ROADMA-SRG1')
+ time.sleep(1)
+
+ # Get existing linkId
+ def test_03_get_linkId(self):
+ url = ("{}/config/ietf-network:network/openroadm-topology/link/XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX"
+ .format(self.restconf_baseurl))
+ headers = {'content-type': 'application/json',
+ "Accept": "application/json"}
+ response = requests.request(
+ "GET", url, headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(
+ res['ietf-network-topology:link'][0]['link-id'],
+ 'XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX')
+ time.sleep(1)
+
+ # Path Computation success
+ def test_04_path_computation_xpdr_bi(self):
+ url = ("{}/operations/transportpce-pce:path-computation-request"
+ .format(self.restconf_baseurl))
+ body = {"input": {
+ "service-name": "service-1",
+ "resource-reserve": "true",
+ "pce-metric": "hop-count",
+ "service-handler-header": {
+ "request-id": "request-1"
+ },
+ "service-a-end": {
+ "node-id": "XPDRA",
+ "service-rate": "0",
+ "clli": "nodeA"
+ },
+ "service-z-end": {
+ "node-id": "XPDRC",
+ "service-rate": "0",
+ "clli": "nodeC"
+ }
+ }
+ }
+ headers = {'content-type': 'application/json',
+ "Accept": "application/json"}
+ response = requests.request(
+ "POST", url, data=json.dumps(body), headers=headers,
+ auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Path is calculated',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(5)
+
+ # Path Computation success
+ def test_05_path_computation_rdm_bi(self):
+ url = ("{}/operations/transportpce-pce:path-computation-request"
+ .format(self.restconf_baseurl))
+ body = {"input": {
+ "service-name": "service-1",
+ "resource-reserve": "true",
+ "pce-metric": "hop-count",
+ "service-handler-header": {
+ "request-id": "request-1"
+ },
+ "service-a-end": {
+ "node-id": "ROADMA",
+ "service-rate": "0",
+ "clli": "nodeA"
+ },
+ "service-z-end": {
+ "node-id": "ROADMC",
+ "service-rate": "0",
+ "clli": "nodeC"
+ }
+ }
+ }
+ headers = {'content-type': 'application/json',
+ "Accept": "application/json"}
+ response = requests.request(
+ "POST", url, data=json.dumps(body), headers=headers,
+ auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Path is calculated',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(5)
+
+ # Delete topology
+ def test_06_delete_simple_topology_bi(self):
+ url = ("{}/config/ietf-network:network/openroadm-topology"
+ .format(self.restconf_baseurl))
+ headers = {'content-type': 'application/xml',
+ "Accept": "application/json"}
+ response = requests.request(
+ "DELETE", url, headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ time.sleep(2)
+
+ # Test deleted topology
+ def test_07_test_topology_simple_bi_deleted(self):
+ url = ("{}/config/ietf-network:network/openroadm-topology/node/ROADMA-SRG1"
+ .format(self.restconf_baseurl))
+ headers = {'content-type': 'application/json',
+ "Accept": "application/json"}
+ response = requests.request(
+ "GET", url, headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, 404)
+ time.sleep(1)
+
+ # Load simple bidirectional topology
+ def test_08_load_simple_topology_uni(self):
+ url = ("{}/config/ietf-network:network/openroadm-topology"
+ .format(self.restconf_baseurl))
+ body = self.simple_topo_uni_dir_data
+ headers = {'content-type': 'application/xml',
+ "Accept": "application/json"}
+ response = requests.request(
+ "PUT", url, data=body, headers=headers,
+ auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, 201)
+ time.sleep(2)
+
+ # Get existing nodeId
+ def test_09_get_nodeId(self):
url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-1-2"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/json',
time.sleep(1)
# Get existing linkId
- def test_03_get_linkId(self):
+ def test_10_get_linkId(self):
url = ("{}/config/ietf-network:network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/json',
time.sleep(1)
# Path Computation success
- def test_04_path_computation(self):
+ def test_11_path_computation_xpdr_uni(self):
url = ("{}/operations/transportpce-pce:path-computation-request"
.format(self.restconf_baseurl))
body = {"input": {
res['output']['configuration-response-common']['response-message'])
time.sleep(5)
+ # Path Computation success
+ def test_12_path_computation_rdm_uni(self):
+ url = ("{}/operations/transportpce-pce:path-computation-request"
+ .format(self.restconf_baseurl))
+ body = {"input": {
+ "service-name": "service1",
+ "resource-reserve": "true",
+ "service-handler-header": {
+ "request-id": "request1"
+ },
+ "service-a-end": {
+ "service-rate": "0",
+ "clli": "cll21",
+ "node-id": "OpenROADM-2-1"
+ },
+ "service-z-end": {
+ "service-rate": "0",
+ "clli": "ncli22",
+ "node-id": "OpenROADM-2-2"
+ },
+ "pce-metric": "hop-count"
+ }
+ }
+ headers = {'content-type': 'application/json',
+ "Accept": "application/json"}
+ response = requests.request(
+ "POST", url, data=json.dumps(body), headers=headers,
+ auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Path is calculated',
+ res['output']['configuration-response-common']['response-message'])
+ #ZtoA path test
+ atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
+ ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
+ self.assertEqual(atozList,15)
+ self.assertEqual(ztoaList,15)
+ for i in range(0,15):
+ atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
+ ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
+ if (atoz['id'] == '14'):
+ self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
+ if (ztoa['id'] == '0'):
+ self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
+ time.sleep(5)
+
# Delete topology
- def test_05_delete_simple_topology(self):
+ def test_13_delete_simple_topology(self):
url = ("{}/config/ietf-network:network/openroadm-topology"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/xml',
time.sleep(2)
# Test deleted topology
- def test_06_test_topology_simple_deleted(self):
+ def test_14_test_topology_simple_deleted(self):
url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-1-2"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/json',
time.sleep(1)
# Load simple topology
- def test_07_load_complex_topology(self):
+ def test_15_load_complex_topology(self):
url = ("{}/config/ietf-network:network/openroadm-topology"
.format(self.restconf_baseurl))
- body = self.complex_data
+ body = self.complex_topo_uni_dir_data
headers = {'content-type': 'application/xml',
"Accept": "application/json"}
response = requests.request(
time.sleep(2)
# Get existing nodeId
- def test_08_get_nodeId(self):
+ def test_16_get_nodeId(self):
url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-3-2"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/json',
time.sleep(1)
# Test failed path computation
- def test_09_fail_path_computation(self):
+ def test_17_fail_path_computation(self):
url = ("{}/operations/transportpce-pce:path-computation-request"
.format(self.restconf_baseurl))
body = {"input": {
time.sleep(2)
# Test1 success path computation
- def test_10_success1_path_computation(self):
+ def test_18_success1_path_computation(self):
url = ("{}/operations/transportpce-pce:path-computation-request"
.format(self.restconf_baseurl))
body = {"input": {
time.sleep(5)
# Test2 success path computation with path description
- def test_11_success2_path_computation(self):
+ def test_19_success2_path_computation(self):
url = ("{}/operations/transportpce-pce:path-computation-request"
.format(self.restconf_baseurl))
body = {"input": {
time.sleep(5)
# Test3 success path computation with hard-constraints exclude
- def test_12_success3_path_computation(self):
+ def test_20_success3_path_computation(self):
url = ("{}/operations/transportpce-pce:path-computation-request"
.format(self.restconf_baseurl))
body = {"input": {
time.sleep(5)
# Delete complex topology
- def test_13_delete_complex_topology(self):
+ def test_21_delete_complex_topology(self):
url = ("{}/config/ietf-network:network/openroadm-topology"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/xml',
time.sleep(2)
# Test deleted complex topology
- def test_14_test_topology_complex_deleted(self):
+ def test_22_test_topology_complex_deleted(self):
url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-3-2"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/json',