# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
+import base64
import unittest
import time
import requests
from common import test_utils
+from common.test_utils import INDEX_1_USED_FREQ_MAP, INDEX_1_2_USED_FREQ_MAP, AVAILABLE_FREQ_MAP
class TransportPCEtesting(unittest.TestCase):
@classmethod
def tearDownClass(cls):
+ # pylint: disable=not-an-iterable
for process in cls.processes:
test_utils.shutdown_process(process)
print("all processes killed")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
- self.assertEqual(nbNode, 4)
+ self.assertEqual(nbNode, 6, 'There should be 6 nodes')
self.assertNotIn('ietf-network-topology:link', res['network'][0])
def test_12_create_OCH_OTU4_service(self):
def test_18_check_no_interface_ODU4_spdra(self):
response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
res = response.json()
self.assertIn(
{"error-type": "application", "error-tag": "data-missing",
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'SRG1-PP1-TXRX':
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
+ self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
+ u'effective-bits': 8, u'freq-map': INDEX_1_USED_FREQ_MAP},
+ ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'])
if ele['tp-id'] == 'DEG2-TTP-TXRX':
self.assertIn({u'index': 1, u'frequency': 196.1,
u'width': 40},
self.assertEqual(nb_links, 4)
for link in res['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
- if (linkId == 'OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
- linkId == 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
+ if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
+ 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
- elif (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
- linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
+ elif (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
+ 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
response = test_utils.get_otn_topo_request()
res = response.json()
for node in res['network'][0]['node']:
- if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
+ if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
- if (tp['tp-id'] == 'XPDR1-NETWORK1'):
+ if tp['tp-id'] == 'XPDR1-NETWORK1':
xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
self.assertEqual(nb_links, 4)
for link in res['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
- if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
- linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
+ if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
+ 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
response = test_utils.get_otn_topo_request()
res = response.json()
for node in res['network'][0]['node']:
- if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
+ if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
- if (tp['tp-id'] == 'XPDR1-NETWORK1'):
+ if tp['tp-id'] == 'XPDR1-NETWORK1':
xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
- tsPoolList = [i for i in range(1, 9)]
+ tsPoolList = list(range(1, 9))
self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
def test_41_delete_10GE_service(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1-10GE",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1-10GE")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
def test_46_check_no_interface_10GE_CLIENT_spdra(self):
response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
def test_47_check_otn_topo_links(self):
response = test_utils.get_otn_topo_request()
self.assertEqual(nb_links, 4)
for link in res['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
- if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
- linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
+ if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
+ 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
- if (tp['tp-id'] == 'XPDR1-NETWORK1'):
+ if tp['tp-id'] == 'XPDR1-NETWORK1':
xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
def test_49_delete_ODU4_service(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1-ODU4",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1-ODU4")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
def test_51_check_no_interface_ODU4_spdra(self):
response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
def test_52_check_otn_topo_links(self):
self.test_22_check_otn_topo_otu4_links()
response = test_utils.get_otn_topo_request()
res = response.json()
for node in res['network'][0]['node']:
- if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
+ if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
- if (tp['tp-id'] == 'XPDR1-NETWORK1'):
+ if tp['tp-id'] == 'XPDR1-NETWORK1':
xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
def test_54_delete_OCH_OTU4_service(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1-OCH-OTU4",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1-OCH-OTU4")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
def test_55_get_no_service(self):
response = test_utils.get_service_list_request("")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
res = response.json()
self.assertIn(
{"error-type": "application", "error-tag": "data-missing",
def test_56_check_no_interface_OTU4_spdra(self):
response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
def test_57_check_no_interface_OCH_spdra(self):
response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
def test_58_getLinks_OtnTopology(self):
response = test_utils.get_otn_topo_request()
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'SRG1-PP1-TXRX':
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ print(res)
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
+ self.assertEqual(ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'],
+ AVAILABLE_FREQ_MAP)
if ele['tp-id'] == 'DEG2-TTP-TXRX':
self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
time.sleep(3)