2d7232eb572c7418cb2eee937c4de7984e181808
[transportpce.git] / tests / transportpce_tests / hybrid / test02_B100G_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
16
17 import base64
18 import unittest
19 import time
20 import requests
21 # pylint: disable=wrong-import-order
22 import sys
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION_221 = '2.2.1'
34     NODE_VERSION_71 = '7.1'
35
36     cr_serv_sample_data = {"input": {
37         "sdnc-request-header": {
38             "request-id": "request-1",
39             "rpc-action": "service-create",
40             "request-system-id": "appname"
41         },
42         "service-name": "service1-OTUC4",
43         "common-id": "commonId",
44         "connection-type": "infrastructure",
45         "service-a-end": {
46             "service-rate": "400",
47             "node-id": "XPDR-A2",
48             "service-format": "OTU",
49             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
50             "clli": "NodeSA",
51             "subrate-eth-sla": {
52                     "subrate-eth-sla": {
53                         "committed-info-rate": "100000",
54                         "committed-burst-size": "64"
55                     }
56             },
57             "tx-direction": {
58                 "port": {
59                     "port-device-name": "XPDR-A2-XPDR2",
60                     "port-type": "fixed",
61                     "port-name": "XPDR2-NETWORK1",
62                     "port-rack": "000000.00",
63                     "port-shelf": "Chassis#1"
64                 },
65                 "lgx": {
66                     "lgx-device-name": "Some lgx-device-name",
67                     "lgx-port-name": "Some lgx-port-name",
68                     "lgx-port-rack": "000000.00",
69                     "lgx-port-shelf": "00"
70                 }
71             },
72             "rx-direction": {
73                 "port": {
74                     "port-device-name": "XPDR-A2-XPDR2",
75                     "port-type": "fixed",
76                     "port-name": "XPDR2-NETWORK1",
77                     "port-rack": "000000.00",
78                     "port-shelf": "Chassis#1"
79                 },
80                 "lgx": {
81                     "lgx-device-name": "Some lgx-device-name",
82                     "lgx-port-name": "Some lgx-port-name",
83                     "lgx-port-rack": "000000.00",
84                     "lgx-port-shelf": "00"
85                 }
86             },
87             "optic-type": "gray"
88         },
89         "service-z-end": {
90             "service-rate": "400",
91             "node-id": "XPDR-C2",
92             "service-format": "OTU",
93             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
94             "clli": "NodeSC",
95             "subrate-eth-sla": {
96                     "subrate-eth-sla": {
97                         "committed-info-rate": "100000",
98                         "committed-burst-size": "64"
99                     }
100             },
101             "tx-direction": {
102                 "port": {
103                     "port-device-name": "XPDR-C2-XPDR2",
104                     "port-type": "fixed",
105                     "port-name": "XPDR2-NETWORK1",
106                     "port-rack": "000000.00",
107                     "port-shelf": "Chassis#1"
108                 },
109                 "lgx": {
110                     "lgx-device-name": "Some lgx-device-name",
111                     "lgx-port-name": "Some lgx-port-name",
112                     "lgx-port-rack": "000000.00",
113                     "lgx-port-shelf": "00"
114                 }
115             },
116             "rx-direction": {
117                 "port": {
118                     "port-device-name": "XPDR-C2-XPDR2",
119                     "port-type": "fixed",
120                     "port-name": "XPDR2-NETWORK1",
121                     "port-rack": "000000.00",
122                     "port-shelf": "Chassis#1"
123                 },
124                 "lgx": {
125                     "lgx-device-name": "Some lgx-device-name",
126                     "lgx-port-name": "Some lgx-port-name",
127                     "lgx-port-rack": "000000.00",
128                     "lgx-port-shelf": "00"
129                 }
130             },
131             "optic-type": "gray"
132         },
133         "due-date": "2018-06-15T00:00:01Z",
134         "operator-contact": "pw1234"
135     }
136     }
137
138     @classmethod
139     def setUpClass(cls):
140         cls.processes = test_utils.start_tpce()
141         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
142                                                ('roadma', cls.NODE_VERSION_221),
143                                                ('roadmc', cls.NODE_VERSION_221),
144                                                ('xpdrc2', cls.NODE_VERSION_71)])
145
146     @classmethod
147     def tearDownClass(cls):
148         # pylint: disable=not-an-iterable
149         for process in cls.processes:
150             test_utils.shutdown_process(process)
151         print("all processes killed")
152
153     def setUp(self):
154         time.sleep(2)
155
156     def test_01_connect_xpdra2(self):
157         response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
158         self.assertEqual(response.status_code,
159                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_02_connect_xpdrc2(self):
162         response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
163         self.assertEqual(response.status_code,
164                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_03_connect_rdma(self):
167         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
168         self.assertEqual(response.status_code,
169                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170
171     def test_04_connect_rdmc(self):
172         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
173         self.assertEqual(response.status_code,
174                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175
176     def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
177         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
178                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
179         self.assertEqual(response.status_code, requests.codes.ok)
180         res = response.json()
181         self.assertIn('Xponder Roadm Link created successfully',
182                       res["output"]["result"])
183         time.sleep(2)
184
185     def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
186         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
187                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
188         self.assertEqual(response.status_code, requests.codes.ok)
189         res = response.json()
190         self.assertIn('Roadm Xponder links created successfully',
191                       res["output"]["result"])
192         time.sleep(2)
193
194     def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
196                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
197         self.assertEqual(response.status_code, requests.codes.ok)
198         res = response.json()
199         self.assertIn('Xponder Roadm Link created successfully',
200                       res["output"]["result"])
201         time.sleep(2)
202
203     def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
204         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
205                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
206         self.assertEqual(response.status_code, requests.codes.ok)
207         res = response.json()
208         self.assertIn('Roadm Xponder links created successfully',
209                       res["output"]["result"])
210         time.sleep(2)
211
212     def test_09_add_omsAttributes_roadma_roadmc(self):
213         # Config ROADMA-ROADMC oms-attributes
214         data = {"span": {
215             "auto-spanloss": "true",
216             "spanloss-base": 11.4,
217             "spanloss-current": 12,
218             "engineered-spanloss": 12.2,
219             "link-concatenation": [{
220                 "SRLG-Id": 0,
221                 "fiber-type": "smf",
222                 "SRLG-length": 100000,
223                 "pmd": 0.5}]}}
224         response = test_utils.add_oms_attr_request(
225             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
226         self.assertEqual(response.status_code, requests.codes.created)
227
228     def test_10_add_omsAttributes_roadmc_roadma(self):
229         # Config ROADMC-ROADMA oms-attributes
230         data = {"span": {
231             "auto-spanloss": "true",
232             "spanloss-base": 11.4,
233             "spanloss-current": 12,
234             "engineered-spanloss": 12.2,
235             "link-concatenation": [{
236                 "SRLG-Id": 0,
237                 "fiber-type": "smf",
238                 "SRLG-length": 100000,
239                 "pmd": 0.5}]}}
240         response = test_utils.add_oms_attr_request(
241             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
242         self.assertEqual(response.status_code, requests.codes.created)
243
244 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
245     def test_11_check_otn_topology(self):
246         response = test_utils.get_otn_topo_request()
247         self.assertEqual(response.status_code, requests.codes.ok)
248         res = response.json()
249         nbNode = len(res['network'][0]['node'])
250         self.assertEqual(nbNode, 4, 'There should be 4 nodes')
251         self.assertNotIn('ietf-network-topology:link', res['network'][0])
252
253     def test_12_create_OTUC4_service(self):
254         response = test_utils.service_create_request(self.cr_serv_sample_data)
255         self.assertEqual(response.status_code, requests.codes.ok)
256         res = response.json()
257         self.assertIn('PCE calculation in progress',
258                       res['output']['configuration-response-common']['response-message'])
259         time.sleep(self.WAITING)
260
261     def test_13_get_OTUC4_service1(self):
262         response = test_utils.get_service_list_request(
263             "services/service1-OTUC4")
264         self.assertEqual(response.status_code, requests.codes.ok)
265         res = response.json()
266         self.assertEqual(
267             res['services'][0]['administrative-state'], 'inService')
268         self.assertEqual(
269             res['services'][0]['service-name'], 'service1-OTUC4')
270         self.assertEqual(
271             res['services'][0]['connection-type'], 'infrastructure')
272         self.assertEqual(
273             res['services'][0]['lifecycle-state'], 'planned')
274         self.assertEqual(
275             res['services'][0]['service-layer'], 'wdm')
276         self.assertEqual(
277             res['services'][0]['service-a-end']['service-rate'], 400)
278         self.assertEqual(
279             res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
280         time.sleep(2)
281         self.assertEqual(
282             res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
283         time.sleep(2)
284
285     # Check correct configuration of devices
286     def test_14_check_interface_och_xpdra2(self):
287         response = test_utils.check_netconf_node_request(
288             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
289         self.assertEqual(response.status_code, requests.codes.ok)
290         res = response.json()
291         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
292                                    'administrative-state': 'inService',
293                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294                                    'type': 'org-openroadm-interfaces:otsi',
295                                    'supporting-port': 'L1'
296                                    }, **res['interface'][0]),
297                              res['interface'][0])
298
299         self.assertDictEqual(
300             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
301                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
302                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
303             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
304
305     def test_15_check_interface_OTSI_GROUP_xpdra2(self):
306         response = test_utils.check_netconf_node_request(
307             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
308         self.assertEqual(response.status_code, requests.codes.ok)
309         res = response.json()
310         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
311                         'administrative-state': 'inService',
312                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
313                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
314                         'type': 'org-openroadm-interfaces:otsi-group',
315                         'supporting-port': 'L1'
316                         }
317         input_dict_2 = {"group-id": 1,
318                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
319                         }
320         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
321                              res['interface'][0])
322         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
323                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
324                              res['interface'][0]
325                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
326
327     def test_16_check_interface_OTUC4_xpdra2(self):
328         response = test_utils.check_netconf_node_request(
329             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
330         self.assertEqual(response.status_code, requests.codes.ok)
331         res = response.json()
332         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
333                         'administrative-state': 'inService',
334                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
335                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
336                         'type': 'org-openroadm-interfaces:otnOtu',
337                         'supporting-port': 'L1'
338                         }
339         input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTUCn',
340                         'degthr-percentage': 100,
341                         'degm-intervals': 2,
342                         'otucn-n-rate': 4
343                         }
344         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
345                              res['interface'][0])
346         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
347                              res['interface'][0]
348                              ['org-openroadm-otn-otu-interfaces:otu'])
349
350     def test_17_check_interface_och_xpdrc2(self):
351         response = test_utils.check_netconf_node_request(
352             "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
353         self.assertEqual(response.status_code, requests.codes.ok)
354         res = response.json()
355         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
356                                    'administrative-state': 'inService',
357                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
358                                    'type': 'org-openroadm-interfaces:otsi',
359                                    'supporting-port': 'L1'
360                                    }, **res['interface'][0]),
361                              res['interface'][0])
362
363         self.assertDictEqual(
364             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
365                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
366                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
367             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
368
369     def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
370         response = test_utils.check_netconf_node_request(
371             "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
372         self.assertEqual(response.status_code, requests.codes.ok)
373         res = response.json()
374         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
375                         'administrative-state': 'inService',
376                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
377                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
378                         'type': 'org-openroadm-interfaces:otsi-group',
379                         'supporting-port': 'L1'
380                         }
381         input_dict_2 = {"group-id": 1,
382                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
383                         }
384         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
385                              res['interface'][0])
386         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
387                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
388                              res['interface'][0]
389                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
390
391     def test_19_check_interface_OTUC4_xpdrc2(self):
392         response = test_utils.check_netconf_node_request(
393             "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
394         self.assertEqual(response.status_code, requests.codes.ok)
395         res = response.json()
396         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
397                         'administrative-state': 'inService',
398                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
399                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
400                         'type': 'org-openroadm-interfaces:otnOtu',
401                         'supporting-port': 'L1'
402                         }
403         input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTUCn',
404                         'degthr-percentage': 100,
405                         'degm-intervals': 2,
406                         'otucn-n-rate': 4
407                         }
408
409         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
410                              res['interface'][0])
411
412         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
413                              res['interface'][0]
414                              ['org-openroadm-otn-otu-interfaces:otu'])
415
416     def test_20_check_no_interface_ODUC4_xpdra2(self):
417         response = test_utils.check_netconf_node_request(
418             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
419         self.assertEqual(response.status_code, requests.codes.conflict)
420         res = response.json()
421         self.assertIn(
422             {"error-type": "application", "error-tag": "data-missing",
423              "error-message": "Request could not be completed because the relevant data model content does not exist"},
424             res['errors']['error'])
425
426     def test_21_check_openroadm_topo_xpdra2(self):
427         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
428         self.assertEqual(response.status_code, requests.codes.ok)
429         res = response.json()
430         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
431         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
432         self.assertEqual({'frequency': 196.08125,
433                           'width': 75},
434                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
435         time.sleep(3)
436
437     def test_22_check_otn_topo_OTUC4_links(self):
438         response = test_utils.get_otn_topo_request()
439         self.assertEqual(response.status_code, requests.codes.ok)
440         res = response.json()
441         nb_links = len(res['network'][0]['ietf-network-topology:link'])
442         self.assertEqual(nb_links, 2)
443         listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
444                       'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
445         for link in res['network'][0]['ietf-network-topology:link']:
446             self.assertIn(link['link-id'], listLinkId)
447             self.assertEqual(
448                 link['transportpce-topology:otn-link-type'], 'OTUC4')
449             self.assertEqual(
450                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
451             self.assertEqual(
452                 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
453             self.assertEqual(
454                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
455             self.assertIn(
456                 link['org-openroadm-common-network:opposite-link'], listLinkId)
457
458 # test service-create for ODU4 service from xpdra2 to xpdrc2
459     def test_23_create_ODUC4_service(self):
460         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
461         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
462         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
463         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
464         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
465         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
466         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
467
468         response = test_utils.service_create_request(self.cr_serv_sample_data)
469         self.assertEqual(response.status_code, requests.codes.ok)
470         res = response.json()
471         self.assertIn('PCE calculation in progress',
472                       res['output']['configuration-response-common']['response-message'])
473         time.sleep(self.WAITING)
474
475     def test_24_get_ODUC4_service1(self):
476         response = test_utils.get_service_list_request(
477             "services/service1-ODUC4")
478         self.assertEqual(response.status_code, requests.codes.ok)
479         res = response.json()
480         self.assertEqual(
481             res['services'][0]['administrative-state'], 'inService')
482         self.assertEqual(
483             res['services'][0]['service-name'], 'service1-ODUC4')
484         self.assertEqual(
485             res['services'][0]['connection-type'], 'infrastructure')
486         self.assertEqual(
487             res['services'][0]['lifecycle-state'], 'planned')
488         self.assertEqual(
489             res['services'][0]['service-layer'], 'wdm')
490         self.assertEqual(
491             res['services'][0]['service-a-end']['service-rate'], 400)
492         self.assertEqual(
493             res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
494         self.assertEqual(
495             res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
496         time.sleep(2)
497
498     def test_25_check_interface_ODUC4_xpdra2(self):
499         response = test_utils.check_netconf_node_request(
500             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
501         self.assertEqual(response.status_code, requests.codes.ok)
502         res = response.json()
503         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
504                         'administrative-state': 'inService',
505                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
506                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
507                         'type': 'org-openroadm-interfaces:otnOdu',
508                         'supporting-port': 'L1'}
509         # SAPI/DAPI are added in the Otu4 renderer
510         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
511                         'rate': 'org-openroadm-otn-common-types:ODUCn',
512                         'expected-dapi': 'Nmbu2MNHvc4=',
513                         'expected-sapi': 'Nmbu2MNHvc4=',
514                         'tx-dapi': 'Nmbu2MNHvc4=',
515                         'tx-sapi': 'LY9PxYJqUbw='}
516
517         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
518                              res['interface'][0])
519         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
520                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
521         self.assertDictEqual(
522             {'payload-type': '22', 'exp-payload-type': '22'},
523             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
524
525     def test_26_check_interface_ODUC4_xpdrc2(self):
526         response = test_utils.check_netconf_node_request(
527             "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
528         self.assertEqual(response.status_code, requests.codes.ok)
529         res = response.json()
530         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
531                         'administrative-state': 'inService',
532                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
533                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
534                         'type': 'org-openroadm-interfaces:otnOdu',
535                         'supporting-port': 'L1'}
536         # SAPI/DAPI are added in the Otu4 renderer
537         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
538                         'rate': 'org-openroadm-otn-common-types:ODUCn',
539                         'tx-sapi': 'Nmbu2MNHvc4=',
540                         'tx-dapi': 'LY9PxYJqUbw=',
541                         'expected-sapi': 'LY9PxYJqUbw=',
542                         'expected-dapi': 'LY9PxYJqUbw='
543                         }
544         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
545                              res['interface'][0])
546         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
547                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
548         self.assertDictEqual(
549             {'payload-type': '22', 'exp-payload-type': '22'},
550             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
551
552     def test_27_check_otn_topo_links(self):
553         response = test_utils.get_otn_topo_request()
554         self.assertEqual(response.status_code, requests.codes.ok)
555         res = response.json()
556         nb_links = len(res['network'][0]['ietf-network-topology:link'])
557         self.assertEqual(nb_links, 4)
558         for link in res['network'][0]['ietf-network-topology:link']:
559             linkId = link['link-id']
560             if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
561                            'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
562                 self.assertEqual(
563                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
564                 self.assertEqual(
565                     link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
566             elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
567                              'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
568                 self.assertEqual(
569                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
570                 self.assertEqual(
571                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
572                 self.assertEqual(
573                     link['transportpce-topology:otn-link-type'], 'ODUC4')
574                 self.assertEqual(
575                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
576                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
577                               ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
578                                'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
579             else:
580                 self.fail("this link should not exist")
581
582     def test_28_check_otn_topo_tp(self):
583         response = test_utils.get_otn_topo_request()
584         res = response.json()
585         for node in res['network'][0]['node']:
586             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
587                 tpList = node['ietf-network-topology:termination-point']
588                 for tp in tpList:
589                     if tp['tp-id'] == 'XPDR2-NETWORK1':
590                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
591                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
592                         self.assertEqual(
593                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
594                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
595                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
596
597 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
598     def test_29_create_100GE_service_1(self):
599         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
600         self.cr_serv_sample_data["input"]["connection-type"] = "service"
601         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
602         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
603         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
604         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
605         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
606         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
607         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
608         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
609         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
610         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611         response = test_utils.service_create_request(self.cr_serv_sample_data)
612         self.assertEqual(response.status_code, requests.codes.ok)
613         res = response.json()
614         self.assertIn('PCE calculation in progress',
615                       res['output']['configuration-response-common']['response-message'])
616         time.sleep(self.WAITING)
617
618     def test_30_get_100GE_service_1(self):
619         response = test_utils.get_service_list_request(
620             "services/service-100GE")
621         self.assertEqual(response.status_code, requests.codes.ok)
622         res = response.json()
623         self.assertEqual(
624             res['services'][0]['administrative-state'], 'inService')
625         self.assertEqual(
626             res['services'][0]['service-name'], 'service-100GE')
627         self.assertEqual(
628             res['services'][0]['connection-type'], 'service')
629         self.assertEqual(
630             res['services'][0]['lifecycle-state'], 'planned')
631         time.sleep(2)
632
633     def test_31_check_interface_100GE_CLIENT_xpdra2(self):
634         response = test_utils.check_netconf_node_request(
635             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
636         self.assertEqual(response.status_code, requests.codes.ok)
637         res = response.json()
638         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
639                         'administrative-state': 'inService',
640                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
641                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
642                         'supporting-port': 'C1'
643                         }
644         input_dict_2 = {'speed': 100000}
645         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
646                              res['interface'][0])
647         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
648                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
649
650     def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
651         response = test_utils.check_netconf_node_request(
652             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
653         self.assertEqual(response.status_code, requests.codes.ok)
654         res = response.json()
655         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
656                         'administrative-state': 'inService',
657                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
658                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
659                         'type': 'org-openroadm-interfaces:otnOdu',
660                         'supporting-port': 'C1'}
661         input_dict_2 = {
662             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
663             'rate': 'org-openroadm-otn-common-types:ODU4',
664             'monitoring-mode': 'terminated'}
665
666         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
667                              res['interface'][0])
668         self.assertDictEqual(dict(input_dict_2,
669                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
670                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
671         self.assertDictEqual(
672             {'payload-type': '07', 'exp-payload-type': '07'},
673             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
674
675     def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
676         response = test_utils.check_netconf_node_request(
677             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
678         self.assertEqual(response.status_code, requests.codes.ok)
679         res = response.json()
680         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
681                         'administrative-state': 'inService',
682                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
683                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
684                         'type': 'org-openroadm-interfaces:otnOdu',
685                         'supporting-port': 'L1'}
686         input_dict_2 = {
687             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
688             'rate': 'org-openroadm-otn-common-types:ODU4',
689             'monitoring-mode': 'not-terminated'}
690         input_dict_3 = {'trib-port-number': 1}
691
692         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
693                              res['interface'][0])
694         self.assertDictEqual(dict(input_dict_2,
695                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
696                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
697         self.assertDictEqual(dict(input_dict_3,
698                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
699                                       'parent-odu-allocation']),
700                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
701         self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
702                       ['opucn-trib-slots'])
703         self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
704                       ['opucn-trib-slots'])
705
706     def test_34_check_ODU4_connection_xpdra2(self):
707         response = test_utils.check_netconf_node_request(
708             "XPDR-A2",
709             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
710         self.assertEqual(response.status_code, requests.codes.ok)
711         res = response.json()
712         input_dict_1 = {
713             'connection-name':
714             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
715             'direction': 'bidirectional'
716         }
717
718         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
719                              res['odu-connection'][0])
720         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
721                              res['odu-connection'][0]['destination'])
722         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
723                              res['odu-connection'][0]['source'])
724
725     def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
726         response = test_utils.check_netconf_node_request(
727             "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
728         self.assertEqual(response.status_code, requests.codes.ok)
729         res = response.json()
730         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
731                         'administrative-state': 'inService',
732                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
733                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
734                         'supporting-port': 'C1'
735                         }
736         input_dict_2 = {'speed': 100000}
737         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
738                              res['interface'][0])
739         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
740                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
741
742     def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
743         response = test_utils.check_netconf_node_request(
744             "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
745         self.assertEqual(response.status_code, requests.codes.ok)
746         res = response.json()
747         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
748                         'administrative-state': 'inService',
749                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
750                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
751                         'type': 'org-openroadm-interfaces:otnOdu',
752                         'supporting-port': 'C1'}
753         input_dict_2 = {
754             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
755             'rate': 'org-openroadm-otn-common-types:ODU4',
756             'monitoring-mode': 'terminated'}
757
758         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
759                              res['interface'][0])
760         self.assertDictEqual(dict(input_dict_2,
761                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
762                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
763         self.assertDictEqual(
764             {'payload-type': '07', 'exp-payload-type': '07'},
765             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
766
767     def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
768         response = test_utils.check_netconf_node_request(
769             "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
770         self.assertEqual(response.status_code, requests.codes.ok)
771         res = response.json()
772         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
773                         'administrative-state': 'inService',
774                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
775                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
776                         'type': 'org-openroadm-interfaces:otnOdu',
777                         'supporting-port': 'C1'}
778         input_dict_2 = {
779             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
780             'rate': 'org-openroadm-otn-common-types:ODU4',
781             'monitoring-mode': 'not-terminated'}
782
783         input_dict_3 = {'trib-port-number': 1}
784
785         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
786                              res['interface'][0])
787         self.assertDictEqual(dict(input_dict_2,
788                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
789                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
790         self.assertDictEqual(dict(input_dict_3,
791                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
792                                       'parent-odu-allocation']),
793                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
794             'parent-odu-allocation'])
795         self.assertIn('1.1',
796                       res['interface'][0][
797                           'org-openroadm-otn-odu-interfaces:odu'][
798                           'parent-odu-allocation']['opucn-trib-slots'])
799         self.assertIn('1.20',
800                       res['interface'][0][
801                           'org-openroadm-otn-odu-interfaces:odu'][
802                           'parent-odu-allocation']['opucn-trib-slots'])
803
804     def test_38_check_ODU4_connection_xpdrc2(self):
805         response = test_utils.check_netconf_node_request(
806             "XPDR-C2",
807             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
808         self.assertEqual(response.status_code, requests.codes.ok)
809         res = response.json()
810         input_dict_1 = {
811             'connection-name':
812             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
813             'direction': 'bidirectional'
814         }
815
816         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
817                              res['odu-connection'][0])
818         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
819                              res['odu-connection'][0]['destination'])
820         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
821                              res['odu-connection'][0]['source'])
822
823     def test_39_check_otn_topo_links(self):
824         response = test_utils.get_otn_topo_request()
825         self.assertEqual(response.status_code, requests.codes.ok)
826         res = response.json()
827         nb_links = len(res['network'][0]['ietf-network-topology:link'])
828         self.assertEqual(nb_links, 4)
829         for link in res['network'][0]['ietf-network-topology:link']:
830             linkId = link['link-id']
831             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
832                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
833                 self.assertEqual(
834                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
835                 self.assertEqual(
836                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
837
838     def test_40_check_otn_topo_tp(self):
839         response = test_utils.get_otn_topo_request()
840         res = response.json()
841         for node in res['network'][0]['node']:
842             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
843                 tpList = node['ietf-network-topology:termination-point']
844                 for tp in tpList:
845                     if tp['tp-id'] == 'XPDR2-NETWORK1':
846                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
847                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
848                         tsPoolList = list(range(1, 20))
849                         self.assertNotIn(
850                             tsPoolList, xpdrTpPortConAt['ts-pool'])
851                         self.assertEqual(
852                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
853                         self.assertNotIn(
854                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
855
856 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
857     def test_41_create_100GE_service_2(self):
858         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
859         self.cr_serv_sample_data["input"]["connection-type"] = "service"
860         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
861         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
862         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
863         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
864         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
865         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
866         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
867         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
868         response = test_utils.service_create_request(self.cr_serv_sample_data)
869         self.assertEqual(response.status_code, requests.codes.ok)
870         res = response.json()
871         self.assertIn('PCE calculation in progress',
872                       res['output']['configuration-response-common']['response-message'])
873         time.sleep(self.WAITING)
874
875     def test_42_get_100GE_service_2(self):
876         response = test_utils.get_service_list_request(
877             "services/service-100GE2")
878         self.assertEqual(response.status_code, requests.codes.ok)
879         res = response.json()
880         self.assertEqual(
881             res['services'][0]['administrative-state'], 'inService')
882         self.assertEqual(
883             res['services'][0]['service-name'], 'service-100GE2')
884         self.assertEqual(
885             res['services'][0]['connection-type'], 'service')
886         self.assertEqual(
887             res['services'][0]['lifecycle-state'], 'planned')
888         time.sleep(2)
889
890     def test_43_check_service_list(self):
891         response = test_utils.get_service_list_request("")
892         self.assertEqual(response.status_code, requests.codes.ok)
893         res = response.json()
894         self.assertEqual(len(res['service-list']['services']), 4)
895         time.sleep(2)
896
897     def test_44_check_otn_topo_links(self):
898         response = test_utils.get_otn_topo_request()
899         self.assertEqual(response.status_code, requests.codes.ok)
900         res = response.json()
901         nb_links = len(res['network'][0]['ietf-network-topology:link'])
902         self.assertEqual(nb_links, 4)
903         for link in res['network'][0]['ietf-network-topology:link']:
904             linkId = link['link-id']
905             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
906                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
907                 self.assertEqual(
908                     link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
909                 self.assertEqual(
910                     link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
911
912     def test_45_check_otn_topo_tp(self):
913         response = test_utils.get_otn_topo_request()
914         res = response.json()
915         for node in res['network'][0]['node']:
916             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
917                 tpList = node['ietf-network-topology:termination-point']
918                 for tp in tpList:
919                     if tp['tp-id'] == 'XPDR2-NETWORK1':
920                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
921                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
922                         tsPoolList = list(range(1, 40))
923                         self.assertNotIn(
924                             tsPoolList, xpdrTpPortConAt['ts-pool'])
925                         self.assertEqual(
926                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
927                         self.assertNotIn(
928                             2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
929
930     def test_46_delete_100GE_service_2(self):
931         response = test_utils.service_delete_request("service-100GE2")
932         self.assertEqual(response.status_code, requests.codes.ok)
933         res = response.json()
934         self.assertIn('Renderer service delete in progress',
935                       res['output']['configuration-response-common']['response-message'])
936         time.sleep(self.WAITING)
937
938     def test_47_delete_100GE_service_1(self):
939         response = test_utils.service_delete_request("service-100GE")
940         self.assertEqual(response.status_code, requests.codes.ok)
941         res = response.json()
942         self.assertIn('Renderer service delete in progress',
943                       res['output']['configuration-response-common']['response-message'])
944         time.sleep(self.WAITING)
945
946     def test_48_check_service_list(self):
947         response = test_utils.get_service_list_request("")
948         self.assertEqual(response.status_code, requests.codes.ok)
949         res = response.json()
950         self.assertEqual(len(res['service-list']['services']), 2)
951         time.sleep(2)
952
953     def test_49_check_no_ODU4_connection_xpdra2(self):
954         response = test_utils.check_netconf_node_request("XPDR-A2", "")
955         self.assertEqual(response.status_code, requests.codes.ok)
956         res = response.json()
957         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
958         time.sleep(1)
959
960     def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
961         response = test_utils.check_netconf_node_request(
962             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
963         self.assertEqual(response.status_code, requests.codes.conflict)
964
965     def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
966         response = test_utils.check_netconf_node_request(
967             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
968         self.assertEqual(response.status_code, requests.codes.conflict)
969
970     def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
971         response = test_utils.check_netconf_node_request(
972             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
973         self.assertEqual(response.status_code, requests.codes.conflict)
974
975     def test_53_check_otn_topo_links(self):
976         response = test_utils.get_otn_topo_request()
977         self.assertEqual(response.status_code, requests.codes.ok)
978         res = response.json()
979         nb_links = len(res['network'][0]['ietf-network-topology:link'])
980         self.assertEqual(nb_links, 4)
981         for link in res['network'][0]['ietf-network-topology:link']:
982             linkId = link['link-id']
983             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
984                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
985                 self.assertEqual(
986                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
987                 self.assertEqual(
988                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
989
990     def test_54_check_otn_topo_tp(self):
991         response = test_utils.get_otn_topo_request()
992         res = response.json()
993         for node in res['network'][0]['node']:
994             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
995                 tpList = node['ietf-network-topology:termination-point']
996                 for tp in tpList:
997                     if tp['tp-id'] == 'XPDR2-NETWORK1':
998                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
999                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1000                         self.assertEqual(
1001                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1002
1003     def test_55_delete_ODUC4_service(self):
1004         response = test_utils.service_delete_request("service1-ODUC4")
1005         self.assertEqual(response.status_code, requests.codes.ok)
1006         res = response.json()
1007         self.assertIn('Renderer service delete in progress',
1008                       res['output']['configuration-response-common']['response-message'])
1009         time.sleep(self.WAITING)
1010
1011     def test_56_check_service_list(self):
1012         response = test_utils.get_service_list_request("")
1013         self.assertEqual(response.status_code, requests.codes.ok)
1014         res = response.json()
1015         self.assertEqual(len(res['service-list']['services']), 1)
1016         time.sleep(2)
1017
1018     def test_57_check_no_interface_ODU4_xpdra2(self):
1019         response = test_utils.check_netconf_node_request(
1020             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1021         self.assertEqual(response.status_code, requests.codes.conflict)
1022
1023     def test_58_check_otn_topo_links(self):
1024         self.test_22_check_otn_topo_OTUC4_links()
1025
1026     def test_59_check_otn_topo_tp(self):
1027         response = test_utils.get_otn_topo_request()
1028         res = response.json()
1029         for node in res['network'][0]['node']:
1030             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1031                 tpList = node['ietf-network-topology:termination-point']
1032                 for tp in tpList:
1033                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1034                         self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1035                                          dict.keys(tp))
1036
1037     def test_60_delete_OTUC4_service(self):
1038         response = test_utils.service_delete_request("service1-OTUC4")
1039         self.assertEqual(response.status_code, requests.codes.ok)
1040         res = response.json()
1041         self.assertIn('Renderer service delete in progress',
1042                       res['output']['configuration-response-common']['response-message'])
1043         time.sleep(self.WAITING)
1044
1045     def test_61_get_no_service(self):
1046         response = test_utils.get_service_list_request("")
1047         self.assertEqual(response.status_code, requests.codes.conflict)
1048         res = response.json()
1049         self.assertIn(
1050             {"error-type": "application", "error-tag": "data-missing",
1051              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1052             res['errors']['error'])
1053         time.sleep(1)
1054
1055     def test_62_check_no_interface_OTUC4_xpdra2(self):
1056         response = test_utils.check_netconf_node_request(
1057             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1058         self.assertEqual(response.status_code, requests.codes.conflict)
1059
1060     def test_63_check_no_interface_OCH_xpdra2(self):
1061         response = test_utils.check_netconf_node_request(
1062             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1063         self.assertEqual(response.status_code, requests.codes.conflict)
1064
1065     def test_64_check_no_interface_OTSI_xpdra2(self):
1066         response = test_utils.check_netconf_node_request(
1067             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1068         self.assertEqual(response.status_code, requests.codes.conflict)
1069
1070     def test_65_getLinks_OtnTopology(self):
1071         response = test_utils.get_otn_topo_request()
1072         self.assertEqual(response.status_code, requests.codes.ok)
1073         res = response.json()
1074         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1075
1076     def test_66_check_openroadm_topo_xpdra2(self):
1077         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1078         self.assertEqual(response.status_code, requests.codes.ok)
1079         res = response.json()
1080         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1081         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1082         self.assertNotIn('wavelength', dict.keys(
1083             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1084         time.sleep(3)
1085
1086     def test_67_check_openroadm_topology(self):
1087         response = test_utils.get_ordm_topo_request("")
1088         self.assertEqual(response.status_code, requests.codes.ok)
1089         res = response.json()
1090         links = res['network'][0]['ietf-network-topology:link']
1091         self.assertEqual(22, len(links), 'Topology should contain 22 links')
1092
1093     def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1094         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1095                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1096         self.assertEqual(response.status_code, requests.codes.ok)
1097         res = response.json()
1098         self.assertIn('Xponder Roadm Link created successfully',
1099                       res["output"]["result"])
1100         time.sleep(2)
1101
1102     def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1103         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1104                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1105         self.assertEqual(response.status_code, requests.codes.ok)
1106         res = response.json()
1107         self.assertIn('Roadm Xponder links created successfully',
1108                       res["output"]["result"])
1109         time.sleep(2)
1110
1111     def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1112         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1113                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1114         self.assertEqual(response.status_code, requests.codes.ok)
1115         res = response.json()
1116         self.assertIn('Xponder Roadm Link created successfully',
1117                       res["output"]["result"])
1118         time.sleep(2)
1119
1120     def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1121         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1122                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1123         self.assertEqual(response.status_code, requests.codes.ok)
1124         res = response.json()
1125         self.assertIn('Roadm Xponder links created successfully',
1126                       res["output"]["result"])
1127         time.sleep(2)
1128
1129 # test service-create for 400GE service from xpdra2 to xpdrc2
1130     def test_72_create_400GE_service(self):
1131         self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1132         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1133         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1134         response = test_utils.service_create_request(self.cr_serv_sample_data)
1135         self.assertEqual(response.status_code, requests.codes.ok)
1136         res = response.json()
1137         self.assertIn('PCE calculation in progress',
1138                       res['output']['configuration-response-common']['response-message'])
1139         time.sleep(self.WAITING)
1140
1141     def test_73_get_400GE_service(self):
1142         response = test_utils.get_service_list_request(
1143             "services/service-400GE")
1144         self.assertEqual(response.status_code, requests.codes.ok)
1145         res = response.json()
1146         self.assertEqual(
1147             res['services'][0]['administrative-state'], 'inService')
1148         self.assertEqual(
1149             res['services'][0]['service-name'], 'service-400GE')
1150         self.assertEqual(
1151             res['services'][0]['connection-type'], 'service')
1152         self.assertEqual(
1153             res['services'][0]['lifecycle-state'], 'planned')
1154         time.sleep(2)
1155
1156     def test_74_check_xc1_roadma(self):
1157         response = test_utils.check_netconf_node_request(
1158             "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1159         self.assertEqual(response.status_code, requests.codes.ok)
1160         res = response.json()
1161         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1162         self.assertDictEqual(
1163             dict({
1164                 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1165                 'opticalControlMode': 'gainLoss',
1166                 'target-output-power': -3.0
1167             }, **res['roadm-connections'][0]),
1168             res['roadm-connections'][0]
1169         )
1170         self.assertDictEqual(
1171             {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1172             res['roadm-connections'][0]['source'])
1173         self.assertDictEqual(
1174             {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1175             res['roadm-connections'][0]['destination'])
1176         time.sleep(5)
1177
1178     def test_75_check_topo_xpdra2(self):
1179         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1180         self.assertEqual(response.status_code, requests.codes.ok)
1181         res = response.json()
1182         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1183         for ele in liste_tp:
1184             if ele['tp-id'] == 'XPDR1-NETWORK1':
1185                 self.assertEqual({'frequency': 196.08125,
1186                                   'width': 75},
1187                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1188             if ele['tp-id'] == 'XPDR1-CLIENT1':
1189                 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1190         time.sleep(3)
1191
1192     def test_76_check_topo_roadma_SRG1(self):
1193         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1194         self.assertEqual(response.status_code, requests.codes.ok)
1195         res = response.json()
1196         freq_map = base64.b64decode(
1197             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1198         freq_map_array = [int(x) for x in freq_map]
1199         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1200         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1201         for ele in liste_tp:
1202             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1203                 freq_map = base64.b64decode(
1204                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1205                 freq_map_array = [int(x) for x in freq_map]
1206                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1207             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1208                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1209         time.sleep(3)
1210
1211     def test_77_check_topo_roadma_DEG1(self):
1212         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1213         self.assertEqual(response.status_code, requests.codes.ok)
1214         res = response.json()
1215         freq_map = base64.b64decode(
1216             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1217         freq_map_array = [int(x) for x in freq_map]
1218         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1219         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1220         for ele in liste_tp:
1221             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1222                 freq_map = base64.b64decode(
1223                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1224                 freq_map_array = [int(x) for x in freq_map]
1225                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1226             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1227                 freq_map = base64.b64decode(
1228                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1229                 freq_map_array = [int(x) for x in freq_map]
1230                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1231         time.sleep(3)
1232
1233     def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1234         response = test_utils.check_netconf_node_request(
1235             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1236         self.assertEqual(response.status_code, requests.codes.ok)
1237         res = response.json()
1238         input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1239                         'administrative-state': 'inService',
1240                         'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1241                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1242                         'supporting-port': 'C1'
1243                         }
1244         input_dict_2 = {'speed': 400000}
1245         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1246                              res['interface'][0])
1247         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1248                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1249
1250     def test_79_check_interface_OTSI_xpdra2(self):
1251         response = test_utils.check_netconf_node_request(
1252             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1253         self.assertEqual(response.status_code, requests.codes.ok)
1254         res = response.json()
1255         input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1256                         'administrative-state': 'inService',
1257                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1258                         'type': 'org-openroadm-interfaces:otsi',
1259                         'supporting-port': 'L1'}
1260         input_dict_2 = {
1261             "frequency": 196.0812,
1262             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1263             "fec": "org-openroadm-common-types:ofec",
1264             "transmit-power": -5,
1265             "provision-mode": "explicit",
1266             "modulation-format": "dp-qam16"}
1267
1268         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1269                              res['interface'][0])
1270         self.assertDictEqual(dict(input_dict_2,
1271                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1272                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1273         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1274                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1275
1276     def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1277         response = test_utils.check_netconf_node_request(
1278             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1279         self.assertEqual(response.status_code, requests.codes.ok)
1280         res = response.json()
1281         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1282                         'administrative-state': 'inService',
1283                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1284                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1285                         'type': 'org-openroadm-interfaces:otsi-group',
1286                         'supporting-port': 'L1'}
1287         input_dict_2 = {"group-id": 1,
1288                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1289
1290         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1291                              res['interface'][0])
1292         self.assertDictEqual(dict(input_dict_2,
1293                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1294                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1295
1296     def test_81_check_interface_OTUC4_xpdra2(self):
1297         response = test_utils.check_netconf_node_request(
1298             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1299         self.assertEqual(response.status_code, requests.codes.ok)
1300         res = response.json()
1301         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1302                         'administrative-state': 'inService',
1303                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1304                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1305                         'type': 'org-openroadm-interfaces:otnOtu',
1306                         'supporting-port': 'L1'}
1307         input_dict_2 = {"tx-sapi": "AIGiVAQ4gDil", "rate": "org-openroadm-otn-common-types:OTUCn",
1308                         "degthr-percentage": 100,
1309                         "tim-detect-mode": "Disabled",
1310                         "otucn-n-rate": 4,
1311                         "degm-intervals": 2,
1312                         "expected-dapi": "AIGiVAQ4gDil"}
1313
1314         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1315                              res['interface'][0])
1316         self.assertDictEqual(dict(input_dict_2,
1317                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1318                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1319
1320     def test_82_check_interface_ODUC4_xpdra2(self):
1321         response = test_utils.check_netconf_node_request(
1322             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1323         self.assertEqual(response.status_code, requests.codes.ok)
1324         res = response.json()
1325         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1326                         'administrative-state': 'inService',
1327                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1328                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1329                         'type': 'org-openroadm-interfaces:otnOdu',
1330                         'supporting-port': 'L1'}
1331         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1332                         "tim-detect-mode": "Disabled",
1333                         "degm-intervals": 2,
1334                         "degthr-percentage": 100,
1335                         "monitoring-mode": "terminated",
1336                         "rate": "org-openroadm-otn-common-types:ODUCn",
1337                         "oducn-n-rate": 4}
1338         input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1339
1340         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1341                              res['interface'][0])
1342         self.assertDictEqual(dict(input_dict_2,
1343                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1344                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1345         self.assertDictEqual(dict(input_dict_3,
1346                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1347                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1348
1349     def test_83_delete_400GE_service(self):
1350         response = test_utils.service_delete_request("service-400GE")
1351         self.assertEqual(response.status_code, requests.codes.ok)
1352         res = response.json()
1353         self.assertIn('Renderer service delete in progress',
1354                       res['output']['configuration-response-common']['response-message'])
1355         time.sleep(self.WAITING)
1356
1357     def test_84_get_no_service(self):
1358         response = test_utils.get_service_list_request("")
1359         self.assertEqual(response.status_code, requests.codes.conflict)
1360         res = response.json()
1361         self.assertIn(
1362             {"error-type": "application", "error-tag": "data-missing",
1363              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1364             res['errors']['error'])
1365         time.sleep(1)
1366
1367     def test_85_check_no_interface_ODUC4_xpdra2(self):
1368         response = test_utils.check_netconf_node_request(
1369             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1370         self.assertEqual(response.status_code, requests.codes.conflict)
1371
1372     def test_86_check_no_interface_OTUC4_xpdra2(self):
1373         response = test_utils.check_netconf_node_request(
1374             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1375         self.assertEqual(response.status_code, requests.codes.conflict)
1376
1377     def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1378         response = test_utils.check_netconf_node_request(
1379             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1380         self.assertEqual(response.status_code, requests.codes.conflict)
1381
1382     def test_88_check_no_interface_OTSI_xpdra2(self):
1383         response = test_utils.check_netconf_node_request(
1384             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1385         self.assertEqual(response.status_code, requests.codes.conflict)
1386
1387     def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1388         response = test_utils.check_netconf_node_request(
1389             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1390         self.assertEqual(response.status_code, requests.codes.conflict)
1391
1392     def test_90_disconnect_xponders_from_roadm(self):
1393         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1394         response = test_utils.get_ordm_topo_request("")
1395         self.assertEqual(response.status_code, requests.codes.ok)
1396         res = response.json()
1397         links = res['network'][0]['ietf-network-topology:link']
1398         for link in links:
1399             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1400                 link_name = link["link-id"]
1401                 response = test_utils.delete_request(url+link_name)
1402                 self.assertEqual(response.status_code, requests.codes.ok)
1403
1404     def test_91_disconnect_xpdra2(self):
1405         response = test_utils.unmount_device("XPDR-A2")
1406         self.assertEqual(response.status_code, requests.codes.ok,
1407                          test_utils.CODE_SHOULD_BE_200)
1408
1409     def test_92_disconnect_xpdrc2(self):
1410         response = test_utils.unmount_device("XPDR-C2")
1411         self.assertEqual(response.status_code, requests.codes.ok,
1412                          test_utils.CODE_SHOULD_BE_200)
1413
1414     def test_93_disconnect_roadmA(self):
1415         response = test_utils.unmount_device("ROADM-A1")
1416         self.assertEqual(response.status_code, requests.codes.ok,
1417                          test_utils.CODE_SHOULD_BE_200)
1418
1419     def test_94_disconnect_roadmC(self):
1420         response = test_utils.unmount_device("ROADM-C1")
1421         self.assertEqual(response.status_code, requests.codes.ok,
1422                          test_utils.CODE_SHOULD_BE_200)
1423
1424
1425 if __name__ == "__main__":
1426     unittest.main(verbosity=2)