f97dda23adb6159908643770b7da8f828808a17a
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_pce.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 import json
12 import os
13 import psutil
14 import requests
15 import signal
16 import shutil
17 import subprocess
18 import time
19 import unittest
20 import test_utils
21
22
23 class TransportPCEtesting(unittest.TestCase):
24
25     odl_process = None
26     simple_topo_bi_dir_data = None
27     simple_topo_uni_dir_data = None
28     complex_topo_uni_dir_data = None
29     restconf_baseurl = "http://localhost:8181/restconf"
30
31     @classmethod
32     def _get_file(cls):
33         topo_bi_dir_file = "sample_configs/honeynode-topo.xml"
34         if os.path.isfile(topo_bi_dir_file):
35             with open(topo_bi_dir_file, 'r') as topo_bi_dir:
36                 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
37         topo_uni_dir_file = "sample_configs/NW-simple-topology.xml"
38         if os.path.isfile(topo_uni_dir_file):
39             with open(topo_uni_dir_file, 'r') as topo_uni_dir:
40                 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
41         topo_uni_dir_complex_file = "sample_configs/NW-for-test-5-4.xml"
42         if os.path.isfile(topo_uni_dir_complex_file):
43             with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
44                 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
45
46     @classmethod
47     def setUpClass(cls):  # a class method called before tests in an individual class run.
48         cls._get_file()
49         print("starting opendaylight...")
50         cls.odl_process = test_utils.start_tpce()
51         time.sleep(90)
52         print("opendaylight started")
53
54     @classmethod
55     def tearDownClass(cls):
56         for child in psutil.Process(cls.odl_process.pid).children():
57             child.send_signal(signal.SIGINT)
58             child.wait()
59         cls.odl_process.send_signal(signal.SIGINT)
60         cls.odl_process.wait()
61
62     def setUp(self):  # instruction executed before each test method
63         time.sleep(1)
64
65      # Load simple bidirectional topology
66     def test_01_load_simple_topology_bi(self):
67         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
68                .format(self.restconf_baseurl))
69         body = self.simple_topo_bi_dir_data
70         headers = {'content-type': 'application/xml',
71                    "Accept": "application/xml"}
72         response = requests.request(
73             "PUT", url, data=body, headers=headers,
74             auth=('admin', 'admin'))
75         self.assertEqual(response.status_code, requests.codes.ok)
76         time.sleep(2)
77
78     # Get existing nodeId
79     def test_02_get_nodeId(self):
80         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
81                .format(self.restconf_baseurl))
82         headers = {'content-type': 'application/json',
83                    "Accept": "application/json"}
84         response = requests.request(
85             "GET", url, headers=headers, auth=('admin', 'admin'))
86         self.assertEqual(response.status_code, requests.codes.ok)
87         res = response.json()
88         self.assertEqual(
89             res['node'][0]['node-id'], 'ROADMA01-SRG1')
90         time.sleep(1)
91
92     # Get existing linkId
93     def test_03_get_linkId(self):
94         url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
95                .format(self.restconf_baseurl))
96         headers = {'content-type': 'application/json',
97                    "Accept": "application/json"}
98         response = requests.request(
99             "GET", url, headers=headers, auth=('admin', 'admin'))
100         self.assertEqual(response.status_code, requests.codes.ok)
101         res = response.json()
102         self.assertEqual(
103             res['ietf-network-topology:link'][0]['link-id'],
104             'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
105         time.sleep(1)
106
107     # Path Computation success
108     def test_04_path_computation_xpdr_bi(self):
109         url = ("{}/operations/transportpce-pce:path-computation-request"
110                .format(self.restconf_baseurl))
111         body = {"input": {
112                 "service-name": "service-1",
113                 "resource-reserve": "true",
114                 "pce-metric": "hop-count",
115                 "service-handler-header": {
116                     "request-id": "request-1"
117                 },
118                 "service-a-end": {
119                     "node-id": "XPDRA01",
120                     "service-rate": "100",
121                     "service-format": "Ethernet",
122                     "clli": "nodeA"
123                 },
124                 "service-z-end": {
125                     "node-id": "XPDRC01",
126                     "service-rate": "100",
127                     "service-format": "Ethernet",
128                     "clli": "nodeC"
129                 }
130                 }
131                 }
132         headers = {'content-type': 'application/json',
133                    "Accept": "application/json"}
134         response = requests.request(
135             "POST", url, data=json.dumps(body), headers=headers,
136             auth=('admin', 'admin'))
137         self.assertEqual(response.status_code, requests.codes.ok)
138         res = response.json()
139         self.assertIn('Path is calculated',
140                       res['output']['configuration-response-common']['response-message'])
141         time.sleep(5)
142
143     # Path Computation success
144     def test_05_path_computation_rdm_bi(self):
145         url = ("{}/operations/transportpce-pce:path-computation-request"
146                .format(self.restconf_baseurl))
147         body = {"input": {
148                 "service-name": "service-1",
149                 "resource-reserve": "true",
150                 "pce-metric": "hop-count",
151                 "service-handler-header": {
152                     "request-id": "request-1"
153                 },
154                 "service-a-end": {
155                     "node-id": "ROADMA01",
156                     "service-rate": "100",
157                     "service-format": "Ethernet",
158                     "clli": "NodeA"
159                 },
160                 "service-z-end": {
161                     "node-id": "ROADMC01",
162                     "service-rate": "100",
163                     "service-format": "Ethernet",
164                     "clli": "NodeC"
165                 }
166                 }
167                 }
168         headers = {'content-type': 'application/json',
169                    "Accept": "application/json"}
170         response = requests.request(
171             "POST", url, data=json.dumps(body), headers=headers,
172             auth=('admin', 'admin'))
173         self.assertEqual(response.status_code, requests.codes.ok)
174         res = response.json()
175         self.assertIn('Path is calculated',
176                       res['output']['configuration-response-common']['response-message'])
177         time.sleep(5)
178
179     # Delete topology
180     def test_06_delete_simple_topology_bi(self):
181         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
182                .format(self.restconf_baseurl))
183         headers = {'content-type': 'application/xml',
184                    "Accept": "application/json"}
185         response = requests.request(
186             "DELETE", url, headers=headers, auth=('admin', 'admin'))
187         self.assertEqual(response.status_code, requests.codes.ok)
188         time.sleep(2)
189
190     # Test deleted topology
191     def test_07_test_topology_simple_bi_deleted(self):
192         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
193                .format(self.restconf_baseurl))
194         headers = {'content-type': 'application/json',
195                    "Accept": "application/json"}
196         response = requests.request(
197             "GET", url, headers=headers, auth=('admin', 'admin'))
198         self.assertEqual(response.status_code, 404)
199         time.sleep(1)
200
201     # Load simple bidirectional topology
202     def test_08_load_simple_topology_uni(self):
203         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
204                .format(self.restconf_baseurl))
205         body = self.simple_topo_uni_dir_data
206         headers = {'content-type': 'application/xml',
207                    "Accept": "application/xml"}
208         response = requests.request(
209             "PUT", url, data=body, headers=headers,
210             auth=('admin', 'admin'))
211         self.assertEqual(response.status_code, 201)
212         time.sleep(2)
213
214     # Get existing nodeId
215     def test_09_get_nodeId(self):
216         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
217                .format(self.restconf_baseurl))
218         headers = {'content-type': 'application/json',
219                    "Accept": "application/json"}
220         response = requests.request(
221             "GET", url, headers=headers, auth=('admin', 'admin'))
222         self.assertEqual(response.status_code, requests.codes.ok)
223         res = response.json()
224         self.assertEqual(
225             res['node'][0]['node-id'],
226             'XPONDER-1-2')
227         time.sleep(1)
228
229     # Get existing linkId
230     def test_10_get_linkId(self):
231         url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
232                .format(self.restconf_baseurl))
233         headers = {'content-type': 'application/json',
234                    "Accept": "application/json"}
235         response = requests.request(
236             "GET", url, headers=headers, auth=('admin', 'admin'))
237         self.assertEqual(response.status_code, requests.codes.ok)
238         res = response.json()
239         self.assertEqual(
240             res['ietf-network-topology:link'][0]['link-id'],
241             'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
242         time.sleep(1)
243
244     # Path Computation success
245     def test_11_path_computation_xpdr_uni(self):
246         url = ("{}/operations/transportpce-pce:path-computation-request"
247                .format(self.restconf_baseurl))
248         body = {"input": {
249                 "service-name": "service-1",
250                 "resource-reserve": "true",
251                 "pce-metric": "hop-count",
252                 "service-handler-header": {
253                     "request-id": "request-1"
254                 },
255                 "service-a-end": {
256                     "node-id": "XPONDER-1-2",
257                     "service-rate": "100",
258                     "service-format": "Ethernet",
259                     "clli": "ORANGE1"
260                 },
261                 "service-z-end": {
262                     "node-id": "XPONDER-3-2",
263                     "service-rate": "100",
264                     "service-format": "Ethernet",
265                     "clli": "ORANGE3"
266                 }
267                 }
268                 }
269         headers = {'content-type': 'application/json',
270                    "Accept": "application/json"}
271         response = requests.request(
272             "POST", url, data=json.dumps(body), headers=headers,
273             auth=('admin', 'admin'))
274         self.assertEqual(response.status_code, requests.codes.ok)
275         res = response.json()
276         self.assertIn('Path is calculated',
277                       res['output']['configuration-response-common']['response-message'])
278         time.sleep(5)
279
280     # Path Computation success
281     def test_12_path_computation_rdm_uni(self):
282         url = ("{}/operations/transportpce-pce:path-computation-request"
283                .format(self.restconf_baseurl))
284         body = {"input": {
285                 "service-name": "service1",
286                 "resource-reserve": "true",
287                 "service-handler-header": {
288                     "request-id": "request1"
289                 },
290                 "service-a-end": {
291                     "service-rate": "100",
292                     "service-format": "Ethernet",
293                     "clli": "cll21",
294                     "node-id": "OpenROADM-2-1"
295                 },
296                 "service-z-end": {
297                     "service-rate": "100",
298                     "service-format": "Ethernet",
299                     "clli": "ncli22",
300                     "node-id": "OpenROADM-2-2"
301                 },
302                 "pce-metric": "hop-count"
303                 }
304                 }
305         headers = {'content-type': 'application/json',
306                    "Accept": "application/json"}
307         response = requests.request(
308             "POST", url, data=json.dumps(body), headers=headers,
309             auth=('admin', 'admin'))
310         self.assertEqual(response.status_code, requests.codes.ok)
311         res = response.json()
312         self.assertIn('Path is calculated',
313                       res['output']['configuration-response-common']['response-message'])
314         # ZtoA path test
315         atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
316         ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
317         self.assertEqual(atozList, 15)
318         self.assertEqual(ztoaList, 15)
319         for i in range(0, 15):
320             atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
321             ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
322             if (atoz['id'] == '14'):
323                 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
324             if (ztoa['id'] == '0'):
325                 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
326         time.sleep(5)
327
328     # Delete topology
329     def test_13_delete_simple_topology(self):
330         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
331                .format(self.restconf_baseurl))
332         headers = {'content-type': 'application/xml',
333                    "Accept": "application/json"}
334         response = requests.request(
335             "DELETE", url, headers=headers, auth=('admin', 'admin'))
336         self.assertEqual(response.status_code, requests.codes.ok)
337         time.sleep(2)
338
339     # Test deleted topology
340     def test_14_test_topology_simple_deleted(self):
341         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
342                .format(self.restconf_baseurl))
343         headers = {'content-type': 'application/json',
344                    "Accept": "application/json"}
345         response = requests.request(
346             "GET", url, headers=headers, auth=('admin', 'admin'))
347         self.assertEqual(response.status_code, 404)
348         time.sleep(1)
349
350     # Load complex topology
351     def test_15_load_complex_topology(self):
352         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
353                .format(self.restconf_baseurl))
354         body = self.complex_topo_uni_dir_data
355         headers = {'content-type': 'application/xml',
356                    "Accept": "application/json"}
357         response = requests.request(
358             "PUT", url, data=body, headers=headers,
359             auth=('admin', 'admin'))
360         self.assertEqual(response.status_code, 201)
361         time.sleep(2)
362
363     # Get existing nodeId
364     def test_16_get_nodeId(self):
365         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
366                .format(self.restconf_baseurl))
367         headers = {'content-type': 'application/json',
368                    "Accept": "application/json"}
369         response = requests.request(
370             "GET", url, headers=headers, auth=('admin', 'admin'))
371         self.assertEqual(response.status_code, requests.codes.ok)
372         res = response.json()
373         self.assertEqual(
374             res['node'][0]['node-id'],
375             'XPONDER-3-2')
376         time.sleep(1)
377
378     # Test failed path computation
379     def test_17_fail_path_computation(self):
380         url = ("{}/operations/transportpce-pce:path-computation-request"
381                .format(self.restconf_baseurl))
382         body = {"input": {
383                 "service-handler-header": {
384                     "request-id": "request-1"
385                 }
386                 }
387                 }
388         headers = {'content-type': 'application/json',
389                    "Accept": "application/json"}
390         response = requests.request(
391             "POST", url, data=json.dumps(body), headers=headers,
392             auth=('admin', 'admin'))
393         self.assertEqual(response.status_code, requests.codes.ok)
394         res = response.json()
395         self.assertIn('Service Name is not set',
396                       res['output']['configuration-response-common']['response-message'])
397         time.sleep(2)
398
399     # Test1 success path computation
400     def test_18_success1_path_computation(self):
401         url = ("{}/operations/transportpce-pce:path-computation-request"
402                .format(self.restconf_baseurl))
403         body = {"input": {
404                 "service-name": "service1",
405                 "resource-reserve": "true",
406                 "service-handler-header": {
407                     "request-id": "request1"
408                 },
409                 "service-a-end": {
410                     "service-format": "Ethernet",
411                     "service-rate": "100",
412                     "clli": "ORANGE2",
413                     "node-id": "XPONDER-2-2",
414                     "tx-direction": {
415                         "port": {
416                             "port-device-name": "Some port-device-name",
417                             "port-type": "Some port-type",
418                             "port-name": "Some port-name",
419                             "port-rack": "Some port-rack",
420                             "port-shelf": "Some port-shelf",
421                             "port-slot": "Some port-slot",
422                             "port-sub-slot": "Some port-sub-slot"
423                         }
424                     },
425                     "rx-direction": {
426                         "port": {
427                             "port-device-name": "Some port-device-name",
428                             "port-type": "Some port-type",
429                             "port-name": "Some port-name",
430                             "port-rack": "Some port-rack",
431                             "port-shelf": "Some port-shelf",
432                             "port-slot": "Some port-slot",
433                             "port-sub-slot": "Some port-sub-slot"
434                         }
435                     }
436                 },
437                 "service-z-end": {
438                     "service-format": "Ethernet",
439                     "service-rate": "100",
440                     "clli": "ORANGE1",
441                     "node-id": "XPONDER-1-2",
442                     "tx-direction": {
443                         "port": {
444                             "port-device-name": "Some port-device-name",
445                             "port-type": "Some port-type",
446                             "port-name": "Some port-name",
447                             "port-rack": "Some port-rack",
448                             "port-shelf": "Some port-shelf",
449                             "port-slot": "Some port-slot",
450                             "port-sub-slot": "Some port-sub-slot"
451                         }
452                     },
453                     "rx-direction": {
454                         "port": {
455                             "port-device-name": "Some port-device-name",
456                             "port-type": "Some port-type",
457                             "port-name": "Some port-name",
458                             "port-rack": "Some port-rack",
459                             "port-shelf": "Some port-shelf",
460                             "port-slot": "Some port-slot",
461                             "port-sub-slot": "Some port-sub-slot"
462                         }
463                     }
464                 },
465                 "hard-constraints": {
466                     "customer-code": [
467                         "Some customer-code"
468                     ],
469                     "co-routing": {
470                         "existing-service": [
471                             "Some existing-service"
472                         ]
473                     }
474                 },
475                 "soft-constraints": {
476                     "customer-code": [
477                         "Some customer-code"
478                     ],
479                     "co-routing": {
480                         "existing-service": [
481                             "Some existing-service"
482                         ]
483                     }
484                 },
485                 "pce-metric": "hop-count",
486                 "locally-protected-links": "true"
487                 }
488                 }
489         headers = {'content-type': 'application/json',
490                    "Accept": "application/json"}
491         response = requests.request(
492             "POST", url, data=json.dumps(body), headers=headers,
493             auth=('admin', 'admin'))
494         self.assertEqual(response.status_code, requests.codes.ok)
495         res = response.json()
496         self.assertIn('Path is calculated',
497                       res['output']['configuration-response-common']['response-message'])
498         time.sleep(5)
499
500     # Test2 success path computation with path description
501     def test_19_success2_path_computation(self):
502         url = ("{}/operations/transportpce-pce:path-computation-request"
503                .format(self.restconf_baseurl))
504         body = {"input": {
505                 "service-name": "service 1",
506                 "resource-reserve": "true",
507                 "service-handler-header": {
508                     "request-id": "request 1"
509                 },
510                 "service-a-end": {
511                     "service-rate": "100",
512                     "service-format": "Ethernet",
513                     "node-id": "XPONDER-1-2",
514                     "clli": "ORANGE1"
515                 },
516                 "service-z-end": {
517                     "service-rate": "100",
518                     "service-format": "Ethernet",
519                     "node-id": "XPONDER-3-2",
520                     "clli": "ORANGE3"
521                 },
522                 "pce-metric": "hop-count"
523                 }
524                 }
525         headers = {'content-type': 'application/json',
526                    "Accept": "application/json"}
527         response = requests.request(
528             "POST", url, data=json.dumps(body), headers=headers,
529             auth=('admin', 'admin'))
530         self.assertEqual(response.status_code, requests.codes.ok)
531         res = response.json()
532         self.assertIn('Path is calculated',
533                       res['output']['configuration-response-common']['response-message'])
534         self.assertEqual(5, res['output']['response-parameters']['path-description']
535                          ['aToZ-direction']['aToZ-wavelength-number'])
536         self.assertEqual(5, res['output']['response-parameters']['path-description']
537                          ['zToA-direction']['zToA-wavelength-number'])
538         time.sleep(5)
539
540     # Test3 success path computation with hard-constraints exclude
541     def test_20_success3_path_computation(self):
542         url = ("{}/operations/transportpce-pce:path-computation-request"
543                .format(self.restconf_baseurl))
544         body = {"input": {
545                 "service-name": "service 1",
546                 "resource-reserve": "true",
547                 "service-handler-header": {
548                     "request-id": "request 1"
549                 },
550                 "service-a-end": {
551                     "service-rate": "100",
552                     "service-format": "Ethernet",
553                     "node-id": "XPONDER-1-2",
554                     "clli": "ORANGE1"
555                 },
556                 "service-z-end": {
557                     "service-rate": "100",
558                     "service-format": "Ethernet",
559                     "node-id": "XPONDER-3-2",
560                     "clli": "ORANGE3"
561                 },
562                 "hard-constraints": {
563                     "exclude_": {
564                         "node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]
565                     }
566                 },
567                 "pce-metric": "hop-count"
568                 }
569                 }
570         headers = {'content-type': 'application/json',
571                    "Accept": "application/json"}
572         response = requests.request(
573             "POST", url, data=json.dumps(body), headers=headers,
574             auth=('admin', 'admin'))
575         self.assertEqual(response.status_code, requests.codes.ok)
576         res = response.json()
577         self.assertIn('Path is calculated',
578                       res['output']['configuration-response-common']['response-message'])
579         self.assertEqual(9, res['output']['response-parameters']['path-description']
580                          ['aToZ-direction']['aToZ-wavelength-number'])
581         self.assertEqual(9, res['output']['response-parameters']['path-description']
582                          ['zToA-direction']['zToA-wavelength-number'])
583         time.sleep(5)
584
585     # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
586     def test_21_path_computation_before_oms_attribute_deletion(self):
587         url = ("{}/operations/transportpce-pce:path-computation-request"
588                .format(self.restconf_baseurl))
589         body = {"input": {
590                 "service-name": "service 1",
591                 "resource-reserve": "true",
592                 "service-handler-header": {
593                     "request-id": "request 1"
594                 },
595                 "service-a-end": {
596                     "service-rate": "100",
597                     "service-format": "Ethernet",
598                     "node-id": "XPONDER-2-2",
599                     "clli": "ORANGE2"
600                 },
601                 "service-z-end": {
602                     "service-rate": "100",
603                     "service-format": "Ethernet",
604                     "node-id": "XPONDER-1-2",
605                     "clli": "ORANGE1"
606                 },
607                 "pce-metric": "hop-count"
608                 }
609                 }
610         headers = {'content-type': 'application/json',
611                    "Accept": "application/json"}
612         response = requests.request(
613             "POST", url, data=json.dumps(body), headers=headers,
614             auth=('admin', 'admin'))
615         self.assertEqual(response.status_code, requests.codes.ok)
616         res = response.json()
617         self.assertIn('Path is calculated',
618                       res['output']['configuration-response-common']['response-message'])
619         nbElmPath = len(res['output']['response-parameters']['path-description']
620                         ['aToZ-direction']['aToZ'])
621         self.assertEqual(31, nbElmPath)
622         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
623         find = False
624         for i in range(0, nbElmPath):
625             resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
626             if(resource_i == link):
627                 find = True
628         self.assertEqual(find, True)
629         time.sleep(5)
630
631     # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
632     def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
633         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
634                "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2/org-openroadm-network-topology:OMS-attributes/span"
635                .format(self.restconf_baseurl))
636         headers = {'content-type': 'application/xml',
637                    "Accept": "application/json"}
638         response = requests.request(
639             "DELETE", url, headers=headers, auth=('admin', 'admin'))
640         self.assertEqual(response.status_code, requests.codes.ok)
641         time.sleep(2)
642
643     # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
644     def test_23_path_computation_after_oms_attribute_deletion(self):
645         url = ("{}/operations/transportpce-pce:path-computation-request"
646                .format(self.restconf_baseurl))
647         body = {"input": {
648                 "service-name": "service 1",
649                 "resource-reserve": "true",
650                 "service-handler-header": {
651                     "request-id": "request 1"
652                 },
653                 "service-a-end": {
654                     "service-rate": "100",
655                     "service-format": "Ethernet",
656                     "node-id": "XPONDER-2-2",
657                     "clli": "ORANGE2"
658                 },
659                 "service-z-end": {
660                     "service-rate": "100",
661                     "service-format": "Ethernet",
662                     "node-id": "XPONDER-1-2",
663                     "clli": "ORANGE1"
664                 },
665                 "pce-metric": "hop-count"
666                 }
667                 }
668         headers = {'content-type': 'application/json',
669                    "Accept": "application/json"}
670         response = requests.request(
671             "POST", url, data=json.dumps(body), headers=headers,
672             auth=('admin', 'admin'))
673         self.assertEqual(response.status_code, requests.codes.ok)
674         res = response.json()
675         self.assertIn('Path is calculated',
676                       res['output']['configuration-response-common']['response-message'])
677         nbElmPath = len(res['output']['response-parameters']['path-description']
678                         ['aToZ-direction']['aToZ'])
679         self.assertEqual(47, nbElmPath)
680         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
681         find = False
682         for i in range(0, nbElmPath):
683             resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
684             if (resource_i == link):
685                 find = True
686         self.assertNotEqual(find, True)
687         time.sleep(5)
688
689     # Delete complex topology
690     def test_24_delete_complex_topology(self):
691         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
692                .format(self.restconf_baseurl))
693         headers = {'content-type': 'application/xml',
694                    "Accept": "application/json"}
695         response = requests.request(
696             "DELETE", url, headers=headers, auth=('admin', 'admin'))
697         self.assertEqual(response.status_code, requests.codes.ok)
698         time.sleep(2)
699
700     # Test deleted complex topology
701     def test_25_test_topology_complex_deleted(self):
702         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
703                .format(self.restconf_baseurl))
704         headers = {'content-type': 'application/json',
705                    "Accept": "application/json"}
706         response = requests.request(
707             "GET", url, headers=headers, auth=('admin', 'admin'))
708         self.assertEqual(response.status_code, 404)
709         time.sleep(1)
710
711
712 if __name__ == "__main__":
713     unittest.main(verbosity=2)