Outsource sims DS direct op on CP to test_utils
[transportpce.git] / tests / transportpce_tests / hybrid / test01_device_change_notifications.py
index 88347bf3d8a0c5fae40ffb80929d44bcafe49959..c0d8c3e6474758e1ee147130b9d5651679ff1508 100644 (file)
@@ -10,7 +10,6 @@
 
 # pylint: disable=no-member
 # pylint: disable=too-many-public-methods
-import json
 import unittest
 import time
 import requests
@@ -198,19 +197,15 @@ class TransportPCEFulltesting(unittest.TestCase):
         time.sleep(1)
 
     def test_13_change_status_line_port_xpdra(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('xpdra', self.NODE_VERSION_121), '1%2F0%2F1-PLUG-NET', '1',
+                                                       {
             "port-name": "1",
             "logical-connection-point": "XPDR1-NETWORK1",
             "port-type": "CFP2",
             "circuit-id": "XPDRA-NETWORK",
             "administrative-state": "outOfService",
-            "port-qual": "xpdr-network"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "xpdr-network"
+        }))
         time.sleep(2)
 
     def test_14_check_update_portmapping(self):
@@ -272,19 +267,15 @@ class TransportPCEFulltesting(unittest.TestCase):
         time.sleep(1)
 
     def test_17_restore_status_line_port_xpdra(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('xpdra', self.NODE_VERSION_121), '1%2F0%2F1-PLUG-NET', '1',
+                                                       {
             "port-name": "1",
             "logical-connection-point": "XPDR1-NETWORK1",
             "port-type": "CFP2",
             "circuit-id": "XPDRA-NETWORK",
             "administrative-state": "inService",
-            "port-qual": "xpdr-network"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "xpdr-network"
+        }))
         time.sleep(2)
 
     def test_18_check_update_portmapping_ok(self):
@@ -320,19 +311,15 @@ class TransportPCEFulltesting(unittest.TestCase):
         self.test_12_get_eth_service1()
 
     def test_21_change_status_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3%2F0', 'C1',
+                                                       {
             "port-name": "C1",
             "logical-connection-point": "SRG1-PP1",
             "port-type": "client",
             "circuit-id": "SRG1",
             "administrative-state": "outOfService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "roadm-external"
+        }))
         time.sleep(2)
 
     def test_22_check_update_portmapping(self):
@@ -387,19 +374,15 @@ class TransportPCEFulltesting(unittest.TestCase):
         time.sleep(1)
 
     def test_24_restore_status_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3%2F0', 'C1',
+                                                       {
             "port-name": "C1",
             "logical-connection-point": "SRG1-PP1",
             "port-type": "client",
             "circuit-id": "SRG1",
             "administrative-state": "inService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "roadm-external"
+        }))
         time.sleep(2)
 
     def test_25_check_update_portmapping_ok(self):
@@ -412,19 +395,15 @@ class TransportPCEFulltesting(unittest.TestCase):
         self.test_12_get_eth_service1()
 
     def test_28_change_status_line_port_roadma_deg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '2%2F0', 'L1',
+                                                       {
             "port-name": "L1",
             "logical-connection-point": "DEG2-TTP-TXRX",
             "port-type": "LINE",
             "circuit-id": "1",
             "administrative-state": "outOfService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "roadm-external"
+        }))
         time.sleep(2)
 
     def test_29_check_update_portmapping(self):
@@ -479,19 +458,15 @@ class TransportPCEFulltesting(unittest.TestCase):
         time.sleep(1)
 
     def test_31_restore_status_line_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '2%2F0', 'L1',
+                                                       {
             "port-name": "L1",
             "logical-connection-point": "DEG2-TTP-TXRX",
             "port-type": "LINE",
             "circuit-id": "1",
             "administrative-state": "inService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "roadm-external"
+        }))
         time.sleep(2)
 
     def test_32_check_update_portmapping_ok(self):
@@ -504,17 +479,13 @@ class TransportPCEFulltesting(unittest.TestCase):
         self.test_12_get_eth_service1()
 
     def test_35_change_status_line_port_xpdrc(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('xpdrc', self.NODE_VERSION_71), '1%2F0%2F1-PLUG-NET', '1',
+                                                       {
             "port-name": "1",
             "port-type": "CFP2",
             "administrative-state": "outOfService",
-            "port-qual": "xpdr-network"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "xpdr-network"
+        }))
         time.sleep(2)
 
     def test_36_check_update_portmapping(self):
@@ -569,17 +540,13 @@ class TransportPCEFulltesting(unittest.TestCase):
         time.sleep(1)
 
     def test_38_restore_status_line_port_xpdrc(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('xpdrc', self.NODE_VERSION_71), '1%2F0%2F1-PLUG-NET', '1',
+                                                       {
             "port-name": "1",
             "port-type": "CFP2",
             "administrative-state": "inService",
-            "port-qual": "xpdr-network"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "xpdr-network"
+        }))
         time.sleep(2)
 
     def test_39_check_update_portmapping_ok(self):
@@ -592,19 +559,15 @@ class TransportPCEFulltesting(unittest.TestCase):
         self.test_12_get_eth_service1()
 
     def test_42_change_status_port_roadma_srg(self):
-        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
-        body = {"ports": [{
+        self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3%2F0', 'C2',
+                                                       {
             "port-name": "C2",
             "logical-connection-point": "SRG1-PP2",
             "port-type": "client",
             "circuit-id": "SRG1",
             "administrative-state": "outOfService",
-            "port-qual": "roadm-external"}]}
-        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
-                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
-                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
-                                    timeout=test_utils.REQUEST_TIMEOUT)
-        self.assertEqual(response.status_code, requests.codes.ok)
+            "port-qual": "roadm-external"
+        }))
         time.sleep(2)
 
     def test_43_check_update_portmapping(self):