Switch functional tests to RFC8040
[transportpce.git] / tests / transportpce_tests / with_docker / test03_tapi_nbinotifications.py
index 54f3ddf865f355d45781f131649d384d4dfd97a1..b557610f355c88dd55982cfcf470f209bcaea894 100644 (file)
@@ -21,7 +21,7 @@ import requests
 sys.path.append('transportpce_tests/common/')
 # pylint: disable=wrong-import-position
 # pylint: disable=import-error
-import test_utils_rfc8040  # nopep8
+import test_utils  # nopep8
 
 
 # pylint: disable=too-few-public-methods
@@ -145,41 +145,41 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         cls.init_failed_tapi = False
         os.environ['JAVA_MIN_MEM'] = '1024M'
         os.environ['JAVA_MAX_MEM'] = '4096M'
-        cls.processes = test_utils_rfc8040.start_tpce()
+        cls.processes = test_utils.start_tpce()
         # NBI notification feature is not installed by default in Karaf
         if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
             print("installing NBI notification feature...")
-            result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-nbinotifications")
+            result = test_utils.install_karaf_feature("odl-transportpce-nbinotifications")
             if result.returncode != 0:
                 cls.init_failed_nbi = True
             print("installing tapi feature...")
-            result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi")
+            result = test_utils.install_karaf_feature("odl-transportpce-tapi")
             if result.returncode != 0:
                 cls.init_failed_tapi = True
             print("Restarting OpenDaylight...")
-            test_utils_rfc8040.shutdown_process(cls.processes[0])
-            cls.processes[0] = test_utils_rfc8040.start_karaf()
-            test_utils_rfc8040.process_list[0] = cls.processes[0]
-            cls.init_failed = not test_utils_rfc8040.wait_until_log_contains(
-                test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60)
+            test_utils.shutdown_process(cls.processes[0])
+            cls.processes[0] = test_utils.start_karaf()
+            test_utils.process_list[0] = cls.processes[0]
+            cls.init_failed = not test_utils.wait_until_log_contains(
+                test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
         if cls.init_failed_nbi:
             print("NBI notification installation feature failed...")
-            test_utils_rfc8040.shutdown_process(cls.processes[0])
+            test_utils.shutdown_process(cls.processes[0])
             sys.exit(2)
         if cls.init_failed_tapi:
             print("tapi installation feature failed...")
-            test_utils_rfc8040.shutdown_process(cls.processes[0])
+            test_utils.shutdown_process(cls.processes[0])
             sys.exit(2)
-        cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION_221),
-                                                       ('roadma', cls.NODE_VERSION_221),
-                                                       ('roadmc', cls.NODE_VERSION_221),
-                                                       ('xpdrc', cls.NODE_VERSION_221)])
+        cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
+                                               ('roadma', cls.NODE_VERSION_221),
+                                               ('roadmc', cls.NODE_VERSION_221),
+                                               ('xpdrc', cls.NODE_VERSION_221)])
 
     @classmethod
     def tearDownClass(cls):
         # pylint: disable=not-an-iterable
         for process in cls.processes:
-            test_utils_rfc8040.shutdown_process(process)
+            test_utils.shutdown_process(process)
         print("all processes killed")
 
     def setUp(self):  # instruction executed before each test method
@@ -187,27 +187,27 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         print("execution of {}".format(self.id().split(".")[-1]))
 
     def test_01_connect_xpdrA(self):
-        response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
+        response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
         self.assertEqual(response.status_code,
-                         requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+                         requests.codes.created, test_utils.CODE_SHOULD_BE_201)
 
     def test_02_connect_xpdrC(self):
-        response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
+        response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
         self.assertEqual(response.status_code,
-                         requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+                         requests.codes.created, test_utils.CODE_SHOULD_BE_201)
 
     def test_03_connect_rdmA(self):
-        response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
+        response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
         self.assertEqual(response.status_code,
-                         requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+                         requests.codes.created, test_utils.CODE_SHOULD_BE_201)
 
     def test_04_connect_rdmC(self):
-        response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
+        response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
         self.assertEqual(response.status_code,
-                         requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+                         requests.codes.created, test_utils.CODE_SHOULD_BE_201)
 
     def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'transportpce-networkutils', 'init-xpdr-rdm-links',
             {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
@@ -216,7 +216,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         time.sleep(2)
 
     def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'transportpce-networkutils', 'init-rdm-xpdr-links',
             {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
@@ -225,7 +225,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         time.sleep(2)
 
     def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'transportpce-networkutils', 'init-xpdr-rdm-links',
             {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
@@ -234,7 +234,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         time.sleep(2)
 
     def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'transportpce-networkutils', 'init-rdm-xpdr-links',
             {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
@@ -254,7 +254,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
                 "fiber-type": "smf",
                 "SRLG-length": 100000,
                 "pmd": 0.5}]}}
-        response = test_utils_rfc8040.add_oms_attr_request(
+        response = test_utils.add_oms_attr_request(
             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
         self.assertEqual(response.status_code, requests.codes.created)
 
@@ -270,13 +270,13 @@ class TransportNbiNotificationstesting(unittest.TestCase):
                 "fiber-type": "smf",
                 "SRLG-length": 100000,
                 "pmd": 0.5}]}}
-        response = test_utils_rfc8040.add_oms_attr_request(
+        response = test_utils.add_oms_attr_request(
             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
         self.assertEqual(response.status_code, requests.codes.created)
 
     # test service-create for Eth service from xpdr to xpdr
     def test_11_create_connectivity_service_Ethernet(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
         time.sleep(self.WAITING)
         self.assertEqual(response['status_code'], requests.codes.ok)
@@ -305,7 +305,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         time.sleep(self.WAITING)
 
     def test_12_get_service_Ethernet(self):
-        response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
+        response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
         self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.eth))
@@ -315,7 +315,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
 
     def test_13_get_connectivity_service_Ethernet(self):
         self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(response['output']['service']['operational-state'], 'ENABLED')
@@ -326,7 +326,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
     def test_14_create_notifications_subscription_service(self):
         self.cr_notif_subs_input_data["subscription-filter"]["requested-object-identifier"][0] =\
             str(self.uuid_services.eth)
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'tapi-notification', 'create-notification-subscription-service', self.cr_notif_subs_input_data)
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.uuid_subscriptions.eth = response['output']['subscription-service']['uuid']
@@ -348,16 +348,16 @@ class TransportNbiNotificationstesting(unittest.TestCase):
             "administrative-state": "outOfService",
             "port-qual": "roadm-external"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
-                                    timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+                                    timeout=test_utils.REQUEST_TIMEOUT)
         self.assertEqual(response.status_code, requests.codes.ok)
         # If the gate fails is because of the waiting time not being enough
         time.sleep(2)
 
     def test_16_get_tapi_notifications_connectivity_service_Ethernet(self):
         self.cr_get_notif_list_input_data["subscription-id-or-name"] = str(self.uuid_subscriptions.eth)
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'tapi-notification', 'get-notification-list', self.cr_get_notif_list_input_data)
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(response['output']['notification'][0]['target-object-identifier'], str(self.uuid_services.eth))
@@ -376,16 +376,16 @@ class TransportNbiNotificationstesting(unittest.TestCase):
             "administrative-state": "inService",
             "port-qual": "roadm-external"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
-                                    timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+                                    timeout=test_utils.REQUEST_TIMEOUT)
         self.assertEqual(response.status_code, requests.codes.ok)
         # If the gate fails is because of the waiting time not being enough
         time.sleep(2)
 
     def test_18_get_tapi_notifications_connectivity_service_Ethernet(self):
         self.cr_get_notif_list_input_data["subscription-id-or-name"] = str(self.uuid_subscriptions.eth)
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'tapi-notification', 'get-notification-list', self.cr_get_notif_list_input_data)
         self.assertEqual(response['status_code'], requests.codes.ok)
         self.assertEqual(response['output']['notification'][1]['target-object-identifier'], str(self.uuid_services.eth))
@@ -396,25 +396,25 @@ class TransportNbiNotificationstesting(unittest.TestCase):
 
     def test_19_delete_connectivity_service_Ethernet(self):
         self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             'tapi-connectivity', 'delete-connectivity-service', self.tapi_serv_details)
         self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
         time.sleep(self.WAITING)
 
     def test_20_disconnect_XPDRA(self):
-        response = test_utils_rfc8040.unmount_device("XPDR-A1")
+        response = test_utils.unmount_device("XPDR-A1")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
     def test_21_disconnect_XPDRC(self):
-        response = test_utils_rfc8040.unmount_device("XPDR-C1")
+        response = test_utils.unmount_device("XPDR-C1")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
     def test_22_disconnect_ROADMA(self):
-        response = test_utils_rfc8040.unmount_device("ROADM-A1")
+        response = test_utils.unmount_device("ROADM-A1")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
     def test_23_disconnect_ROADMC(self):
-        response = test_utils_rfc8040.unmount_device("ROADM-C1")
+        response = test_utils.unmount_device("ROADM-C1")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))