fix some pylint issues
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_pce.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13
14 import unittest
15 import os
16 import sys
17 import time
18 import requests
19 from common import test_utils
20
21
22 class TransportPCEtesting(unittest.TestCase):
23
24     simple_topo_bi_dir_data = None
25     simple_topo_uni_dir_data = None
26     complex_topo_uni_dir_data = None
27     processes = None
28
29     @classmethod
30     def setUpClass(cls):
31         # pylint: disable=bare-except
32         try:
33             sample_files_parsed = False
34             TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
35                                             "..", "..", "sample_configs", "honeynode-topo.xml")
36             with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
37                 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
38
39             TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
40                                              "..", "..", "sample_configs", "NW-simple-topology.xml")
41
42             with open(TOPO_UNI_DIR_FILE, 'r') as topo_uni_dir:
43                 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
44
45             TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
46                                                      "..", "..", "sample_configs", "NW-for-test-5-4.xml")
47             with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r') as topo_uni_dir_complex:
48                 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
49             sample_files_parsed = True
50         except PermissionError as err:
51             print("Permission Error when trying to read sample files\n", err)
52             sys.exit(2)
53         except FileNotFoundError as err:
54             print("File Not found Error when trying to read sample files\n", err)
55             sys.exit(2)
56         except:
57             print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
58             sys.exit(2)
59         finally:
60             if sample_files_parsed:
61                 print("sample files content loaded")
62
63         cls.processes = test_utils.start_tpce()
64
65     @classmethod
66     def tearDownClass(cls):
67         # pylint: disable=not-an-iterable
68         for process in cls.processes:
69             test_utils.shutdown_process(process)
70         print("all processes killed")
71
72     def setUp(self):  # instruction executed before each test method
73         time.sleep(1)
74
75      # Load simple bidirectional topology
76     def test_01_load_simple_topology_bi(self):
77         response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
78         self.assertEqual(response.status_code, requests.codes.ok)
79         time.sleep(2)
80
81     # Get existing nodeId
82     def test_02_get_nodeId(self):
83         response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
84         self.assertEqual(response.status_code, requests.codes.ok)
85         res = response.json()
86         self.assertEqual(
87             res['node'][0]['node-id'], 'ROADMA01-SRG1')
88         time.sleep(1)
89
90     # Get existing linkId
91     def test_03_get_linkId(self):
92         response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
93         self.assertEqual(response.status_code, requests.codes.ok)
94         res = response.json()
95         self.assertEqual(
96             res['ietf-network-topology:link'][0]['link-id'],
97             'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
98         time.sleep(1)
99
100     # Path Computation success
101     def test_04_path_computation_xpdr_bi(self):
102         response = test_utils.path_computation_request("request-1", "service-1",
103                                                        {"node-id": "XPDRA01", "service-rate": "100",
104                                                            "service-format": "Ethernet", "clli": "nodeA"},
105                                                        {"node-id": "XPDRC01", "service-rate": "100",
106                                                            "service-format": "Ethernet", "clli": "nodeC"})
107         self.assertEqual(response.status_code, requests.codes.ok)
108         res = response.json()
109         self.assertIn('Path is calculated',
110                       res['output']['configuration-response-common']['response-message'])
111         time.sleep(5)
112
113     # Path Computation success
114     def test_05_path_computation_rdm_bi(self):
115         response = test_utils.path_computation_request("request-1", "service-1",
116                                                        {"node-id": "ROADMA01", "service-rate": "100",
117                                                            "service-format": "Ethernet", "clli": "NodeA"},
118                                                        {"node-id": "ROADMC01", "service-rate": "100",
119                                                            "service-format": "Ethernet", "clli": "NodeC"})
120         self.assertEqual(response.status_code, requests.codes.ok)
121         res = response.json()
122         self.assertIn('Path is calculated',
123                       res['output']['configuration-response-common']['response-message'])
124         time.sleep(5)
125
126     # Delete topology
127     def test_06_delete_simple_topology_bi(self):
128         response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
129         self.assertEqual(response.status_code, requests.codes.ok)
130         time.sleep(2)
131
132     # Test deleted topology
133     def test_07_test_topology_simple_bi_deleted(self):
134         response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
135         self.assertEqual(response.status_code, requests.codes.conflict)
136         time.sleep(1)
137
138     # Load simple bidirectional topology
139     def test_08_load_simple_topology_uni(self):
140         response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
141         self.assertEqual(response.status_code, 201)
142         time.sleep(2)
143
144     # Get existing nodeId
145     def test_09_get_nodeId(self):
146         response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
147         self.assertEqual(response.status_code, requests.codes.ok)
148         res = response.json()
149         self.assertEqual(
150             res['node'][0]['node-id'],
151             'XPONDER-1-2')
152         time.sleep(1)
153
154     # Get existing linkId
155     def test_10_get_linkId(self):
156         response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
157         self.assertEqual(response.status_code, requests.codes.ok)
158         res = response.json()
159         self.assertEqual(
160             res['ietf-network-topology:link'][0]['link-id'],
161             'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
162         time.sleep(1)
163
164     # Path Computation success
165     def test_11_path_computation_xpdr_uni(self):
166         response = test_utils.path_computation_request("request-1", "service-1",
167                                                        {"node-id": "XPONDER-1-2", "service-rate": "100",
168                                                            "service-format": "Ethernet", "clli": "ORANGE1"},
169                                                        {"node-id": "XPONDER-3-2", "service-rate": "100",
170                                                            "service-format": "Ethernet", "clli": "ORANGE3"})
171         self.assertEqual(response.status_code, requests.codes.ok)
172         res = response.json()
173         self.assertIn('Path is calculated',
174                       res['output']['configuration-response-common']['response-message'])
175         time.sleep(5)
176
177     # Path Computation success
178     def test_12_path_computation_rdm_uni(self):
179         response = test_utils.path_computation_request("request1", "service1",
180                                                        {"service-rate": "100", "service-format": "Ethernet",
181                                                            "clli": "cll21", "node-id": "OpenROADM-2-1"},
182                                                        {"service-rate": "100", "service-format": "Ethernet",
183                                                            "clli": "ncli22", "node-id": "OpenROADM-2-2"})
184         self.assertEqual(response.status_code, requests.codes.ok)
185         res = response.json()
186         self.assertIn('Path is calculated',
187                       res['output']['configuration-response-common']['response-message'])
188         # ZtoA path test
189         atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
190         ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
191         self.assertEqual(atozList, 15)
192         self.assertEqual(ztoaList, 15)
193         for i in range(0, 15):
194             atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
195             ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
196             if atoz['id'] == '14':
197                 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
198             if ztoa['id'] == '0':
199                 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
200         time.sleep(5)
201
202     # Delete topology
203     def test_13_delete_simple_topology(self):
204         response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
205         self.assertEqual(response.status_code, requests.codes.ok)
206         time.sleep(2)
207
208     # Test deleted topology
209     def test_14_test_topology_simple_deleted(self):
210         response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
211         self.assertEqual(response.status_code, requests.codes.conflict)
212         time.sleep(1)
213
214     # Load complex topology
215     def test_15_load_complex_topology(self):
216         response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
217         self.assertEqual(response.status_code, 201)
218         time.sleep(2)
219
220     # Get existing nodeId
221     def test_16_get_nodeId(self):
222         response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
223         self.assertEqual(response.status_code, requests.codes.ok)
224         res = response.json()
225         self.assertEqual(
226             res['node'][0]['node-id'],
227             'XPONDER-3-2')
228         time.sleep(1)
229
230     # Test failed path computation
231     def test_17_fail_path_computation(self):
232         response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
233                                            {"input": {"service-handler-header": {"request-id": "request-1"}}})
234         self.assertEqual(response.status_code, requests.codes.ok)
235         res = response.json()
236         self.assertIn('Service Name is not set',
237                       res['output']['configuration-response-common']['response-message'])
238         time.sleep(2)
239
240     # Test1 success path computation
241     def test_18_success1_path_computation(self):
242         response = test_utils.path_computation_request("request1", "service1",
243                                                        {"service-format": "Ethernet", "service-rate": "100",
244                                                         "clli": "ORANGE2", "node-id": "XPONDER-2-2",
245                                                         "tx-direction": {"port": {
246                                                             "port-device-name": "Some port-device-name",
247                                                             "port-type": "Some port-type",
248                                                             "port-name": "Some port-name",
249                                                             "port-rack": "Some port-rack",
250                                                             "port-shelf": "Some port-shelf",
251                                                             "port-slot": "Some port-slot",
252                                                             "port-sub-slot": "Some port-sub-slot"
253                                                         }},
254                                                            "rx-direction": {"port": {
255                                                                "port-device-name": "Some port-device-name",
256                                                                "port-type": "Some port-type",
257                                                                "port-name": "Some port-name",
258                                                                "port-rack": "Some port-rack",
259                                                                "port-shelf": "Some port-shelf",
260                                                                "port-slot": "Some port-slot",
261                                                                "port-sub-slot": "Some port-sub-slot"
262                                                            }}},
263                                                        {"service-format": "Ethernet", "service-rate": "100",
264                                                            "clli": "ORANGE1", "node-id": "XPONDER-1-2",
265                                                            "tx-direction": {"port": {
266                                                                "port-device-name": "Some port-device-name",
267                                                                "port-type": "Some port-type",
268                                                                "port-name": "Some port-name",
269                                                                "port-rack": "Some port-rack",
270                                                                "port-shelf": "Some port-shelf",
271                                                                "port-slot": "Some port-slot",
272                                                                "port-sub-slot": "Some port-sub-slot"
273                                                            }},
274                                                            "rx-direction": {"port": {
275                                                                "port-device-name": "Some port-device-name",
276                                                                "port-type": "Some port-type",
277                                                                "port-name": "Some port-name",
278                                                                "port-rack": "Some port-rack",
279                                                                "port-shelf": "Some port-shelf",
280                                                                "port-slot": "Some port-slot",
281                                                                "port-sub-slot": "Some port-sub-slot"
282                                                            }}},
283                                                        {"customer-code": ["Some customer-code"],
284                                                            "co-routing": {"existing-service": ["Some existing-service"]}
285                                                         },
286                                                        {"customer-code": ["Some customer-code"],
287                                                            "co-routing": {"existing-service": ["Some existing-service"]}
288                                                         },
289                                                        "hop-count", {"locally-protected-links": "true"})
290         self.assertEqual(response.status_code, requests.codes.ok)
291         res = response.json()
292         self.assertIn('Path is calculated',
293                       res['output']['configuration-response-common']['response-message'])
294         time.sleep(5)
295
296     # Test2 success path computation with path description
297     def test_19_success2_path_computation(self):
298         response = test_utils.path_computation_request("request 1", "service 1",
299                                                        {"service-rate": "100", "service-format": "Ethernet",
300                                                            "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
301                                                        {"service-rate": "100", "service-format": "Ethernet",
302                                                            "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
303         self.assertEqual(response.status_code, requests.codes.ok)
304         res = response.json()
305         self.assertIn('Path is calculated',
306                       res['output']['configuration-response-common']['response-message'])
307         self.assertEqual(5, res['output']['response-parameters']['path-description']
308                          ['aToZ-direction']['aToZ-wavelength-number'])
309         self.assertEqual(5, res['output']['response-parameters']['path-description']
310                          ['zToA-direction']['zToA-wavelength-number'])
311         time.sleep(5)
312
313     # Test3 success path computation with hard-constraints exclude
314     def test_20_success3_path_computation(self):
315         response = test_utils.path_computation_request("request 1", "service 1",
316                                                        {"service-rate": "100", "service-format": "Ethernet",
317                                                            "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
318                                                        {"service-rate": "100", "service-format": "Ethernet",
319                                                            "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
320                                                        {"exclude_": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
321         self.assertEqual(response.status_code, requests.codes.ok)
322         res = response.json()
323         self.assertIn('Path is calculated',
324                       res['output']['configuration-response-common']['response-message'])
325         self.assertEqual(9, res['output']['response-parameters']['path-description']
326                          ['aToZ-direction']['aToZ-wavelength-number'])
327         self.assertEqual(9, res['output']['response-parameters']['path-description']
328                          ['zToA-direction']['zToA-wavelength-number'])
329         time.sleep(5)
330
331     # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
332     def test_21_path_computation_before_oms_attribute_deletion(self):
333         response = test_utils.path_computation_request("request 1", "service 1",
334                                                        {"service-rate": "100", "service-format": "Ethernet",
335                                                            "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
336                                                        {"service-rate": "100", "service-format": "Ethernet",
337                                                            "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
338         self.assertEqual(response.status_code, requests.codes.ok)
339         res = response.json()
340         self.assertIn('Path is calculated',
341                       res['output']['configuration-response-common']['response-message'])
342         nbElmPath = len(res['output']['response-parameters']['path-description']
343                         ['aToZ-direction']['aToZ'])
344         self.assertEqual(31, nbElmPath)
345         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
346         find = False
347         for i in range(0, nbElmPath):
348             resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
349                              ['resource'])
350             if resource_i == link:
351                 find = True
352         self.assertEqual(find, True)
353         time.sleep(5)
354
355     # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
356     def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
357         response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
358         self.assertEqual(response.status_code, requests.codes.ok)
359         time.sleep(2)
360
361     # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
362     def test_23_path_computation_after_oms_attribute_deletion(self):
363         response = test_utils.path_computation_request("request 1", "service 1",
364                                                        {"service-rate": "100", "service-format": "Ethernet",
365                                                            "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
366                                                        {"service-rate": "100", "service-format": "Ethernet",
367                                                            "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
368         self.assertEqual(response.status_code, requests.codes.ok)
369         res = response.json()
370         self.assertIn('Path is calculated',
371                       res['output']['configuration-response-common']['response-message'])
372         nbElmPath = len(res['output']['response-parameters']['path-description']
373                         ['aToZ-direction']['aToZ'])
374         self.assertEqual(47, nbElmPath)
375         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
376         find = False
377         for i in range(0, nbElmPath):
378             resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
379                              ['resource'])
380             if resource_i == link:
381                 find = True
382         self.assertNotEqual(find, True)
383         time.sleep(5)
384
385     # Delete complex topology
386     def test_24_delete_complex_topology(self):
387         response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
388         self.assertEqual(response.status_code, requests.codes.ok)
389         time.sleep(2)
390
391     # Test deleted complex topology
392     def test_25_test_topology_complex_deleted(self):
393         response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
394         self.assertEqual(response.status_code, requests.codes.conflict)
395         time.sleep(1)
396
397
398 if __name__ == "__main__":
399     unittest.main(verbosity=2)