5375dabc7e75c216f305ef20e29e6b7808627842
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_pce.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 import json
12 import os
13 import psutil
14 import requests
15 import signal
16 import shutil
17 import subprocess
18 import time
19 import unittest
20 import test_utils
21
22
23 class TransportPCEtesting(unittest.TestCase):
24
25     odl_process = None
26     simple_topo_bi_dir_data = None
27     simple_topo_uni_dir_data = None
28     complex_topo_uni_dir_data = None
29     restconf_baseurl = "http://localhost:8181/restconf"
30
31     @classmethod
32     def _get_file(cls):
33         topo_bi_dir_file = "sample_configs/honeynode-topo.xml"
34         if os.path.isfile(topo_bi_dir_file):
35             with open(topo_bi_dir_file, 'r') as topo_bi_dir:
36                 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
37         topo_uni_dir_file = "sample_configs/NW-simple-topology.xml"
38         if os.path.isfile(topo_uni_dir_file):
39             with open(topo_uni_dir_file, 'r') as topo_uni_dir:
40                 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
41         topo_uni_dir_complex_file = "sample_configs/NW-for-test-5-4.xml"
42         if os.path.isfile(topo_uni_dir_complex_file):
43             with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
44                 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
45
46     @classmethod
47     def setUpClass(cls):  # a class method called before tests in an individual class run.
48         cls._get_file()
49         cls.odl_process = test_utils.start_tpce()
50         time.sleep(90)
51         print("opendaylight started")
52
53     @classmethod
54     def tearDownClass(cls):
55         for child in psutil.Process(cls.odl_process.pid).children():
56             child.send_signal(signal.SIGINT)
57             child.wait()
58         cls.odl_process.send_signal(signal.SIGINT)
59         cls.odl_process.wait()
60
61     def setUp(self):  # instruction executed before each test method
62         time.sleep(1)
63
64      # Load simple bidirectional topology
65     def test_01_load_simple_topology_bi(self):
66         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
67                .format(self.restconf_baseurl))
68         body = self.simple_topo_bi_dir_data
69         headers = {'content-type': 'application/xml',
70                    "Accept": "application/xml"}
71         response = requests.request(
72             "PUT", url, data=body, headers=headers,
73             auth=('admin', 'admin'))
74         self.assertEqual(response.status_code, requests.codes.ok)
75         time.sleep(2)
76
77     # Get existing nodeId
78     def test_02_get_nodeId(self):
79         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
80                .format(self.restconf_baseurl))
81         headers = {'content-type': 'application/json',
82                    "Accept": "application/json"}
83         response = requests.request(
84             "GET", url, headers=headers, auth=('admin', 'admin'))
85         self.assertEqual(response.status_code, requests.codes.ok)
86         res = response.json()
87         self.assertEqual(
88             res['node'][0]['node-id'], 'ROADMA01-SRG1')
89         time.sleep(1)
90
91     # Get existing linkId
92     def test_03_get_linkId(self):
93         url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
94                .format(self.restconf_baseurl))
95         headers = {'content-type': 'application/json',
96                    "Accept": "application/json"}
97         response = requests.request(
98             "GET", url, headers=headers, auth=('admin', 'admin'))
99         self.assertEqual(response.status_code, requests.codes.ok)
100         res = response.json()
101         self.assertEqual(
102             res['ietf-network-topology:link'][0]['link-id'],
103             'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
104         time.sleep(1)
105
106     # Path Computation success
107     def test_04_path_computation_xpdr_bi(self):
108         url = ("{}/operations/transportpce-pce:path-computation-request"
109                .format(self.restconf_baseurl))
110         body = {"input": {
111                 "service-name": "service-1",
112                 "resource-reserve": "true",
113                 "pce-metric": "hop-count",
114                 "service-handler-header": {
115                     "request-id": "request-1"
116                 },
117                 "service-a-end": {
118                     "node-id": "XPDRA01",
119                     "service-rate": "100",
120                     "service-format": "Ethernet",
121                     "clli": "nodeA"
122                 },
123                 "service-z-end": {
124                     "node-id": "XPDRC01",
125                     "service-rate": "100",
126                     "service-format": "Ethernet",
127                     "clli": "nodeC"
128                 }
129                 }
130                 }
131         headers = {'content-type': 'application/json',
132                    "Accept": "application/json"}
133         response = requests.request(
134             "POST", url, data=json.dumps(body), headers=headers,
135             auth=('admin', 'admin'))
136         self.assertEqual(response.status_code, requests.codes.ok)
137         res = response.json()
138         self.assertIn('Path is calculated',
139                       res['output']['configuration-response-common']['response-message'])
140         time.sleep(5)
141
142     # Path Computation success
143     def test_05_path_computation_rdm_bi(self):
144         url = ("{}/operations/transportpce-pce:path-computation-request"
145                .format(self.restconf_baseurl))
146         body = {"input": {
147                 "service-name": "service-1",
148                 "resource-reserve": "true",
149                 "pce-metric": "hop-count",
150                 "service-handler-header": {
151                     "request-id": "request-1"
152                 },
153                 "service-a-end": {
154                     "node-id": "ROADMA01",
155                     "service-rate": "100",
156                     "service-format": "Ethernet",
157                     "clli": "NodeA"
158                 },
159                 "service-z-end": {
160                     "node-id": "ROADMC01",
161                     "service-rate": "100",
162                     "service-format": "Ethernet",
163                     "clli": "NodeC"
164                 }
165                 }
166                 }
167         headers = {'content-type': 'application/json',
168                    "Accept": "application/json"}
169         response = requests.request(
170             "POST", url, data=json.dumps(body), headers=headers,
171             auth=('admin', 'admin'))
172         self.assertEqual(response.status_code, requests.codes.ok)
173         res = response.json()
174         self.assertIn('Path is calculated',
175                       res['output']['configuration-response-common']['response-message'])
176         time.sleep(5)
177
178     # Delete topology
179     def test_06_delete_simple_topology_bi(self):
180         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
181                .format(self.restconf_baseurl))
182         headers = {'content-type': 'application/xml',
183                    "Accept": "application/json"}
184         response = requests.request(
185             "DELETE", url, headers=headers, auth=('admin', 'admin'))
186         self.assertEqual(response.status_code, requests.codes.ok)
187         time.sleep(2)
188
189     # Test deleted topology
190     def test_07_test_topology_simple_bi_deleted(self):
191         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
192                .format(self.restconf_baseurl))
193         headers = {'content-type': 'application/json',
194                    "Accept": "application/json"}
195         response = requests.request(
196             "GET", url, headers=headers, auth=('admin', 'admin'))
197         self.assertEqual(response.status_code, 404)
198         time.sleep(1)
199
200     # Load simple bidirectional topology
201     def test_08_load_simple_topology_uni(self):
202         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
203                .format(self.restconf_baseurl))
204         body = self.simple_topo_uni_dir_data
205         headers = {'content-type': 'application/xml',
206                    "Accept": "application/xml"}
207         response = requests.request(
208             "PUT", url, data=body, headers=headers,
209             auth=('admin', 'admin'))
210         self.assertEqual(response.status_code, 201)
211         time.sleep(2)
212
213     # Get existing nodeId
214     def test_09_get_nodeId(self):
215         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
216                .format(self.restconf_baseurl))
217         headers = {'content-type': 'application/json',
218                    "Accept": "application/json"}
219         response = requests.request(
220             "GET", url, headers=headers, auth=('admin', 'admin'))
221         self.assertEqual(response.status_code, requests.codes.ok)
222         res = response.json()
223         self.assertEqual(
224             res['node'][0]['node-id'],
225             'XPONDER-1-2')
226         time.sleep(1)
227
228     # Get existing linkId
229     def test_10_get_linkId(self):
230         url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
231                .format(self.restconf_baseurl))
232         headers = {'content-type': 'application/json',
233                    "Accept": "application/json"}
234         response = requests.request(
235             "GET", url, headers=headers, auth=('admin', 'admin'))
236         self.assertEqual(response.status_code, requests.codes.ok)
237         res = response.json()
238         self.assertEqual(
239             res['ietf-network-topology:link'][0]['link-id'],
240             'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
241         time.sleep(1)
242
243     # Path Computation success
244     def test_11_path_computation_xpdr_uni(self):
245         url = ("{}/operations/transportpce-pce:path-computation-request"
246                .format(self.restconf_baseurl))
247         body = {"input": {
248                 "service-name": "service-1",
249                 "resource-reserve": "true",
250                 "pce-metric": "hop-count",
251                 "service-handler-header": {
252                     "request-id": "request-1"
253                 },
254                 "service-a-end": {
255                     "node-id": "XPONDER-1-2",
256                     "service-rate": "100",
257                     "service-format": "Ethernet",
258                     "clli": "ORANGE1"
259                 },
260                 "service-z-end": {
261                     "node-id": "XPONDER-3-2",
262                     "service-rate": "100",
263                     "service-format": "Ethernet",
264                     "clli": "ORANGE3"
265                 }
266                 }
267                 }
268         headers = {'content-type': 'application/json',
269                    "Accept": "application/json"}
270         response = requests.request(
271             "POST", url, data=json.dumps(body), headers=headers,
272             auth=('admin', 'admin'))
273         self.assertEqual(response.status_code, requests.codes.ok)
274         res = response.json()
275         self.assertIn('Path is calculated',
276                       res['output']['configuration-response-common']['response-message'])
277         time.sleep(5)
278
279     # Path Computation success
280     def test_12_path_computation_rdm_uni(self):
281         url = ("{}/operations/transportpce-pce:path-computation-request"
282                .format(self.restconf_baseurl))
283         body = {"input": {
284                 "service-name": "service1",
285                 "resource-reserve": "true",
286                 "service-handler-header": {
287                     "request-id": "request1"
288                 },
289                 "service-a-end": {
290                     "service-rate": "100",
291                     "service-format": "Ethernet",
292                     "clli": "cll21",
293                     "node-id": "OpenROADM-2-1"
294                 },
295                 "service-z-end": {
296                     "service-rate": "100",
297                     "service-format": "Ethernet",
298                     "clli": "ncli22",
299                     "node-id": "OpenROADM-2-2"
300                 },
301                 "pce-metric": "hop-count"
302                 }
303                 }
304         headers = {'content-type': 'application/json',
305                    "Accept": "application/json"}
306         response = requests.request(
307             "POST", url, data=json.dumps(body), headers=headers,
308             auth=('admin', 'admin'))
309         self.assertEqual(response.status_code, requests.codes.ok)
310         res = response.json()
311         self.assertIn('Path is calculated',
312                       res['output']['configuration-response-common']['response-message'])
313         # ZtoA path test
314         atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
315         ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
316         self.assertEqual(atozList, 15)
317         self.assertEqual(ztoaList, 15)
318         for i in range(0, 15):
319             atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
320             ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
321             if (atoz['id'] == '14'):
322                 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
323             if (ztoa['id'] == '0'):
324                 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
325         time.sleep(5)
326
327     # Delete topology
328     def test_13_delete_simple_topology(self):
329         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
330                .format(self.restconf_baseurl))
331         headers = {'content-type': 'application/xml',
332                    "Accept": "application/json"}
333         response = requests.request(
334             "DELETE", url, headers=headers, auth=('admin', 'admin'))
335         self.assertEqual(response.status_code, requests.codes.ok)
336         time.sleep(2)
337
338     # Test deleted topology
339     def test_14_test_topology_simple_deleted(self):
340         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
341                .format(self.restconf_baseurl))
342         headers = {'content-type': 'application/json',
343                    "Accept": "application/json"}
344         response = requests.request(
345             "GET", url, headers=headers, auth=('admin', 'admin'))
346         self.assertEqual(response.status_code, 404)
347         time.sleep(1)
348
349     # Load complex topology
350     def test_15_load_complex_topology(self):
351         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
352                .format(self.restconf_baseurl))
353         body = self.complex_topo_uni_dir_data
354         headers = {'content-type': 'application/xml',
355                    "Accept": "application/json"}
356         response = requests.request(
357             "PUT", url, data=body, headers=headers,
358             auth=('admin', 'admin'))
359         self.assertEqual(response.status_code, 201)
360         time.sleep(2)
361
362     # Get existing nodeId
363     def test_16_get_nodeId(self):
364         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
365                .format(self.restconf_baseurl))
366         headers = {'content-type': 'application/json',
367                    "Accept": "application/json"}
368         response = requests.request(
369             "GET", url, headers=headers, auth=('admin', 'admin'))
370         self.assertEqual(response.status_code, requests.codes.ok)
371         res = response.json()
372         self.assertEqual(
373             res['node'][0]['node-id'],
374             'XPONDER-3-2')
375         time.sleep(1)
376
377     # Test failed path computation
378     def test_17_fail_path_computation(self):
379         url = ("{}/operations/transportpce-pce:path-computation-request"
380                .format(self.restconf_baseurl))
381         body = {"input": {
382                 "service-handler-header": {
383                     "request-id": "request-1"
384                 }
385                 }
386                 }
387         headers = {'content-type': 'application/json',
388                    "Accept": "application/json"}
389         response = requests.request(
390             "POST", url, data=json.dumps(body), headers=headers,
391             auth=('admin', 'admin'))
392         self.assertEqual(response.status_code, requests.codes.ok)
393         res = response.json()
394         self.assertIn('Service Name is not set',
395                       res['output']['configuration-response-common']['response-message'])
396         time.sleep(2)
397
398     # Test1 success path computation
399     def test_18_success1_path_computation(self):
400         url = ("{}/operations/transportpce-pce:path-computation-request"
401                .format(self.restconf_baseurl))
402         body = {"input": {
403                 "service-name": "service1",
404                 "resource-reserve": "true",
405                 "service-handler-header": {
406                     "request-id": "request1"
407                 },
408                 "service-a-end": {
409                     "service-format": "Ethernet",
410                     "service-rate": "100",
411                     "clli": "ORANGE2",
412                     "node-id": "XPONDER-2-2",
413                     "tx-direction": {
414                         "port": {
415                             "port-device-name": "Some port-device-name",
416                             "port-type": "Some port-type",
417                             "port-name": "Some port-name",
418                             "port-rack": "Some port-rack",
419                             "port-shelf": "Some port-shelf",
420                             "port-slot": "Some port-slot",
421                             "port-sub-slot": "Some port-sub-slot"
422                         }
423                     },
424                     "rx-direction": {
425                         "port": {
426                             "port-device-name": "Some port-device-name",
427                             "port-type": "Some port-type",
428                             "port-name": "Some port-name",
429                             "port-rack": "Some port-rack",
430                             "port-shelf": "Some port-shelf",
431                             "port-slot": "Some port-slot",
432                             "port-sub-slot": "Some port-sub-slot"
433                         }
434                     }
435                 },
436                 "service-z-end": {
437                     "service-format": "Ethernet",
438                     "service-rate": "100",
439                     "clli": "ORANGE1",
440                     "node-id": "XPONDER-1-2",
441                     "tx-direction": {
442                         "port": {
443                             "port-device-name": "Some port-device-name",
444                             "port-type": "Some port-type",
445                             "port-name": "Some port-name",
446                             "port-rack": "Some port-rack",
447                             "port-shelf": "Some port-shelf",
448                             "port-slot": "Some port-slot",
449                             "port-sub-slot": "Some port-sub-slot"
450                         }
451                     },
452                     "rx-direction": {
453                         "port": {
454                             "port-device-name": "Some port-device-name",
455                             "port-type": "Some port-type",
456                             "port-name": "Some port-name",
457                             "port-rack": "Some port-rack",
458                             "port-shelf": "Some port-shelf",
459                             "port-slot": "Some port-slot",
460                             "port-sub-slot": "Some port-sub-slot"
461                         }
462                     }
463                 },
464                 "hard-constraints": {
465                     "customer-code": [
466                         "Some customer-code"
467                     ],
468                     "co-routing": {
469                         "existing-service": [
470                             "Some existing-service"
471                         ]
472                     }
473                 },
474                 "soft-constraints": {
475                     "customer-code": [
476                         "Some customer-code"
477                     ],
478                     "co-routing": {
479                         "existing-service": [
480                             "Some existing-service"
481                         ]
482                     }
483                 },
484                 "pce-metric": "hop-count",
485                 "locally-protected-links": "true"
486                 }
487                 }
488         headers = {'content-type': 'application/json',
489                    "Accept": "application/json"}
490         response = requests.request(
491             "POST", url, data=json.dumps(body), headers=headers,
492             auth=('admin', 'admin'))
493         self.assertEqual(response.status_code, requests.codes.ok)
494         res = response.json()
495         self.assertIn('Path is calculated',
496                       res['output']['configuration-response-common']['response-message'])
497         time.sleep(5)
498
499     # Test2 success path computation with path description
500     def test_19_success2_path_computation(self):
501         url = ("{}/operations/transportpce-pce:path-computation-request"
502                .format(self.restconf_baseurl))
503         body = {"input": {
504                 "service-name": "service 1",
505                 "resource-reserve": "true",
506                 "service-handler-header": {
507                     "request-id": "request 1"
508                 },
509                 "service-a-end": {
510                     "service-rate": "100",
511                     "service-format": "Ethernet",
512                     "node-id": "XPONDER-1-2",
513                     "clli": "ORANGE1"
514                 },
515                 "service-z-end": {
516                     "service-rate": "100",
517                     "service-format": "Ethernet",
518                     "node-id": "XPONDER-3-2",
519                     "clli": "ORANGE3"
520                 },
521                 "pce-metric": "hop-count"
522                 }
523                 }
524         headers = {'content-type': 'application/json',
525                    "Accept": "application/json"}
526         response = requests.request(
527             "POST", url, data=json.dumps(body), headers=headers,
528             auth=('admin', 'admin'))
529         self.assertEqual(response.status_code, requests.codes.ok)
530         res = response.json()
531         self.assertIn('Path is calculated',
532                       res['output']['configuration-response-common']['response-message'])
533         self.assertEqual(5, res['output']['response-parameters']['path-description']
534                          ['aToZ-direction']['aToZ-wavelength-number'])
535         self.assertEqual(5, res['output']['response-parameters']['path-description']
536                          ['zToA-direction']['zToA-wavelength-number'])
537         time.sleep(5)
538
539     # Test3 success path computation with hard-constraints exclude
540     def test_20_success3_path_computation(self):
541         url = ("{}/operations/transportpce-pce:path-computation-request"
542                .format(self.restconf_baseurl))
543         body = {"input": {
544                 "service-name": "service 1",
545                 "resource-reserve": "true",
546                 "service-handler-header": {
547                     "request-id": "request 1"
548                 },
549                 "service-a-end": {
550                     "service-rate": "100",
551                     "service-format": "Ethernet",
552                     "node-id": "XPONDER-1-2",
553                     "clli": "ORANGE1"
554                 },
555                 "service-z-end": {
556                     "service-rate": "100",
557                     "service-format": "Ethernet",
558                     "node-id": "XPONDER-3-2",
559                     "clli": "ORANGE3"
560                 },
561                 "hard-constraints": {
562                     "exclude_": {
563                         "node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]
564                     }
565                 },
566                 "pce-metric": "hop-count"
567                 }
568                 }
569         headers = {'content-type': 'application/json',
570                    "Accept": "application/json"}
571         response = requests.request(
572             "POST", url, data=json.dumps(body), headers=headers,
573             auth=('admin', 'admin'))
574         self.assertEqual(response.status_code, requests.codes.ok)
575         res = response.json()
576         self.assertIn('Path is calculated',
577                       res['output']['configuration-response-common']['response-message'])
578         self.assertEqual(9, res['output']['response-parameters']['path-description']
579                          ['aToZ-direction']['aToZ-wavelength-number'])
580         self.assertEqual(9, res['output']['response-parameters']['path-description']
581                          ['zToA-direction']['zToA-wavelength-number'])
582         time.sleep(5)
583
584     # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
585     def test_21_path_computation_before_oms_attribute_deletion(self):
586         url = ("{}/operations/transportpce-pce:path-computation-request"
587                .format(self.restconf_baseurl))
588         body = {"input": {
589                 "service-name": "service 1",
590                 "resource-reserve": "true",
591                 "service-handler-header": {
592                     "request-id": "request 1"
593                 },
594                 "service-a-end": {
595                     "service-rate": "100",
596                     "service-format": "Ethernet",
597                     "node-id": "XPONDER-2-2",
598                     "clli": "ORANGE2"
599                 },
600                 "service-z-end": {
601                     "service-rate": "100",
602                     "service-format": "Ethernet",
603                     "node-id": "XPONDER-1-2",
604                     "clli": "ORANGE1"
605                 },
606                 "pce-metric": "hop-count"
607                 }
608                 }
609         headers = {'content-type': 'application/json',
610                    "Accept": "application/json"}
611         response = requests.request(
612             "POST", url, data=json.dumps(body), headers=headers,
613             auth=('admin', 'admin'))
614         self.assertEqual(response.status_code, requests.codes.ok)
615         res = response.json()
616         self.assertIn('Path is calculated',
617                       res['output']['configuration-response-common']['response-message'])
618         nbElmPath = len(res['output']['response-parameters']['path-description']
619                         ['aToZ-direction']['aToZ'])
620         self.assertEqual(31, nbElmPath)
621         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
622         find = False
623         for i in range(0, nbElmPath):
624             resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
625             if(resource_i == link):
626                 find = True
627         self.assertEqual(find, True)
628         time.sleep(5)
629
630     # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
631     def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
632         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
633                "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2/org-openroadm-network-topology:OMS-attributes/span"
634                .format(self.restconf_baseurl))
635         headers = {'content-type': 'application/xml',
636                    "Accept": "application/json"}
637         response = requests.request(
638             "DELETE", url, headers=headers, auth=('admin', 'admin'))
639         self.assertEqual(response.status_code, requests.codes.ok)
640         time.sleep(2)
641
642     # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
643     def test_23_path_computation_after_oms_attribute_deletion(self):
644         url = ("{}/operations/transportpce-pce:path-computation-request"
645                .format(self.restconf_baseurl))
646         body = {"input": {
647                 "service-name": "service 1",
648                 "resource-reserve": "true",
649                 "service-handler-header": {
650                     "request-id": "request 1"
651                 },
652                 "service-a-end": {
653                     "service-rate": "100",
654                     "service-format": "Ethernet",
655                     "node-id": "XPONDER-2-2",
656                     "clli": "ORANGE2"
657                 },
658                 "service-z-end": {
659                     "service-rate": "100",
660                     "service-format": "Ethernet",
661                     "node-id": "XPONDER-1-2",
662                     "clli": "ORANGE1"
663                 },
664                 "pce-metric": "hop-count"
665                 }
666                 }
667         headers = {'content-type': 'application/json',
668                    "Accept": "application/json"}
669         response = requests.request(
670             "POST", url, data=json.dumps(body), headers=headers,
671             auth=('admin', 'admin'))
672         self.assertEqual(response.status_code, requests.codes.ok)
673         res = response.json()
674         self.assertIn('Path is calculated',
675                       res['output']['configuration-response-common']['response-message'])
676         nbElmPath = len(res['output']['response-parameters']['path-description']
677                         ['aToZ-direction']['aToZ'])
678         self.assertEqual(47, nbElmPath)
679         link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
680         find = False
681         for i in range(0, nbElmPath):
682             resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
683             if (resource_i == link):
684                 find = True
685         self.assertNotEqual(find, True)
686         time.sleep(5)
687
688     # Delete complex topology
689     def test_24_delete_complex_topology(self):
690         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
691                .format(self.restconf_baseurl))
692         headers = {'content-type': 'application/xml',
693                    "Accept": "application/json"}
694         response = requests.request(
695             "DELETE", url, headers=headers, auth=('admin', 'admin'))
696         self.assertEqual(response.status_code, requests.codes.ok)
697         time.sleep(2)
698
699     # Test deleted complex topology
700     def test_25_test_topology_complex_deleted(self):
701         url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
702                .format(self.restconf_baseurl))
703         headers = {'content-type': 'application/json',
704                    "Accept": "application/json"}
705         response = requests.request(
706             "GET", url, headers=headers, auth=('admin', 'admin'))
707         self.assertEqual(response.status_code, 404)
708         time.sleep(1)
709
710
711 if __name__ == "__main__":
712     unittest.main(verbosity=2)