Merge "Debug tool for openconfig proprietary extensions"
[transportpce.git] / tests / transportpce_tests / with_docker / test02_nbinotifications.py
index f7dd69e95ee5200e39e9ec9c6bf81aec64e87cc4..053b16b5ca7b985b5342e57690c7eb486df4f143 100644 (file)
@@ -12,7 +12,6 @@
 # pylint: disable=too-many-public-methods
 
 import os
-import json
 # pylint: disable=wrong-import-order
 import sys
 import unittest
@@ -85,7 +84,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         cls.init_failed = False
         cls.processes = test_utils.start_tpce()
         # NBI notification feature is not installed by default in Karaf
-        if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
+        if "NO_ODL_STARTUP" not in os.environ or "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
             print("installing NBI notification feature...")
             result = test_utils.install_karaf_feature("odl-transportpce-nbinotifications")
             if result.returncode != 0:
@@ -130,7 +129,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         self.assertEqual(response.status_code,
                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
 
-    def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
+    def test_05_connect_xpdrA_N1_to_roadmA_PP1(self):
         response = test_utils.transportpce_api_rpc_request(
             'transportpce-networkutils', 'init-xpdr-rdm-links',
             {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
@@ -148,7 +147,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
         time.sleep(2)
 
-    def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
+    def test_07_connect_xpdrC_N1_to_roadmC_PP1(self):
         response = test_utils.transportpce_api_rpc_request(
             'transportpce-networkutils', 'init-xpdr-rdm-links',
             {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
@@ -262,20 +261,16 @@ class TransportNbiNotificationstesting(unittest.TestCase):
                          'The service is now inService')
 
     def test_18_change_status_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION), '3/0', 'C1',
+                                                       {
             "port-name": "C1",
             "logical-connection-point": "SRG1-PP1",
             "port-type": "client",
             "circuit-id": "SRG1",
             "administrative-state": "outOfService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
-        time.sleep(2)
+            "port-qual": "roadm-external"
+        }))
+        time.sleep(5)
 
     def test_19_get_notifications_alarm_service1(self):
         response = test_utils.transportpce_api_rpc_request(
@@ -288,20 +283,16 @@ class TransportNbiNotificationstesting(unittest.TestCase):
                          'The service is now outOfService')
 
     def test_20_restore_status_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION), '3/0', 'C1',
+                                                       {
             "port-name": "C1",
             "logical-connection-point": "SRG1-PP1",
             "port-type": "client",
             "circuit-id": "SRG1",
             "administrative-state": "inService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
-        time.sleep(2)
+            "port-qual": "roadm-external"
+        }))
+        time.sleep(5)
 
     def test_21_get_notifications_alarm_service1(self):
         self.test_17_get_notifications_alarm_service1()