Merge changes I082f82ac,I951224b0,Icecd1b21,Ifafa74b6,I9b384c72, ...
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test08_otn_sh_renderer.py
index ce6da1da515e42fa795176544f4cbee7fb67d17c..c3a1dfdebcfd40d52eb93337db4d97dcdc38ddd3 100644 (file)
@@ -20,7 +20,7 @@ import sys
 sys.path.append('transportpce_tests/common/')
 # pylint: disable=wrong-import-position
 # pylint: disable=import-error
-import test_utils_rfc8040  # nopep8
+import test_utils  # nopep8
 
 
 class TransportPCEtesting(unittest.TestCase):
@@ -30,42 +30,43 @@ class TransportPCEtesting(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.processes = test_utils_rfc8040.start_tpce()
-        cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
-                                                       ('spdrc', cls.NODE_VERSION)])
+        cls.processes = test_utils.start_tpce()
+        cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
+                                               ('spdrc', cls.NODE_VERSION)])
 
     @classmethod
     def tearDownClass(cls):
         # pylint: disable=not-an-iterable
         for process in cls.processes:
-            test_utils_rfc8040.shutdown_process(process)
+            test_utils.shutdown_process(process)
         print("all processes killed")
 
     def setUp(self):
-        time.sleep(5)
+        time.sleep(2)
 
     def test_01_connect_SPDR_SA1(self):
-        response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+        response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
         self.assertEqual(response.status_code, requests.codes.created,
-                         test_utils_rfc8040.CODE_SHOULD_BE_201)
-        time.sleep(10)
+                         test_utils.CODE_SHOULD_BE_201)
+        time.sleep(5)
 
-        response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
+        response = test_utils.check_device_connection("SPDR-SA1")
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(response['connection-status'], 'connected')
 
     def test_02_connect_SPDR_SC1(self):
-        response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+        response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
         self.assertEqual(response.status_code, requests.codes.created,
-                         test_utils_rfc8040.CODE_SHOULD_BE_201)
-        time.sleep(10)
+                         test_utils.CODE_SHOULD_BE_201)
+        time.sleep(5)
 
-        response = test_utils_rfc8040.check_device_connection("SPDR-SC1")
+        response = test_utils.check_device_connection("SPDR-SC1")
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(response['connection-status'], 'connected')
 
     def test_03_service_create_OTU4(self):
-        response = test_utils_rfc8040.renderer_service_implementation_request(
+        response = test_utils.transportpce_api_rpc_request(
+            'transportpce-renderer', 'service-implementation-request',
             {
                 'service-name': 'SPDRA-SPDRC-OTU4-ODU4',
                 'connection-type': 'infrastructure',
@@ -165,10 +166,11 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertIn('Operation Successful',
                       response['output']['configuration-response-common']['response-message'])
+        time.sleep(2)
 
     # Test OCH-OTU interfaces on SPDR-A1
     def test_04_check_interface_och(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
+        response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
                                    'administrative-state': 'inService',
@@ -177,15 +179,14 @@ class TransportPCEtesting(unittest.TestCase):
                                    'supporting-port': 'CP1-CFP0-P1'
                                    }, **response['interface'][0]),
                              response['interface'][0])
-        self.assertIn(
-            response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
-            [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
-              'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
-             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
-              'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
+        intf = response['interface'][0]['org-openroadm-optical-channel-interfaces:och']
+        self.assertEqual(intf['rate'], 'org-openroadm-common-types:R100G')
+        self.assertEqual(intf['modulation-format'], 'dp-qpsk')
+        self.assertEqual(float(intf['frequency']), 196.1)
+        self.assertEqual(float(intf['transmit-power']), -5)
 
     def test_05_check_interface_OTU(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+        response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
                         'administrative-state': 'inService',
@@ -208,7 +209,7 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Test OCH-OTU interfaces on SPDR-C1
     def test_06_check_interface_och(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-761:768")
+        response = test_utils.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-761:768")
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
                                    'administrative-state': 'inService',
@@ -217,15 +218,14 @@ class TransportPCEtesting(unittest.TestCase):
                                    'supporting-port': 'CP1-CFP0-P1'
                                    }, **response['interface'][0]),
                              response['interface'][0])
-        self.assertIn(
-            response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
-            [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
-              'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
-             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
-              'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
+        intf = response['interface'][0]['org-openroadm-optical-channel-interfaces:och']
+        self.assertEqual(intf['rate'], 'org-openroadm-common-types:R100G')
+        self.assertEqual(intf['modulation-format'], 'dp-qpsk')
+        self.assertEqual(float(intf['frequency']), 196.1)
+        self.assertEqual(float(intf['transmit-power']), -5)
 
     def test_07_check_interface_OTU(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-OTU")
+        response = test_utils.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-OTU")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
                         'administrative-state': 'inService',
@@ -248,7 +248,8 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Test creation of ODU4 service
     def test_08_service_create_ODU4(self):
-        response = test_utils_rfc8040.renderer_service_implementation_request(
+        response = test_utils.transportpce_api_rpc_request(
+            'transportpce-renderer', 'service-implementation-request',
             {
                 'service-name':
                 'SPDRA-SPDRC-OTU4-ODU4',
@@ -343,10 +344,11 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertIn('Operation Successful',
                       response['output']['configuration-response-common']['response-message'])
+        time.sleep(2)
 
     # Test ODU4 interfaces on SPDR-A1 and SPDR-C1
     def test_09_check_interface_ODU4(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+        response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
                         'administrative-state': 'inService',
@@ -373,7 +375,7 @@ class TransportPCEtesting(unittest.TestCase):
             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
 
     def test_10_check_interface_ODU4(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU4")
+        response = test_utils.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU4")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
                         'administrative-state': 'inService',
@@ -401,7 +403,8 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Test creation of 10G service
     def test_11_service_create_10GE(self):
-        response = test_utils_rfc8040.renderer_service_implementation_request(
+        response = test_utils.transportpce_api_rpc_request(
+            'transportpce-renderer', 'service-implementation-request',
             {
                 'service-name': 'SPDRA-SPDRC-10G',
                 'connection-type': 'service',
@@ -498,10 +501,11 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertIn('Operation Successful',
                       response['output']['configuration-response-common']['response-message'])
+        time.sleep(2)
 
     # Test the interfaces on SPDR-A1
     def test_12_check_interface_10GE_CLIENT(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
+        response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
                       'administrative-state': 'inService',
@@ -514,10 +518,10 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertEqual(response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'], 10000)
 
     def test_13_check_interface_ODU2E_CLIENT(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
-            "SPDR-SA1", "interface", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
+        response = test_utils.check_node_attribute_request(
+            "SPDR-SA1", "interface", "XPDR1-CLIENT1-ODU2e:SPDRA-SPDRC-10G")
         self.assertEqual(response['status_code'], requests.codes.ok)
-        input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G',
+        input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e:SPDRA-SPDRC-10G',
                         'administrative-state': 'inService',
                         'supporting-circuit-pack-name': 'CP1-SFP4',
                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
@@ -538,27 +542,27 @@ class TransportPCEtesting(unittest.TestCase):
             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
 
     def test_14_check_ODU2E_connection(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "SPDR-SA1",
-            "odu-connection", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+            "odu-connection", "XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict_1 = {
             'connection-name':
-            'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
+            'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
             'direction': 'bidirectional'
         }
         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
                              response['odu-connection'][0])
-        self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G'},
+        self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e:SPDRA-SPDRC-10G'},
                              response['odu-connection'][0]['destination'])
-        self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G'},
+        self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e:SPDRA-SPDRC-10G'},
                              response['odu-connection'][0]['source'])
 
     def test_15_check_interface_ODU2E_NETWORK(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
-            "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+        response = test_utils.check_node_attribute_request(
+            "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e:SPDRA-SPDRC-10G")
         self.assertEqual(response['status_code'], requests.codes.ok)
-        input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
+        input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA:SPDRC-10G',
                         'administrative-state': 'inService',
                         'supporting-circuit-pack-name': 'CP1-CFP0',
                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
@@ -586,10 +590,10 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Test the interfaces on SPDR-C1
     def test_16_check_interface_ODU2E_NETWORK(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
-            "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+        response = test_utils.check_node_attribute_request(
+            "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e:SPDRA-SPDRC-10G")
         self.assertEqual(response['status_code'], requests.codes.ok)
-        input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
+        input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e:SPDRA-SPDRC-10G',
                         'administrative-state': 'inService',
                         'supporting-circuit-pack-name': 'CP1-CFP0',
                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
@@ -616,7 +620,7 @@ class TransportPCEtesting(unittest.TestCase):
                           'parent-odu-allocation']['trib-slots'])
 
     def test_17_check_interface_10GE_CLIENT(self):
-        response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
+        response = test_utils.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
                       'administrative-state': 'inService',
@@ -629,8 +633,8 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertEqual(response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'], 10000)
 
     def test_18_check_interface_ODU2E_CLIENT(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
-            "SPDR-SC1", "interface", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
+        response = test_utils.check_node_attribute_request(
+            "SPDR-SC1", "interface", "XPDR1-CLIENT1-ODU2e:SPDRA-SPDRC-10G")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G',
                         'administrative-state': 'inService',
@@ -652,27 +656,27 @@ class TransportPCEtesting(unittest.TestCase):
             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
 
     def test_19_check_ODU2E_connection(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "SPDR-SC1",
-            "odu-connection", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+            "odu-connection", "XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
         self.assertEqual(response['status_code'], requests.codes.ok)
         input_dict_1 = {
             'connection-name':
-            'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
+            'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
             'direction': 'bidirectional'
         }
         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
                              response['odu-connection'][0])
-        self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G'},
+        self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e:SPDRA-SPDRC-10G'},
                              response['odu-connection'][0]['destination'])
-        self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G'},
+        self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e:SPDRA-SPDRC-10G'},
                              response['odu-connection'][0]['source'])
 
     def test_20_check_interface_ODU2E_NETWORK(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
-            "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+        response = test_utils.check_node_attribute_request(
+            "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e:SPDRA-SPDRC-10G")
         self.assertEqual(response['status_code'], requests.codes.ok)
-        input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
+        input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e:SPDRA-SPDRC-10G',
                         'administrative-state': 'inService',
                         'supporting-circuit-pack-name': 'CP1-CFP0',
                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
@@ -704,11 +708,11 @@ class TransportPCEtesting(unittest.TestCase):
     # TODO: Delete interfaces (SPDR-A1, SPDR-C1)
 
     def test_21_disconnect_SPDR_SA1(self):
-        response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+        response = test_utils.unmount_device("SPDR-SA1")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
     def test_22_disconnect_SPDR_SC1(self):
-        response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+        response = test_utils.unmount_device("SPDR-SC1")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))