Outsource sims DS direct op on CP to test_utils
[transportpce.git] / tests / transportpce_tests / with_docker / test03_tapi_nbinotifications.py
index 1a82ceaba55bd0a84fda7fb83498b0d009c9afc4..a58ef7a0553e077f5a3ced54751362f593030c81 100644 (file)
@@ -12,7 +12,6 @@
 # pylint: disable=too-many-public-methods
 
 import os
-import json
 # pylint: disable=wrong-import-order
 import sys
 import unittest
@@ -331,21 +330,15 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         time.sleep(2)
 
     def test_15_change_status_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3%2F0', 'C1',
+                                                       {
             "port-name": "C1",
             "logical-connection-point": "SRG1-PP1",
             "port-type": "client",
             "circuit-id": "SRG1",
             "administrative-state": "outOfService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
-        # If the gate fails is because of the waiting time not being enough
-        time.sleep(2)
+            "port-qual": "roadm-external"
+        }))
 
     def test_16_get_tapi_notifications_connectivity_service_Ethernet(self):
         self.cr_get_notif_list_input_data["subscription-id-or-name"] = str(self.uuid_subscriptions.eth)
@@ -359,21 +352,15 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         time.sleep(2)
 
     def test_17_restore_status_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3%2F0', 'C1',
+                                                       {
             "port-name": "C1",
             "logical-connection-point": "SRG1-PP1",
             "port-type": "client",
             "circuit-id": "SRG1",
             "administrative-state": "inService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
-        # If the gate fails is because of the waiting time not being enough
-        time.sleep(2)
+            "port-qual": "roadm-external"
+        }))
 
     def test_18_get_tapi_notifications_connectivity_service_Ethernet(self):
         self.cr_get_notif_list_input_data["subscription-id-or-name"] = str(self.uuid_subscriptions.eth)