Refactor func tests transportpce api rpc calls 1/2
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test05_flex_grid.py
index c82a0243f86840e1ad3e85862e74162254cc92b1..2d5674bc5b8119632abe81e3a8fd95a585b91402 100644 (file)
@@ -9,12 +9,17 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
+# pylint: disable=no-member
+
 import unittest
 import time
 import requests
+# pylint: disable=wrong-import-order
 import sys
 sys.path.append('transportpce_tests/common/')
-import test_utils  # nopep8
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils_rfc8040  # nopep8
 
 
 class TransportPCEPortMappingTesting(unittest.TestCase):
@@ -24,133 +29,102 @@ class TransportPCEPortMappingTesting(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.processes = test_utils.start_tpce()
-        cls.processes = test_utils.start_sims([('roadmd', cls.NODE_VERSION)])
+        cls.processes = test_utils_rfc8040.start_tpce()
+        cls.processes = test_utils_rfc8040.start_sims([('roadmd', cls.NODE_VERSION)])
 
     @classmethod
     def tearDownClass(cls):
+        # pylint: disable=not-an-iterable
         for process in cls.processes:
-            test_utils.shutdown_process(process)
+            test_utils_rfc8040.shutdown_process(process)
         print("all processes killed")
 
     def setUp(self):
+        # pylint: disable=consider-using-f-string
         print("execution of {}".format(self.id().split(".")[-1]))
         time.sleep(10)
 
     def test_01_rdm_device_connection(self):
-        response = test_utils.mount_device("ROADM-D1", ('roadmd', self.NODE_VERSION))
-        self.assertEqual(response.status_code, requests.codes.created,
-                         test_utils.CODE_SHOULD_BE_201)
+        response = test_utils_rfc8040.mount_device("ROADM-D1", ('roadmd', self.NODE_VERSION))
+        self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
 
     def test_02_rdm_device_connected(self):
-        response = test_utils.get_netconf_oper_request("ROADM-D1")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
-        self.assertEqual(
-            res['node'][0]['netconf-node-topology:connection-status'],
-            'connected')
+        response = test_utils_rfc8040.check_device_connection("ROADM-D1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertEqual(response['connection-status'], 'connected')
         time.sleep(10)
 
     def test_03_rdm_portmapping_info(self):
-        response = test_utils.portmapping_request("ROADM-D1/node-info")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
+        response = test_utils_rfc8040.get_portmapping_node_info("ROADM-D1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(
-            {u'node-info': {u'node-type': u'rdm',
-                            u'node-ip-address': u'127.0.0.14',
-                            u'node-clli': u'NodeD',
-                            u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorD',
-                            u'node-model': u'model2',
-                            }},
-            res)
+            {'node-type': 'rdm',
+             'node-ip-address': '127.0.0.14',
+             'node-clli': 'NodeD',
+             'openroadm-version': '2.2.1',
+             'node-vendor': 'vendorD',
+             'node-model': 'model2'},
+            response['node-info'])
         time.sleep(3)
 
     def test_04_rdm_deg1_lcp(self):
-        response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/DEG1-TTP")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
-        self.assertEqual(
-            {
-                "mc-capabilities": [
-                    {
-                        "mc-node-name": "DEG1-TTP",
-                        "center-freq-granularity": 6.25,
-                        "slot-width-granularity": 12.5
-                    }
-                ]
-            }, res)
+        # pylint: disable=line-too-long
+        response = test_utils_rfc8040.portmapping_mc_capa_request("ROADM-D1", "DEG1-TTP")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn(response['mc-capabilities'],
+                      [[{'mc-node-name': 'DEG1-TTP', 'center-freq-granularity': '6.25', 'slot-width-granularity': '12.5'}],
+                       [{'mc-node-name': 'DEG1-TTP', 'center-freq-granularity': 6.25, 'slot-width-granularity': 12.5}]])
         time.sleep(3)
 
     def test_05_rdm_deg2_lcp(self):
-        response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/DEG2-TTP")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
-        self.assertEqual(
-            {
-                "mc-capabilities": [
-                    {
-                        "mc-node-name": "DEG2-TTP",
-                        "center-freq-granularity": 6.25,
-                        "slot-width-granularity": 12.5
-                    }
-                ]
-            }, res)
+        # pylint: disable=line-too-long
+        response = test_utils_rfc8040.portmapping_mc_capa_request("ROADM-D1", "DEG2-TTP")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn(response['mc-capabilities'],
+                      [[{'mc-node-name': 'DEG2-TTP', 'center-freq-granularity': '6.25', 'slot-width-granularity': '12.5'}],
+                       [{'mc-node-name': 'DEG2-TTP', 'center-freq-granularity': 6.25, 'slot-width-granularity': 12.5}]])
         time.sleep(3)
 
     def test_06_rdm_srg1_lcp(self):
-        response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/SRG1-PP")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
-        self.assertEqual(
-            {
-                "mc-capabilities": [
-                    {
-                        "mc-node-name": "SRG1-PP",
-                        "center-freq-granularity": 6.25,
-                        "slot-width-granularity": 12.5
-                    }
-                ]
-            }, res)
+        # pylint: disable=line-too-long
+        response = test_utils_rfc8040.portmapping_mc_capa_request("ROADM-D1", "SRG1-PP")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn(response['mc-capabilities'],
+                      [[{'mc-node-name': 'SRG1-PP', 'center-freq-granularity': '6.25', 'slot-width-granularity': '12.5'}],
+                       [{'mc-node-name': 'SRG1-PP', 'center-freq-granularity': 6.25, 'slot-width-granularity': 12.5}]])
         time.sleep(3)
 
     # Renderer interface creations
     def test_07_device_renderer(self):
-        data = {
-            "transportpce-device-renderer:input": {
-                "transportpce-device-renderer:modulation-format": "dp-qpsk",
-                "transportpce-device-renderer:operation": "create",
-                "transportpce-device-renderer:service-name": "testNMC-MC",
-                "transportpce-device-renderer:wave-number": "0",
-                "transportpce-device-renderer:center-freq": "196.05",
-                "transportpce-device-renderer:width": "80",
-                "transportpce-device-renderer:nodes": [
+        response = test_utils_rfc8040.transportpce_api_rpc_request(
+            'transportpce-device-renderer', 'service-path',
+            {
+                'modulation-format': 'dp-qpsk',
+                'operation': 'create',
+                'service-name': 'testNMC-MC',
+                'wave-number': '0',
+                'center-freq': '196.05',
+                'nmc-width': '80',
+                'nodes': [
                     {
-                        "transportpce-device-renderer:node-id": "ROADM-D1",
-                        "transportpce-device-renderer:src-tp": "SRG1-PP1-TXRX",
-                        "transportpce-device-renderer:dest-tp": "DEG1-TTP-TXRX"
+                        'node-id': 'ROADM-D1',
+                        'src-tp': 'SRG1-PP1-TXRX',
+                        'dest-tp': 'DEG1-TTP-TXRX'
                     }
                 ],
-                "transportpce-device-renderer:min-freq": 196.00625,
-                "transportpce-device-renderer:max-freq": 196.09375,
-                "transportpce-device-renderer:lower-spectral-slot-number": 749,
-                "transportpce-device-renderer:higher-spectral-slot-number": 763
-            }
-        }
-        url = test_utils.RESTCONF_BASE_URL + \
-            "/operations/transportpce-device-renderer:service-path"
-        response = test_utils.post_request(url, data)
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
-        self.assertIn('Roadm-connection successfully created for nodes',
-                      res['output']['result'])
+                'min-freq': 196.00625,
+                'max-freq': 196.09375,
+                'lower-spectral-slot-number': 749,
+                'higher-spectral-slot-number': 763
+            })
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
         time.sleep(10)
 
     # Get Degree MC interface and check
     def test_08_degree_mc_interface(self):
-        response = test_utils.check_netconf_node_request("ROADM-D1",
-                                                         "interface/DEG1-TTP-TXRX-mc-749:763")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
+        response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-mc-749:763")
+        self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertDictEqual(
             dict({"name": "DEG1-TTP-TXRX-mc-749:763",
                   "supporting-interface": "OMS-DEG1-TTP-TXRX",
@@ -159,23 +133,18 @@ class TransportPCEPortMappingTesting(unittest.TestCase):
                   "description": "TBD",
                   "supporting-port": "L1",
                   "type": "org-openroadm-interfaces:mediaChannelTrailTerminationPoint"},
-                 **res['interface'][0]),
-            res['interface'][0])
+                 **response['interface'][0]), response['interface'][0])
 
         # Check the mc-ttp max and min-freq
-        self.assertEqual({
-            "min-freq": 196.00625,
-            "max-freq": 196.09375
-        },
-            res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
+        self.assertIn(response['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'],
+                      [{'min-freq': '196.00625', 'max-freq': '196.09375'},
+                       {'min-freq': 196.00625, 'max-freq': 196.09375}])
         time.sleep(3)
 
     # get DEG-NMC interface and check
     def test_09_degree_nmc_interface(self):
-        response = test_utils.check_netconf_node_request("ROADM-D1",
-                                                         "interface/DEG1-TTP-TXRX-nmc-749:763")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
+        response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-nmc-749:763")
+        self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertDictEqual(
             dict({"name": "DEG1-TTP-TXRX-nmc-749:763",
                   "supporting-interface": "DEG1-TTP-TXRX-mc-749:763",
@@ -184,23 +153,18 @@ class TransportPCEPortMappingTesting(unittest.TestCase):
                   "description": "TBD",
                   "supporting-port": "L1",
                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
-                 **res['interface'][0]),
-            res['interface'][0])
+                 **response['interface'][0]), response['interface'][0])
 
         # Check the mc-ttp max and min-freq
-        self.assertEqual({
-            "frequency": 196.05,
-            "width": 80
-        },
-            res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
+        self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
+                      [{'frequency': '196.05000', 'width': '80'},
+                       {'frequency': 196.05, 'width': 80}])
         time.sleep(3)
 
     # get SRG-NMC interface
     def test_10_srg_nmc_interface(self):
-        response = test_utils.check_netconf_node_request("ROADM-D1",
-                                                         "interface/SRG1-PP1-TXRX-nmc-749:763")
-        res = response.json()
-        self.assertEqual(response.status_code, requests.codes.ok)
+        response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "SRG1-PP1-TXRX-nmc-749:763")
+        self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(
             dict({"name": "SRG1-PP1-TXRX-nmc-749:763",
                   "supporting-circuit-pack-name": "3/0",
@@ -208,63 +172,56 @@ class TransportPCEPortMappingTesting(unittest.TestCase):
                   "description": "TBD",
                   "supporting-port": "C1",
                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
-                 **res['interface'][0]), res['interface'][0])
-        self.assertEqual({"frequency": 196.05, "width": 80},
-                         res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
+                 **response['interface'][0]), response['interface'][0])
+        self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
+                      [{'frequency': '196.05000', 'width': '80'},
+                       {'frequency': 196.05, 'width': 80}])
         time.sleep(3)
 
     # Create ROADM-connection
     def test_11_roadm_connection(self):
-        response = test_utils.check_netconf_node_request("ROADM-D1",
-                                                         "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
-        self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "ROADM-D1", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
+        self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual("SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763",
-                         res['roadm-connections'][0]['connection-name'])
+                         response['roadm-connections'][0]['connection-name'])
         self.assertEqual("SRG1-PP1-TXRX-nmc-749:763",
-                         res['roadm-connections'][0]['source']['src-if'])
+                         response['roadm-connections'][0]['source']['src-if'])
         self.assertEqual("DEG1-TTP-TXRX-nmc-749:763",
-                         res['roadm-connections'][0]['destination']['dst-if'])
+                         response['roadm-connections'][0]['destination']['dst-if'])
         time.sleep(3)
 
     # Delete ROADM connections and interfaces
 
     # delete ROADM connection
     def test_12_delete_roadm_connection(self):
-        response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
-                                             "node/ROADM-D1/yang-ext:mount/" +
-                                             "org-openroadm-device:org-openroadm-device/" +
-                                             "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
-        self.assertEqual(response.status_code, requests.codes.ok)
+        response = test_utils_rfc8040.del_node_attribute_request(
+            "ROADM-D1", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
         time.sleep(3)
 
     # Delete NMC SRG interface
     def test_13_delete_srg_interface(self):
-        response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
-                                             "node/ROADM-D1/yang-ext:mount/" +
-                                             "org-openroadm-device:org-openroadm-device/" +
-                                             "interface/SRG1-PP1-TXRX-nmc-749:763")
-        self.assertEqual(response.status_code, requests.codes.ok)
+        response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "SRG1-PP1-TXRX-nmc-749:763")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
         time.sleep(3)
 
     # Delete NMC Degree interface
     def test_14_delete_degree_interface(self):
-        response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
-                                             "node/ROADM-D1/yang-ext:mount/" +
-                                             "org-openroadm-device:org-openroadm-device/" +
-                                             "interface/DEG1-TTP-TXRX-nmc-749:763")
-        self.assertEqual(response.status_code, requests.codes.ok)
+        response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-nmc-749:763")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
         time.sleep(3)
 
     # Delete MC Degree interface
     def test_15_delete_degree_interface(self):
-        response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
-                                             "node/ROADM-D1/yang-ext:mount/" +
-                                             "org-openroadm-device:org-openroadm-device/" +
-                                             "interface/DEG1-TTP-TXRX-mc-749:763")
-        self.assertEqual(response.status_code, requests.codes.ok)
+        response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-mc-749:763")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
         time.sleep(3)
 
+    def test_16_disconnect_ROADM_D1(self):
+        response = test_utils_rfc8040.unmount_device("ROADM-D1")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
+
 
 if __name__ == "__main__":
     unittest.main(verbosity=2)