Complement B100G_end2end functional test
[transportpce.git] / tests / transportpce_tests / hybrid / test02_B100G_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
16
17 import base64
18 import unittest
19 import time
20 import requests
21 # pylint: disable=wrong-import-order
22 import sys
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION_221 = '2.2.1'
34     NODE_VERSION_71 = '7.1'
35
36     cr_serv_input_data = {
37         "sdnc-request-header": {
38             "request-id": "request-1",
39             "rpc-action": "service-create",
40             "request-system-id": "appname"
41         },
42         "service-name": "service1-OTUC4",
43         "common-id": "commonId",
44         "connection-type": "infrastructure",
45         "service-a-end": {
46             "service-rate": "400",
47             "node-id": "XPDR-A2",
48             "service-format": "OTU",
49             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
50             "clli": "NodeSA",
51             "tx-direction": [{
52                 "port": {
53                     "port-device-name": "XPDR-A2-XPDR2",
54                     "port-type": "fixed",
55                     "port-name": "XPDR2-NETWORK1",
56                     "port-rack": "000000.00",
57                     "port-shelf": "Chassis#1"
58                 },
59                 "lgx": {
60                     "lgx-device-name": "Some lgx-device-name",
61                     "lgx-port-name": "Some lgx-port-name",
62                     "lgx-port-rack": "000000.00",
63                     "lgx-port-shelf": "00"
64                 },
65                 "index": 0
66             }],
67             "rx-direction": [{
68                 "port": {
69                     "port-device-name": "XPDR-A2-XPDR2",
70                     "port-type": "fixed",
71                     "port-name": "XPDR2-NETWORK1",
72                     "port-rack": "000000.00",
73                     "port-shelf": "Chassis#1"
74                 },
75                 "lgx": {
76                     "lgx-device-name": "Some lgx-device-name",
77                     "lgx-port-name": "Some lgx-port-name",
78                     "lgx-port-rack": "000000.00",
79                     "lgx-port-shelf": "00"
80                 },
81                 "index": 0
82             }],
83             "optic-type": "gray"
84         },
85         "service-z-end": {
86             "service-rate": "400",
87             "node-id": "XPDR-C2",
88             "service-format": "OTU",
89             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
90             "clli": "NodeSC",
91             "tx-direction": [{
92                 "port": {
93                     "port-device-name": "XPDR-C2-XPDR2",
94                     "port-type": "fixed",
95                     "port-name": "XPDR2-NETWORK1",
96                     "port-rack": "000000.00",
97                     "port-shelf": "Chassis#1"
98                 },
99                 "lgx": {
100                     "lgx-device-name": "Some lgx-device-name",
101                     "lgx-port-name": "Some lgx-port-name",
102                     "lgx-port-rack": "000000.00",
103                     "lgx-port-shelf": "00"
104                 },
105                 "index": 0
106             }],
107             "rx-direction": [{
108                 "port": {
109                     "port-device-name": "XPDR-C2-XPDR2",
110                     "port-type": "fixed",
111                     "port-name": "XPDR2-NETWORK1",
112                     "port-rack": "000000.00",
113                     "port-shelf": "Chassis#1"
114                 },
115                 "lgx": {
116                     "lgx-device-name": "Some lgx-device-name",
117                     "lgx-port-name": "Some lgx-port-name",
118                     "lgx-port-rack": "000000.00",
119                     "lgx-port-shelf": "00"
120                 },
121                 "index": 0
122             }],
123             "optic-type": "gray"
124         },
125         "due-date": "2018-06-15T00:00:01Z",
126         "operator-contact": "pw1234"
127     }
128
129     del_serv_input_data = {
130         "sdnc-request-header": {
131             "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
132             "rpc-action": "service-delete",
133             "request-system-id": "appname",
134             "notification-url": "http://localhost:8585/NotificationServer/notify"},
135         "service-delete-req-info": {
136             "service-name": "TBD",
137             "tail-retention": "no"}
138     }
139
140     @classmethod
141     def setUpClass(cls):
142         cls.processes = test_utils.start_tpce()
143         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
144                                                ('roadma', cls.NODE_VERSION_221),
145                                                ('roadmc', cls.NODE_VERSION_221),
146                                                ('xpdrc2', cls.NODE_VERSION_71)])
147
148     @classmethod
149     def tearDownClass(cls):
150         # pylint: disable=not-an-iterable
151         for process in cls.processes:
152             test_utils.shutdown_process(process)
153         print("all processes killed")
154
155     def setUp(self):
156         time.sleep(1)
157
158     def test_001_connect_xpdra2(self):
159         response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
160         self.assertEqual(response.status_code,
161                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
162
163     def test_002_connect_xpdrc2(self):
164         response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
165         self.assertEqual(response.status_code,
166                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
167
168     def test_003_connect_rdma(self):
169         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
170         self.assertEqual(response.status_code,
171                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
172
173     def test_004_connect_rdmc(self):
174         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
175         self.assertEqual(response.status_code,
176                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
177
178     def test_005_connect_xpdra2_2_N1_to_roadma_PP2(self):
179         response = test_utils.transportpce_api_rpc_request(
180             'transportpce-networkutils', 'init-xpdr-rdm-links',
181             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
182                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
183         self.assertEqual(response['status_code'], requests.codes.ok)
184         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
185
186     def test_006_connect_roadma_PP2_to_xpdra2_2_N1(self):
187         response = test_utils.transportpce_api_rpc_request(
188             'transportpce-networkutils', 'init-rdm-xpdr-links',
189             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
190                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
191         self.assertEqual(response['status_code'], requests.codes.ok)
192         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
193
194     def test_007_connect_xpdrc2_2_N1_to_roadmc_PP2(self):
195         response = test_utils.transportpce_api_rpc_request(
196             'transportpce-networkutils', 'init-xpdr-rdm-links',
197             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
198                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
199         self.assertEqual(response['status_code'], requests.codes.ok)
200         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
201
202     def test_008_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
203         response = test_utils.transportpce_api_rpc_request(
204             'transportpce-networkutils', 'init-rdm-xpdr-links',
205             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
206                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
207         self.assertEqual(response['status_code'], requests.codes.ok)
208         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
209
210     def test_009_add_omsAttributes_roadma_roadmc(self):
211         # Config ROADMA-ROADMC oms-attributes
212         data = {"span": {
213             "auto-spanloss": "true",
214             "spanloss-base": 11.4,
215             "spanloss-current": 12,
216             "engineered-spanloss": 12.2,
217             "link-concatenation": [{
218                 "SRLG-Id": 0,
219                 "fiber-type": "smf",
220                 "SRLG-length": 100000,
221                 "pmd": 0.5}]}}
222         response = test_utils.add_oms_attr_request(
223             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224         self.assertEqual(response.status_code, requests.codes.created)
225
226     def test_010_add_omsAttributes_roadmc_roadma(self):
227         # Config ROADMC-ROADMA oms-attributes
228         data = {"span": {
229             "auto-spanloss": "true",
230             "spanloss-base": 11.4,
231             "spanloss-current": 12,
232             "engineered-spanloss": 12.2,
233             "link-concatenation": [{
234                 "SRLG-Id": 0,
235                 "fiber-type": "smf",
236                 "SRLG-length": 100000,
237                 "pmd": 0.5}]}}
238         response = test_utils.add_oms_attr_request(
239             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240         self.assertEqual(response.status_code, requests.codes.created)
241
242     # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
243     def test_011_check_otn_topology(self):
244         response = test_utils.get_ietf_network_request('otn-topology', 'config')
245         self.assertEqual(response['status_code'], requests.codes.ok)
246         self.assertEqual(len(response['network'][0]['node']), 7, 'There should be 7 nodes')
247         self.assertNotIn('ietf-network-topology:link', response['network'][0])
248
249     def test_012_create_OTUC4_service(self):
250         response = test_utils.transportpce_api_rpc_request(
251             'org-openroadm-service', 'service-create',
252             self.cr_serv_input_data)
253         self.assertEqual(response['status_code'], requests.codes.ok)
254         self.assertIn('PCE calculation in progress',
255                       response['output']['configuration-response-common']['response-message'])
256         time.sleep(self.WAITING)
257
258     def test_013_get_OTUC4_service1(self):
259         response = test_utils.get_ordm_serv_list_attr_request(
260             "services", "service1-OTUC4")
261         self.assertEqual(response['status_code'], requests.codes.ok)
262         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
263         self.assertEqual(response['services'][0]['service-name'], 'service1-OTUC4')
264         self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
265         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
266         self.assertEqual(response['services'][0]['service-layer'], 'wdm')
267         self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
268         self.assertEqual(response['services'][0]['service-a-end']['otu-service-rate'],
269                          'org-openroadm-otn-common-types:OTUCn')
270         self.assertEqual(response['services'][0]['service-z-end']['otu-service-rate'],
271                          'org-openroadm-otn-common-types:OTUCn')
272
273     # Check correct configuration of devices
274     def test_014_check_interface_otsi_xpdra2(self):
275         response = test_utils.check_node_attribute_request(
276             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
277         self.assertEqual(response['status_code'], requests.codes.ok)
278         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
279                                    'administrative-state': 'inService',
280                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
281                                    'type': 'org-openroadm-interfaces:otsi',
282                                    'supporting-port': 'L1'
283                                    }, **response['interface'][0]),
284                              response['interface'][0])
285         self.assertDictEqual(
286             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
287                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
288                  **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
289             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
290
291     def test_015_check_interface_OTSI_GROUP_xpdra2(self):
292         response = test_utils.check_node_attribute_request(
293             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
294         self.assertEqual(response['status_code'], requests.codes.ok)
295         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
296                         'administrative-state': 'inService',
297                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
298                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
299                         'type': 'org-openroadm-interfaces:otsi-group',
300                         'supporting-port': 'L1'
301                         }
302         input_dict_2 = {"group-id": 1,
303                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
304                         }
305         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
306                              response['interface'][0])
307         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
308                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
309                              response['interface'][0]
310                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
311
312     def test_016_check_interface_OTUC4_xpdra2(self):
313         response = test_utils.check_node_attribute_request(
314             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
315         self.assertEqual(response['status_code'], requests.codes.ok)
316         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
317                         'administrative-state': 'inService',
318                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
319                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
320                         'type': 'org-openroadm-interfaces:otnOtu',
321                         'supporting-port': 'L1'
322                         }
323         input_dict_2 = {'tx-sapi': 'G54UFNImtOE=',
324                         'expected-dapi': 'G54UFNImtOE=',
325                         'tx-dapi': 'J/FIUzQc+4M=',
326                         'expected-sapi': 'J/FIUzQc+4M=',
327                         'rate': 'org-openroadm-otn-common-types:OTUCn',
328                         'degthr-percentage': 100,
329                         'degm-intervals': 2,
330                         'otucn-n-rate': 4
331                         }
332         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
333                              response['interface'][0])
334         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
335                              response['interface'][0]
336                              ['org-openroadm-otn-otu-interfaces:otu'])
337
338     def test_017_check_interface_otsi_xpdrc2(self):
339         response = test_utils.check_node_attribute_request(
340             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-755:768')
341         self.assertEqual(response['status_code'], requests.codes.ok)
342         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
343                                    'administrative-state': 'inService',
344                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
345                                    'type': 'org-openroadm-interfaces:otsi',
346                                    'supporting-port': 'L1'
347                                    }, **response['interface'][0]),
348                              response['interface'][0])
349
350         self.assertDictEqual(
351             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
352                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
353                  **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
354             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
355
356     def test_018_check_interface_OTSI_GROUP_xpdrc2(self):
357         response = test_utils.check_node_attribute_request(
358             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
359         self.assertEqual(response['status_code'], requests.codes.ok)
360         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
361                         'administrative-state': 'inService',
362                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
363                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
364                         'type': 'org-openroadm-interfaces:otsi-group',
365                         'supporting-port': 'L1'
366                         }
367         input_dict_2 = {"group-id": 1,
368                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
369                         }
370         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
371                              response['interface'][0])
372         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
373                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
374                              response['interface'][0]
375                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
376
377     def test_019_check_interface_OTUC4_xpdrc2(self):
378         response = test_utils.check_node_attribute_request(
379             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTUC4')
380         self.assertEqual(response['status_code'], requests.codes.ok)
381         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
382                         'administrative-state': 'inService',
383                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
384                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
385                         'type': 'org-openroadm-interfaces:otnOtu',
386                         'supporting-port': 'L1'
387                         }
388         input_dict_2 = {'tx-dapi': 'G54UFNImtOE=',
389                         'expected-sapi': 'G54UFNImtOE=',
390                         'tx-sapi': 'J/FIUzQc+4M=',
391                         'expected-dapi': 'J/FIUzQc+4M=',
392                         'rate': 'org-openroadm-otn-common-types:OTUCn',
393                         'degthr-percentage': 100,
394                         'degm-intervals': 2,
395                         'otucn-n-rate': 4
396                         }
397
398         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
399                              response['interface'][0])
400
401         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
402                              response['interface'][0]
403                              ['org-openroadm-otn-otu-interfaces:otu'])
404
405     def test_020_check_no_interface_ODUC4_xpdra2(self):
406         response = test_utils.check_node_attribute_request(
407             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
408         self.assertEqual(response['status_code'], requests.codes.conflict)
409
410     def test_021_check_openroadm_topo_xpdra2(self):
411         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
412         self.assertEqual(response['status_code'], requests.codes.ok)
413         ele = response['node']['ietf-network-topology:termination-point'][0]
414         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
415         self.assertEqual(
416             196.08125,
417             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
418         self.assertEqual(
419             75.0,
420             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
421
422     def test_022_check_otn_topo_OTUC4_links(self):
423         response = test_utils.get_ietf_network_request('otn-topology', 'config')
424         self.assertEqual(response['status_code'], requests.codes.ok)
425         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
426         listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
427                       'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
428         for link in response['network'][0]['ietf-network-topology:link']:
429             self.assertIn(link['link-id'], listLinkId)
430             self.assertEqual(link['transportpce-networkutils:otn-link-type'], 'OTUC4')
431             self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
432             self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
433             self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
434             self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
435
436     # test service-create for ODU4 service from xpdra2 to xpdrc2
437     def test_023_create_ODUC4_service(self):
438         self.cr_serv_input_data["service-name"] = "service1-ODUC4"
439         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
440         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
441         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
442         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
443         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
444         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
445         response = test_utils.transportpce_api_rpc_request(
446             'org-openroadm-service', 'service-create',
447             self.cr_serv_input_data)
448         self.assertEqual(response['status_code'], requests.codes.ok)
449         self.assertIn('PCE calculation in progress',
450                       response['output']['configuration-response-common']['response-message'])
451         time.sleep(self.WAITING)
452
453     def test_024_get_ODUC4_service1(self):
454         response = test_utils.get_ordm_serv_list_attr_request(
455             "services", "service1-ODUC4")
456         self.assertEqual(response['status_code'], requests.codes.ok)
457         self.assertEqual(response['services'][0]['service-name'], 'service1-ODUC4')
458         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
459         self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
460         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
461         self.assertEqual(response['services'][0]['service-layer'], 'wdm')
462         self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
463         self.assertEqual(response['services'][0]['service-a-end']['odu-service-rate'],
464                          'org-openroadm-otn-common-types:ODUCn')
465         self.assertEqual(response['services'][0]['service-z-end']['odu-service-rate'],
466                          'org-openroadm-otn-common-types:ODUCn')
467
468     def test_025_check_interface_ODUC4_xpdra2(self):
469         response = test_utils.check_node_attribute_request(
470             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
471         self.assertEqual(response['status_code'], requests.codes.ok)
472         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
473                         'administrative-state': 'inService',
474                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
475                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
476                         'type': 'org-openroadm-interfaces:otnOdu',
477                         'supporting-port': 'L1'}
478         # SAPI/DAPI are added in the Otu4 renderer
479         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
480                         'rate': 'org-openroadm-otn-common-types:ODUCn',
481                         'expected-dapi': 'Nmbu2MNHvc4=',
482                         'expected-sapi': 'Nmbu2MNHvc4=',
483                         'tx-dapi': 'Nmbu2MNHvc4=',
484                         'tx-sapi': 'LY9PxYJqUbw='}
485
486         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
487                              response['interface'][0])
488         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
489                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
490         self.assertDictEqual(
491             {'payload-type': '22', 'exp-payload-type': '22'},
492             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
493
494     def test_026_check_interface_ODUC4_xpdrc2(self):
495         response = test_utils.check_node_attribute_request(
496             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODUC4')
497         self.assertEqual(response['status_code'], requests.codes.ok)
498         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
499                         'administrative-state': 'inService',
500                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
501                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
502                         'type': 'org-openroadm-interfaces:otnOdu',
503                         'supporting-port': 'L1'}
504         # SAPI/DAPI are added in the Otu4 renderer
505         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
506                         'rate': 'org-openroadm-otn-common-types:ODUCn',
507                         'tx-sapi': 'Nmbu2MNHvc4=',
508                         'tx-dapi': 'LY9PxYJqUbw=',
509                         'expected-sapi': 'LY9PxYJqUbw=',
510                         'expected-dapi': 'LY9PxYJqUbw='
511                         }
512         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
513                              response['interface'][0])
514         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
515                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
516         self.assertDictEqual(
517             {'payload-type': '22', 'exp-payload-type': '22'},
518             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
519
520     def test_027_check_otn_topo_links(self):
521         response = test_utils.get_ietf_network_request('otn-topology', 'config')
522         self.assertEqual(response['status_code'], requests.codes.ok)
523         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
524         for link in response['network'][0]['ietf-network-topology:link']:
525             linkId = link['link-id']
526             if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
527                            'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
528                 self.assertEqual(
529                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
530                 self.assertEqual(
531                     link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
532             elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
533                              'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
534                 self.assertEqual(
535                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
536                 self.assertEqual(
537                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
538                 self.assertEqual(
539                     link['transportpce-networkutils:otn-link-type'], 'ODUC4')
540                 self.assertEqual(
541                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
542                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
543                               ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
544                                'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
545             else:
546                 self.fail("this link should not exist")
547
548     def test_028_check_otn_topo_tp(self):
549         response = test_utils.get_ietf_network_request('otn-topology', 'config')
550         self.assertEqual(response['status_code'], requests.codes.ok)
551         for node in response['network'][0]['node']:
552             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
553                 tpList = node['ietf-network-topology:termination-point']
554                 for tp in tpList:
555                     if tp['tp-id'] == 'XPDR2-NETWORK1':
556                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
557                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
558                         self.assertEqual(
559                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
560                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
561                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
562
563     # test service-create for 100GE service 1 from xpdra2 to xpdrc2
564     def test_029_create_100GE_service_1(self):
565         self.cr_serv_input_data["service-name"] = "service-100GE"
566         self.cr_serv_input_data["connection-type"] = "service"
567         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
568         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
569         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
570         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
571         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
572         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
573         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
574         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
575         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
576         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
577         response = test_utils.transportpce_api_rpc_request(
578             'org-openroadm-service', 'service-create',
579             self.cr_serv_input_data)
580         self.assertEqual(response['status_code'], requests.codes.ok)
581         self.assertIn('PCE calculation in progress',
582                       response['output']['configuration-response-common']['response-message'])
583         time.sleep(self.WAITING)
584
585     def test_030_get_100GE_service_1(self):
586         response = test_utils.get_ordm_serv_list_attr_request(
587             "services", "service-100GE")
588         self.assertEqual(response['status_code'], requests.codes.ok)
589         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
590         self.assertEqual(response['services'][0]['service-name'], 'service-100GE')
591         self.assertEqual(response['services'][0]['connection-type'], 'service')
592         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
593
594     def test_031_check_interface_100GE_CLIENT_xpdra2(self):
595         response = test_utils.check_node_attribute_request(
596             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
597         self.assertEqual(response['status_code'], requests.codes.ok)
598         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
599                         'administrative-state': 'inService',
600                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
601                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
602                         'supporting-port': 'C1'
603                         }
604         input_dict_2 = {'speed': 100000}
605         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
606                              response['interface'][0])
607         self.assertDictEqual(
608             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
609             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
610
611     def test_032_check_interface_ODU4_CLIENT_xpdra2(self):
612         response = test_utils.check_node_attribute_request(
613             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
614         self.assertEqual(response['status_code'], requests.codes.ok)
615         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE',
616                         'administrative-state': 'inService',
617                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
618                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
619                         'type': 'org-openroadm-interfaces:otnOdu',
620                         'supporting-port': 'C1'}
621         input_dict_2 = {
622             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
623             'rate': 'org-openroadm-otn-common-types:ODU4',
624             'monitoring-mode': 'terminated'}
625
626         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
627                              response['interface'][0])
628         self.assertDictEqual(dict(input_dict_2,
629                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
630                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
631         self.assertDictEqual(
632             {'payload-type': '07', 'exp-payload-type': '07'},
633             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
634
635     def test_033_check_interface_ODU4_NETWORK_xpdra2(self):
636         response = test_utils.check_node_attribute_request(
637             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
638         self.assertEqual(response['status_code'], requests.codes.ok)
639         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE',
640                         'administrative-state': 'inService',
641                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
642                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
643                         'type': 'org-openroadm-interfaces:otnOdu',
644                         'supporting-port': 'L1'}
645         input_dict_2 = {
646             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
647             'rate': 'org-openroadm-otn-common-types:ODU4',
648             'monitoring-mode': 'not-terminated'}
649         input_dict_3 = {'trib-port-number': 1}
650
651         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
652                              response['interface'][0])
653         self.assertDictEqual(dict(input_dict_2,
654                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
655                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
656         self.assertDictEqual(dict(input_dict_3,
657                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
658                                       'parent-odu-allocation']),
659                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
660         self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
661                       ['opucn-trib-slots'])
662         self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
663                       ['opucn-trib-slots'])
664
665     def test_034_check_ODU4_connection_xpdra2(self):
666         response = test_utils.check_node_attribute_request(
667             'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
668         self.assertEqual(response['status_code'], requests.codes.ok)
669         input_dict_1 = {
670             'connection-name':
671             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
672             'direction': 'bidirectional'
673         }
674
675         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
676                              response['odu-connection'][0])
677         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE'},
678                              response['odu-connection'][0]['destination'])
679         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE'},
680                              response['odu-connection'][0]['source'])
681
682     def test_035_check_interface_100GE_CLIENT_xpdrc2(self):
683         response = test_utils.check_node_attribute_request(
684             'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
685         self.assertEqual(response['status_code'], requests.codes.ok)
686         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
687                         'administrative-state': 'inService',
688                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
689                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
690                         'supporting-port': 'C1'
691                         }
692         input_dict_2 = {'speed': 100000}
693         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
694                              response['interface'][0])
695         self.assertDictEqual(
696             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
697             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
698
699     def test_036_check_interface_ODU4_CLIENT_xpdrc2(self):
700         response = test_utils.check_node_attribute_request(
701             'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
702         self.assertEqual(response['status_code'], requests.codes.ok)
703         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE',
704                         'administrative-state': 'inService',
705                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
706                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
707                         'type': 'org-openroadm-interfaces:otnOdu',
708                         'supporting-port': 'C1'}
709         input_dict_2 = {
710             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
711             'rate': 'org-openroadm-otn-common-types:ODU4',
712             'monitoring-mode': 'terminated'}
713
714         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
715                              response['interface'][0])
716         self.assertDictEqual(dict(input_dict_2,
717                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
718                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
719         self.assertDictEqual(
720             {'payload-type': '07', 'exp-payload-type': '07'},
721             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
722
723     def test_037_check_interface_ODU4_NETWORK_xpdrc2(self):
724         response = test_utils.check_node_attribute_request(
725             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
726         self.assertEqual(response['status_code'], requests.codes.ok)
727         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE',
728                         'administrative-state': 'inService',
729                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
730                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
731                         'type': 'org-openroadm-interfaces:otnOdu',
732                         'supporting-port': 'C1'}
733         input_dict_2 = {
734             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
735             'rate': 'org-openroadm-otn-common-types:ODU4',
736             'monitoring-mode': 'not-terminated'}
737
738         input_dict_3 = {'trib-port-number': 1}
739
740         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
741                              response['interface'][0])
742         self.assertDictEqual(dict(input_dict_2,
743                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
744                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
745         self.assertDictEqual(dict(input_dict_3,
746                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
747                                       'parent-odu-allocation']),
748                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
749             'parent-odu-allocation'])
750         self.assertIn('1.1',
751                       response['interface'][0][
752                           'org-openroadm-otn-odu-interfaces:odu'][
753                           'parent-odu-allocation']['opucn-trib-slots'])
754         self.assertIn('1.20',
755                       response['interface'][0][
756                           'org-openroadm-otn-odu-interfaces:odu'][
757                           'parent-odu-allocation']['opucn-trib-slots'])
758
759     def test_038_check_ODU4_connection_xpdrc2(self):
760         response = test_utils.check_node_attribute_request(
761             'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
762         self.assertEqual(response['status_code'], requests.codes.ok)
763         input_dict_1 = {
764             'connection-name':
765             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
766             'direction': 'bidirectional'
767         }
768
769         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
770                              response['odu-connection'][0])
771         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE'},
772                              response['odu-connection'][0]['destination'])
773         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE'},
774                              response['odu-connection'][0]['source'])
775
776     def test_039_check_otn_topo_links(self):
777         response = test_utils.get_ietf_network_request('otn-topology', 'config')
778         self.assertEqual(response['status_code'], requests.codes.ok)
779         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
780         for link in response['network'][0]['ietf-network-topology:link']:
781             linkId = link['link-id']
782             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
783                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
784                 self.assertEqual(
785                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
786                 self.assertEqual(
787                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
788
789     def test_040_check_otn_topo_tp(self):
790         response = test_utils.get_ietf_network_request('otn-topology', 'config')
791         self.assertEqual(response['status_code'], requests.codes.ok)
792         for node in response['network'][0]['node']:
793             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
794                 tpList = node['ietf-network-topology:termination-point']
795                 for tp in tpList:
796                     if tp['tp-id'] == 'XPDR2-NETWORK1':
797                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
798                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
799                         tsPoolList = list(range(1, 20))
800                         self.assertNotIn(
801                             tsPoolList, xpdrTpPortConAt['ts-pool'])
802                         self.assertEqual(
803                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
804                         self.assertNotIn(
805                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
806
807     # test service-create for 100GE service 2 from xpdra2 to xpdrc2
808     def test_041_create_100GE_service_2(self):
809         self.cr_serv_input_data["service-name"] = "service-100GE2"
810         self.cr_serv_input_data["connection-type"] = "service"
811         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
812         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
813         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
814         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
815         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
816         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
817         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
818         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
819         response = test_utils.transportpce_api_rpc_request(
820             'org-openroadm-service', 'service-create',
821             self.cr_serv_input_data)
822         self.assertEqual(response['status_code'], requests.codes.ok)
823         self.assertIn('PCE calculation in progress',
824                       response['output']['configuration-response-common']['response-message'])
825         time.sleep(self.WAITING)
826
827     def test_042_get_100GE_service_2(self):
828         response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE2")
829         self.assertEqual(response['status_code'], requests.codes.ok)
830         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
831         self.assertEqual(response['services'][0]['service-name'], 'service-100GE2')
832         self.assertEqual(response['services'][0]['connection-type'], 'service')
833         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
834         time.sleep(1)
835
836     def test_043_check_service_list(self):
837         response = test_utils.get_ordm_serv_list_request()
838         self.assertEqual(response['status_code'], requests.codes.ok)
839         self.assertEqual(len(response['service-list']['services']), 4)
840
841     def test_044_check_otn_topo_links(self):
842         response = test_utils.get_ietf_network_request('otn-topology', 'config')
843         self.assertEqual(response['status_code'], requests.codes.ok)
844         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
845         for link in response['network'][0]['ietf-network-topology:link']:
846             linkId = link['link-id']
847             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
848                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
849                 self.assertEqual(
850                     link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
851                 self.assertEqual(
852                     link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
853
854     def test_045_check_otn_topo_tp(self):
855         response = test_utils.get_ietf_network_request('otn-topology', 'config')
856         self.assertEqual(response['status_code'], requests.codes.ok)
857         for node in response['network'][0]['node']:
858             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
859                 tpList = node['ietf-network-topology:termination-point']
860                 for tp in tpList:
861                     if tp['tp-id'] == 'XPDR2-NETWORK1':
862                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
863                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
864                         tsPoolList = list(range(1, 40))
865                         self.assertNotIn(
866                             tsPoolList, xpdrTpPortConAt['ts-pool'])
867                         self.assertEqual(
868                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
869                         self.assertNotIn(
870                             2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
871
872     def test_046_delete_100GE_service_2(self):
873         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE2"
874         response = test_utils.transportpce_api_rpc_request(
875             'org-openroadm-service', 'service-delete',
876             self.del_serv_input_data)
877         self.assertEqual(response['status_code'], requests.codes.ok)
878         self.assertIn('Renderer service delete in progress',
879                       response['output']['configuration-response-common']['response-message'])
880         time.sleep(self.WAITING)
881
882     def test_047_delete_100GE_service_1(self):
883         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE"
884         response = test_utils.transportpce_api_rpc_request(
885             'org-openroadm-service', 'service-delete',
886             self.del_serv_input_data)
887         self.assertEqual(response['status_code'], requests.codes.ok)
888         self.assertIn('Renderer service delete in progress',
889                       response['output']['configuration-response-common']['response-message'])
890         time.sleep(self.WAITING)
891
892     def test_048_check_service_list(self):
893         response = test_utils.get_ordm_serv_list_request()
894         self.assertEqual(response['status_code'], requests.codes.ok)
895         self.assertEqual(len(response['service-list']['services']), 2)
896
897     def test_049_check_no_ODU4_connection_xpdra2(self):
898         response = test_utils.check_node_request("XPDR-A2")
899         self.assertEqual(response['status_code'], requests.codes.ok)
900         self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
901
902     def test_050_check_no_interface_ODU4_NETWORK_xpdra2(self):
903         response = test_utils.check_node_attribute_request(
904             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
905         self.assertEqual(response['status_code'], requests.codes.conflict)
906
907     def test_051_check_no_interface_ODU4_CLIENT_xpdra2(self):
908         response = test_utils.check_node_attribute_request(
909             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
910         self.assertEqual(response['status_code'], requests.codes.conflict)
911
912     def test_052_check_no_interface_100GE_CLIENT_xpdra2(self):
913         response = test_utils.check_node_attribute_request(
914             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
915         self.assertEqual(response['status_code'], requests.codes.conflict)
916
917     def test_053_check_otn_topo_links(self):
918         response = test_utils.get_ietf_network_request('otn-topology', 'config')
919         self.assertEqual(response['status_code'], requests.codes.ok)
920         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
921         for link in response['network'][0]['ietf-network-topology:link']:
922             linkId = link['link-id']
923             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
924                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
925                 self.assertEqual(
926                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
927                 self.assertEqual(
928                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
929
930     def test_054_check_otn_topo_tp(self):
931         response = test_utils.get_ietf_network_request('otn-topology', 'config')
932         self.assertEqual(response['status_code'], requests.codes.ok)
933         for node in response['network'][0]['node']:
934             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
935                 tpList = node['ietf-network-topology:termination-point']
936                 for tp in tpList:
937                     if tp['tp-id'] == 'XPDR2-NETWORK1':
938                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
939                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
940                         self.assertEqual(
941                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
942
943     # test service-create for 100GE service 3 from xpdra2 client1 to xpdrc2 client2
944     def test_055_create_100GE_service_3(self):
945         self.cr_serv_input_data["service-name"] = "service-100GE3"
946         self.cr_serv_input_data["connection-type"] = "service"
947         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
948         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
949         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
950         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
951         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
952         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
953         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
954         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
955         response = test_utils.transportpce_api_rpc_request(
956             'org-openroadm-service', 'service-create',
957             self.cr_serv_input_data)
958         self.assertEqual(response['status_code'], requests.codes.ok)
959         self.assertIn('PCE calculation in progress',
960                       response['output']['configuration-response-common']['response-message'])
961         time.sleep(self.WAITING)
962
963     def test_056_get_100GE_service_3(self):
964         response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE3")
965         self.assertEqual(response['status_code'], requests.codes.ok)
966         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
967         self.assertEqual(response['services'][0]['service-name'], 'service-100GE3')
968         self.assertEqual(response['services'][0]['connection-type'], 'service')
969         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
970         time.sleep(1)
971
972     def test_057_check_service_list(self):
973         response = test_utils.get_ordm_serv_list_request()
974         self.assertEqual(response['status_code'], requests.codes.ok)
975         self.assertEqual(len(response['service-list']['services']), 3)
976
977     def test_058_check_interface_100GE_CLIENT_xpdra2(self):
978         response = test_utils.check_node_attribute_request(
979             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
980         self.assertEqual(response['status_code'], requests.codes.ok)
981         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
982                         'administrative-state': 'inService',
983                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
984                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
985                         'supporting-port': 'C1'
986                         }
987         input_dict_2 = {'speed': 100000}
988         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
989                              response['interface'][0])
990         self.assertDictEqual(
991             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
992             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
993
994     def test_059_check_interface_ODU4_CLIENT_xpdra2(self):
995         response = test_utils.check_node_attribute_request(
996             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE3')
997         self.assertEqual(response['status_code'], requests.codes.ok)
998         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE3',
999                         'administrative-state': 'inService',
1000                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
1001                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100GE3',
1002                         'type': 'org-openroadm-interfaces:otnOdu',
1003                         'supporting-port': 'C1'}
1004         input_dict_2 = {
1005             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1006             'rate': 'org-openroadm-otn-common-types:ODU4',
1007             'monitoring-mode': 'terminated'}
1008
1009         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1010                              response['interface'][0])
1011         self.assertDictEqual(dict(input_dict_2,
1012                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1013                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1014         self.assertDictEqual(
1015             {'payload-type': '07', 'exp-payload-type': '07'},
1016             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1017
1018     def test_060_check_interface_ODU4_NETWORK_xpdra2(self):
1019         response = test_utils.check_node_attribute_request(
1020             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE3')
1021         self.assertEqual(response['status_code'], requests.codes.ok)
1022         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE3',
1023                         'administrative-state': 'inService',
1024                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
1025                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
1026                         'type': 'org-openroadm-interfaces:otnOdu',
1027                         'supporting-port': 'L1'}
1028         input_dict_2 = {
1029             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1030             'rate': 'org-openroadm-otn-common-types:ODU4',
1031             'monitoring-mode': 'not-terminated'}
1032         input_dict_3 = {'trib-port-number': 1}
1033
1034         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1035                              response['interface'][0])
1036         self.assertDictEqual(dict(input_dict_2,
1037                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1038                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1039         self.assertDictEqual(dict(input_dict_3,
1040                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1041                                       'parent-odu-allocation']),
1042                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1043         self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1044                       ['opucn-trib-slots'])
1045         self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1046                       ['opucn-trib-slots'])
1047
1048     def test_061_check_ODU4_connection_xpdra2(self):
1049         response = test_utils.check_node_attribute_request(
1050             'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
1051         self.assertEqual(response['status_code'], requests.codes.ok)
1052         input_dict_1 = {
1053             'connection-name':
1054             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1055             'direction': 'bidirectional'
1056         }
1057
1058         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1059                              response['odu-connection'][0])
1060         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE3'},
1061                              response['odu-connection'][0]['destination'])
1062         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE3'},
1063                              response['odu-connection'][0]['source'])
1064
1065     def test_062_check_interface_100GE_CLIENT_xpdrc2(self):
1066         response = test_utils.check_node_attribute_request(
1067             'XPDR-C2', 'interface', 'XPDR2-CLIENT2-ETHERNET-100G')
1068         self.assertEqual(response['status_code'], requests.codes.ok)
1069         input_dict_1 = {'name': 'XPDR2-CLIENT2-ETHERNET-100G',
1070                         'administrative-state': 'inService',
1071                         'supporting-circuit-pack-name': '1/2/1/2-PLUG-CLIENT',
1072                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1073                         'supporting-port': 'C1'
1074                         }
1075         input_dict_2 = {'speed': 100000}
1076         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1077                              response['interface'][0])
1078         self.assertDictEqual(
1079             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1080             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1081
1082     def test_063_check_interface_ODU4_CLIENT_xpdrc2(self):
1083         response = test_utils.check_node_attribute_request(
1084             'XPDR-C2', 'interface', 'XPDR2-CLIENT2-ODU4:service-100GE3')
1085         self.assertEqual(response['status_code'], requests.codes.ok)
1086         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE3',
1087                         'administrative-state': 'inService',
1088                         'supporting-circuit-pack-name': '1/2/1/2-PLUG-CLIENT',
1089                         'supporting-interface-list': 'XPDR2-CLIENT2-ETHERNET-100GE3',
1090                         'type': 'org-openroadm-interfaces:otnOdu',
1091                         'supporting-port': 'C1'}
1092         input_dict_2 = {
1093             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1094             'rate': 'org-openroadm-otn-common-types:ODU4',
1095             'monitoring-mode': 'terminated'}
1096
1097         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1098                              response['interface'][0])
1099         self.assertDictEqual(dict(input_dict_2,
1100                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1101                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1102         self.assertDictEqual(
1103             {'payload-type': '07', 'exp-payload-type': '07'},
1104             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1105
1106     def test_064_check_interface_ODU4_NETWORK_xpdrc2(self):
1107         response = test_utils.check_node_attribute_request(
1108             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE3')
1109         self.assertEqual(response['status_code'], requests.codes.ok)
1110         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE3',
1111                         'administrative-state': 'inService',
1112                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
1113                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
1114                         'type': 'org-openroadm-interfaces:otnOdu',
1115                         'supporting-port': 'C1'}
1116         input_dict_2 = {
1117             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1118             'rate': 'org-openroadm-otn-common-types:ODU4',
1119             'monitoring-mode': 'not-terminated'}
1120
1121         input_dict_3 = {'trib-port-number': 1}
1122
1123         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1124                              response['interface'][0])
1125         self.assertDictEqual(dict(input_dict_2,
1126                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1127                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1128         self.assertDictEqual(dict(input_dict_3,
1129                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1130                                       'parent-odu-allocation']),
1131                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1132             'parent-odu-allocation'])
1133         self.assertIn('1.1',
1134                       response['interface'][0][
1135                           'org-openroadm-otn-odu-interfaces:odu'][
1136                           'parent-odu-allocation']['opucn-trib-slots'])
1137         self.assertIn('1.20',
1138                       response['interface'][0][
1139                           'org-openroadm-otn-odu-interfaces:odu'][
1140                           'parent-odu-allocation']['opucn-trib-slots'])
1141
1142     def test_065_check_ODU4_connection_xpdrc2(self):
1143         response = test_utils.check_node_attribute_request(
1144             'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT2-ODU4-x-XPDR2-NETWORK1-ODU4')
1145         self.assertEqual(response['status_code'], requests.codes.ok)
1146         input_dict_1 = {
1147             'connection-name':
1148             'XPDR2-CLIENT2-ODU4-x-XPDR2-NETWORK1-ODU4',
1149             'direction': 'bidirectional'
1150         }
1151
1152         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1153                              response['odu-connection'][0])
1154         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE3'},
1155                              response['odu-connection'][0]['destination'])
1156         self.assertDictEqual({'src-if': 'XPDR2-CLIENT2-ODU4:service-100GE3'},
1157                              response['odu-connection'][0]['source'])
1158
1159     def test_066_check_otn_topo_links(self):
1160         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1161         self.assertEqual(response['status_code'], requests.codes.ok)
1162         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1163         for link in response['network'][0]['ietf-network-topology:link']:
1164             linkId = link['link-id']
1165             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
1166                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
1167                 self.assertEqual(
1168                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
1169                 self.assertEqual(
1170                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1171
1172     def test_067_check_otn_topo_tp(self):
1173         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1174         self.assertEqual(response['status_code'], requests.codes.ok)
1175         for node in response['network'][0]['node']:
1176             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1177                 tpList = node['ietf-network-topology:termination-point']
1178                 for tp in tpList:
1179                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1180                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1181                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
1182                         tsPoolList = list(range(1, 20))
1183                         self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
1184                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
1185                         self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1186
1187     def test_068_delete_100GE_service_3(self):
1188         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE3"
1189         response = test_utils.transportpce_api_rpc_request(
1190             'org-openroadm-service', 'service-delete',
1191             self.del_serv_input_data)
1192         self.assertEqual(response['status_code'], requests.codes.ok)
1193         self.assertIn('Renderer service delete in progress',
1194                       response['output']['configuration-response-common']['response-message'])
1195         time.sleep(self.WAITING)
1196
1197     def test_069_delete_ODUC4_service(self):
1198         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODUC4"
1199         response = test_utils.transportpce_api_rpc_request(
1200             'org-openroadm-service', 'service-delete',
1201             self.del_serv_input_data)
1202         self.assertEqual(response['status_code'], requests.codes.ok)
1203         self.assertIn('Renderer service delete in progress',
1204                       response['output']['configuration-response-common']['response-message'])
1205         time.sleep(self.WAITING)
1206
1207     def test_070_check_service_list(self):
1208         response = test_utils.get_ordm_serv_list_request()
1209         self.assertEqual(response['status_code'], requests.codes.ok)
1210         self.assertEqual(len(response['service-list']['services']), 1)
1211
1212     def test_071_check_no_interface_ODU4_xpdra2(self):
1213         response = test_utils.check_node_attribute_request(
1214             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
1215         self.assertEqual(response['status_code'], requests.codes.conflict)
1216
1217     def test_072_check_otn_topo_links(self):
1218         self.test_022_check_otn_topo_OTUC4_links()
1219
1220     def test_073_check_otn_topo_tp(self):
1221         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1222         self.assertEqual(response['status_code'], requests.codes.ok)
1223         for node in response['network'][0]['node']:
1224             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1225                 tpList = node['ietf-network-topology:termination-point']
1226                 for tp in tpList:
1227                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1228                         self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1229                                          dict.keys(tp))
1230
1231     def test_074_delete_OTUC4_service(self):
1232         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OTUC4"
1233         response = test_utils.transportpce_api_rpc_request(
1234             'org-openroadm-service', 'service-delete',
1235             self.del_serv_input_data)
1236         self.assertEqual(response['status_code'], requests.codes.ok)
1237         self.assertIn('Renderer service delete in progress',
1238                       response['output']['configuration-response-common']['response-message'])
1239         time.sleep(self.WAITING)
1240
1241     def test_075_get_no_service(self):
1242         response = test_utils.get_ordm_serv_list_request()
1243         self.assertEqual(response['status_code'], requests.codes.conflict)
1244         self.assertIn(response['service-list'], (
1245             {
1246                 "error-type": "protocol",
1247                 "error-tag": "data-missing",
1248                 "error-message":
1249                     "Request could not be completed because the relevant data "
1250                     "model content does not exist"
1251             }, {
1252                 "error-type": "application",
1253                 "error-tag": "data-missing",
1254                 "error-message":
1255                     "Request could not be completed because the relevant data "
1256                     "model content does not exist"
1257             }))
1258
1259     def test_076_check_no_interface_OTUC4_xpdra2(self):
1260         response = test_utils.check_node_attribute_request(
1261             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
1262         self.assertEqual(response['status_code'], requests.codes.conflict)
1263
1264     def test_077_check_no_interface_OTSI_xpdra2(self):
1265         response = test_utils.check_node_attribute_request(
1266             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
1267         self.assertEqual(response['status_code'], requests.codes.conflict)
1268
1269     def test_078_check_no_interface_OTSIG_xpdra2(self):
1270         response = test_utils.check_node_attribute_request(
1271             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
1272         self.assertEqual(response['status_code'], requests.codes.conflict)
1273
1274     def test_079_getLinks_OtnTopology(self):
1275         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1276         self.assertEqual(response['status_code'], requests.codes.ok)
1277         self.assertNotIn('ietf-network-topology:link', response['network'][0])
1278
1279     def test_080_check_openroadm_topo_xpdra2(self):
1280         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
1281         self.assertEqual(response['status_code'], requests.codes.ok)
1282         tp = response['node']['ietf-network-topology:termination-point'][0]
1283         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1284         self.assertNotIn('wavelength', dict.keys(
1285             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1286
1287     def test_081_check_openroadm_topology(self):
1288         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1289         self.assertEqual(response['status_code'], requests.codes.ok)
1290         links = response['network'][0]['ietf-network-topology:link']
1291         self.assertEqual(22, len(links), 'Topology should contain 22 links')
1292
1293     def test_082_connect_xpdra2_1_N1_to_roadma_PP2(self):
1294         response = test_utils.transportpce_api_rpc_request(
1295             'transportpce-networkutils', 'init-xpdr-rdm-links',
1296             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1297                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1298         self.assertEqual(response['status_code'], requests.codes.ok)
1299         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1300
1301     def test_083_connect_roadma_PP2_to_xpdra2_1_N1(self):
1302         response = test_utils.transportpce_api_rpc_request(
1303             'transportpce-networkutils', 'init-rdm-xpdr-links',
1304             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1305                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1306         self.assertEqual(response['status_code'], requests.codes.ok)
1307         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1308
1309     def test_084_connect_xpdrc2_1_N1_to_roadmc_PP2(self):
1310         response = test_utils.transportpce_api_rpc_request(
1311             'transportpce-networkutils', 'init-xpdr-rdm-links',
1312             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1313                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1314         self.assertEqual(response['status_code'], requests.codes.ok)
1315         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1316
1317     def test_085_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
1318         response = test_utils.transportpce_api_rpc_request(
1319             'transportpce-networkutils', 'init-rdm-xpdr-links',
1320             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1321                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1322         self.assertEqual(response['status_code'], requests.codes.ok)
1323         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1324
1325     # test service-create for 400GE service from xpdra2 to xpdrc2
1326     def test_086_create_400GE_service(self):
1327         self.cr_serv_input_data["service-name"] = "service-400GE"
1328         self.cr_serv_input_data["service-a-end"]["service-rate"] = "400"
1329         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1330         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1331         self.cr_serv_input_data["service-z-end"]["service-rate"] = "400"
1332         del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"]
1333         del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"]
1334         del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"]
1335         del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"]
1336         del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"]
1337         del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"]
1338         del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"]
1339         del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"]
1340         response = test_utils.transportpce_api_rpc_request(
1341             'org-openroadm-service', 'service-create',
1342             self.cr_serv_input_data)
1343         self.assertEqual(response['status_code'], requests.codes.ok)
1344         time.sleep(self.WAITING)
1345         self.assertIn('PCE calculation in progress',
1346                       response['output']['configuration-response-common']['response-message'])
1347         time.sleep(self.WAITING)
1348
1349     def test_087_get_400GE_service(self):
1350         response = test_utils.get_ordm_serv_list_attr_request("services", "service-400GE")
1351         self.assertEqual(response['status_code'], requests.codes.ok)
1352         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1353         self.assertEqual(response['services'][0]['service-name'], 'service-400GE')
1354         self.assertEqual(response['services'][0]['connection-type'], 'service')
1355         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1356         time.sleep(1)
1357
1358     def test_088_check_xc1_roadma(self):
1359         response = test_utils.check_node_attribute_request(
1360             "ROADM-A1", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1361         self.assertEqual(response['status_code'], requests.codes.ok)
1362         self.assertDictEqual(
1363             dict({
1364                 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1365                 'opticalControlMode': 'gainLoss',
1366                 'target-output-power': -3.0
1367             }, **response['roadm-connections'][0]), response['roadm-connections'][0])
1368         self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-nmc-755:768'}, response['roadm-connections'][0]['source'])
1369         self.assertDictEqual({'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'}, response['roadm-connections'][0]['destination'])
1370         time.sleep(1)
1371
1372     def test_089_check_topo_xpdra2(self):
1373         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR1', 'config')
1374         self.assertEqual(response['status_code'], requests.codes.ok)
1375         liste_tp = response['node']['ietf-network-topology:termination-point']
1376         for ele in liste_tp:
1377             if ele['tp-id'] == 'XPDR1-NETWORK1':
1378                 self.assertEqual(
1379                     196.08125,
1380                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
1381                 self.assertEqual(
1382                     75.0,
1383                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
1384             if ele['tp-id'] == 'XPDR1-CLIENT1':
1385                 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1386         time.sleep(1)
1387
1388     def test_090_check_topo_roadma_SRG1(self):
1389         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
1390         self.assertEqual(response['status_code'], requests.codes.ok)
1391         freq_map = base64.b64decode(
1392             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1393         freq_map_array = [int(x) for x in freq_map]
1394         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1395         liste_tp = response['node']['ietf-network-topology:termination-point']
1396         for ele in liste_tp:
1397             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1398                 freq_map = base64.b64decode(
1399                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1400                 freq_map_array = [int(x) for x in freq_map]
1401                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1402             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1403                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1404         time.sleep(1)
1405
1406     def test_091_check_topo_roadma_DEG1(self):
1407         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
1408         self.assertEqual(response['status_code'], requests.codes.ok)
1409         freq_map = base64.b64decode(
1410             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1411         freq_map_array = [int(x) for x in freq_map]
1412         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1413         liste_tp = response['node']['ietf-network-topology:termination-point']
1414         for ele in liste_tp:
1415             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1416                 freq_map = base64.b64decode(
1417                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1418                 freq_map_array = [int(x) for x in freq_map]
1419                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1420             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1421                 freq_map = base64.b64decode(
1422                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1423                 freq_map_array = [int(x) for x in freq_map]
1424                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1425         time.sleep(1)
1426
1427     def test_092_check_interface_400GE_CLIENT_xpdra2(self):
1428         response = test_utils.check_node_attribute_request(
1429             'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1430         self.assertEqual(response['status_code'], requests.codes.ok)
1431         input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1432                         'administrative-state': 'inService',
1433                         'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1434                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1435                         'supporting-port': 'C1'
1436                         }
1437         input_dict_2 = {'speed': 400000}
1438         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1439                              response['interface'][0])
1440         self.assertDictEqual(
1441             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1442             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1443
1444     def test_093_check_interface_OTSI_xpdra2(self):
1445         response = test_utils.check_node_attribute_request(
1446             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1447         self.assertEqual(response['status_code'], requests.codes.ok)
1448         input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1449                         'administrative-state': 'inService',
1450                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1451                         'type': 'org-openroadm-interfaces:otsi',
1452                         'supporting-port': 'L1'}
1453         input_dict_2 = {
1454             "frequency": 196.0812,
1455             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1456             "fec": "org-openroadm-common-types:ofec",
1457             "transmit-power": -5,
1458             "provision-mode": "explicit",
1459             "modulation-format": "dp-qam16"}
1460
1461         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1462                              response['interface'][0])
1463         self.assertDictEqual(
1464             dict(input_dict_2, **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1465             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1466         self.assertDictEqual(
1467             {"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1468             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1469
1470     def test_094_check_interface_OTSI_GROUP_xpdra2(self):
1471         response = test_utils.check_node_attribute_request(
1472             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1473         self.assertEqual(response['status_code'], requests.codes.ok)
1474         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTSIGROUP-400G',
1475                         'administrative-state': 'inService',
1476                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1477                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1478                         'type': 'org-openroadm-interfaces:otsi-group',
1479                         'supporting-port': 'L1'}
1480         input_dict_2 = {"group-id": 1,
1481                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1482
1483         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1484                              response['interface'][0])
1485         self.assertDictEqual(dict(input_dict_2,
1486                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1487                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1488
1489     def test_095_check_interface_OTUC4_xpdra2(self):
1490         response = test_utils.check_node_attribute_request(
1491             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1492         self.assertEqual(response['status_code'], requests.codes.ok)
1493         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1494                         'administrative-state': 'inService',
1495                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1496                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSIGROUP-400G',
1497                         'type': 'org-openroadm-interfaces:otnOtu',
1498                         'supporting-port': 'L1'}
1499         input_dict_2 = {"tx-sapi": "ANeUjNzWtDLV",
1500                         "expected-dapi": "ANeUjNzWtDLV",
1501                         'tx-dapi': 'AKsqPmWceByv',
1502                         'expected-sapi': 'AKsqPmWceByv',
1503                         "rate": "org-openroadm-otn-common-types:OTUCn",
1504                         "degthr-percentage": 100,
1505                         "tim-detect-mode": "Disabled",
1506                         "otucn-n-rate": 4,
1507                         "degm-intervals": 2}
1508
1509         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1510                              response['interface'][0])
1511         self.assertDictEqual(dict(input_dict_2,
1512                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1513                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1514
1515     def test_096_check_interface_ODUC4_xpdra2(self):
1516         response = test_utils.check_node_attribute_request(
1517             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1518         self.assertEqual(response['status_code'], requests.codes.ok)
1519         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1520                         'administrative-state': 'inService',
1521                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1522                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTUC4',
1523                         'type': 'org-openroadm-interfaces:otnOdu',
1524                         'supporting-port': 'L1',
1525                         'circuit-id': 'TBD',
1526                         'description': 'TBD'}
1527         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1528                         "tim-detect-mode": "Disabled",
1529                         "degm-intervals": 2,
1530                         "degthr-percentage": 100,
1531                         "monitoring-mode": "terminated",
1532                         "rate": "org-openroadm-otn-common-types:ODUCn",
1533                         "oducn-n-rate": 4}
1534         input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1535
1536         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1537                              response['interface'][0])
1538         self.assertDictEqual(dict(input_dict_2,
1539                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1540                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1541         self.assertDictEqual(dict(input_dict_3,
1542                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1543                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1544         self.assertEqual('XPDR1-NETWORK1-OTUC4', response['interface'][0]['supporting-interface-list'][0])
1545
1546     def test_097_check_interface_ODUFLEX_xpdra2(self):
1547         response = test_utils.check_node_attribute_request(
1548             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUFLEX')
1549         self.assertEqual(response['status_code'], requests.codes.ok)
1550         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUFLEX',
1551                         'administrative-state': 'inService',
1552                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1553                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-ODUC4',
1554                         'type': 'org-openroadm-interfaces:otnOdu',
1555                         'supporting-port': 'L1',
1556                         'circuit-id': 'TBD',
1557                         'description': 'TBD'}
1558         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
1559                         "tim-detect-mode": "Disabled",
1560                         "degm-intervals": 2,
1561                         "degthr-percentage": 100,
1562                         "monitoring-mode": "terminated",
1563                         "rate": "org-openroadm-otn-common-types:ODUflex-cbr",
1564                         "oduflex-cbr-service": "org-openroadm-otn-common-types:ODUflex-cbr-400G"
1565                         }
1566         input_dict_3 = {"exp-payload-type": "32", "payload-type": "32"}
1567         input_dict_4 = {'trib-port-number': 1}
1568
1569         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1570                              response['interface'][0])
1571         self.assertDictEqual(dict(input_dict_2,
1572                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1573                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1574         self.assertDictEqual(dict(input_dict_3,
1575                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1576                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1577         self.assertDictEqual(dict(input_dict_4,
1578                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1579                                       'parent-odu-allocation']),
1580                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1581         self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1582                       ['opucn-trib-slots'])
1583         self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1584                       ['opucn-trib-slots'])
1585         self.assertIn('2.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1586                       ['opucn-trib-slots'])
1587         self.assertIn('2.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1588                       ['opucn-trib-slots'])
1589         self.assertIn('3.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1590                       ['opucn-trib-slots'])
1591         self.assertIn('3.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1592                       ['opucn-trib-slots'])
1593         self.assertIn('4.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1594                       ['opucn-trib-slots'])
1595         self.assertIn('4.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1596                       ['opucn-trib-slots'])
1597         self.assertEqual('XPDR1-NETWORK1-ODUC4', response['interface'][0]['supporting-interface-list'][0])
1598
1599     def test_098_delete_400GE_service(self):
1600         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-400GE"
1601         response = test_utils.transportpce_api_rpc_request(
1602             'org-openroadm-service', 'service-delete',
1603             self.del_serv_input_data)
1604         self.assertEqual(response['status_code'], requests.codes.ok)
1605         self.assertIn('Renderer service delete in progress',
1606                       response['output']['configuration-response-common']['response-message'])
1607         time.sleep(self.WAITING)
1608
1609     def test_099_get_no_service(self):
1610         response = test_utils.get_ordm_serv_list_request()
1611         self.assertEqual(response['status_code'], requests.codes.conflict)
1612         self.assertIn(response['service-list'], (
1613             {
1614                 "error-type": "protocol",
1615                 "error-tag": "data-missing",
1616                 "error-message":
1617                     "Request could not be completed because the relevant data "
1618                     "model content does not exist"
1619             }, {
1620                 "error-type": "application",
1621                 "error-tag": "data-missing",
1622                 "error-message":
1623                     "Request could not be completed because the relevant data "
1624                     "model content does not exist"
1625             }))
1626         time.sleep(1)
1627
1628     def test_100_check_no_interface_ODUC4_xpdra2(self):
1629         response = test_utils.check_node_attribute_request(
1630             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1631         self.assertEqual(response['status_code'], requests.codes.conflict)
1632
1633     def test_101_check_no_interface_OTUC4_xpdra2(self):
1634         response = test_utils.check_node_attribute_request(
1635             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1636         self.assertEqual(response['status_code'], requests.codes.conflict)
1637
1638     def test_102_check_no_interface_OTSI_GROUP_xpdra2(self):
1639         response = test_utils.check_node_attribute_request(
1640             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1641         self.assertEqual(response['status_code'], requests.codes.conflict)
1642
1643     def test_103_check_no_interface_OTSI_xpdra2(self):
1644         response = test_utils.check_node_attribute_request(
1645             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1646         self.assertEqual(response['status_code'], requests.codes.conflict)
1647
1648     def test_104_check_no_interface_400GE_CLIENT_xpdra2(self):
1649         response = test_utils.check_node_attribute_request(
1650             'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1651         self.assertEqual(response['status_code'], requests.codes.conflict)
1652
1653     def test_105_disconnect_xponders_from_roadm(self):
1654         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1655         self.assertEqual(response['status_code'], requests.codes.ok)
1656         links = response['network'][0]['ietf-network-topology:link']
1657         for link in links:
1658             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1659                 response = test_utils.del_ietf_network_link_request(
1660                     'openroadm-topology', link['link-id'], 'config')
1661                 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1662
1663     def test_106_disconnect_xpdra2(self):
1664         response = test_utils.unmount_device("XPDR-A2")
1665         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1666
1667     def test_107_disconnect_xpdrc2(self):
1668         response = test_utils.unmount_device("XPDR-C2")
1669         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1670
1671     def test_108_disconnect_roadmA(self):
1672         response = test_utils.unmount_device("ROADM-A1")
1673         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1674
1675     def test_109_disconnect_roadmC(self):
1676         response = test_utils.unmount_device("ROADM-C1")
1677         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1678
1679
1680 if __name__ == "__main__":
1681     unittest.main(verbosity=2)