Align 1.2.1 tests sims/tpce management to 2.2.1
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_pce.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 import json
12 import os
13 import psutil
14 import requests
15 import signal
16 import shutil
17 import subprocess
18 import time
19 import unittest
20 from common import test_utils
21
22
23 class TransportPCEtesting(unittest.TestCase):
24
25     simple_topo_bi_dir_data = None
26     simple_topo_uni_dir_data = None
27     complex_topo_uni_dir_data = None
28
29     @classmethod
30     def _get_file(cls):
31         topo_bi_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
32                                         "..", "..", "sample_configs", "honeynode-topo.xml")
33         if os.path.isfile(topo_bi_dir_file):
34             with open(topo_bi_dir_file, 'r') as topo_bi_dir:
35                 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
36         topo_uni_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
37                                          "..", "..", "sample_configs", "NW-simple-topology.xml")
38         if os.path.isfile(topo_uni_dir_file):
39             with open(topo_uni_dir_file, 'r') as topo_uni_dir:
40                 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
41         topo_uni_dir_complex_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
42                                                  "..", "..", "sample_configs", "NW-for-test-5-4.xml")
43         if os.path.isfile(topo_uni_dir_complex_file):
44             with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
45                 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
46
47     processes = None
48     restconf_baseurl = "http://localhost:8181/restconf"
49
50     @classmethod
51     def setUpClass(cls):
52         cls._get_file()
53         cls.processes = test_utils.start_tpce()
54
55     @classmethod
56     def tearDownClass(cls):
57         for process in cls.processes:
58             test_utils.shutdown_process(process)
59         print("all processes killed")
60
61     def setUp(self):  # instruction executed before each test method
62         time.sleep(1)
63
64      # Load simple bidirectional topology
65     def test_01_load_simple_topology_bi(self):
66         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
67                .format(self.restconf_baseurl))
68         body = self.simple_topo_bi_dir_data
69         headers = {'content-type': 'application/xml',
70                    "Accept": "application/xml"}
71         response = requests.request(
72             "PUT", url, data=body, headers=headers,
73             auth=('admin', 'admin'))
74         self.assertEqual(response.status_code, requests.codes.ok)
75         time.sleep(2)
76
77     # Get existing nodeId
78     def test_02_get_nodeId(self):
79         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
80                .format(self.restconf_baseurl))
81         headers = {'content-type': 'application/json',
82                    "Accept": "application/json"}
83         response = requests.request(
84             "GET", url, headers=headers, auth=('admin', 'admin'))
85         self.assertEqual(response.status_code, requests.codes.ok)
86         res = response.json()
87         self.assertEqual(
88             res['node'][0]['node-id'], 'ROADMA01-SRG1')
89         time.sleep(1)
90
91     # Get existing linkId
92     def test_03_get_linkId(self):
93         url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
94                .format(self.restconf_baseurl))
95         headers = {'content-type': 'application/json',
96                    "Accept": "application/json"}
97         response = requests.request(
98             "GET", url, headers=headers, auth=('admin', 'admin'))
99         self.assertEqual(response.status_code, requests.codes.ok)
100         res = response.json()
101         self.assertEqual(
102             res['ietf-network-topology:link'][0]['link-id'],
103             'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
104         time.sleep(1)
105
106     # Path Computation success
107     def test_04_path_computation_xpdr_bi(self):
108         url = ("{}/operations/transportpce-pce:path-computation-request"
109                .format(self.restconf_baseurl))
110         body = {"input": {
111                 "service-name": "service-1",
112                 "resource-reserve": "true",
113                 "pce-metric": "hop-count",
114                 "service-handler-header": {
115                     "request-id": "request-1"
116                 },
117                 "service-a-end": {
118                     "node-id": "XPDRA01",
119                     "service-rate": "100",
120                     "service-format": "Ethernet",
121                     "clli": "nodeA"
122                 },
123                 "service-z-end": {
124                     "node-id": "XPDRC01",
125                     "service-rate": "100",
126                     "service-format": "Ethernet",
127                     "clli": "nodeC"
128                 }
129                 }
130                 }
131         headers = {'content-type': 'application/json',
132                    "Accept": "application/json"}
133         response = requests.request(
134             "POST", url, data=json.dumps(body), headers=headers,
135             auth=('admin', 'admin'))
136         self.assertEqual(response.status_code, requests.codes.ok)
137         res = response.json()
138         self.assertIn('Path is calculated',
139                       res['output']['configuration-response-common']['response-message'])
140         time.sleep(5)
141
142     # Path Computation success
143     def test_05_path_computation_rdm_bi(self):
144         url = ("{}/operations/transportpce-pce:path-computation-request"
145                .format(self.restconf_baseurl))
146         body = {"input": {
147                 "service-name": "service-1",
148                 "resource-reserve": "true",
149                 "pce-metric": "hop-count",
150                 "service-handler-header": {
151                     "request-id": "request-1"
152                 },
153                 "service-a-end": {
154                     "node-id": "ROADMA01",
155                     "service-rate": "100",
156                     "service-format": "Ethernet",
157                     "clli": "NodeA"
158                 },
159                 "service-z-end": {
160                     "node-id": "ROADMC01",
161                     "service-rate": "100",
162                     "service-format": "Ethernet",
163                     "clli": "NodeC"
164                 }
165                 }
166                 }
167         headers = {'content-type': 'application/json',
168                    "Accept": "application/json"}
169         response = requests.request(
170             "POST", url, data=json.dumps(body), headers=headers,
171             auth=('admin', 'admin'))
172         self.assertEqual(response.status_code, requests.codes.ok)
173         res = response.json()
174         self.assertIn('Path is calculated',
175                       res['output']['configuration-response-common']['response-message'])
176         time.sleep(5)
177
178     # Delete topology
179     def test_06_delete_simple_topology_bi(self):
180         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
181                .format(self.restconf_baseurl))
182         headers = {'content-type': 'application/xml',
183                    "Accept": "application/json"}
184         response = requests.request(
185             "DELETE", url, headers=headers, auth=('admin', 'admin'))
186         self.assertEqual(response.status_code, requests.codes.ok)
187         time.sleep(2)
188
189     # Test deleted topology
190     def test_07_test_topology_simple_bi_deleted(self):
191         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
192                .format(self.restconf_baseurl))
193         headers = {'content-type': 'application/json',
194                    "Accept": "application/json"}
195         response = requests.request(
196             "GET", url, headers=headers, auth=('admin', 'admin'))
197         self.assertEqual(response.status_code, 404)
198         time.sleep(1)
199
200     # Load simple bidirectional topology
201     def test_08_load_simple_topology_uni(self):
202         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
203                .format(self.restconf_baseurl))
204         body = self.simple_topo_uni_dir_data
205         headers = {'content-type': 'application/xml',
206                    "Accept": "application/xml"}
207         response = requests.request(
208             "PUT", url, data=body, headers=headers,
209             auth=('admin', 'admin'))
210         self.assertEqual(response.status_code, 201)
211         time.sleep(2)
212
213     # Get existing nodeId
214     def test_09_get_nodeId(self):
215         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
216                .format(self.restconf_baseurl))
217         headers = {'content-type': 'application/json',
218                    "Accept": "application/json"}
219         response = requests.request(
220             "GET", url, headers=headers, auth=('admin', 'admin'))
221         self.assertEqual(response.status_code, requests.codes.ok)
222         res = response.json()
223         self.assertEqual(
224             res['node'][0]['node-id'],
225             'XPONDER-1-2')
226         time.sleep(1)
227
228     # Get existing linkId
229     def test_10_get_linkId(self):
230         url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
231                .format(self.restconf_baseurl))
232         headers = {'content-type': 'application/json',
233                    "Accept": "application/json"}
234         response = requests.request(
235             "GET", url, headers=headers, auth=('admin', 'admin'))
236         self.assertEqual(response.status_code, requests.codes.ok)
237         res = response.json()
238         self.assertEqual(
239             res['ietf-network-topology:link'][0]['link-id'],
240             'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
241         time.sleep(1)
242
243     # Path Computation success
244     def test_11_path_computation_xpdr_uni(self):
245         url = ("{}/operations/transportpce-pce:path-computation-request"
246                .format(self.restconf_baseurl))
247         body = {"input": {
248                 "service-name": "service-1",
249                 "resource-reserve": "true",
250                 "pce-metric": "hop-count",
251                 "service-handler-header": {
252                     "request-id": "request-1"
253                 },
254                 "service-a-end": {
255                     "node-id": "XPONDER-1-2",
256                     "service-rate": "100",
257                     "service-format": "Ethernet",
258                     "clli": "ORANGE1"
259                 },
260                 "service-z-end": {
261                     "node-id": "XPONDER-3-2",
262                     "service-rate": "100",
263                     "service-format": "Ethernet",
264                     "clli": "ORANGE3"
265                 }
266                 }
267                 }
268         headers = {'content-type': 'application/json',
269                    "Accept": "application/json"}
270         response = requests.request(
271             "POST", url, data=json.dumps(body), headers=headers,
272             auth=('admin', 'admin'))
273         self.assertEqual(response.status_code, requests.codes.ok)
274         res = response.json()
275         self.assertIn('Path is calculated',
276                       res['output']['configuration-response-common']['response-message'])
277         time.sleep(5)
278
279     # Path Computation success
280     def test_12_path_computation_rdm_uni(self):
281         url = ("{}/operations/transportpce-pce:path-computation-request"
282                .format(self.restconf_baseurl))
283         body = {"input": {
284                 "service-name": "service1",
285                 "resource-reserve": "true",
286                 "service-handler-header": {
287                     "request-id": "request1"
288                 },
289                 "service-a-end": {
290                     "service-rate": "100",
291                     "service-format": "Ethernet",
292                     "clli": "cll21",
293                     "node-id": "OpenROADM-2-1"
294                 },
295                 "service-z-end": {
296                     "service-rate": "100",
297                     "service-format": "Ethernet",
298                     "clli": "ncli22",
299                     "node-id": "OpenROADM-2-2"
300                 },
301                 "pce-metric": "hop-count"
302                 }
303                 }
304         headers = {'content-type': 'application/json',
305                    "Accept": "application/json"}
306         response = requests.request(
307             "POST", url, data=json.dumps(body), headers=headers,
308             auth=('admin', 'admin'))
309         self.assertEqual(response.status_code, requests.codes.ok)
310         res = response.json()
311         self.assertIn('Path is calculated',
312                       res['output']['configuration-response-common']['response-message'])
313         # ZtoA path test
314         atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
315         ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
316         self.assertEqual(atozList, 15)
317         self.assertEqual(ztoaList, 15)
318         for i in range(0, 15):
319             atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
320             ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
321             if (atoz['id'] == '14'):
322                 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
323             if (ztoa['id'] == '0'):
324                 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
325         time.sleep(5)
326
327     # Delete topology
328     def test_13_delete_simple_topology(self):
329         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
330                .format(self.restconf_baseurl))
331         headers = {'content-type': 'application/xml',
332                    "Accept": "application/json"}
333         response = requests.request(
334             "DELETE", url, headers=headers, auth=('admin', 'admin'))
335         self.assertEqual(response.status_code, requests.codes.ok)
336         time.sleep(2)
337
338     # Test deleted topology
339     def test_14_test_topology_simple_deleted(self):
340         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
341                .format(self.restconf_baseurl))
342         headers = {'content-type': 'application/json',
343                    "Accept": "application/json"}
344         response = requests.request(
345             "GET", url, headers=headers, auth=('admin', 'admin'))
346         self.assertEqual(response.status_code, 404)
347         time.sleep(1)
348
349     # Load complex topology
350     def test_15_load_complex_topology(self):
351         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
352                .format(self.restconf_baseurl))
353         body = self.complex_topo_uni_dir_data
354         headers = {'content-type': 'application/xml',
355                    "Accept": "application/json"}
356         response = requests.request(
357             "PUT", url, data=body, headers=headers,
358             auth=('admin', 'admin'))
359         self.assertEqual(response.status_code, 201)
360         time.sleep(2)
361
362     # Get existing nodeId
363     def test_16_get_nodeId(self):
364         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
365                .format(self.restconf_baseurl))
366         headers = {'content-type': 'application/json',
367                    "Accept": "application/json"}
368         response = requests.request(
369             "GET", url, headers=headers, auth=('admin', 'admin'))
370         self.assertEqual(response.status_code, requests.codes.ok)
371         res = response.json()
372         self.assertEqual(
373             res['node'][0]['node-id'],
374             'XPONDER-3-2')
375         time.sleep(1)
376
377     # Test failed path computation
378     def test_17_fail_path_computation(self):
379         url = ("{}/operations/transportpce-pce:path-computation-request"
380                .format(self.restconf_baseurl))
381         body = {"input": {
382                 "service-handler-header": {
383                     "request-id": "request-1"
384                 }
385                 }
386                 }
387         headers = {'content-type': 'application/json',
388                    "Accept": "application/json"}
389         response = requests.request(
390             "POST", url, data=json.dumps(body), headers=headers,
391             auth=('admin', 'admin'))
392         self.assertEqual(response.status_code, requests.codes.ok)
393         res = response.json()
394         self.assertIn('Service Name is not set',
395                       res['output']['configuration-response-common']['response-message'])
396         time.sleep(2)
397
398     # Test1 success path computation
399     def test_18_success1_path_computation(self):
400         url = ("{}/operations/transportpce-pce:path-computation-request"
401                .format(self.restconf_baseurl))
402         body = {"input": {
403                 "service-name": "service1",
404                 "resource-reserve": "true",
405                 "service-handler-header": {
406                     "request-id": "request1"
407                 },
408                 "service-a-end": {
409                     "service-format": "Ethernet",
410                     "service-rate": "100",
411                     "clli": "ORANGE2",
412                     "node-id": "XPONDER-2-2",
413                     "tx-direction": {
414                         "port": {
415                             "port-device-name": "Some port-device-name",
416                             "port-type": "Some port-type",
417                             "port-name": "Some port-name",
418                             "port-rack": "Some port-rack",
419                             "port-shelf": "Some port-shelf",
420                             "port-slot": "Some port-slot",
421                             "port-sub-slot": "Some port-sub-slot"
422                         }
423                     },
424                     "rx-direction": {
425                         "port": {
426                             "port-device-name": "Some port-device-name",
427                             "port-type": "Some port-type",
428                             "port-name": "Some port-name",
429                             "port-rack": "Some port-rack",
430                             "port-shelf": "Some port-shelf",
431                             "port-slot": "Some port-slot",
432                             "port-sub-slot": "Some port-sub-slot"
433                         }
434                     }
435                 },
436                 "service-z-end": {
437                     "service-format": "Ethernet",
438                     "service-rate": "100",
439                     "clli": "ORANGE1",
440                     "node-id": "XPONDER-1-2",
441                     "tx-direction": {
442                         "port": {
443                             "port-device-name": "Some port-device-name",
444                             "port-type": "Some port-type",
445                             "port-name": "Some port-name",
446                             "port-rack": "Some port-rack",
447                             "port-shelf": "Some port-shelf",
448                             "port-slot": "Some port-slot",
449                             "port-sub-slot": "Some port-sub-slot"
450                         }
451                     },
452                     "rx-direction": {
453                         "port": {
454                             "port-device-name": "Some port-device-name",
455                             "port-type": "Some port-type",
456                             "port-name": "Some port-name",
457                             "port-rack": "Some port-rack",
458                             "port-shelf": "Some port-shelf",
459                             "port-slot": "Some port-slot",
460                             "port-sub-slot": "Some port-sub-slot"
461                         }
462                     }
463                 },
464                 "hard-constraints": {
465                     "customer-code": [
466                         "Some customer-code"
467                     ],
468                     "co-routing": {
469                         "existing-service": [
470                             "Some existing-service"
471                         ]
472                     }
473                 },
474                 "soft-constraints": {
475                     "customer-code": [
476                         "Some customer-code"
477                     ],
478                     "co-routing": {
479                         "existing-service": [
480                             "Some existing-service"
481                         ]
482                     }
483                 },
484                 "pce-metric": "hop-count",
485                 "locally-protected-links": "true"
486                 }
487                 }
488         headers = {'content-type': 'application/json',
489                    "Accept": "application/json"}
490         response = requests.request(
491             "POST", url, data=json.dumps(body), headers=headers,
492             auth=('admin', 'admin'))
493         self.assertEqual(response.status_code, requests.codes.ok)
494         res = response.json()
495         self.assertIn('Path is calculated',
496                       res['output']['configuration-response-common']['response-message'])
497         time.sleep(5)
498
499     # Test2 success path computation with path description
500     def test_19_success2_path_computation(self):
501         url = ("{}/operations/transportpce-pce:path-computation-request"
502                .format(self.restconf_baseurl))
503         body = {"input": {
504                 "service-name": "service 1",
505                 "resource-reserve": "true",
506                 "service-handler-header": {
507                     "request-id": "request 1"
508                 },
509                 "service-a-end": {
510                     "service-rate": "100",
511                     "service-format": "Ethernet",
512                     "node-id": "XPONDER-1-2",
513                     "clli": "ORANGE1"
514                 },
515                 "service-z-end": {
516                     "service-rate": "100",
517                     "service-format": "Ethernet",
518                     "node-id": "XPONDER-3-2",
519                     "clli": "ORANGE3"
520                 },
521                 "pce-metric": "hop-count"
522                 }
523                 }
524         headers = {'content-type': 'application/json',
525                    "Accept": "application/json"}
526         response = requests.request(
527             "POST", url, data=json.dumps(body), headers=headers,
528             auth=('admin', 'admin'))
529         self.assertEqual(response.status_code, requests.codes.ok)
530         res = response.json()
531         self.assertIn('Path is calculated',
532                       res['output']['configuration-response-common']['response-message'])
533         self.assertEqual(5, res['output']['response-parameters']['path-description']
534                          ['aToZ-direction']['aToZ-wavelength-number'])
535         self.assertEqual(5, res['output']['response-parameters']['path-description']
536                          ['zToA-direction']['zToA-wavelength-number'])
537         time.sleep(5)
538
539     # Test3 success path computation with hard-constraints exclude
540     def test_20_success3_path_computation(self):
541         url = ("{}/operations/transportpce-pce:path-computation-request"
542                .format(self.restconf_baseurl))
543         body = {"input": {
544                 "service-name": "service 1",
545                 "resource-reserve": "true",
546                 "service-handler-header": {
547                     "request-id": "request 1"
548                 },
549                 "service-a-end": {
550                     "service-rate": "100",
551                     "service-format": "Ethernet",
552                     "node-id": "XPONDER-1-2",
553                     "clli": "ORANGE1"
554                 },
555                 "service-z-end": {
556                     "service-rate": "100",
557                     "service-format": "Ethernet",
558                     "node-id": "XPONDER-3-2",
559                     "clli": "ORANGE3"
560                 },
561                 "hard-constraints": {
562                     "exclude_": {
563                         "node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]
564                     }
565                 },
566                 "pce-metric": "hop-count"
567                 }
568                 }
569         headers = {'content-type': 'application/json',
570                    "Accept": "application/json"}
571         response = requests.request(
572             "POST", url, data=json.dumps(body), headers=headers,
573             auth=('admin', 'admin'))
574         self.assertEqual(response.status_code, requests.codes.ok)
575         res = response.json()
576         self.assertIn('Path is calculated',
577                       res['output']['configuration-response-common']['response-message'])
578         self.assertEqual(9, res['output']['response-parameters']['path-description']
579                          ['aToZ-direction']['aToZ-wavelength-number'])
580         self.assertEqual(9, res['output']['response-parameters']['path-description']
581                          ['zToA-direction']['zToA-wavelength-number'])
582         time.sleep(5)
583
584     # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
585     def test_21_path_computation_before_oms_attribute_deletion(self):
586         url = ("{}/operations/transportpce-pce:path-computation-request"
587                .format(self.restconf_baseurl))
588         body = {"input": {
589                 "service-name": "service 1",
590                 "resource-reserve": "true",
591                 "service-handler-header": {
592                     "request-id": "request 1"
593                 },
594                 "service-a-end": {
595                     "service-rate": "100",
596                     "service-format": "Ethernet",
597                     "node-id": "XPONDER-2-2",
598                     "clli": "ORANGE2"
599                 },
600                 "service-z-end": {
601                     "service-rate": "100",
602                     "service-format": "Ethernet",
603                     "node-id": "XPONDER-1-2",
604                     "clli": "ORANGE1"
605                 },
606                 "pce-metric": "hop-count"
607                 }
608                 }
609         headers = {'content-type': 'application/json',
610                    "Accept": "application/json"}
611         response = requests.request(
612             "POST", url, data=json.dumps(body), headers=headers,
613             auth=('admin', 'admin'))
614         self.assertEqual(response.status_code, requests.codes.ok)
615         res = response.json()
616         self.assertIn('Path is calculated',
617                       res['output']['configuration-response-common']['response-message'])
618         nbElmPath = len(res['output']['response-parameters']['path-description']
619                         ['aToZ-direction']['aToZ'])
620         self.assertEqual(31, nbElmPath)
621         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
622         find = False
623         for i in range(0, nbElmPath):
624             resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
625             if(resource_i == link):
626                 find = True
627         self.assertEqual(find, True)
628         time.sleep(5)
629
630     # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
631     def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
632         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
633                "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2/org-openroadm-network-topology:OMS-attributes/span"
634                .format(self.restconf_baseurl))
635         headers = {'content-type': 'application/xml',
636                    "Accept": "application/json"}
637         response = requests.request(
638             "DELETE", url, headers=headers, auth=('admin', 'admin'))
639         self.assertEqual(response.status_code, requests.codes.ok)
640         time.sleep(2)
641
642     # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
643     def test_23_path_computation_after_oms_attribute_deletion(self):
644         url = ("{}/operations/transportpce-pce:path-computation-request"
645                .format(self.restconf_baseurl))
646         body = {"input": {
647                 "service-name": "service 1",
648                 "resource-reserve": "true",
649                 "service-handler-header": {
650                     "request-id": "request 1"
651                 },
652                 "service-a-end": {
653                     "service-rate": "100",
654                     "service-format": "Ethernet",
655                     "node-id": "XPONDER-2-2",
656                     "clli": "ORANGE2"
657                 },
658                 "service-z-end": {
659                     "service-rate": "100",
660                     "service-format": "Ethernet",
661                     "node-id": "XPONDER-1-2",
662                     "clli": "ORANGE1"
663                 },
664                 "pce-metric": "hop-count"
665                 }
666                 }
667         headers = {'content-type': 'application/json',
668                    "Accept": "application/json"}
669         response = requests.request(
670             "POST", url, data=json.dumps(body), headers=headers,
671             auth=('admin', 'admin'))
672         self.assertEqual(response.status_code, requests.codes.ok)
673         res = response.json()
674         self.assertIn('Path is calculated',
675                       res['output']['configuration-response-common']['response-message'])
676         nbElmPath = len(res['output']['response-parameters']['path-description']
677                         ['aToZ-direction']['aToZ'])
678         self.assertEqual(47, nbElmPath)
679         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
680         find = False
681         for i in range(0, nbElmPath):
682             resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
683             if (resource_i == link):
684                 find = True
685         self.assertNotEqual(find, True)
686         time.sleep(5)
687
688     # Delete complex topology
689     def test_24_delete_complex_topology(self):
690         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
691                .format(self.restconf_baseurl))
692         headers = {'content-type': 'application/xml',
693                    "Accept": "application/json"}
694         response = requests.request(
695             "DELETE", url, headers=headers, auth=('admin', 'admin'))
696         self.assertEqual(response.status_code, requests.codes.ok)
697         time.sleep(2)
698
699     # Test deleted complex topology
700     def test_25_test_topology_complex_deleted(self):
701         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
702                .format(self.restconf_baseurl))
703         headers = {'content-type': 'application/json',
704                    "Accept": "application/json"}
705         response = requests.request(
706             "GET", url, headers=headers, auth=('admin', 'admin'))
707         self.assertEqual(response.status_code, 404)
708         time.sleep(1)
709
710
711 if __name__ == "__main__":
712     unittest.main(verbosity=2)