988d4240802477832f749df31e76dadb2c2f42ce
[transportpce.git] / tests / transportpce_tests / hybrid / test02_B100G_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
16
17 import base64
18 import unittest
19 import time
20 import requests
21 # pylint: disable=wrong-import-order
22 import sys
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION_221 = '2.2.1'
34     NODE_VERSION_71 = '7.1'
35
36     cr_serv_sample_data = {"input": {
37         "sdnc-request-header": {
38             "request-id": "request-1",
39             "rpc-action": "service-create",
40             "request-system-id": "appname"
41         },
42         "service-name": "service1-OTUC4",
43         "common-id": "commonId",
44         "connection-type": "infrastructure",
45         "service-a-end": {
46             "service-rate": "400",
47             "node-id": "XPDR-A2",
48             "service-format": "OTU",
49             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
50             "clli": "NodeSA",
51             "subrate-eth-sla": {
52                     "subrate-eth-sla": {
53                         "committed-info-rate": "100000",
54                         "committed-burst-size": "64"
55                     }
56             },
57             "tx-direction": {
58                 "port": {
59                     "port-device-name": "XPDR-A2-XPDR2",
60                     "port-type": "fixed",
61                     "port-name": "XPDR2-NETWORK1",
62                     "port-rack": "000000.00",
63                     "port-shelf": "Chassis#1"
64                 },
65                 "lgx": {
66                     "lgx-device-name": "Some lgx-device-name",
67                     "lgx-port-name": "Some lgx-port-name",
68                     "lgx-port-rack": "000000.00",
69                     "lgx-port-shelf": "00"
70                 }
71             },
72             "rx-direction": {
73                 "port": {
74                     "port-device-name": "XPDR-A2-XPDR2",
75                     "port-type": "fixed",
76                     "port-name": "XPDR2-NETWORK1",
77                     "port-rack": "000000.00",
78                     "port-shelf": "Chassis#1"
79                 },
80                 "lgx": {
81                     "lgx-device-name": "Some lgx-device-name",
82                     "lgx-port-name": "Some lgx-port-name",
83                     "lgx-port-rack": "000000.00",
84                     "lgx-port-shelf": "00"
85                 }
86             },
87             "optic-type": "gray"
88         },
89         "service-z-end": {
90             "service-rate": "400",
91             "node-id": "XPDR-C2",
92             "service-format": "OTU",
93             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
94             "clli": "NodeSC",
95             "subrate-eth-sla": {
96                     "subrate-eth-sla": {
97                         "committed-info-rate": "100000",
98                         "committed-burst-size": "64"
99                     }
100             },
101             "tx-direction": {
102                 "port": {
103                     "port-device-name": "XPDR-C2-XPDR2",
104                     "port-type": "fixed",
105                     "port-name": "XPDR2-NETWORK1",
106                     "port-rack": "000000.00",
107                     "port-shelf": "Chassis#1"
108                 },
109                 "lgx": {
110                     "lgx-device-name": "Some lgx-device-name",
111                     "lgx-port-name": "Some lgx-port-name",
112                     "lgx-port-rack": "000000.00",
113                     "lgx-port-shelf": "00"
114                 }
115             },
116             "rx-direction": {
117                 "port": {
118                     "port-device-name": "XPDR-C2-XPDR2",
119                     "port-type": "fixed",
120                     "port-name": "XPDR2-NETWORK1",
121                     "port-rack": "000000.00",
122                     "port-shelf": "Chassis#1"
123                 },
124                 "lgx": {
125                     "lgx-device-name": "Some lgx-device-name",
126                     "lgx-port-name": "Some lgx-port-name",
127                     "lgx-port-rack": "000000.00",
128                     "lgx-port-shelf": "00"
129                 }
130             },
131             "optic-type": "gray"
132         },
133         "due-date": "2018-06-15T00:00:01Z",
134         "operator-contact": "pw1234"
135     }
136     }
137
138     @classmethod
139     def setUpClass(cls):
140         cls.processes = test_utils.start_tpce()
141         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
142                                                ('roadma', cls.NODE_VERSION_221),
143                                                ('roadmc', cls.NODE_VERSION_221),
144                                                ('xpdrc2', cls.NODE_VERSION_71)])
145
146     @classmethod
147     def tearDownClass(cls):
148         # pylint: disable=not-an-iterable
149         for process in cls.processes:
150             test_utils.shutdown_process(process)
151         print("all processes killed")
152
153     def setUp(self):
154         time.sleep(5)
155
156     def test_01_connect_xpdra2(self):
157         response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
158         self.assertEqual(response.status_code,
159                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_02_connect_xpdrc2(self):
162         response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
163         self.assertEqual(response.status_code,
164                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_03_connect_rdma(self):
167         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
168         self.assertEqual(response.status_code,
169                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170
171     def test_04_connect_rdmc(self):
172         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
173         self.assertEqual(response.status_code,
174                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175
176     def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
177         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
178                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
179         self.assertEqual(response.status_code, requests.codes.ok)
180         res = response.json()
181         self.assertIn('Xponder Roadm Link created successfully',
182                       res["output"]["result"])
183         time.sleep(2)
184
185     def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
186         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
187                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
188         self.assertEqual(response.status_code, requests.codes.ok)
189         res = response.json()
190         self.assertIn('Roadm Xponder links created successfully',
191                       res["output"]["result"])
192         time.sleep(2)
193
194     def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
196                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
197         self.assertEqual(response.status_code, requests.codes.ok)
198         res = response.json()
199         self.assertIn('Xponder Roadm Link created successfully',
200                       res["output"]["result"])
201         time.sleep(2)
202
203     def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
204         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
205                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
206         self.assertEqual(response.status_code, requests.codes.ok)
207         res = response.json()
208         self.assertIn('Roadm Xponder links created successfully',
209                       res["output"]["result"])
210         time.sleep(2)
211
212     def test_09_add_omsAttributes_roadma_roadmc(self):
213         # Config ROADMA-ROADMC oms-attributes
214         data = {"span": {
215             "auto-spanloss": "true",
216             "spanloss-base": 11.4,
217             "spanloss-current": 12,
218             "engineered-spanloss": 12.2,
219             "link-concatenation": [{
220                 "SRLG-Id": 0,
221                 "fiber-type": "smf",
222                 "SRLG-length": 100000,
223                 "pmd": 0.5}]}}
224         response = test_utils.add_oms_attr_request(
225             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
226         self.assertEqual(response.status_code, requests.codes.created)
227
228     def test_10_add_omsAttributes_roadmc_roadma(self):
229         # Config ROADMC-ROADMA oms-attributes
230         data = {"span": {
231             "auto-spanloss": "true",
232             "spanloss-base": 11.4,
233             "spanloss-current": 12,
234             "engineered-spanloss": 12.2,
235             "link-concatenation": [{
236                 "SRLG-Id": 0,
237                 "fiber-type": "smf",
238                 "SRLG-length": 100000,
239                 "pmd": 0.5}]}}
240         response = test_utils.add_oms_attr_request(
241             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
242         self.assertEqual(response.status_code, requests.codes.created)
243
244 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
245     def test_11_check_otn_topology(self):
246         response = test_utils.get_otn_topo_request()
247         self.assertEqual(response.status_code, requests.codes.ok)
248         res = response.json()
249         nbNode = len(res['network'][0]['node'])
250         self.assertEqual(nbNode, 4, 'There should be 4 nodes')
251         self.assertNotIn('ietf-network-topology:link', res['network'][0])
252
253     def test_12_create_OTUC4_service(self):
254         response = test_utils.service_create_request(self.cr_serv_sample_data)
255         self.assertEqual(response.status_code, requests.codes.ok)
256         res = response.json()
257         self.assertIn('PCE calculation in progress',
258                       res['output']['configuration-response-common']['response-message'])
259         time.sleep(self.WAITING)
260
261     def test_13_get_OTUC4_service1(self):
262         response = test_utils.get_service_list_request(
263             "services/service1-OTUC4")
264         self.assertEqual(response.status_code, requests.codes.ok)
265         res = response.json()
266         self.assertEqual(
267             res['services'][0]['administrative-state'], 'inService')
268         self.assertEqual(
269             res['services'][0]['service-name'], 'service1-OTUC4')
270         self.assertEqual(
271             res['services'][0]['connection-type'], 'infrastructure')
272         self.assertEqual(
273             res['services'][0]['lifecycle-state'], 'planned')
274         self.assertEqual(
275             res['services'][0]['service-layer'], 'wdm')
276         self.assertEqual(
277             res['services'][0]['service-a-end']['service-rate'], 400)
278         self.assertEqual(
279             res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
280         time.sleep(2)
281         self.assertEqual(
282             res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
283         time.sleep(2)
284
285     # Check correct configuration of devices
286     def test_14_check_interface_och_xpdra2(self):
287         response = test_utils.check_netconf_node_request(
288             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
289         self.assertEqual(response.status_code, requests.codes.ok)
290         res = response.json()
291         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
292                                    'administrative-state': 'inService',
293                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294                                    'type': 'org-openroadm-interfaces:otsi',
295                                    'supporting-port': 'L1'
296                                    }, **res['interface'][0]),
297                              res['interface'][0])
298
299         self.assertDictEqual(
300             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
301                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
302                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
303             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
304
305     def test_15_check_interface_OTSI_GROUP_xpdra2(self):
306         response = test_utils.check_netconf_node_request(
307             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
308         self.assertEqual(response.status_code, requests.codes.ok)
309         res = response.json()
310         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
311                         'administrative-state': 'inService',
312                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
313                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
314                         'type': 'org-openroadm-interfaces:otsi-group',
315                         'supporting-port': 'L1'
316                         }
317         input_dict_2 = {"group-id": 1,
318                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
319                         }
320         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
321                              res['interface'][0])
322         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
323                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
324                              res['interface'][0]
325                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
326
327     def test_16_check_interface_OTUC4_xpdra2(self):
328         response = test_utils.check_netconf_node_request(
329             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
330         self.assertEqual(response.status_code, requests.codes.ok)
331         res = response.json()
332         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
333                         'administrative-state': 'inService',
334                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
335                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
336                         'type': 'org-openroadm-interfaces:otnOtu',
337                         'supporting-port': 'L1'
338                         }
339         input_dict_2 = {'tx-sapi': 'LY9PxYJqUbw=',
340                         'expected-dapi': 'LY9PxYJqUbw=',
341                         'rate': 'org-openroadm-otn-common-types:OTUCn',
342                         'degthr-percentage': 100,
343                         'degm-intervals': 2,
344                         'otucn-n-rate': 4
345                         }
346         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
347                              res['interface'][0])
348         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
349                              res['interface'][0]
350                              ['org-openroadm-otn-otu-interfaces:otu'])
351
352     def test_17_check_interface_och_xpdrc2(self):
353         response = test_utils.check_netconf_node_request(
354             "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
355         self.assertEqual(response.status_code, requests.codes.ok)
356         res = response.json()
357         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
358                                    'administrative-state': 'inService',
359                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
360                                    'type': 'org-openroadm-interfaces:otsi',
361                                    'supporting-port': 'L1'
362                                    }, **res['interface'][0]),
363                              res['interface'][0])
364
365         self.assertDictEqual(
366             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
367                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
368                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
369             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
370
371     def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
372         response = test_utils.check_netconf_node_request(
373             "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
374         self.assertEqual(response.status_code, requests.codes.ok)
375         res = response.json()
376         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
377                         'administrative-state': 'inService',
378                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
379                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
380                         'type': 'org-openroadm-interfaces:otsi-group',
381                         'supporting-port': 'L1'
382                         }
383         input_dict_2 = {"group-id": 1,
384                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
385                         }
386         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
387                              res['interface'][0])
388         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
389                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
390                              res['interface'][0]
391                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
392
393     def test_19_check_interface_OTUC4_xpdrc2(self):
394         response = test_utils.check_netconf_node_request(
395             "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
396         self.assertEqual(response.status_code, requests.codes.ok)
397         res = response.json()
398         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
399                         'administrative-state': 'inService',
400                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
401                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
402                         'type': 'org-openroadm-interfaces:otnOtu',
403                         'supporting-port': 'L1'
404                         }
405         input_dict_2 = {'tx-dapi': 'LY9PxYJqUbw=',
406                         'expected-sapi': 'LY9PxYJqUbw=',
407                         'tx-sapi': 'Nmbu2MNHvc4=',
408                         'expected-dapi': 'Nmbu2MNHvc4=',
409                         'rate': 'org-openroadm-otn-common-types:OTUCn',
410                         'degthr-percentage': 100,
411                         'degm-intervals': 2,
412                         'otucn-n-rate': 4
413                         }
414
415         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
416                              res['interface'][0])
417
418         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
419                              res['interface'][0]
420                              ['org-openroadm-otn-otu-interfaces:otu'])
421
422     def test_20_check_no_interface_ODUC4_xpdra2(self):
423         response = test_utils.check_netconf_node_request(
424             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
425         self.assertEqual(response.status_code, requests.codes.conflict)
426         res = response.json()
427         self.assertIn(
428             {"error-type": "application", "error-tag": "data-missing",
429              "error-message": "Request could not be completed because the relevant data model content does not exist"},
430             res['errors']['error'])
431
432     def test_21_check_openroadm_topo_xpdra2(self):
433         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
434         self.assertEqual(response.status_code, requests.codes.ok)
435         res = response.json()
436         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
437         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
438         self.assertEqual({'frequency': 196.08125,
439                           'width': 75},
440                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
441         time.sleep(3)
442
443     def test_22_check_otn_topo_OTUC4_links(self):
444         response = test_utils.get_otn_topo_request()
445         self.assertEqual(response.status_code, requests.codes.ok)
446         res = response.json()
447         nb_links = len(res['network'][0]['ietf-network-topology:link'])
448         self.assertEqual(nb_links, 2)
449         listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
450                       'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
451         for link in res['network'][0]['ietf-network-topology:link']:
452             self.assertIn(link['link-id'], listLinkId)
453             self.assertEqual(
454                 link['transportpce-topology:otn-link-type'], 'OTUC4')
455             self.assertEqual(
456                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
457             self.assertEqual(
458                 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
459             self.assertEqual(
460                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
461             self.assertIn(
462                 link['org-openroadm-common-network:opposite-link'], listLinkId)
463
464 # test service-create for ODU4 service from xpdra2 to xpdrc2
465     def test_23_create_ODUC4_service(self):
466         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
467         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
468         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
469         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
470         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
471         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
472         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
473
474         response = test_utils.service_create_request(self.cr_serv_sample_data)
475         self.assertEqual(response.status_code, requests.codes.ok)
476         res = response.json()
477         self.assertIn('PCE calculation in progress',
478                       res['output']['configuration-response-common']['response-message'])
479         time.sleep(self.WAITING)
480
481     def test_24_get_ODUC4_service1(self):
482         response = test_utils.get_service_list_request(
483             "services/service1-ODUC4")
484         self.assertEqual(response.status_code, requests.codes.ok)
485         res = response.json()
486         self.assertEqual(
487             res['services'][0]['administrative-state'], 'inService')
488         self.assertEqual(
489             res['services'][0]['service-name'], 'service1-ODUC4')
490         self.assertEqual(
491             res['services'][0]['connection-type'], 'infrastructure')
492         self.assertEqual(
493             res['services'][0]['lifecycle-state'], 'planned')
494         self.assertEqual(
495             res['services'][0]['service-layer'], 'wdm')
496         self.assertEqual(
497             res['services'][0]['service-a-end']['service-rate'], 400)
498         self.assertEqual(
499             res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
500         self.assertEqual(
501             res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
502         time.sleep(2)
503
504     def test_25_check_interface_ODUC4_xpdra2(self):
505         response = test_utils.check_netconf_node_request(
506             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
507         self.assertEqual(response.status_code, requests.codes.ok)
508         res = response.json()
509         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
510                         'administrative-state': 'inService',
511                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
512                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
513                         'type': 'org-openroadm-interfaces:otnOdu',
514                         'supporting-port': 'L1'}
515         # SAPI/DAPI are added in the Otu4 renderer
516         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
517                         'rate': 'org-openroadm-otn-common-types:ODUCn',
518                         'expected-dapi': 'Nmbu2MNHvc4=',
519                         'expected-sapi': 'Nmbu2MNHvc4=',
520                         'tx-dapi': 'Nmbu2MNHvc4=',
521                         'tx-sapi': 'LY9PxYJqUbw='}
522
523         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
524                              res['interface'][0])
525         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
526                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
527         self.assertDictEqual(
528             {'payload-type': '22', 'exp-payload-type': '22'},
529             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
530
531     def test_26_check_interface_ODUC4_xpdrc2(self):
532         response = test_utils.check_netconf_node_request(
533             "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
534         self.assertEqual(response.status_code, requests.codes.ok)
535         res = response.json()
536         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
537                         'administrative-state': 'inService',
538                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
539                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
540                         'type': 'org-openroadm-interfaces:otnOdu',
541                         'supporting-port': 'L1'}
542         # SAPI/DAPI are added in the Otu4 renderer
543         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
544                         'rate': 'org-openroadm-otn-common-types:ODUCn',
545                         'tx-sapi': 'Nmbu2MNHvc4=',
546                         'tx-dapi': 'LY9PxYJqUbw=',
547                         'expected-sapi': 'LY9PxYJqUbw=',
548                         'expected-dapi': 'LY9PxYJqUbw='
549                         }
550         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
551                              res['interface'][0])
552         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
553                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
554         self.assertDictEqual(
555             {'payload-type': '22', 'exp-payload-type': '22'},
556             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
557
558     def test_27_check_otn_topo_links(self):
559         response = test_utils.get_otn_topo_request()
560         self.assertEqual(response.status_code, requests.codes.ok)
561         res = response.json()
562         nb_links = len(res['network'][0]['ietf-network-topology:link'])
563         self.assertEqual(nb_links, 4)
564         for link in res['network'][0]['ietf-network-topology:link']:
565             linkId = link['link-id']
566             if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
567                            'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
568                 self.assertEqual(
569                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
570                 self.assertEqual(
571                     link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
572             elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
573                              'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
574                 self.assertEqual(
575                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
576                 self.assertEqual(
577                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
578                 self.assertEqual(
579                     link['transportpce-topology:otn-link-type'], 'ODUC4')
580                 self.assertEqual(
581                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
582                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
583                               ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
584                                'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
585             else:
586                 self.fail("this link should not exist")
587
588     def test_28_check_otn_topo_tp(self):
589         response = test_utils.get_otn_topo_request()
590         res = response.json()
591         for node in res['network'][0]['node']:
592             if node['node-id'] == 'XPDR-A2-XPDR2' or node['node-id'] == 'XPDR-C2-XPDR2':
593                 tpList = node['ietf-network-topology:termination-point']
594                 for tp in tpList:
595                     if tp['tp-id'] == 'XPDR2-NETWORK1':
596                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
597                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
598                         self.assertEqual(
599                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
600                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
601                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
602
603 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
604     def test_29_create_100GE_service_1(self):
605         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
606         self.cr_serv_sample_data["input"]["connection-type"] = "service"
607         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
608         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
609         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
610         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
612         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
613         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
614         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
615         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
616         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
617         response = test_utils.service_create_request(self.cr_serv_sample_data)
618         self.assertEqual(response.status_code, requests.codes.ok)
619         res = response.json()
620         self.assertIn('PCE calculation in progress',
621                       res['output']['configuration-response-common']['response-message'])
622         time.sleep(self.WAITING)
623
624     def test_30_get_100GE_service_1(self):
625         response = test_utils.get_service_list_request(
626             "services/service-100GE")
627         self.assertEqual(response.status_code, requests.codes.ok)
628         res = response.json()
629         self.assertEqual(
630             res['services'][0]['administrative-state'], 'inService')
631         self.assertEqual(
632             res['services'][0]['service-name'], 'service-100GE')
633         self.assertEqual(
634             res['services'][0]['connection-type'], 'service')
635         self.assertEqual(
636             res['services'][0]['lifecycle-state'], 'planned')
637         time.sleep(2)
638
639     def test_31_check_interface_100GE_CLIENT_xpdra2(self):
640         response = test_utils.check_netconf_node_request(
641             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
642         self.assertEqual(response.status_code, requests.codes.ok)
643         res = response.json()
644         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
645                         'administrative-state': 'inService',
646                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
647                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
648                         'supporting-port': 'C1'
649                         }
650         input_dict_2 = {'speed': 100000}
651         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
652                              res['interface'][0])
653         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
654                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
655
656     def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
657         response = test_utils.check_netconf_node_request(
658             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
659         self.assertEqual(response.status_code, requests.codes.ok)
660         res = response.json()
661         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
662                         'administrative-state': 'inService',
663                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
664                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
665                         'type': 'org-openroadm-interfaces:otnOdu',
666                         'supporting-port': 'C1'}
667         input_dict_2 = {
668             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
669             'rate': 'org-openroadm-otn-common-types:ODU4',
670             'monitoring-mode': 'terminated'}
671
672         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
673                              res['interface'][0])
674         self.assertDictEqual(dict(input_dict_2,
675                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
676                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
677         self.assertDictEqual(
678             {'payload-type': '07', 'exp-payload-type': '07'},
679             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
680
681     def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
682         response = test_utils.check_netconf_node_request(
683             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
684         self.assertEqual(response.status_code, requests.codes.ok)
685         res = response.json()
686         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
687                         'administrative-state': 'inService',
688                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
689                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
690                         'type': 'org-openroadm-interfaces:otnOdu',
691                         'supporting-port': 'L1'}
692         input_dict_2 = {
693             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
694             'rate': 'org-openroadm-otn-common-types:ODU4',
695             'monitoring-mode': 'not-terminated'}
696         input_dict_3 = {'trib-port-number': 1}
697
698         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
699                              res['interface'][0])
700         self.assertDictEqual(dict(input_dict_2,
701                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
702                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
703         self.assertDictEqual(dict(input_dict_3,
704                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
705                                       'parent-odu-allocation']),
706                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
707         self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
708                       ['opucn-trib-slots'])
709         self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
710                       ['opucn-trib-slots'])
711
712     def test_34_check_ODU4_connection_xpdra2(self):
713         response = test_utils.check_netconf_node_request(
714             "XPDR-A2",
715             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
716         self.assertEqual(response.status_code, requests.codes.ok)
717         res = response.json()
718         input_dict_1 = {
719             'connection-name':
720             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
721             'direction': 'bidirectional'
722         }
723
724         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
725                              res['odu-connection'][0])
726         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
727                              res['odu-connection'][0]['destination'])
728         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
729                              res['odu-connection'][0]['source'])
730
731     def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
732         response = test_utils.check_netconf_node_request(
733             "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
734         self.assertEqual(response.status_code, requests.codes.ok)
735         res = response.json()
736         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
737                         'administrative-state': 'inService',
738                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
739                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
740                         'supporting-port': 'C1'
741                         }
742         input_dict_2 = {'speed': 100000}
743         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
744                              res['interface'][0])
745         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
746                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
747
748     def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
749         response = test_utils.check_netconf_node_request(
750             "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
751         self.assertEqual(response.status_code, requests.codes.ok)
752         res = response.json()
753         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
754                         'administrative-state': 'inService',
755                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
756                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
757                         'type': 'org-openroadm-interfaces:otnOdu',
758                         'supporting-port': 'C1'}
759         input_dict_2 = {
760             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
761             'rate': 'org-openroadm-otn-common-types:ODU4',
762             'monitoring-mode': 'terminated'}
763
764         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
765                              res['interface'][0])
766         self.assertDictEqual(dict(input_dict_2,
767                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
768                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
769         self.assertDictEqual(
770             {'payload-type': '07', 'exp-payload-type': '07'},
771             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
772
773     def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
774         response = test_utils.check_netconf_node_request(
775             "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
776         self.assertEqual(response.status_code, requests.codes.ok)
777         res = response.json()
778         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
779                         'administrative-state': 'inService',
780                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
781                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
782                         'type': 'org-openroadm-interfaces:otnOdu',
783                         'supporting-port': 'C1'}
784         input_dict_2 = {
785             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
786             'rate': 'org-openroadm-otn-common-types:ODU4',
787             'monitoring-mode': 'not-terminated'}
788
789         input_dict_3 = {'trib-port-number': 1}
790
791         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
792                              res['interface'][0])
793         self.assertDictEqual(dict(input_dict_2,
794                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
795                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
796         self.assertDictEqual(dict(input_dict_3,
797                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
798                                       'parent-odu-allocation']),
799                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
800             'parent-odu-allocation'])
801         self.assertIn('1.1',
802                       res['interface'][0][
803                           'org-openroadm-otn-odu-interfaces:odu'][
804                           'parent-odu-allocation']['opucn-trib-slots'])
805         self.assertIn('1.20',
806                       res['interface'][0][
807                           'org-openroadm-otn-odu-interfaces:odu'][
808                           'parent-odu-allocation']['opucn-trib-slots'])
809
810     def test_38_check_ODU4_connection_xpdrc2(self):
811         response = test_utils.check_netconf_node_request(
812             "XPDR-C2",
813             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
814         self.assertEqual(response.status_code, requests.codes.ok)
815         res = response.json()
816         input_dict_1 = {
817             'connection-name':
818             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
819             'direction': 'bidirectional'
820         }
821
822         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
823                              res['odu-connection'][0])
824         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
825                              res['odu-connection'][0]['destination'])
826         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
827                              res['odu-connection'][0]['source'])
828
829     def test_39_check_otn_topo_links(self):
830         response = test_utils.get_otn_topo_request()
831         self.assertEqual(response.status_code, requests.codes.ok)
832         res = response.json()
833         nb_links = len(res['network'][0]['ietf-network-topology:link'])
834         self.assertEqual(nb_links, 4)
835         for link in res['network'][0]['ietf-network-topology:link']:
836             linkId = link['link-id']
837             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
838                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
839                 self.assertEqual(
840                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
841                 self.assertEqual(
842                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
843
844     def test_40_check_otn_topo_tp(self):
845         response = test_utils.get_otn_topo_request()
846         res = response.json()
847         for node in res['network'][0]['node']:
848             if node['node-id'] == 'XPDR-A2-XPDR2' or node['node-id'] == 'XPDR-C2-XPDR2':
849                 tpList = node['ietf-network-topology:termination-point']
850                 for tp in tpList:
851                     if tp['tp-id'] == 'XPDR2-NETWORK1':
852                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
853                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
854                         tsPoolList = list(range(1, 20))
855                         self.assertNotIn(
856                             tsPoolList, xpdrTpPortConAt['ts-pool'])
857                         self.assertEqual(
858                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
859                         self.assertNotIn(
860                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
861
862 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
863     def test_41_create_100GE_service_2(self):
864         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
865         self.cr_serv_sample_data["input"]["connection-type"] = "service"
866         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
867         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
868         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
869         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
870         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
871         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
872         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
873         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
874         response = test_utils.service_create_request(self.cr_serv_sample_data)
875         self.assertEqual(response.status_code, requests.codes.ok)
876         res = response.json()
877         self.assertIn('PCE calculation in progress',
878                       res['output']['configuration-response-common']['response-message'])
879         time.sleep(self.WAITING)
880
881     def test_42_get_100GE_service_2(self):
882         response = test_utils.get_service_list_request(
883             "services/service-100GE2")
884         self.assertEqual(response.status_code, requests.codes.ok)
885         res = response.json()
886         self.assertEqual(
887             res['services'][0]['administrative-state'], 'inService')
888         self.assertEqual(
889             res['services'][0]['service-name'], 'service-100GE2')
890         self.assertEqual(
891             res['services'][0]['connection-type'], 'service')
892         self.assertEqual(
893             res['services'][0]['lifecycle-state'], 'planned')
894         time.sleep(2)
895
896     def test_43_check_service_list(self):
897         response = test_utils.get_service_list_request("")
898         self.assertEqual(response.status_code, requests.codes.ok)
899         res = response.json()
900         self.assertEqual(len(res['service-list']['services']), 4)
901         time.sleep(2)
902
903     def test_44_check_otn_topo_links(self):
904         response = test_utils.get_otn_topo_request()
905         self.assertEqual(response.status_code, requests.codes.ok)
906         res = response.json()
907         nb_links = len(res['network'][0]['ietf-network-topology:link'])
908         self.assertEqual(nb_links, 4)
909         for link in res['network'][0]['ietf-network-topology:link']:
910             linkId = link['link-id']
911             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
912                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
913                 self.assertEqual(
914                     link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
915                 self.assertEqual(
916                     link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
917
918     def test_45_check_otn_topo_tp(self):
919         response = test_utils.get_otn_topo_request()
920         res = response.json()
921         for node in res['network'][0]['node']:
922             if node['node-id'] == 'XPDR-A2-XPDR2' or node['node-id'] == 'XPDR-C2-XPDR2':
923                 tpList = node['ietf-network-topology:termination-point']
924                 for tp in tpList:
925                     if tp['tp-id'] == 'XPDR2-NETWORK1':
926                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
927                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
928                         tsPoolList = list(range(1, 40))
929                         self.assertNotIn(
930                             tsPoolList, xpdrTpPortConAt['ts-pool'])
931                         self.assertEqual(
932                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
933                         self.assertNotIn(
934                             2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
935
936     def test_46_delete_100GE_service_2(self):
937         response = test_utils.service_delete_request("service-100GE2")
938         self.assertEqual(response.status_code, requests.codes.ok)
939         res = response.json()
940         self.assertIn('Renderer service delete in progress',
941                       res['output']['configuration-response-common']['response-message'])
942         time.sleep(self.WAITING)
943
944     def test_47_delete_100GE_service_1(self):
945         response = test_utils.service_delete_request("service-100GE")
946         self.assertEqual(response.status_code, requests.codes.ok)
947         res = response.json()
948         self.assertIn('Renderer service delete in progress',
949                       res['output']['configuration-response-common']['response-message'])
950         time.sleep(self.WAITING)
951
952     def test_48_check_service_list(self):
953         response = test_utils.get_service_list_request("")
954         self.assertEqual(response.status_code, requests.codes.ok)
955         res = response.json()
956         self.assertEqual(len(res['service-list']['services']), 2)
957         time.sleep(2)
958
959     def test_49_check_no_ODU4_connection_xpdra2(self):
960         response = test_utils.check_netconf_node_request("XPDR-A2", "")
961         self.assertEqual(response.status_code, requests.codes.ok)
962         res = response.json()
963         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
964         time.sleep(1)
965
966     def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
967         response = test_utils.check_netconf_node_request(
968             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
969         self.assertEqual(response.status_code, requests.codes.conflict)
970
971     def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
972         response = test_utils.check_netconf_node_request(
973             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
974         self.assertEqual(response.status_code, requests.codes.conflict)
975
976     def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
977         response = test_utils.check_netconf_node_request(
978             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
979         self.assertEqual(response.status_code, requests.codes.conflict)
980
981     def test_53_check_otn_topo_links(self):
982         response = test_utils.get_otn_topo_request()
983         self.assertEqual(response.status_code, requests.codes.ok)
984         res = response.json()
985         nb_links = len(res['network'][0]['ietf-network-topology:link'])
986         self.assertEqual(nb_links, 4)
987         for link in res['network'][0]['ietf-network-topology:link']:
988             linkId = link['link-id']
989             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
990                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
991                 self.assertEqual(
992                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
993                 self.assertEqual(
994                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
995
996     def test_54_check_otn_topo_tp(self):
997         response = test_utils.get_otn_topo_request()
998         res = response.json()
999         for node in res['network'][0]['node']:
1000             if node['node-id'] == 'XPDR-A2-XPDR2' or node['node-id'] == 'XPDR-C2-XPDR2':
1001                 tpList = node['ietf-network-topology:termination-point']
1002                 for tp in tpList:
1003                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1004                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1005                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1006                         self.assertEqual(
1007                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1008
1009     def test_55_delete_ODUC4_service(self):
1010         response = test_utils.service_delete_request("service1-ODUC4")
1011         self.assertEqual(response.status_code, requests.codes.ok)
1012         res = response.json()
1013         self.assertIn('Renderer service delete in progress',
1014                       res['output']['configuration-response-common']['response-message'])
1015         time.sleep(self.WAITING)
1016
1017     def test_56_check_service_list(self):
1018         response = test_utils.get_service_list_request("")
1019         self.assertEqual(response.status_code, requests.codes.ok)
1020         res = response.json()
1021         self.assertEqual(len(res['service-list']['services']), 1)
1022         time.sleep(2)
1023
1024     def test_57_check_no_interface_ODU4_xpdra2(self):
1025         response = test_utils.check_netconf_node_request(
1026             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1027         self.assertEqual(response.status_code, requests.codes.conflict)
1028
1029     def test_58_check_otn_topo_links(self):
1030         self.test_22_check_otn_topo_OTUC4_links()
1031
1032     def test_59_check_otn_topo_tp(self):
1033         response = test_utils.get_otn_topo_request()
1034         res = response.json()
1035         for node in res['network'][0]['node']:
1036             if node['node-id'] == 'XPDR-A2-XPDR2' or node['node-id'] == 'XPDR-C2-XPDR2':
1037                 tpList = node['ietf-network-topology:termination-point']
1038                 for tp in tpList:
1039                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1040                         self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1041                                          dict.keys(tp))
1042
1043     def test_60_delete_OTUC4_service(self):
1044         response = test_utils.service_delete_request("service1-OTUC4")
1045         self.assertEqual(response.status_code, requests.codes.ok)
1046         res = response.json()
1047         self.assertIn('Renderer service delete in progress',
1048                       res['output']['configuration-response-common']['response-message'])
1049         time.sleep(self.WAITING)
1050
1051     def test_61_get_no_service(self):
1052         response = test_utils.get_service_list_request("")
1053         self.assertEqual(response.status_code, requests.codes.conflict)
1054         res = response.json()
1055         self.assertIn(
1056             {"error-type": "application", "error-tag": "data-missing",
1057              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1058             res['errors']['error'])
1059         time.sleep(1)
1060
1061     def test_62_check_no_interface_OTUC4_xpdra2(self):
1062         response = test_utils.check_netconf_node_request(
1063             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1064         self.assertEqual(response.status_code, requests.codes.conflict)
1065
1066     def test_63_check_no_interface_OCH_xpdra2(self):
1067         response = test_utils.check_netconf_node_request(
1068             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1069         self.assertEqual(response.status_code, requests.codes.conflict)
1070
1071     def test_64_check_no_interface_OTSI_xpdra2(self):
1072         response = test_utils.check_netconf_node_request(
1073             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1074         self.assertEqual(response.status_code, requests.codes.conflict)
1075
1076     def test_65_getLinks_OtnTopology(self):
1077         response = test_utils.get_otn_topo_request()
1078         self.assertEqual(response.status_code, requests.codes.ok)
1079         res = response.json()
1080         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1081
1082     def test_66_check_openroadm_topo_xpdra2(self):
1083         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1084         self.assertEqual(response.status_code, requests.codes.ok)
1085         res = response.json()
1086         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1087         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1088         self.assertNotIn('wavelength', dict.keys(
1089             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1090         time.sleep(3)
1091
1092     def test_67_check_openroadm_topology(self):
1093         response = test_utils.get_ordm_topo_request("")
1094         self.assertEqual(response.status_code, requests.codes.ok)
1095         res = response.json()
1096         links = res['network'][0]['ietf-network-topology:link']
1097         self.assertEqual(22, len(links), 'Topology should contain 22 links')
1098
1099     def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1100         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1101                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1102         self.assertEqual(response.status_code, requests.codes.ok)
1103         res = response.json()
1104         self.assertIn('Xponder Roadm Link created successfully',
1105                       res["output"]["result"])
1106         time.sleep(2)
1107
1108     def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1109         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1110                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1111         self.assertEqual(response.status_code, requests.codes.ok)
1112         res = response.json()
1113         self.assertIn('Roadm Xponder links created successfully',
1114                       res["output"]["result"])
1115         time.sleep(2)
1116
1117     def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1118         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1119                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1120         self.assertEqual(response.status_code, requests.codes.ok)
1121         res = response.json()
1122         self.assertIn('Xponder Roadm Link created successfully',
1123                       res["output"]["result"])
1124         time.sleep(2)
1125
1126     def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1127         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1128                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1129         self.assertEqual(response.status_code, requests.codes.ok)
1130         res = response.json()
1131         self.assertIn('Roadm Xponder links created successfully',
1132                       res["output"]["result"])
1133         time.sleep(2)
1134
1135 # test service-create for 400GE service from xpdra2 to xpdrc2
1136     def test_72_create_400GE_service(self):
1137         self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1138         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1139         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1140         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1141         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1142         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1143         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1144         response = test_utils.service_create_request(self.cr_serv_sample_data)
1145         self.assertEqual(response.status_code, requests.codes.ok)
1146         res = response.json()
1147         self.assertIn('PCE calculation in progress',
1148                       res['output']['configuration-response-common']['response-message'])
1149         time.sleep(self.WAITING)
1150
1151     def test_73_get_400GE_service(self):
1152         response = test_utils.get_service_list_request(
1153             "services/service-400GE")
1154         self.assertEqual(response.status_code, requests.codes.ok)
1155         res = response.json()
1156         self.assertEqual(
1157             res['services'][0]['administrative-state'], 'inService')
1158         self.assertEqual(
1159             res['services'][0]['service-name'], 'service-400GE')
1160         self.assertEqual(
1161             res['services'][0]['connection-type'], 'service')
1162         self.assertEqual(
1163             res['services'][0]['lifecycle-state'], 'planned')
1164         time.sleep(2)
1165
1166     def test_74_check_xc1_roadma(self):
1167         response = test_utils.check_netconf_node_request(
1168             "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1169         self.assertEqual(response.status_code, requests.codes.ok)
1170         res = response.json()
1171         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1172         self.assertDictEqual(
1173             dict({
1174                 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1175                 'opticalControlMode': 'gainLoss',
1176                 'target-output-power': -3.0
1177             }, **res['roadm-connections'][0]),
1178             res['roadm-connections'][0]
1179         )
1180         self.assertDictEqual(
1181             {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1182             res['roadm-connections'][0]['source'])
1183         self.assertDictEqual(
1184             {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1185             res['roadm-connections'][0]['destination'])
1186         time.sleep(5)
1187
1188     def test_75_check_topo_xpdra2(self):
1189         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1190         self.assertEqual(response.status_code, requests.codes.ok)
1191         res = response.json()
1192         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1193         for ele in liste_tp:
1194             if ele['tp-id'] == 'XPDR1-NETWORK1':
1195                 self.assertEqual({'frequency': 196.08125,
1196                                   'width': 75},
1197                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1198             if ele['tp-id'] == 'XPDR1-CLIENT1':
1199                 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1200         time.sleep(3)
1201
1202     def test_76_check_topo_roadma_SRG1(self):
1203         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1204         self.assertEqual(response.status_code, requests.codes.ok)
1205         res = response.json()
1206         freq_map = base64.b64decode(
1207             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1208         freq_map_array = [int(x) for x in freq_map]
1209         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1210         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1211         for ele in liste_tp:
1212             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1213                 freq_map = base64.b64decode(
1214                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1215                 freq_map_array = [int(x) for x in freq_map]
1216                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1217             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1218                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1219         time.sleep(3)
1220
1221     def test_77_check_topo_roadma_DEG1(self):
1222         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1223         self.assertEqual(response.status_code, requests.codes.ok)
1224         res = response.json()
1225         freq_map = base64.b64decode(
1226             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1227         freq_map_array = [int(x) for x in freq_map]
1228         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1229         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1230         for ele in liste_tp:
1231             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1232                 freq_map = base64.b64decode(
1233                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1234                 freq_map_array = [int(x) for x in freq_map]
1235                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1236             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1237                 freq_map = base64.b64decode(
1238                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1239                 freq_map_array = [int(x) for x in freq_map]
1240                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1241         time.sleep(3)
1242
1243     def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1244         response = test_utils.check_netconf_node_request(
1245             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1246         self.assertEqual(response.status_code, requests.codes.ok)
1247         res = response.json()
1248         input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1249                         'administrative-state': 'inService',
1250                         'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1251                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1252                         'supporting-port': 'C1'
1253                         }
1254         input_dict_2 = {'speed': 400000}
1255         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1256                              res['interface'][0])
1257         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1258                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1259
1260     def test_79_check_interface_OTSI_xpdra2(self):
1261         response = test_utils.check_netconf_node_request(
1262             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1263         self.assertEqual(response.status_code, requests.codes.ok)
1264         res = response.json()
1265         input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1266                         'administrative-state': 'inService',
1267                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1268                         'type': 'org-openroadm-interfaces:otsi',
1269                         'supporting-port': 'L1'}
1270         input_dict_2 = {
1271             "frequency": 196.0812,
1272             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1273             "fec": "org-openroadm-common-types:ofec",
1274             "transmit-power": -5,
1275             "provision-mode": "explicit",
1276             "modulation-format": "dp-qam16"}
1277
1278         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1279                              res['interface'][0])
1280         self.assertDictEqual(dict(input_dict_2,
1281                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1282                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1283         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1284                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1285
1286     def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1287         response = test_utils.check_netconf_node_request(
1288             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1289         self.assertEqual(response.status_code, requests.codes.ok)
1290         res = response.json()
1291         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1292                         'administrative-state': 'inService',
1293                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1294                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1295                         'type': 'org-openroadm-interfaces:otsi-group',
1296                         'supporting-port': 'L1'}
1297         input_dict_2 = {"group-id": 1,
1298                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1299
1300         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1301                              res['interface'][0])
1302         self.assertDictEqual(dict(input_dict_2,
1303                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1304                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1305
1306     def test_81_check_interface_OTUC4_xpdra2(self):
1307         response = test_utils.check_netconf_node_request(
1308             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1309         self.assertEqual(response.status_code, requests.codes.ok)
1310         res = response.json()
1311         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1312                         'administrative-state': 'inService',
1313                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1314                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1315                         'type': 'org-openroadm-interfaces:otnOtu',
1316                         'supporting-port': 'L1'}
1317         input_dict_2 = {"tx-sapi": "AIGiVAQ4gDil", "rate": "org-openroadm-otn-common-types:OTUCn",
1318                         "degthr-percentage": 100,
1319                         "tim-detect-mode": "Disabled",
1320                         "otucn-n-rate": 4,
1321                         "degm-intervals": 2,
1322                         "expected-dapi": "AIGiVAQ4gDil"}
1323
1324         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1325                              res['interface'][0])
1326         self.assertDictEqual(dict(input_dict_2,
1327                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1328                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1329
1330     def test_82_check_interface_ODUC4_xpdra2(self):
1331         response = test_utils.check_netconf_node_request(
1332             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1333         self.assertEqual(response.status_code, requests.codes.ok)
1334         res = response.json()
1335         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1336                         'administrative-state': 'inService',
1337                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1338                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1339                         'type': 'org-openroadm-interfaces:otnOdu',
1340                         'supporting-port': 'L1'}
1341         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1342                         "tim-detect-mode": "Disabled",
1343                         "degm-intervals": 2,
1344                         "degthr-percentage": 100,
1345                         "monitoring-mode": "terminated",
1346                         "rate": "org-openroadm-otn-common-types:ODUCn",
1347                         "oducn-n-rate": 4}
1348         input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1349
1350         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1351                              res['interface'][0])
1352         self.assertDictEqual(dict(input_dict_2,
1353                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1354                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1355         self.assertDictEqual(dict(input_dict_3,
1356                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1357                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1358
1359     def test_83_delete_400GE_service(self):
1360         response = test_utils.service_delete_request("service-400GE")
1361         self.assertEqual(response.status_code, requests.codes.ok)
1362         res = response.json()
1363         self.assertIn('Renderer service delete in progress',
1364                       res['output']['configuration-response-common']['response-message'])
1365         time.sleep(self.WAITING)
1366
1367     def test_84_get_no_service(self):
1368         response = test_utils.get_service_list_request("")
1369         self.assertEqual(response.status_code, requests.codes.conflict)
1370         res = response.json()
1371         self.assertIn(
1372             {"error-type": "application", "error-tag": "data-missing",
1373              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1374             res['errors']['error'])
1375         time.sleep(1)
1376
1377     def test_85_check_no_interface_ODUC4_xpdra2(self):
1378         response = test_utils.check_netconf_node_request(
1379             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1380         self.assertEqual(response.status_code, requests.codes.conflict)
1381
1382     def test_86_check_no_interface_OTUC4_xpdra2(self):
1383         response = test_utils.check_netconf_node_request(
1384             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1385         self.assertEqual(response.status_code, requests.codes.conflict)
1386
1387     def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1388         response = test_utils.check_netconf_node_request(
1389             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1390         self.assertEqual(response.status_code, requests.codes.conflict)
1391
1392     def test_88_check_no_interface_OTSI_xpdra2(self):
1393         response = test_utils.check_netconf_node_request(
1394             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1395         self.assertEqual(response.status_code, requests.codes.conflict)
1396
1397     def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1398         response = test_utils.check_netconf_node_request(
1399             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1400         self.assertEqual(response.status_code, requests.codes.conflict)
1401
1402     def test_90_disconnect_xponders_from_roadm(self):
1403         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1404         response = test_utils.get_ordm_topo_request("")
1405         self.assertEqual(response.status_code, requests.codes.ok)
1406         res = response.json()
1407         links = res['network'][0]['ietf-network-topology:link']
1408         for link in links:
1409             if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1410                     link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1411                 link_name = link["link-id"]
1412                 response = test_utils.delete_request(url+link_name)
1413                 self.assertEqual(response.status_code, requests.codes.ok)
1414
1415     def test_91_disconnect_xpdra2(self):
1416         response = test_utils.unmount_device("XPDR-A2")
1417         self.assertEqual(response.status_code, requests.codes.ok,
1418                          test_utils.CODE_SHOULD_BE_200)
1419
1420     def test_92_disconnect_xpdrc2(self):
1421         response = test_utils.unmount_device("XPDR-C2")
1422         self.assertEqual(response.status_code, requests.codes.ok,
1423                          test_utils.CODE_SHOULD_BE_200)
1424
1425     def test_93_disconnect_roadmA(self):
1426         response = test_utils.unmount_device("ROADM-A1")
1427         self.assertEqual(response.status_code, requests.codes.ok,
1428                          test_utils.CODE_SHOULD_BE_200)
1429
1430     def test_94_disconnect_roadmC(self):
1431         response = test_utils.unmount_device("ROADM-C1")
1432         self.assertEqual(response.status_code, requests.codes.ok,
1433                          test_utils.CODE_SHOULD_BE_200)
1434
1435
1436 if __name__ == "__main__":
1437     unittest.main(verbosity=2)