e6b55c27ccac0a6c39a3c1222785852ce299a0d8
[transportpce.git] / tests / transportpce_tests / hybrid / test02_B100G_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
16
17 import base64
18 import unittest
19 import time
20 import requests
21 # pylint: disable=wrong-import-order
22 import sys
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION_221 = '2.2.1'
34     NODE_VERSION_71 = '7.1'
35
36     cr_serv_input_data = {
37         "sdnc-request-header": {
38             "request-id": "request-1",
39             "rpc-action": "service-create",
40             "request-system-id": "appname"
41         },
42         "service-name": "service1-OTUC4",
43         "common-id": "commonId",
44         "connection-type": "infrastructure",
45         "service-a-end": {
46             "service-rate": "400",
47             "node-id": "XPDR-A2",
48             "service-format": "OTU",
49             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
50             "clli": "NodeSA",
51             "tx-direction": [{
52                 "port": {
53                     "port-device-name": "XPDR-A2-XPDR2",
54                     "port-type": "fixed",
55                     "port-name": "XPDR2-NETWORK1",
56                     "port-rack": "000000.00",
57                     "port-shelf": "Chassis#1"
58                 },
59                 "lgx": {
60                     "lgx-device-name": "Some lgx-device-name",
61                     "lgx-port-name": "Some lgx-port-name",
62                     "lgx-port-rack": "000000.00",
63                     "lgx-port-shelf": "00"
64                 },
65                 "index": 0
66             }],
67             "rx-direction": [{
68                 "port": {
69                     "port-device-name": "XPDR-A2-XPDR2",
70                     "port-type": "fixed",
71                     "port-name": "XPDR2-NETWORK1",
72                     "port-rack": "000000.00",
73                     "port-shelf": "Chassis#1"
74                 },
75                 "lgx": {
76                     "lgx-device-name": "Some lgx-device-name",
77                     "lgx-port-name": "Some lgx-port-name",
78                     "lgx-port-rack": "000000.00",
79                     "lgx-port-shelf": "00"
80                 },
81                 "index": 0
82             }],
83             "optic-type": "gray"
84         },
85         "service-z-end": {
86             "service-rate": "400",
87             "node-id": "XPDR-C2",
88             "service-format": "OTU",
89             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
90             "clli": "NodeSC",
91             "tx-direction": [{
92                 "port": {
93                     "port-device-name": "XPDR-C2-XPDR2",
94                     "port-type": "fixed",
95                     "port-name": "XPDR2-NETWORK1",
96                     "port-rack": "000000.00",
97                     "port-shelf": "Chassis#1"
98                 },
99                 "lgx": {
100                     "lgx-device-name": "Some lgx-device-name",
101                     "lgx-port-name": "Some lgx-port-name",
102                     "lgx-port-rack": "000000.00",
103                     "lgx-port-shelf": "00"
104                 },
105                 "index": 0
106             }],
107             "rx-direction": [{
108                 "port": {
109                     "port-device-name": "XPDR-C2-XPDR2",
110                     "port-type": "fixed",
111                     "port-name": "XPDR2-NETWORK1",
112                     "port-rack": "000000.00",
113                     "port-shelf": "Chassis#1"
114                 },
115                 "lgx": {
116                     "lgx-device-name": "Some lgx-device-name",
117                     "lgx-port-name": "Some lgx-port-name",
118                     "lgx-port-rack": "000000.00",
119                     "lgx-port-shelf": "00"
120                 },
121                 "index": 0
122             }],
123             "optic-type": "gray"
124         },
125         "due-date": "2018-06-15T00:00:01Z",
126         "operator-contact": "pw1234"
127     }
128
129     del_serv_input_data = {
130         "sdnc-request-header": {
131             "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
132             "rpc-action": "service-delete",
133             "request-system-id": "appname",
134             "notification-url": "http://localhost:8585/NotificationServer/notify"},
135         "service-delete-req-info": {
136             "service-name": "TBD",
137             "tail-retention": "no"}
138     }
139
140     @classmethod
141     def setUpClass(cls):
142         cls.processes = test_utils.start_tpce()
143         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
144                                                ('roadma', cls.NODE_VERSION_221),
145                                                ('roadmc', cls.NODE_VERSION_221),
146                                                ('xpdrc2', cls.NODE_VERSION_71)])
147
148     @classmethod
149     def tearDownClass(cls):
150         # pylint: disable=not-an-iterable
151         for process in cls.processes:
152             test_utils.shutdown_process(process)
153         print("all processes killed")
154
155     def setUp(self):
156         time.sleep(1)
157
158     def test_01_connect_xpdra2(self):
159         response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
160         self.assertEqual(response.status_code,
161                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
162
163     def test_02_connect_xpdrc2(self):
164         response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
165         self.assertEqual(response.status_code,
166                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
167
168     def test_03_connect_rdma(self):
169         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
170         self.assertEqual(response.status_code,
171                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
172
173     def test_04_connect_rdmc(self):
174         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
175         self.assertEqual(response.status_code,
176                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
177
178     def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
179         response = test_utils.transportpce_api_rpc_request(
180             'transportpce-networkutils', 'init-xpdr-rdm-links',
181             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
182                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
183         self.assertEqual(response['status_code'], requests.codes.ok)
184         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
185
186     def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
187         response = test_utils.transportpce_api_rpc_request(
188             'transportpce-networkutils', 'init-rdm-xpdr-links',
189             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
190                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
191         self.assertEqual(response['status_code'], requests.codes.ok)
192         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
193
194     def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195         response = test_utils.transportpce_api_rpc_request(
196             'transportpce-networkutils', 'init-xpdr-rdm-links',
197             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
198                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
199         self.assertEqual(response['status_code'], requests.codes.ok)
200         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
201
202     def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
203         response = test_utils.transportpce_api_rpc_request(
204             'transportpce-networkutils', 'init-rdm-xpdr-links',
205             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
206                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
207         self.assertEqual(response['status_code'], requests.codes.ok)
208         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
209
210     def test_09_add_omsAttributes_roadma_roadmc(self):
211         # Config ROADMA-ROADMC oms-attributes
212         data = {"span": {
213             "auto-spanloss": "true",
214             "spanloss-base": 11.4,
215             "spanloss-current": 12,
216             "engineered-spanloss": 12.2,
217             "link-concatenation": [{
218                 "SRLG-Id": 0,
219                 "fiber-type": "smf",
220                 "SRLG-length": 100000,
221                 "pmd": 0.5}]}}
222         response = test_utils.add_oms_attr_request(
223             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224         self.assertEqual(response.status_code, requests.codes.created)
225
226     def test_10_add_omsAttributes_roadmc_roadma(self):
227         # Config ROADMC-ROADMA oms-attributes
228         data = {"span": {
229             "auto-spanloss": "true",
230             "spanloss-base": 11.4,
231             "spanloss-current": 12,
232             "engineered-spanloss": 12.2,
233             "link-concatenation": [{
234                 "SRLG-Id": 0,
235                 "fiber-type": "smf",
236                 "SRLG-length": 100000,
237                 "pmd": 0.5}]}}
238         response = test_utils.add_oms_attr_request(
239             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240         self.assertEqual(response.status_code, requests.codes.created)
241
242     # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
243     def test_11_check_otn_topology(self):
244         response = test_utils.get_ietf_network_request('otn-topology', 'config')
245         self.assertEqual(response['status_code'], requests.codes.ok)
246         self.assertEqual(len(response['network'][0]['node']), 7, 'There should be 7 nodes')
247         self.assertNotIn('ietf-network-topology:link', response['network'][0])
248
249     def test_12_create_OTUC4_service(self):
250         response = test_utils.transportpce_api_rpc_request(
251             'org-openroadm-service', 'service-create',
252             self.cr_serv_input_data)
253         self.assertEqual(response['status_code'], requests.codes.ok)
254         self.assertIn('PCE calculation in progress',
255                       response['output']['configuration-response-common']['response-message'])
256         time.sleep(self.WAITING)
257
258     def test_13_get_OTUC4_service1(self):
259         response = test_utils.get_ordm_serv_list_attr_request(
260             "services", "service1-OTUC4")
261         self.assertEqual(response['status_code'], requests.codes.ok)
262         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
263         self.assertEqual(response['services'][0]['service-name'], 'service1-OTUC4')
264         self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
265         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
266         self.assertEqual(response['services'][0]['service-layer'], 'wdm')
267         self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
268         self.assertEqual(response['services'][0]['service-a-end']['otu-service-rate'],
269                          'org-openroadm-otn-common-types:OTUCn')
270         self.assertEqual(response['services'][0]['service-z-end']['otu-service-rate'],
271                          'org-openroadm-otn-common-types:OTUCn')
272
273     # Check correct configuration of devices
274     def test_14_check_interface_otsi_xpdra2(self):
275         response = test_utils.check_node_attribute_request(
276             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
277         self.assertEqual(response['status_code'], requests.codes.ok)
278         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
279                                    'administrative-state': 'inService',
280                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
281                                    'type': 'org-openroadm-interfaces:otsi',
282                                    'supporting-port': 'L1'
283                                    }, **response['interface'][0]),
284                              response['interface'][0])
285         self.assertDictEqual(
286             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
287                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
288                  **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
289             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
290
291     def test_15_check_interface_OTSI_GROUP_xpdra2(self):
292         response = test_utils.check_node_attribute_request(
293             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
294         self.assertEqual(response['status_code'], requests.codes.ok)
295         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
296                         'administrative-state': 'inService',
297                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
298                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
299                         'type': 'org-openroadm-interfaces:otsi-group',
300                         'supporting-port': 'L1'
301                         }
302         input_dict_2 = {"group-id": 1,
303                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
304                         }
305         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
306                              response['interface'][0])
307         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
308                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
309                              response['interface'][0]
310                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
311
312     def test_16_check_interface_OTUC4_xpdra2(self):
313         response = test_utils.check_node_attribute_request(
314             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
315         self.assertEqual(response['status_code'], requests.codes.ok)
316         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
317                         'administrative-state': 'inService',
318                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
319                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
320                         'type': 'org-openroadm-interfaces:otnOtu',
321                         'supporting-port': 'L1'
322                         }
323         input_dict_2 = {'tx-sapi': 'G54UFNImtOE=',
324                         'expected-dapi': 'G54UFNImtOE=',
325                         'tx-dapi': 'J/FIUzQc+4M=',
326                         'expected-sapi': 'J/FIUzQc+4M=',
327                         'rate': 'org-openroadm-otn-common-types:OTUCn',
328                         'degthr-percentage': 100,
329                         'degm-intervals': 2,
330                         'otucn-n-rate': 4
331                         }
332         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
333                              response['interface'][0])
334         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
335                              response['interface'][0]
336                              ['org-openroadm-otn-otu-interfaces:otu'])
337
338     def test_17_check_interface_otsi_xpdrc2(self):
339         response = test_utils.check_node_attribute_request(
340             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-755:768')
341         self.assertEqual(response['status_code'], requests.codes.ok)
342         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
343                                    'administrative-state': 'inService',
344                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
345                                    'type': 'org-openroadm-interfaces:otsi',
346                                    'supporting-port': 'L1'
347                                    }, **response['interface'][0]),
348                              response['interface'][0])
349
350         self.assertDictEqual(
351             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
352                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
353                  **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
354             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
355
356     def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
357         response = test_utils.check_node_attribute_request(
358             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
359         self.assertEqual(response['status_code'], requests.codes.ok)
360         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
361                         'administrative-state': 'inService',
362                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
363                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
364                         'type': 'org-openroadm-interfaces:otsi-group',
365                         'supporting-port': 'L1'
366                         }
367         input_dict_2 = {"group-id": 1,
368                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
369                         }
370         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
371                              response['interface'][0])
372         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
373                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
374                              response['interface'][0]
375                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
376
377     def test_19_check_interface_OTUC4_xpdrc2(self):
378         response = test_utils.check_node_attribute_request(
379             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTUC4')
380         self.assertEqual(response['status_code'], requests.codes.ok)
381         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
382                         'administrative-state': 'inService',
383                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
384                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
385                         'type': 'org-openroadm-interfaces:otnOtu',
386                         'supporting-port': 'L1'
387                         }
388         input_dict_2 = {'tx-dapi': 'G54UFNImtOE=',
389                         'expected-sapi': 'G54UFNImtOE=',
390                         'tx-sapi': 'J/FIUzQc+4M=',
391                         'expected-dapi': 'J/FIUzQc+4M=',
392                         'rate': 'org-openroadm-otn-common-types:OTUCn',
393                         'degthr-percentage': 100,
394                         'degm-intervals': 2,
395                         'otucn-n-rate': 4
396                         }
397
398         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
399                              response['interface'][0])
400
401         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
402                              response['interface'][0]
403                              ['org-openroadm-otn-otu-interfaces:otu'])
404
405     def test_20_check_no_interface_ODUC4_xpdra2(self):
406         response = test_utils.check_node_attribute_request(
407             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
408         self.assertEqual(response['status_code'], requests.codes.conflict)
409
410     def test_21_check_openroadm_topo_xpdra2(self):
411         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
412         self.assertEqual(response['status_code'], requests.codes.ok)
413         ele = response['node']['ietf-network-topology:termination-point'][0]
414         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
415         self.assertEqual(
416             196.08125,
417             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
418         self.assertEqual(
419             75.0,
420             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
421
422     def test_22_check_otn_topo_OTUC4_links(self):
423         response = test_utils.get_ietf_network_request('otn-topology', 'config')
424         self.assertEqual(response['status_code'], requests.codes.ok)
425         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
426         listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
427                       'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
428         for link in response['network'][0]['ietf-network-topology:link']:
429             self.assertIn(link['link-id'], listLinkId)
430             self.assertEqual(link['transportpce-networkutils:otn-link-type'], 'OTUC4')
431             self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
432             self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
433             self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
434             self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
435
436     # test service-create for ODU4 service from xpdra2 to xpdrc2
437     def test_23_create_ODUC4_service(self):
438         self.cr_serv_input_data["service-name"] = "service1-ODUC4"
439         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
440         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
441         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
442         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
443         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
444         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
445         response = test_utils.transportpce_api_rpc_request(
446             'org-openroadm-service', 'service-create',
447             self.cr_serv_input_data)
448         self.assertEqual(response['status_code'], requests.codes.ok)
449         self.assertIn('PCE calculation in progress',
450                       response['output']['configuration-response-common']['response-message'])
451         time.sleep(self.WAITING)
452
453     def test_24_get_ODUC4_service1(self):
454         response = test_utils.get_ordm_serv_list_attr_request(
455             "services", "service1-ODUC4")
456         self.assertEqual(response['status_code'], requests.codes.ok)
457         self.assertEqual(response['services'][0]['service-name'], 'service1-ODUC4')
458         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
459         self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
460         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
461         self.assertEqual(response['services'][0]['service-layer'], 'wdm')
462         self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
463         self.assertEqual(response['services'][0]['service-a-end']['odu-service-rate'],
464                          'org-openroadm-otn-common-types:ODUCn')
465         self.assertEqual(response['services'][0]['service-z-end']['odu-service-rate'],
466                          'org-openroadm-otn-common-types:ODUCn')
467
468     def test_25_check_interface_ODUC4_xpdra2(self):
469         response = test_utils.check_node_attribute_request(
470             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
471         self.assertEqual(response['status_code'], requests.codes.ok)
472         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
473                         'administrative-state': 'inService',
474                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
475                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
476                         'type': 'org-openroadm-interfaces:otnOdu',
477                         'supporting-port': 'L1'}
478         # SAPI/DAPI are added in the Otu4 renderer
479         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
480                         'rate': 'org-openroadm-otn-common-types:ODUCn',
481                         'expected-dapi': 'Nmbu2MNHvc4=',
482                         'expected-sapi': 'Nmbu2MNHvc4=',
483                         'tx-dapi': 'Nmbu2MNHvc4=',
484                         'tx-sapi': 'LY9PxYJqUbw='}
485
486         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
487                              response['interface'][0])
488         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
489                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
490         self.assertDictEqual(
491             {'payload-type': '22', 'exp-payload-type': '22'},
492             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
493
494     def test_26_check_interface_ODUC4_xpdrc2(self):
495         response = test_utils.check_node_attribute_request(
496             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODUC4')
497         self.assertEqual(response['status_code'], requests.codes.ok)
498         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
499                         'administrative-state': 'inService',
500                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
501                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
502                         'type': 'org-openroadm-interfaces:otnOdu',
503                         'supporting-port': 'L1'}
504         # SAPI/DAPI are added in the Otu4 renderer
505         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
506                         'rate': 'org-openroadm-otn-common-types:ODUCn',
507                         'tx-sapi': 'Nmbu2MNHvc4=',
508                         'tx-dapi': 'LY9PxYJqUbw=',
509                         'expected-sapi': 'LY9PxYJqUbw=',
510                         'expected-dapi': 'LY9PxYJqUbw='
511                         }
512         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
513                              response['interface'][0])
514         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
515                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
516         self.assertDictEqual(
517             {'payload-type': '22', 'exp-payload-type': '22'},
518             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
519
520     def test_27_check_otn_topo_links(self):
521         response = test_utils.get_ietf_network_request('otn-topology', 'config')
522         self.assertEqual(response['status_code'], requests.codes.ok)
523         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
524         for link in response['network'][0]['ietf-network-topology:link']:
525             linkId = link['link-id']
526             if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
527                            'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
528                 self.assertEqual(
529                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
530                 self.assertEqual(
531                     link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
532             elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
533                              'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
534                 self.assertEqual(
535                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
536                 self.assertEqual(
537                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
538                 self.assertEqual(
539                     link['transportpce-networkutils:otn-link-type'], 'ODUC4')
540                 self.assertEqual(
541                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
542                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
543                               ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
544                                'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
545             else:
546                 self.fail("this link should not exist")
547
548     def test_28_check_otn_topo_tp(self):
549         response = test_utils.get_ietf_network_request('otn-topology', 'config')
550         self.assertEqual(response['status_code'], requests.codes.ok)
551         for node in response['network'][0]['node']:
552             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
553                 tpList = node['ietf-network-topology:termination-point']
554                 for tp in tpList:
555                     if tp['tp-id'] == 'XPDR2-NETWORK1':
556                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
557                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
558                         self.assertEqual(
559                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
560                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
561                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
562
563     # test service-create for 100GE service 1 from xpdra2 to xpdrc2
564     def test_29_create_100GE_service_1(self):
565         self.cr_serv_input_data["service-name"] = "service-100GE"
566         self.cr_serv_input_data["connection-type"] = "service"
567         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
568         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
569         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
570         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
571         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
572         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
573         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
574         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
575         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
576         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
577         response = test_utils.transportpce_api_rpc_request(
578             'org-openroadm-service', 'service-create',
579             self.cr_serv_input_data)
580         self.assertEqual(response['status_code'], requests.codes.ok)
581         self.assertIn('PCE calculation in progress',
582                       response['output']['configuration-response-common']['response-message'])
583         time.sleep(self.WAITING)
584
585     def test_30_get_100GE_service_1(self):
586         response = test_utils.get_ordm_serv_list_attr_request(
587             "services", "service-100GE")
588         self.assertEqual(response['status_code'], requests.codes.ok)
589         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
590         self.assertEqual(response['services'][0]['service-name'], 'service-100GE')
591         self.assertEqual(response['services'][0]['connection-type'], 'service')
592         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
593
594     def test_31_check_interface_100GE_CLIENT_xpdra2(self):
595         response = test_utils.check_node_attribute_request(
596             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
597         self.assertEqual(response['status_code'], requests.codes.ok)
598         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
599                         'administrative-state': 'inService',
600                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
601                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
602                         'supporting-port': 'C1'
603                         }
604         input_dict_2 = {'speed': 100000}
605         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
606                              response['interface'][0])
607         self.assertDictEqual(
608             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
609             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
610
611     def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
612         response = test_utils.check_node_attribute_request(
613             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
614         self.assertEqual(response['status_code'], requests.codes.ok)
615         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE',
616                         'administrative-state': 'inService',
617                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
618                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
619                         'type': 'org-openroadm-interfaces:otnOdu',
620                         'supporting-port': 'C1'}
621         input_dict_2 = {
622             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
623             'rate': 'org-openroadm-otn-common-types:ODU4',
624             'monitoring-mode': 'terminated'}
625
626         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
627                              response['interface'][0])
628         self.assertDictEqual(dict(input_dict_2,
629                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
630                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
631         self.assertDictEqual(
632             {'payload-type': '07', 'exp-payload-type': '07'},
633             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
634
635     def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
636         response = test_utils.check_node_attribute_request(
637             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
638         self.assertEqual(response['status_code'], requests.codes.ok)
639         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE',
640                         'administrative-state': 'inService',
641                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
642                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
643                         'type': 'org-openroadm-interfaces:otnOdu',
644                         'supporting-port': 'L1'}
645         input_dict_2 = {
646             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
647             'rate': 'org-openroadm-otn-common-types:ODU4',
648             'monitoring-mode': 'not-terminated'}
649         input_dict_3 = {'trib-port-number': 1}
650
651         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
652                              response['interface'][0])
653         self.assertDictEqual(dict(input_dict_2,
654                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
655                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
656         self.assertDictEqual(dict(input_dict_3,
657                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
658                                       'parent-odu-allocation']),
659                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
660         self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
661                       ['opucn-trib-slots'])
662         self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
663                       ['opucn-trib-slots'])
664
665     def test_34_check_ODU4_connection_xpdra2(self):
666         response = test_utils.check_node_attribute_request(
667             'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
668         self.assertEqual(response['status_code'], requests.codes.ok)
669         input_dict_1 = {
670             'connection-name':
671             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
672             'direction': 'bidirectional'
673         }
674
675         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
676                              response['odu-connection'][0])
677         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE'},
678                              response['odu-connection'][0]['destination'])
679         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE'},
680                              response['odu-connection'][0]['source'])
681
682     def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
683         response = test_utils.check_node_attribute_request(
684             'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
685         self.assertEqual(response['status_code'], requests.codes.ok)
686         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
687                         'administrative-state': 'inService',
688                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
689                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
690                         'supporting-port': 'C1'
691                         }
692         input_dict_2 = {'speed': 100000}
693         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
694                              response['interface'][0])
695         self.assertDictEqual(
696             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
697             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
698
699     def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
700         response = test_utils.check_node_attribute_request(
701             'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
702         self.assertEqual(response['status_code'], requests.codes.ok)
703         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE',
704                         'administrative-state': 'inService',
705                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
706                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
707                         'type': 'org-openroadm-interfaces:otnOdu',
708                         'supporting-port': 'C1'}
709         input_dict_2 = {
710             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
711             'rate': 'org-openroadm-otn-common-types:ODU4',
712             'monitoring-mode': 'terminated'}
713
714         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
715                              response['interface'][0])
716         self.assertDictEqual(dict(input_dict_2,
717                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
718                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
719         self.assertDictEqual(
720             {'payload-type': '07', 'exp-payload-type': '07'},
721             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
722
723     def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
724         response = test_utils.check_node_attribute_request(
725             'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
726         self.assertEqual(response['status_code'], requests.codes.ok)
727         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE',
728                         'administrative-state': 'inService',
729                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
730                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
731                         'type': 'org-openroadm-interfaces:otnOdu',
732                         'supporting-port': 'C1'}
733         input_dict_2 = {
734             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
735             'rate': 'org-openroadm-otn-common-types:ODU4',
736             'monitoring-mode': 'not-terminated'}
737
738         input_dict_3 = {'trib-port-number': 1}
739
740         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
741                              response['interface'][0])
742         self.assertDictEqual(dict(input_dict_2,
743                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
744                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
745         self.assertDictEqual(dict(input_dict_3,
746                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
747                                       'parent-odu-allocation']),
748                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
749             'parent-odu-allocation'])
750         self.assertIn('1.1',
751                       response['interface'][0][
752                           'org-openroadm-otn-odu-interfaces:odu'][
753                           'parent-odu-allocation']['opucn-trib-slots'])
754         self.assertIn('1.20',
755                       response['interface'][0][
756                           'org-openroadm-otn-odu-interfaces:odu'][
757                           'parent-odu-allocation']['opucn-trib-slots'])
758
759     def test_38_check_ODU4_connection_xpdrc2(self):
760         response = test_utils.check_node_attribute_request(
761             'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
762         self.assertEqual(response['status_code'], requests.codes.ok)
763         input_dict_1 = {
764             'connection-name':
765             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
766             'direction': 'bidirectional'
767         }
768
769         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
770                              response['odu-connection'][0])
771         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE'},
772                              response['odu-connection'][0]['destination'])
773         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE'},
774                              response['odu-connection'][0]['source'])
775
776     def test_39_check_otn_topo_links(self):
777         response = test_utils.get_ietf_network_request('otn-topology', 'config')
778         self.assertEqual(response['status_code'], requests.codes.ok)
779         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
780         for link in response['network'][0]['ietf-network-topology:link']:
781             linkId = link['link-id']
782             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
783                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
784                 self.assertEqual(
785                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
786                 self.assertEqual(
787                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
788
789     def test_40_check_otn_topo_tp(self):
790         response = test_utils.get_ietf_network_request('otn-topology', 'config')
791         self.assertEqual(response['status_code'], requests.codes.ok)
792         for node in response['network'][0]['node']:
793             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
794                 tpList = node['ietf-network-topology:termination-point']
795                 for tp in tpList:
796                     if tp['tp-id'] == 'XPDR2-NETWORK1':
797                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
798                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
799                         tsPoolList = list(range(1, 20))
800                         self.assertNotIn(
801                             tsPoolList, xpdrTpPortConAt['ts-pool'])
802                         self.assertEqual(
803                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
804                         self.assertNotIn(
805                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
806
807     # test service-create for 100GE service 2 from xpdra2 to xpdrc2
808     def test_41_create_100GE_service_2(self):
809         self.cr_serv_input_data["service-name"] = "service-100GE2"
810         self.cr_serv_input_data["connection-type"] = "service"
811         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
812         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
813         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
814         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
815         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
816         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
817         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
818         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
819         response = test_utils.transportpce_api_rpc_request(
820             'org-openroadm-service', 'service-create',
821             self.cr_serv_input_data)
822         self.assertEqual(response['status_code'], requests.codes.ok)
823         self.assertIn('PCE calculation in progress',
824                       response['output']['configuration-response-common']['response-message'])
825         time.sleep(self.WAITING)
826
827     def test_42_get_100GE_service_2(self):
828         response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE2")
829         self.assertEqual(response['status_code'], requests.codes.ok)
830         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
831         self.assertEqual(response['services'][0]['service-name'], 'service-100GE2')
832         self.assertEqual(response['services'][0]['connection-type'], 'service')
833         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
834         time.sleep(1)
835
836     def test_43_check_service_list(self):
837         response = test_utils.get_ordm_serv_list_request()
838         self.assertEqual(response['status_code'], requests.codes.ok)
839         self.assertEqual(len(response['service-list']['services']), 4)
840
841     def test_44_check_otn_topo_links(self):
842         response = test_utils.get_ietf_network_request('otn-topology', 'config')
843         self.assertEqual(response['status_code'], requests.codes.ok)
844         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
845         for link in response['network'][0]['ietf-network-topology:link']:
846             linkId = link['link-id']
847             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
848                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
849                 self.assertEqual(
850                     link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
851                 self.assertEqual(
852                     link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
853
854     def test_45_check_otn_topo_tp(self):
855         response = test_utils.get_ietf_network_request('otn-topology', 'config')
856         self.assertEqual(response['status_code'], requests.codes.ok)
857         for node in response['network'][0]['node']:
858             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
859                 tpList = node['ietf-network-topology:termination-point']
860                 for tp in tpList:
861                     if tp['tp-id'] == 'XPDR2-NETWORK1':
862                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
863                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
864                         tsPoolList = list(range(1, 40))
865                         self.assertNotIn(
866                             tsPoolList, xpdrTpPortConAt['ts-pool'])
867                         self.assertEqual(
868                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
869                         self.assertNotIn(
870                             2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
871
872     def test_46_delete_100GE_service_2(self):
873         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE2"
874         response = test_utils.transportpce_api_rpc_request(
875             'org-openroadm-service', 'service-delete',
876             self.del_serv_input_data)
877         self.assertEqual(response['status_code'], requests.codes.ok)
878         self.assertIn('Renderer service delete in progress',
879                       response['output']['configuration-response-common']['response-message'])
880         time.sleep(self.WAITING)
881
882     def test_47_delete_100GE_service_1(self):
883         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE"
884         response = test_utils.transportpce_api_rpc_request(
885             'org-openroadm-service', 'service-delete',
886             self.del_serv_input_data)
887         self.assertEqual(response['status_code'], requests.codes.ok)
888         self.assertIn('Renderer service delete in progress',
889                       response['output']['configuration-response-common']['response-message'])
890         time.sleep(self.WAITING)
891
892     def test_48_check_service_list(self):
893         response = test_utils.get_ordm_serv_list_request()
894         self.assertEqual(response['status_code'], requests.codes.ok)
895         self.assertEqual(len(response['service-list']['services']), 2)
896
897     def test_49_check_no_ODU4_connection_xpdra2(self):
898         response = test_utils.check_node_request("XPDR-A2")
899         self.assertEqual(response['status_code'], requests.codes.ok)
900         self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
901
902     def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
903         response = test_utils.check_node_attribute_request(
904             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
905         self.assertEqual(response['status_code'], requests.codes.conflict)
906
907     def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
908         response = test_utils.check_node_attribute_request(
909             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
910         self.assertEqual(response['status_code'], requests.codes.conflict)
911
912     def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
913         response = test_utils.check_node_attribute_request(
914             'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
915         self.assertEqual(response['status_code'], requests.codes.conflict)
916
917     def test_53_check_otn_topo_links(self):
918         response = test_utils.get_ietf_network_request('otn-topology', 'config')
919         self.assertEqual(response['status_code'], requests.codes.ok)
920         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
921         for link in response['network'][0]['ietf-network-topology:link']:
922             linkId = link['link-id']
923             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
924                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
925                 self.assertEqual(
926                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
927                 self.assertEqual(
928                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
929
930     def test_54_check_otn_topo_tp(self):
931         response = test_utils.get_ietf_network_request('otn-topology', 'config')
932         self.assertEqual(response['status_code'], requests.codes.ok)
933         for node in response['network'][0]['node']:
934             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
935                 tpList = node['ietf-network-topology:termination-point']
936                 for tp in tpList:
937                     if tp['tp-id'] == 'XPDR2-NETWORK1':
938                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
939                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
940                         self.assertEqual(
941                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
942
943     def test_55_delete_ODUC4_service(self):
944         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODUC4"
945         response = test_utils.transportpce_api_rpc_request(
946             'org-openroadm-service', 'service-delete',
947             self.del_serv_input_data)
948         self.assertEqual(response['status_code'], requests.codes.ok)
949         self.assertIn('Renderer service delete in progress',
950                       response['output']['configuration-response-common']['response-message'])
951         time.sleep(self.WAITING)
952
953     def test_56_check_service_list(self):
954         response = test_utils.get_ordm_serv_list_request()
955         self.assertEqual(response['status_code'], requests.codes.ok)
956         self.assertEqual(len(response['service-list']['services']), 1)
957
958     def test_57_check_no_interface_ODU4_xpdra2(self):
959         response = test_utils.check_node_attribute_request(
960             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
961         self.assertEqual(response['status_code'], requests.codes.conflict)
962
963     def test_58_check_otn_topo_links(self):
964         self.test_22_check_otn_topo_OTUC4_links()
965
966     def test_59_check_otn_topo_tp(self):
967         response = test_utils.get_ietf_network_request('otn-topology', 'config')
968         self.assertEqual(response['status_code'], requests.codes.ok)
969         for node in response['network'][0]['node']:
970             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
971                 tpList = node['ietf-network-topology:termination-point']
972                 for tp in tpList:
973                     if tp['tp-id'] == 'XPDR2-NETWORK1':
974                         self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
975                                          dict.keys(tp))
976
977     def test_60_delete_OTUC4_service(self):
978         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OTUC4"
979         response = test_utils.transportpce_api_rpc_request(
980             'org-openroadm-service', 'service-delete',
981             self.del_serv_input_data)
982         self.assertEqual(response['status_code'], requests.codes.ok)
983         self.assertIn('Renderer service delete in progress',
984                       response['output']['configuration-response-common']['response-message'])
985         time.sleep(self.WAITING)
986
987     def test_61_get_no_service(self):
988         response = test_utils.get_ordm_serv_list_request()
989         self.assertEqual(response['status_code'], requests.codes.conflict)
990         self.assertIn(response['service-list'], (
991             {
992                 "error-type": "protocol",
993                 "error-tag": "data-missing",
994                 "error-message":
995                     "Request could not be completed because the relevant data "
996                     "model content does not exist"
997             }, {
998                 "error-type": "application",
999                 "error-tag": "data-missing",
1000                 "error-message":
1001                     "Request could not be completed because the relevant data "
1002                     "model content does not exist"
1003             }))
1004
1005     def test_62_check_no_interface_OTUC4_xpdra2(self):
1006         response = test_utils.check_node_attribute_request(
1007             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
1008         self.assertEqual(response['status_code'], requests.codes.conflict)
1009
1010     def test_63_check_no_interface_OTSI_xpdra2(self):
1011         response = test_utils.check_node_attribute_request(
1012             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
1013         self.assertEqual(response['status_code'], requests.codes.conflict)
1014
1015     def test_64_check_no_interface_OTSIG_xpdra2(self):
1016         response = test_utils.check_node_attribute_request(
1017             'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
1018         self.assertEqual(response['status_code'], requests.codes.conflict)
1019
1020     def test_65_getLinks_OtnTopology(self):
1021         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1022         self.assertEqual(response['status_code'], requests.codes.ok)
1023         self.assertNotIn('ietf-network-topology:link', response['network'][0])
1024
1025     def test_66_check_openroadm_topo_xpdra2(self):
1026         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
1027         self.assertEqual(response['status_code'], requests.codes.ok)
1028         tp = response['node']['ietf-network-topology:termination-point'][0]
1029         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1030         self.assertNotIn('wavelength', dict.keys(
1031             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1032
1033     def test_67_check_openroadm_topology(self):
1034         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1035         self.assertEqual(response['status_code'], requests.codes.ok)
1036         links = response['network'][0]['ietf-network-topology:link']
1037         self.assertEqual(22, len(links), 'Topology should contain 22 links')
1038
1039     def test_68_connect_xprda2_1_N1_to_roadma_PP2(self):
1040         response = test_utils.transportpce_api_rpc_request(
1041             'transportpce-networkutils', 'init-xpdr-rdm-links',
1042             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1043                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1044         self.assertEqual(response['status_code'], requests.codes.ok)
1045         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1046
1047     def test_69_connect_roadma_PP2_to_xpdra2_1_N1(self):
1048         response = test_utils.transportpce_api_rpc_request(
1049             'transportpce-networkutils', 'init-rdm-xpdr-links',
1050             {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1051                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1052         self.assertEqual(response['status_code'], requests.codes.ok)
1053         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1054
1055     def test_70_connect_xprdc2_1_N1_to_roadmc_PP2(self):
1056         response = test_utils.transportpce_api_rpc_request(
1057             'transportpce-networkutils', 'init-xpdr-rdm-links',
1058             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1059                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1060         self.assertEqual(response['status_code'], requests.codes.ok)
1061         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1062
1063     def test_71_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
1064         response = test_utils.transportpce_api_rpc_request(
1065             'transportpce-networkutils', 'init-rdm-xpdr-links',
1066             {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1067                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1068         self.assertEqual(response['status_code'], requests.codes.ok)
1069         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1070
1071     # test service-create for 400GE service from xpdra2 to xpdrc2
1072     def test_72_create_400GE_service(self):
1073         self.cr_serv_input_data["service-name"] = "service-400GE"
1074         self.cr_serv_input_data["service-a-end"]["service-rate"] = "400"
1075         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1076         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1077         self.cr_serv_input_data["service-z-end"]["service-rate"] = "400"
1078         del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"]
1079         del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"]
1080         del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"]
1081         del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"]
1082         del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"]
1083         del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"]
1084         del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"]
1085         del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"]
1086         response = test_utils.transportpce_api_rpc_request(
1087             'org-openroadm-service', 'service-create',
1088             self.cr_serv_input_data)
1089         self.assertEqual(response['status_code'], requests.codes.ok)
1090         time.sleep(self.WAITING)
1091         self.assertIn('PCE calculation in progress',
1092                       response['output']['configuration-response-common']['response-message'])
1093         time.sleep(self.WAITING)
1094
1095     def test_73_get_400GE_service(self):
1096         response = test_utils.get_ordm_serv_list_attr_request("services", "service-400GE")
1097         self.assertEqual(response['status_code'], requests.codes.ok)
1098         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1099         self.assertEqual(response['services'][0]['service-name'], 'service-400GE')
1100         self.assertEqual(response['services'][0]['connection-type'], 'service')
1101         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1102         time.sleep(1)
1103
1104     def test_74_check_xc1_roadma(self):
1105         response = test_utils.check_node_attribute_request(
1106             "ROADM-A1", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1107         self.assertEqual(response['status_code'], requests.codes.ok)
1108         self.assertDictEqual(
1109             dict({
1110                 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1111                 'opticalControlMode': 'gainLoss',
1112                 'target-output-power': -3.0
1113             }, **response['roadm-connections'][0]), response['roadm-connections'][0])
1114         self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-nmc-755:768'}, response['roadm-connections'][0]['source'])
1115         self.assertDictEqual({'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'}, response['roadm-connections'][0]['destination'])
1116         time.sleep(1)
1117
1118     def test_75_check_topo_xpdra2(self):
1119         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR1', 'config')
1120         self.assertEqual(response['status_code'], requests.codes.ok)
1121         liste_tp = response['node']['ietf-network-topology:termination-point']
1122         for ele in liste_tp:
1123             if ele['tp-id'] == 'XPDR1-NETWORK1':
1124                 self.assertEqual(
1125                     196.08125,
1126                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
1127                 self.assertEqual(
1128                     75.0,
1129                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
1130             if ele['tp-id'] == 'XPDR1-CLIENT1':
1131                 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1132         time.sleep(1)
1133
1134     def test_76_check_topo_roadma_SRG1(self):
1135         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
1136         self.assertEqual(response['status_code'], requests.codes.ok)
1137         freq_map = base64.b64decode(
1138             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1139         freq_map_array = [int(x) for x in freq_map]
1140         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1141         liste_tp = response['node']['ietf-network-topology:termination-point']
1142         for ele in liste_tp:
1143             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1144                 freq_map = base64.b64decode(
1145                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1146                 freq_map_array = [int(x) for x in freq_map]
1147                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1148             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1149                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1150         time.sleep(1)
1151
1152     def test_77_check_topo_roadma_DEG1(self):
1153         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
1154         self.assertEqual(response['status_code'], requests.codes.ok)
1155         freq_map = base64.b64decode(
1156             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1157         freq_map_array = [int(x) for x in freq_map]
1158         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1159         liste_tp = response['node']['ietf-network-topology:termination-point']
1160         for ele in liste_tp:
1161             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1162                 freq_map = base64.b64decode(
1163                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1164                 freq_map_array = [int(x) for x in freq_map]
1165                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1166             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1167                 freq_map = base64.b64decode(
1168                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1169                 freq_map_array = [int(x) for x in freq_map]
1170                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1171         time.sleep(1)
1172
1173     def test_78_check_interface_400GE_CLIENT_xpdra2(self):
1174         response = test_utils.check_node_attribute_request(
1175             'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1176         self.assertEqual(response['status_code'], requests.codes.ok)
1177         input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1178                         'administrative-state': 'inService',
1179                         'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1180                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1181                         'supporting-port': 'C1'
1182                         }
1183         input_dict_2 = {'speed': 400000}
1184         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1185                              response['interface'][0])
1186         self.assertDictEqual(
1187             dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1188             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1189
1190     def test_79_check_interface_OTSI_xpdra2(self):
1191         response = test_utils.check_node_attribute_request(
1192             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1193         self.assertEqual(response['status_code'], requests.codes.ok)
1194         input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1195                         'administrative-state': 'inService',
1196                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1197                         'type': 'org-openroadm-interfaces:otsi',
1198                         'supporting-port': 'L1'}
1199         input_dict_2 = {
1200             "frequency": 196.0812,
1201             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1202             "fec": "org-openroadm-common-types:ofec",
1203             "transmit-power": -5,
1204             "provision-mode": "explicit",
1205             "modulation-format": "dp-qam16"}
1206
1207         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1208                              response['interface'][0])
1209         self.assertDictEqual(
1210             dict(input_dict_2, **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1211             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1212         self.assertDictEqual(
1213             {"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1214             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1215
1216     def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1217         response = test_utils.check_node_attribute_request(
1218             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1219         self.assertEqual(response['status_code'], requests.codes.ok)
1220         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTSIGROUP-400G',
1221                         'administrative-state': 'inService',
1222                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1223                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1224                         'type': 'org-openroadm-interfaces:otsi-group',
1225                         'supporting-port': 'L1'}
1226         input_dict_2 = {"group-id": 1,
1227                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1228
1229         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1230                              response['interface'][0])
1231         self.assertDictEqual(dict(input_dict_2,
1232                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1233                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1234
1235     def test_81_check_interface_OTUC4_xpdra2(self):
1236         response = test_utils.check_node_attribute_request(
1237             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1238         self.assertEqual(response['status_code'], requests.codes.ok)
1239         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1240                         'administrative-state': 'inService',
1241                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1242                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSIGROUP-400G',
1243                         'type': 'org-openroadm-interfaces:otnOtu',
1244                         'supporting-port': 'L1'}
1245         input_dict_2 = {"tx-sapi": "ANeUjNzWtDLV",
1246                         "expected-dapi": "ANeUjNzWtDLV",
1247                         'tx-dapi': 'AKsqPmWceByv',
1248                         'expected-sapi': 'AKsqPmWceByv',
1249                         "rate": "org-openroadm-otn-common-types:OTUCn",
1250                         "degthr-percentage": 100,
1251                         "tim-detect-mode": "Disabled",
1252                         "otucn-n-rate": 4,
1253                         "degm-intervals": 2}
1254
1255         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1256                              response['interface'][0])
1257         self.assertDictEqual(dict(input_dict_2,
1258                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1259                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1260
1261     def test_82_check_interface_ODUC4_xpdra2(self):
1262         response = test_utils.check_node_attribute_request(
1263             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1264         self.assertEqual(response['status_code'], requests.codes.ok)
1265         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1266                         'administrative-state': 'inService',
1267                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1268                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTUC4',
1269                         'type': 'org-openroadm-interfaces:otnOdu',
1270                         'supporting-port': 'L1',
1271                         'circuit-id': 'TBD',
1272                         'description': 'TBD'}
1273         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1274                         "tim-detect-mode": "Disabled",
1275                         "degm-intervals": 2,
1276                         "degthr-percentage": 100,
1277                         "monitoring-mode": "terminated",
1278                         "rate": "org-openroadm-otn-common-types:ODUCn",
1279                         "oducn-n-rate": 4}
1280         input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1281
1282         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1283                              response['interface'][0])
1284         self.assertDictEqual(dict(input_dict_2,
1285                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1286                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1287         self.assertDictEqual(dict(input_dict_3,
1288                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1289                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1290         self.assertEqual('XPDR1-NETWORK1-OTUC4', response['interface'][0]['supporting-interface-list'][0])
1291
1292     def test_82a_check_interface_ODUFLEX_xpdra2(self):
1293         response = test_utils.check_node_attribute_request(
1294             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUFLEX')
1295         self.assertEqual(response['status_code'], requests.codes.ok)
1296         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUFLEX',
1297                         'administrative-state': 'inService',
1298                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1299                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-ODUC4',
1300                         'type': 'org-openroadm-interfaces:otnOdu',
1301                         'supporting-port': 'L1',
1302                         'circuit-id': 'TBD',
1303                         'description': 'TBD'}
1304         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
1305                         "tim-detect-mode": "Disabled",
1306                         "degm-intervals": 2,
1307                         "degthr-percentage": 100,
1308                         "monitoring-mode": "terminated",
1309                         "rate": "org-openroadm-otn-common-types:ODUflex-cbr",
1310                         "oduflex-cbr-service": "org-openroadm-otn-common-types:ODUflex-cbr-400G"
1311                         }
1312         input_dict_3 = {"exp-payload-type": "32", "payload-type": "32"}
1313         input_dict_4 = {'trib-port-number': 1}
1314
1315         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1316                              response['interface'][0])
1317         self.assertDictEqual(dict(input_dict_2,
1318                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1319                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1320         self.assertDictEqual(dict(input_dict_3,
1321                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1322                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1323         self.assertDictEqual(dict(input_dict_4,
1324                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1325                                       'parent-odu-allocation']),
1326                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1327         self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1328                       ['opucn-trib-slots'])
1329         self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1330                       ['opucn-trib-slots'])
1331         self.assertIn('2.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1332                       ['opucn-trib-slots'])
1333         self.assertIn('2.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1334                       ['opucn-trib-slots'])
1335         self.assertIn('3.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1336                       ['opucn-trib-slots'])
1337         self.assertIn('3.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1338                       ['opucn-trib-slots'])
1339         self.assertIn('4.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1340                       ['opucn-trib-slots'])
1341         self.assertIn('4.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1342                       ['opucn-trib-slots'])
1343         self.assertEqual('XPDR1-NETWORK1-ODUC4', response['interface'][0]['supporting-interface-list'][0])
1344
1345     def test_83_delete_400GE_service(self):
1346         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-400GE"
1347         response = test_utils.transportpce_api_rpc_request(
1348             'org-openroadm-service', 'service-delete',
1349             self.del_serv_input_data)
1350         self.assertEqual(response['status_code'], requests.codes.ok)
1351         self.assertIn('Renderer service delete in progress',
1352                       response['output']['configuration-response-common']['response-message'])
1353         time.sleep(self.WAITING)
1354
1355     def test_84_get_no_service(self):
1356         response = test_utils.get_ordm_serv_list_request()
1357         self.assertEqual(response['status_code'], requests.codes.conflict)
1358         self.assertIn(response['service-list'], (
1359             {
1360                 "error-type": "protocol",
1361                 "error-tag": "data-missing",
1362                 "error-message":
1363                     "Request could not be completed because the relevant data "
1364                     "model content does not exist"
1365             }, {
1366                 "error-type": "application",
1367                 "error-tag": "data-missing",
1368                 "error-message":
1369                     "Request could not be completed because the relevant data "
1370                     "model content does not exist"
1371             }))
1372         time.sleep(1)
1373
1374     def test_85_check_no_interface_ODUC4_xpdra2(self):
1375         response = test_utils.check_node_attribute_request(
1376             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1377         self.assertEqual(response['status_code'], requests.codes.conflict)
1378
1379     def test_86_check_no_interface_OTUC4_xpdra2(self):
1380         response = test_utils.check_node_attribute_request(
1381             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1382         self.assertEqual(response['status_code'], requests.codes.conflict)
1383
1384     def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1385         response = test_utils.check_node_attribute_request(
1386             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1387         self.assertEqual(response['status_code'], requests.codes.conflict)
1388
1389     def test_88_check_no_interface_OTSI_xpdra2(self):
1390         response = test_utils.check_node_attribute_request(
1391             'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1392         self.assertEqual(response['status_code'], requests.codes.conflict)
1393
1394     def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1395         response = test_utils.check_node_attribute_request(
1396             'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1397         self.assertEqual(response['status_code'], requests.codes.conflict)
1398
1399     def test_90_disconnect_xponders_from_roadm(self):
1400         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1401         self.assertEqual(response['status_code'], requests.codes.ok)
1402         links = response['network'][0]['ietf-network-topology:link']
1403         for link in links:
1404             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1405                 response = test_utils.del_ietf_network_link_request(
1406                     'openroadm-topology', link['link-id'], 'config')
1407                 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1408
1409     def test_91_disconnect_xpdra2(self):
1410         response = test_utils.unmount_device("XPDR-A2")
1411         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1412
1413     def test_92_disconnect_xpdrc2(self):
1414         response = test_utils.unmount_device("XPDR-C2")
1415         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1416
1417     def test_93_disconnect_roadmA(self):
1418         response = test_utils.unmount_device("ROADM-A1")
1419         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1420
1421     def test_94_disconnect_roadmC(self):
1422         response = test_utils.unmount_device("ROADM-C1")
1423         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1424
1425
1426 if __name__ == "__main__":
1427     unittest.main(verbosity=2)