ROADM To ROADM Path Calculation on unidir ports
[transportpce.git] / tests / transportpce_tests / test_pce.py
index 8d60017be414ce7c2879e2e8b82370bc5169afb7..e45b4f1e3f7c9a4383884ef17a3cf1863cb5989c 100644 (file)
@@ -22,20 +22,25 @@ import unittest
 class TransportPCEtesting(unittest.TestCase):
 
     odl_process = None
-    simple_data = None
-    complex_data = None
+    simple_topo_bi_dir_data = None
+    simple_topo_uni_dir_data = None
+    complex_topo_uni_dir_data = None
     restconf_baseurl = "http://localhost:8181/restconf"
 
     @classmethod
     def _get_file(cls):
-        simple_topology_file = "sample_configs/NW-simple-topology.xml"
-        if os.path.isfile(simple_topology_file):
-            with open(simple_topology_file, 'r') as simple_file:
-                cls.simple_data = simple_file.read();
-        complex_topology_file = "sample_configs/NW-for-test-5-4.xml"
-        if os.path.isfile(complex_topology_file):
-            with open(complex_topology_file, 'r') as complex_file:
-                cls.complex_data = complex_file.read();
+        topo_bi_dir_file = "sample_configs/honeynode-topo.xml"
+        if os.path.isfile(topo_bi_dir_file):
+            with open(topo_bi_dir_file, 'r') as topo_bi_dir:
+                cls.simple_topo_bi_dir_data = topo_bi_dir.read();
+        topo_uni_dir_file = "sample_configs/NW-simple-topology.xml"
+        if os.path.isfile(topo_uni_dir_file):
+            with open(topo_uni_dir_file, 'r') as topo_uni_dir:
+                cls.simple_topo_uni_dir_data = topo_uni_dir.read();
+        topo_uni_dir_complex_file = "sample_configs/NW-for-test-5-4.xml"
+        if os.path.isfile(topo_uni_dir_complex_file):
+            with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
+                cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read();
 
     @classmethod
     def __start_odl(cls):
@@ -62,11 +67,11 @@ class TransportPCEtesting(unittest.TestCase):
     def setUp(self):  # instruction executed before each test method
         time.sleep(1)
 
-    # Load simple topology
-    def test_01_load_simple_topology(self):
+     # Load simple bidirectional topology
+    def test_01_load_simple_topology_bi(self):
         url = ("{}/config/ietf-network:network/openroadm-topology"
               .format(self.restconf_baseurl))
-        body = self.simple_data
+        body = self.simple_topo_bi_dir_data
         headers = {'content-type': 'application/xml',
         "Accept": "application/json"}
         response = requests.request(
@@ -77,6 +82,138 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Get existing nodeId
     def test_02_get_nodeId(self):
+        url = ("{}/config/ietf-network:network/openroadm-topology/node/ROADMA-SRG1"
+                .format(self.restconf_baseurl))
+        headers = {'content-type': 'application/json',
+        "Accept": "application/json"}
+        response = requests.request(
+            "GET", url, headers=headers, auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertEqual(
+            res['node'][0]['node-id'], 'ROADMA-SRG1')
+        time.sleep(1)
+
+    # Get existing linkId
+    def test_03_get_linkId(self):
+        url = ("{}/config/ietf-network:network/openroadm-topology/link/XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX"
+              .format(self.restconf_baseurl))
+        headers = {'content-type': 'application/json',
+        "Accept": "application/json"}
+        response = requests.request(
+            "GET", url, headers=headers, auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertEqual(
+            res['ietf-network-topology:link'][0]['link-id'],
+            'XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX')
+        time.sleep(1)
+
+    # Path Computation success
+    def test_04_path_computation_xpdr_bi(self):
+        url = ("{}/operations/transportpce-pce:path-computation-request"
+              .format(self.restconf_baseurl))
+        body = {"input": {
+                "service-name": "service-1",
+                "resource-reserve": "true",
+                "pce-metric": "hop-count",
+                "service-handler-header": {
+                    "request-id": "request-1"
+                },
+                "service-a-end": {
+                    "node-id": "XPDRA",
+                    "service-rate": "0",
+                    "clli": "nodeA"
+                },
+                "service-z-end": {
+                    "node-id": "XPDRC",
+                    "service-rate": "0",
+                    "clli": "nodeC"
+                }
+            }
+        }
+        headers = {'content-type': 'application/json',
+        "Accept": "application/json"}
+        response = requests.request(
+            "POST", url, data=json.dumps(body), headers=headers,
+            auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertIn('Path is calculated',
+            res['output']['configuration-response-common']['response-message'])
+        time.sleep(5)
+
+    # Path Computation success
+    def test_05_path_computation_rdm_bi(self):
+        url = ("{}/operations/transportpce-pce:path-computation-request"
+              .format(self.restconf_baseurl))
+        body = {"input": {
+                "service-name": "service-1",
+                "resource-reserve": "true",
+                "pce-metric": "hop-count",
+                "service-handler-header": {
+                    "request-id": "request-1"
+                },
+                "service-a-end": {
+                    "node-id": "ROADMA",
+                    "service-rate": "0",
+                    "clli": "nodeA"
+                },
+                "service-z-end": {
+                    "node-id": "ROADMC",
+                    "service-rate": "0",
+                    "clli": "nodeC"
+                }
+            }
+        }
+        headers = {'content-type': 'application/json',
+        "Accept": "application/json"}
+        response = requests.request(
+            "POST", url, data=json.dumps(body), headers=headers,
+            auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertIn('Path is calculated',
+            res['output']['configuration-response-common']['response-message'])
+        time.sleep(5)
+
+    # Delete topology
+    def test_06_delete_simple_topology_bi(self):
+        url = ("{}/config/ietf-network:network/openroadm-topology"
+              .format(self.restconf_baseurl))
+        headers = {'content-type': 'application/xml',
+        "Accept": "application/json"}
+        response = requests.request(
+            "DELETE", url, headers=headers, auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        time.sleep(2)
+
+    # Test deleted topology
+    def test_07_test_topology_simple_bi_deleted(self):
+        url = ("{}/config/ietf-network:network/openroadm-topology/node/ROADMA-SRG1"
+                .format(self.restconf_baseurl))
+        headers = {'content-type': 'application/json',
+        "Accept": "application/json"}
+        response = requests.request(
+            "GET", url, headers=headers, auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, 404)
+        time.sleep(1)
+
+    # Load simple bidirectional topology
+    def test_08_load_simple_topology_uni(self):
+        url = ("{}/config/ietf-network:network/openroadm-topology"
+              .format(self.restconf_baseurl))
+        body = self.simple_topo_uni_dir_data
+        headers = {'content-type': 'application/xml',
+        "Accept": "application/json"}
+        response = requests.request(
+            "PUT", url, data=body, headers=headers,
+            auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, 201)
+        time.sleep(2)
+
+    # Get existing nodeId
+    def test_09_get_nodeId(self):
         url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-1-2"
                 .format(self.restconf_baseurl))
         headers = {'content-type': 'application/json',
@@ -91,7 +228,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(1)
 
     # Get existing linkId
-    def test_03_get_linkId(self):
+    def test_10_get_linkId(self):
         url = ("{}/config/ietf-network:network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
               .format(self.restconf_baseurl))
         headers = {'content-type': 'application/json',
@@ -106,7 +243,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(1)
 
     # Path Computation success
-    def test_04_path_computation(self):
+    def test_11_path_computation_xpdr_uni(self):
         url = ("{}/operations/transportpce-pce:path-computation-request"
               .format(self.restconf_baseurl))
         body = {"input": {
@@ -139,8 +276,54 @@ class TransportPCEtesting(unittest.TestCase):
             res['output']['configuration-response-common']['response-message'])
         time.sleep(5)
 
+    # Path Computation success
+    def test_12_path_computation_rdm_uni(self):
+        url = ("{}/operations/transportpce-pce:path-computation-request"
+              .format(self.restconf_baseurl))
+        body = {"input": {
+                "service-name": "service1",
+                "resource-reserve": "true",
+                "service-handler-header": {
+                  "request-id": "request1"
+                },
+                "service-a-end": {
+                  "service-rate": "0",
+                  "clli": "cll21",
+                  "node-id": "OpenROADM-2-1"
+                },
+                "service-z-end": {
+                  "service-rate": "0",
+                  "clli": "ncli22",
+                  "node-id": "OpenROADM-2-2"
+                  },
+                "pce-metric": "hop-count"
+            }
+        }
+        headers = {'content-type': 'application/json',
+        "Accept": "application/json"}
+        response = requests.request(
+            "POST", url, data=json.dumps(body), headers=headers,
+            auth=('admin', 'admin'))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertIn('Path is calculated',
+            res['output']['configuration-response-common']['response-message'])
+        #ZtoA path test
+        atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
+        ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
+        self.assertEqual(atozList,15)
+        self.assertEqual(ztoaList,15)
+        for i in range(0,15):
+            atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
+            ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
+            if (atoz['id'] == '14'):
+                self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
+            if (ztoa['id'] == '0'):
+                self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
+        time.sleep(5)
+
     # Delete topology
-    def test_05_delete_simple_topology(self):
+    def test_13_delete_simple_topology(self):
         url = ("{}/config/ietf-network:network/openroadm-topology"
               .format(self.restconf_baseurl))
         headers = {'content-type': 'application/xml',
@@ -151,7 +334,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(2)
 
     # Test deleted topology
-    def test_06_test_topology_simple_deleted(self):
+    def test_14_test_topology_simple_deleted(self):
         url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-1-2"
                 .format(self.restconf_baseurl))
         headers = {'content-type': 'application/json',
@@ -162,10 +345,10 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(1)
 
     # Load simple topology
-    def test_07_load_complex_topology(self):
+    def test_15_load_complex_topology(self):
         url = ("{}/config/ietf-network:network/openroadm-topology"
               .format(self.restconf_baseurl))
-        body = self.complex_data
+        body = self.complex_topo_uni_dir_data
         headers = {'content-type': 'application/xml',
         "Accept": "application/json"}
         response = requests.request(
@@ -175,7 +358,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(2)
 
     # Get existing nodeId
-    def test_08_get_nodeId(self):
+    def test_16_get_nodeId(self):
         url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-3-2"
                 .format(self.restconf_baseurl))
         headers = {'content-type': 'application/json',
@@ -190,7 +373,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(1)
 
     # Test failed path computation
-    def test_09_fail_path_computation(self):
+    def test_17_fail_path_computation(self):
         url = ("{}/operations/transportpce-pce:path-computation-request"
               .format(self.restconf_baseurl))
         body = {"input": {
@@ -211,7 +394,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(2)
 
     # Test1 success path computation
-    def test_10_success1_path_computation(self):
+    def test_18_success1_path_computation(self):
         url = ("{}/operations/transportpce-pce:path-computation-request"
               .format(self.restconf_baseurl))
         body = {"input": {
@@ -312,7 +495,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(5)
 
     # Test2 success path computation with path description
-    def test_11_success2_path_computation(self):
+    def test_19_success2_path_computation(self):
         url = ("{}/operations/transportpce-pce:path-computation-request"
               .format(self.restconf_baseurl))
         body = {"input": {
@@ -348,7 +531,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(5)
 
     # Test3 success path computation with hard-constraints exclude
-    def test_12_success3_path_computation(self):
+    def test_20_success3_path_computation(self):
         url = ("{}/operations/transportpce-pce:path-computation-request"
               .format(self.restconf_baseurl))
         body = {"input": {
@@ -389,7 +572,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(5)
 
     # Delete complex topology
-    def test_13_delete_complex_topology(self):
+    def test_21_delete_complex_topology(self):
         url = ("{}/config/ietf-network:network/openroadm-topology"
               .format(self.restconf_baseurl))
         headers = {'content-type': 'application/xml',
@@ -400,7 +583,7 @@ class TransportPCEtesting(unittest.TestCase):
         time.sleep(2)
 
     # Test deleted complex topology
-    def test_14_test_topology_complex_deleted(self):
+    def test_22_test_topology_complex_deleted(self):
         url = ("{}/config/ietf-network:network/openroadm-topology/node/XPONDER-3-2"
                 .format(self.restconf_baseurl))
         headers = {'content-type': 'application/json',