Improve some func. tests variables naming
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test02_topo_portmapping.py
index 76c2aae4c54c74a66d47bb49fd6066e248652d7c..586b882f1868933b174bae4144b0974485f0f862 100644 (file)
 import unittest
 import time
 import requests
+# pylint: disable=wrong-import-order
 import sys
 sys.path.append('transportpce_tests/common/')
-import test_utils  # nopep8
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils_rfc8040  # nopep8
 
 
 class TransportPCEtesting(unittest.TestCase):
@@ -27,14 +30,14 @@ class TransportPCEtesting(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.processes = test_utils.start_tpce()
-        cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
+        cls.processes = test_utils_rfc8040.start_tpce()
+        cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
 
     @classmethod
     def tearDownClass(cls):
         # pylint: disable=not-an-iterable
         for process in cls.processes:
-            test_utils.shutdown_process(process)
+            test_utils_rfc8040.shutdown_process(process)
         print("all processes killed")
 
     def setUp(self):
@@ -42,33 +45,32 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Connect the ROADMA
     def test_01_connect_rdm(self):
-        response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
-        self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+        response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+        self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
 
     # Verify the termination points of the ROADMA
     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
-        responseTopo = test_utils.get_ordm_topo_request("")
-        resTopo = responseTopo.json()
-        nbNode = len(resTopo['network'][0]['node'])
+        resTopo = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+        self.assertEqual(resTopo['status_code'], requests.codes.ok)
         nbMapCumul = 0
         nbMappings = 0
-        for i in range(0, nbNode):
-            nodeId = resTopo['network'][0]['node'][i]['node-id']
+        for node in resTopo['network'][0]['node']:
+            nodeId = node['node-id']
+            # pylint: disable=consider-using-f-string
             print("nodeId={}".format(nodeId))
             nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
             print("nodeMapId={}".format(nodeMapId))
-            responseMapList = test_utils.portmapping_request(nodeMapId)
-            resMapList = responseMapList.json()
-
-            nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
-            nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
+            response = test_utils_rfc8040.get_portmapping_node_info(nodeMapId)
+            self.assertEqual(response['status_code'], requests.codes.ok)
+            responseMapList = test_utils_rfc8040.get_portmapping(nodeMapId)
+            nbMappings = len(responseMapList['nodes'][0]['mapping']) - nbMapCumul
             nbMapCurrent = 0
-            for j in range(0, nbTp):
-                tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
+            for tp in node['ietf-network-topology:termination-point']:
+                tpId = tp['tp-id']
                 if (not "CP" in tpId) and (not "CTP" in tpId):
-                    responseMap = test_utils.portmapping_request(nodeMapId+"/mapping/"+tpId)
-                    self.assertEqual(responseMap.status_code, requests.codes.ok)
-                    if responseMap.status_code == requests.codes.ok:
+                    responseMap = test_utils_rfc8040.portmapping_request(nodeMapId, tpId)
+                    self.assertEqual(responseMap['status_code'], requests.codes.ok)
+                    if responseMap['status_code'] == requests.codes.ok:
                         nbMapCurrent += 1
             nbMapCumul += nbMapCurrent
         nbMappings -= nbMapCurrent
@@ -76,13 +78,13 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Disconnect the ROADMA
     def test_03_disconnect_rdm(self):
-        response = test_utils.unmount_device("ROADM-A1")
-        self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+        response = test_utils_rfc8040.unmount_device("ROADM-A1")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
 #     #Connect the XPDRA
     def test_04_connect_xpdr(self):
-        response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
-        self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+        response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
+        self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
 
 #     #Verify the termination points related to XPDR
     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
@@ -90,8 +92,8 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Disconnect the XPDRA
     def test_06_disconnect_device(self):
-        response = test_utils.unmount_device("XPDR-A1")
-        self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+        response = test_utils_rfc8040.unmount_device("XPDR-A1")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
 
 if __name__ == "__main__":