import unittest
import time
import requests
+# pylint: disable=wrong-import-order
import sys
sys.path.append('transportpce_tests/common/')
-import test_utils
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils # nopep8
class TransportPCEtesting(unittest.TestCase):
print("all processes killed")
def setUp(self):
- time.sleep(10)
+ time.sleep(2)
# Connect the ROADMA
def test_01_connect_rdm(self):
# Verify the termination points of the ROADMA
def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
- responseTopo = test_utils.get_ordm_topo_request("")
- resTopo = responseTopo.json()
- nbNode = len(resTopo['network'][0]['node'])
+ resTopo = test_utils.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(resTopo['status_code'], requests.codes.ok)
nbMapCumul = 0
nbMappings = 0
- for i in range(0, nbNode):
- nodeId = resTopo['network'][0]['node'][i]['node-id']
+ for node in resTopo['network'][0]['node']:
+ nodeId = node['node-id']
+ # pylint: disable=consider-using-f-string
print("nodeId={}".format(nodeId))
nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
print("nodeMapId={}".format(nodeMapId))
- responseMapList = test_utils.portmapping_request(nodeMapId)
- resMapList = responseMapList.json()
-
- nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
- nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
+ response = test_utils.get_portmapping_node_attr(nodeMapId, "node-info", None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ responseMapList = test_utils.get_portmapping_node_attr(nodeMapId, None, None)
+ nbMappings = len(responseMapList['nodes'][0]['mapping']) - nbMapCumul
nbMapCurrent = 0
- for j in range(0, nbTp):
- tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
+ for tp in node['ietf-network-topology:termination-point']:
+ tpId = tp['tp-id']
if (not "CP" in tpId) and (not "CTP" in tpId):
- responseMap = test_utils.portmapping_request(nodeMapId+"/mapping/"+tpId)
- self.assertEqual(responseMap.status_code, requests.codes.ok)
- if responseMap.status_code == requests.codes.ok:
+ responseMap = test_utils.get_portmapping_node_attr(nodeMapId, "mapping", tpId)
+ self.assertEqual(responseMap['status_code'], requests.codes.ok)
+ if responseMap['status_code'] == requests.codes.ok:
nbMapCurrent += 1
nbMapCumul += nbMapCurrent
nbMappings -= nbMapCurrent
# Disconnect the ROADMA
def test_03_disconnect_rdm(self):
response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
# #Connect the XPDRA
def test_04_connect_xpdr(self):
# Disconnect the XPDRA
def test_06_disconnect_device(self):
response = test_utils.unmount_device("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":