fix some pylint issues
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
index cfe2772eb241229849546a24440fe050402900c9..b8fd458acbfb484fc102e0bb7bdf61204caf26db 100644 (file)
@@ -9,8 +9,10 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
 import unittest
-import json
 import time
 import requests
 from common import test_utils
@@ -27,6 +29,7 @@ class TransportPCEtesting(unittest.TestCase):
 
     @classmethod
     def tearDownClass(cls):
+        # pylint: disable=not-an-iterable
         for process in cls.processes:
             test_utils.shutdown_process(process)
         print("all processes killed")
@@ -39,35 +42,29 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
         time.sleep(10)
 
-        url = "{}/operational/network-topology:network-topology/topology/topology-netconf/node/SPDR-SA1"
-        response = test_utils.get_request(url)
+        response = test_utils.get_netconf_oper_request("SPDR-SA1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         self.assertEqual(
             res['node'][0]['netconf-node-topology:connection-status'],
             'connected')
 
-    def test_02_get_portmapping_CLIENT1(self):
-        url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
-        response = test_utils.get_request(url)
+    def test_02_get_portmapping_CLIENT4(self):
+        response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
         self.assertEqual(response.status_code, requests.codes.ok)
-        res = response.json()
-        self.assertIn(
-            {'supported-interface-capability': [
-                'org-openroadm-port-types:if-10GE-ODU2e',
-                'org-openroadm-port-types:if-10GE-ODU2',
-                'org-openroadm-port-types:if-10GE'],
-             'supporting-port': 'CP1-SFP4-P1',
-             'supporting-circuit-pack-name': 'CP1-SFP4',
-             'logical-connection-point': 'XPDR1-CLIENT1',
-             'port-direction': 'bidirectional',
-             'port-qual': 'xpdr-client',
-             'lcp-hash-val': 'FqlcrxV7p30='},
-            res['mapping'])
+        res_mapping = (response.json())['mapping'][0]
+        self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
+        self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
+        self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
+        self.assertEqual('bidirectional', res_mapping['port-direction'])
+        self.assertEqual('xpdr-client', res_mapping['port-qual'])
+        self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
+        self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
+        self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
+        self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
 
     def test_03_get_portmapping_NETWORK1(self):
-        url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
-        response = test_utils.get_request(url)
+        response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         self.assertIn(
@@ -84,16 +81,8 @@ class TransportPCEtesting(unittest.TestCase):
             res['mapping'])
 
     def test_04_service_path_create_OCH_OTU4(self):
-        url = "{}/operations/transportpce-device-renderer:service-path"
-        data = {"renderer:input": {
-            "service-name": "service_OCH_OTU4",
-            "wave-number": "1",
-            "modulation-format": "qpsk",
-            "operation": "create",
-            "nodes": [
-                {"node-id": "SPDR-SA1",
-                 "dest-tp": "XPDR1-NETWORK1"}]}}
-        response = test_utils.post_request(url, data)
+        response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
+                                                   [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
         time.sleep(3)
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
@@ -105,8 +94,7 @@ class TransportPCEtesting(unittest.TestCase):
              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
 
     def test_05_get_portmapping_NETWORK1(self):
-        url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
-        response = test_utils.get_request(url)
+        response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         self.assertIn(
@@ -123,11 +111,7 @@ class TransportPCEtesting(unittest.TestCase):
             res['mapping'])
 
     def test_06_check_interface_och(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-1"
-               )
-        response = test_utils.get_request(url)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
 
@@ -141,15 +125,11 @@ class TransportPCEtesting(unittest.TestCase):
 
         self.assertDictEqual(
             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
-             u'transmit-power': -5},
+             u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
 
     def test_07_check_interface_OTU(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-OTU"
-               )
-        response = test_utils.get_request(url)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
@@ -175,16 +155,8 @@ class TransportPCEtesting(unittest.TestCase):
                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
 
     def test_08_otn_service_path_create_ODU4(self):
-        url = "{}/operations/transportpce-device-renderer:otn-service-path"
-        data = {"renderer:input": {
-            "service-name": "service_ODU4",
-            "operation": "create",
-            "service-rate": "100G",
-            "service-type": "ODU",
-            "nodes": [
-                {"node-id": "SPDR-SA1",
-                 "network-tp": "XPDR1-NETWORK1"}]}}
-        response = test_utils.post_request(url, data)
+        response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
+                                                       [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
         time.sleep(3)
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
@@ -195,8 +167,7 @@ class TransportPCEtesting(unittest.TestCase):
              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
 
     def test_09_get_portmapping_NETWORK1(self):
-        url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
-        response = test_utils.get_request(url)
+        response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         self.assertIn(
@@ -215,11 +186,7 @@ class TransportPCEtesting(unittest.TestCase):
             res['mapping'])
 
     def test_10_check_interface_ODU4(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-ODU4"
-               )
-        response = test_utils.get_request(url)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
@@ -245,40 +212,28 @@ class TransportPCEtesting(unittest.TestCase):
             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
 
     def test_11_otn_service_path_create_10GE(self):
-        url = "{}/operations/transportpce-device-renderer:otn-service-path"
-        data = {"renderer:input": {
-            "service-name": "service1",
-            "operation": "create",
-            "service-rate": "10G",
-            "service-type": "Ethernet",
-            "ethernet-encoding": "eth encode",
-            "trib-slot": ["1"],
-            "trib-port-number": "1",
-            "nodes": [
-                {"node-id": "SPDR-SA1",
-                 "client-tp": "XPDR1-CLIENT1",
-                 "network-tp": "XPDR1-NETWORK1"}]}}
-        response = test_utils.post_request(url, data)
+        response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
+                                                       [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
+                                                           "network-tp": "XPDR1-NETWORK1"}],
+                                                       {"ethernet-encoding": "eth encode",
+                                                        "trib-slot": ["1"], "trib-port-number": "1"})
         time.sleep(3)
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
         self.assertTrue(res["output"]["success"])
-        self.assertIn(
-            {'node-id': 'SPDR-SA1',
-             'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
-             'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
-             'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
+        self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
+        self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
+                      res["output"]['node-interface'][0]['connection-id'])
+        self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
+        self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
+        self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
 
     def test_12_check_interface_10GE_CLIENT(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-CLIENT1-ETHERNET10G"
-               )
-        response = test_utils.get_request(url)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
-        input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
+        input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
                       'administrative-state': 'inService',
                       'supporting-circuit-pack-name': 'CP1-SFP4',
                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
@@ -291,18 +246,14 @@ class TransportPCEtesting(unittest.TestCase):
             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
 
     def test_13_check_interface_ODU2E_CLIENT(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-CLIENT1-ODU2e-service1"
-               )
-        response = test_utils.get_request(url)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
 
-        input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
+        input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
                         'administrative-state': 'inService',
                         'supporting-circuit-pack-name': 'CP1-SFP4',
-                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
+                        'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
                         'type': 'org-openroadm-interfaces:otnOdu',
                         'supporting-port': 'CP1-SFP4-P1'}
         input_dict_2 = {
@@ -320,11 +271,7 @@ class TransportPCEtesting(unittest.TestCase):
             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
 
     def test_14_check_interface_ODU2E_NETWORK(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-ODU2e-service1"
-               )
-        response = test_utils.get_request(url)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
@@ -355,16 +302,14 @@ class TransportPCEtesting(unittest.TestCase):
                           'parent-odu-allocation']['trib-slots'])
 
     def test_15_check_ODU2E_connection(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
-               )
-        response = test_utils.get_request(url)
+        response = test_utils.check_netconf_node_request(
+            "SPDR-SA1",
+            "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
         input_dict_1 = {
             'connection-name':
-            'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
+            'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
             'direction': 'bidirectional'
         }
 
@@ -372,24 +317,15 @@ class TransportPCEtesting(unittest.TestCase):
                              res['odu-connection'][0])
         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
                              res['odu-connection'][0]['destination'])
-        self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
+        self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
                              res['odu-connection'][0]['source'])
 
     def test_16_otn_service_path_delete_10GE(self):
-        url = "{}/operations/transportpce-device-renderer:otn-service-path"
-        data = {"renderer:input": {
-            "service-name": "service1",
-            "operation": "delete",
-            "service-rate": "10G",
-            "service-type": "Ethernet",
-            "ethernet-encoding": "eth encode",
-            "trib-slot": ["1"],
-            "trib-port-number": "1",
-            "nodes": [
-                {"node-id": "SPDR-SA1",
-                 "client-tp": "XPDR1-CLIENT1",
-                 "network-tp": "XPDR1-NETWORK1"}]}}
-        response = test_utils.post_request(url, data)
+        response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
+                                                       [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
+                                                           "network-tp": "XPDR1-NETWORK1"}],
+                                                       {"ethernet-encoding": "eth encode",
+                                                        "trib-slot": ["1"], "trib-port-number": "1"})
         time.sleep(3)
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
@@ -397,48 +333,26 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertTrue(res["output"]["success"])
 
     def test_17_check_no_ODU2E_connection(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
-               )
-        response = test_utils.get_request(url)
-        self.assertEqual(response.status_code, requests.codes.not_found)
+        response = test_utils.check_netconf_node_request(
+            "SPDR-SA1",
+            "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
+        self.assertEqual(response.status_code, requests.codes.conflict)
 
     def test_18_check_no_interface_ODU2E_NETWORK(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-ODU2e-service1"
-               )
-        response = test_utils.get_request(url)
-        self.assertEqual(response.status_code, requests.codes.not_found)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
+        self.assertEqual(response.status_code, requests.codes.conflict)
 
     def test_19_check_no_interface_ODU2E_CLIENT(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-CLIENT1-ODU2e-service1"
-               )
-        response = test_utils.get_request(url)
-        self.assertEqual(response.status_code, requests.codes.not_found)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
+        self.assertEqual(response.status_code, requests.codes.conflict)
 
     def test_20_check_no_interface_10GE_CLIENT(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-CLIENT1-ETHERNET10G"
-               )
-        response = test_utils.get_request(url)
-        self.assertEqual(response.status_code, requests.codes.not_found)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
+        self.assertEqual(response.status_code, requests.codes.conflict)
 
     def test_21_otn_service_path_delete_ODU4(self):
-        url = "{}/operations/transportpce-device-renderer:otn-service-path"
-        data = {"renderer:input": {
-            "service-name": "service_ODU4",
-            "operation": "delete",
-            "service-rate": "100G",
-            "service-type": "ODU",
-            "nodes": [
-                {"node-id": "SPDR-SA1",
-                 "network-tp": "XPDR1-NETWORK1"}]}}
-        response = test_utils.post_request(url, data)
+        response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
+                                                       [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
         time.sleep(3)
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
@@ -446,24 +360,12 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertTrue(res["output"]["success"])
 
     def test_22_check_no_interface_ODU4(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-ODU4"
-               )
-        response = test_utils.get_request(url)
-        self.assertEqual(response.status_code, requests.codes.not_found)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
+        self.assertEqual(response.status_code, requests.codes.conflict)
 
     def test_23_service_path_delete_OCH_OTU4(self):
-        url = "{}/operations/transportpce-device-renderer:service-path"
-        data = {"renderer:input": {
-            "service-name": "service_OTU4",
-            "wave-number": "1",
-            "modulation-format": "qpsk",
-            "operation": "delete",
-            "nodes": [
-                {"node-id": "SPDR-SA1",
-                 "dest-tp": "XPDR1-NETWORK1"}]}}
-        response = test_utils.post_request(url, data)
+        response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
+                                                   [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
         time.sleep(3)
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
@@ -471,20 +373,12 @@ class TransportPCEtesting(unittest.TestCase):
         self.assertTrue(res["output"]["success"])
 
     def test_24_check_no_interface_OTU4(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-OTU"
-               )
-        response = test_utils.get_request(url)
-        self.assertEqual(response.status_code, requests.codes.not_found)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
+        self.assertEqual(response.status_code, requests.codes.conflict)
 
     def test_25_check_no_interface_OCH(self):
-        url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
-               "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
-               "interface/XPDR1-NETWORK1-1"
-               )
-        response = test_utils.get_request(url)
-        self.assertEqual(response.status_code, requests.codes.not_found)
+        response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
+        self.assertEqual(response.status_code, requests.codes.conflict)
 
     def test_26_disconnect_SPDR_SA1(self):
         response = test_utils.unmount_device("SPDR-SA1")