Manage 100GE on XPDR with a list of xponder
[transportpce.git] / tests / transportpce_tests / hybrid / test01_device_change_notifications.py
index a585886771782555c63d2b9bd9b69ff1c9d17b5d..c489551b1be95dba2a1e7a5e963d92a4d1f12ae1 100644 (file)
 # pylint: disable=no-member
 # pylint: disable=too-many-public-methods
 import json
-import base64
 import unittest
 import time
 import requests
+# pylint: disable=wrong-import-order
 import sys
 sys.path.append('transportpce_tests/common/')
-import test_utils
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils  # nopep8
 
 
 class TransportPCEFulltesting(unittest.TestCase):
@@ -40,9 +42,7 @@ class TransportPCEFulltesting(unittest.TestCase):
             "clli": "SNJSCAMCJP8",
                     "tx-direction": {
                         "port": {
-                            "port-device-name": "1/0/C1",
                             "port-type": "fixed",
-                            "port-name": "1",
                             "port-rack": "000000.00",
                             "port-shelf": "Chassis#1"
                         },
@@ -55,9 +55,7 @@ class TransportPCEFulltesting(unittest.TestCase):
                     },
             "rx-direction": {
                         "port": {
-                            "port-device-name": "1/0/C1",
                             "port-type": "fixed",
-                            "port-name": "1",
                             "port-rack": "000000.00",
                             "port-shelf": "Chassis#1"
                         },
@@ -77,9 +75,7 @@ class TransportPCEFulltesting(unittest.TestCase):
             "clli": "SNJSCAMCJT4",
                     "tx-direction": {
                         "port": {
-                            "port-device-name": "1/0/C1",
                             "port-type": "fixed",
-                            "port-name": "1",
                             "port-rack": "000000.00",
                             "port-shelf": "Chassis#1"
                         },
@@ -92,9 +88,7 @@ class TransportPCEFulltesting(unittest.TestCase):
                     },
             "rx-direction": {
                         "port": {
-                            "port-device-name": "1/0/C1",
                             "port-type": "fixed",
-                            "port-name": "1",
                             "port-rack": "000000.00",
                             "port-shelf": "Chassis#1"
                         },
@@ -131,8 +125,10 @@ class TransportPCEFulltesting(unittest.TestCase):
         for process in cls.processes:
             test_utils.shutdown_process(process)
         print("all processes killed")
+        time.sleep(10)
 
     def setUp(self):  # instruction executed before each test method
+        # pylint: disable=consider-using-f-string
         print("execution of {}".format(self.id().split(".")[-1]))
 
     def test_01_connect_xpdrA(self):
@@ -240,12 +236,12 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_13_change_status_line_port_xpdra(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
         body = {"ports": [{
-                    "port-name": "1",
-                    "logical-connection-point": "XPDR1-NETWORK1",
-                    "port-type": "CFP2",
-                    "circuit-id": "XPDRA-NETWORK",
-                    "administrative-state": "outOfService",
-                    "port-qual": "xpdr-network"}]}
+            "port-name": "1",
+            "logical-connection-point": "XPDR1-NETWORK1",
+            "port-type": "CFP2",
+            "circuit-id": "XPDRA-NETWORK",
+            "administrative-state": "outOfService",
+            "port-qual": "xpdr-network"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -317,12 +313,12 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_17_restore_status_line_port_xpdra(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
         body = {"ports": [{
-                    "port-name": "1",
-                    "logical-connection-point": "XPDR1-NETWORK1",
-                    "port-type": "CFP2",
-                    "circuit-id": "XPDRA-NETWORK",
-                    "administrative-state": "inService",
-                    "port-qual": "xpdr-network"}]}
+            "port-name": "1",
+            "logical-connection-point": "XPDR1-NETWORK1",
+            "port-type": "CFP2",
+            "circuit-id": "XPDRA-NETWORK",
+            "administrative-state": "inService",
+            "port-qual": "xpdr-network"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -367,12 +363,12 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_21_change_status_port_roadma_srg(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
         body = {"ports": [{
-                    "port-name": "C1",
-                    "logical-connection-point": "SRG1-PP1",
-                    "port-type": "client",
-                    "circuit-id": "SRG1",
-                    "administrative-state": "outOfService",
-                    "port-qual": "roadm-external"}]}
+            "port-name": "C1",
+            "logical-connection-point": "SRG1-PP1",
+            "port-type": "client",
+            "circuit-id": "SRG1",
+            "administrative-state": "outOfService",
+            "port-qual": "roadm-external"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -436,12 +432,12 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_24_restore_status_port_roadma_srg(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
         body = {"ports": [{
-                    "port-name": "C1",
-                    "logical-connection-point": "SRG1-PP1",
-                    "port-type": "client",
-                    "circuit-id": "SRG1",
-                    "administrative-state": "inService",
-                    "port-qual": "roadm-external"}]}
+            "port-name": "C1",
+            "logical-connection-point": "SRG1-PP1",
+            "port-type": "client",
+            "circuit-id": "SRG1",
+            "administrative-state": "inService",
+            "port-qual": "roadm-external"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -460,12 +456,12 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_28_change_status_line_port_roadma_deg(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
         body = {"ports": [{
-                    "port-name": "L1",
-                    "logical-connection-point": "DEG2-TTP-TXRX",
-                    "port-type": "LINE",
-                    "circuit-id": "1",
-                    "administrative-state": "outOfService",
-                    "port-qual": "roadm-external"}]}
+            "port-name": "L1",
+            "logical-connection-point": "DEG2-TTP-TXRX",
+            "port-type": "LINE",
+            "circuit-id": "1",
+            "administrative-state": "outOfService",
+            "port-qual": "roadm-external"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -529,12 +525,12 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_31_restore_status_line_port_roadma_srg(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
         body = {"ports": [{
-                    "port-name": "L1",
-                    "logical-connection-point": "DEG2-TTP-TXRX",
-                    "port-type": "LINE",
-                    "circuit-id": "1",
-                    "administrative-state": "inService",
-                    "port-qual": "roadm-external"}]}
+            "port-name": "L1",
+            "logical-connection-point": "DEG2-TTP-TXRX",
+            "port-type": "LINE",
+            "circuit-id": "1",
+            "administrative-state": "inService",
+            "port-qual": "roadm-external"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -553,10 +549,10 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_35_change_status_line_port_xpdrc(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
         body = {"ports": [{
-                    "port-name": "1",
-                    "port-type": "CFP2",
-                    "administrative-state": "outOfService",
-                    "port-qual": "xpdr-network"}]}
+            "port-name": "1",
+            "port-type": "CFP2",
+            "administrative-state": "outOfService",
+            "port-qual": "xpdr-network"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -620,10 +616,10 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_38_restore_status_line_port_xpdrc(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
         body = {"ports": [{
-                    "port-name": "1",
-                    "port-type": "CFP2",
-                    "administrative-state": "inService",
-                    "port-qual": "xpdr-network"}]}
+            "port-name": "1",
+            "port-type": "CFP2",
+            "administrative-state": "inService",
+            "port-qual": "xpdr-network"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -642,12 +638,12 @@ class TransportPCEFulltesting(unittest.TestCase):
     def test_42_change_status_port_roadma_srg(self):
         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
         body = {"ports": [{
-                    "port-name": "C2",
-                    "logical-connection-point": "SRG1-PP2",
-                    "port-type": "client",
-                    "circuit-id": "SRG1",
-                    "administrative-state": "outOfService",
-                    "port-qual": "roadm-external"}]}
+            "port-name": "C2",
+            "logical-connection-point": "SRG1-PP2",
+            "port-type": "client",
+            "circuit-id": "SRG1",
+            "administrative-state": "outOfService",
+            "port-qual": "roadm-external"}]}
         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
@@ -719,8 +715,7 @@ class TransportPCEFulltesting(unittest.TestCase):
         res = response.json()
         links = res['network'][0]['ietf-network-topology:link']
         for link in links:
-            if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
-                    link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
+            if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
                 link_name = link["link-id"]
                 response = test_utils.delete_request(url+link_name)
                 self.assertEqual(response.status_code, requests.codes.ok)