Replace tpce-topology yang by existing ordmodels
[transportpce.git] / tests / transportpce_tests / hybrid / test02_B100G_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
16
17 import base64
18 import unittest
19 import time
20 import requests
21 # pylint: disable=wrong-import-order
22 import sys
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION_221 = '2.2.1'
34     NODE_VERSION_71 = '7.1'
35
36     cr_serv_sample_data = {"input": {
37         "sdnc-request-header": {
38             "request-id": "request-1",
39             "rpc-action": "service-create",
40             "request-system-id": "appname"
41         },
42         "service-name": "service1-OTUC4",
43         "common-id": "commonId",
44         "connection-type": "infrastructure",
45         "service-a-end": {
46             "service-rate": "400",
47             "node-id": "XPDR-A2",
48             "service-format": "OTU",
49             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
50             "clli": "NodeSA",
51             "tx-direction": [{
52                 "port": {
53                     "port-device-name": "XPDR-A2-XPDR2",
54                     "port-type": "fixed",
55                     "port-name": "XPDR2-NETWORK1",
56                     "port-rack": "000000.00",
57                     "port-shelf": "Chassis#1"
58                 },
59                 "lgx": {
60                     "lgx-device-name": "Some lgx-device-name",
61                     "lgx-port-name": "Some lgx-port-name",
62                     "lgx-port-rack": "000000.00",
63                     "lgx-port-shelf": "00"
64                 },
65                 "index": 0
66             }],
67             "rx-direction": [{
68                 "port": {
69                     "port-device-name": "XPDR-A2-XPDR2",
70                     "port-type": "fixed",
71                     "port-name": "XPDR2-NETWORK1",
72                     "port-rack": "000000.00",
73                     "port-shelf": "Chassis#1"
74                 },
75                 "lgx": {
76                     "lgx-device-name": "Some lgx-device-name",
77                     "lgx-port-name": "Some lgx-port-name",
78                     "lgx-port-rack": "000000.00",
79                     "lgx-port-shelf": "00"
80                 },
81                 "index": 0
82             }],
83             "optic-type": "gray"
84         },
85         "service-z-end": {
86             "service-rate": "400",
87             "node-id": "XPDR-C2",
88             "service-format": "OTU",
89             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
90             "clli": "NodeSC",
91             "tx-direction": [{
92                 "port": {
93                     "port-device-name": "XPDR-C2-XPDR2",
94                     "port-type": "fixed",
95                     "port-name": "XPDR2-NETWORK1",
96                     "port-rack": "000000.00",
97                     "port-shelf": "Chassis#1"
98                 },
99                 "lgx": {
100                     "lgx-device-name": "Some lgx-device-name",
101                     "lgx-port-name": "Some lgx-port-name",
102                     "lgx-port-rack": "000000.00",
103                     "lgx-port-shelf": "00"
104                 },
105                 "index": 0
106             }],
107             "rx-direction": [{
108                 "port": {
109                     "port-device-name": "XPDR-C2-XPDR2",
110                     "port-type": "fixed",
111                     "port-name": "XPDR2-NETWORK1",
112                     "port-rack": "000000.00",
113                     "port-shelf": "Chassis#1"
114                 },
115                 "lgx": {
116                     "lgx-device-name": "Some lgx-device-name",
117                     "lgx-port-name": "Some lgx-port-name",
118                     "lgx-port-rack": "000000.00",
119                     "lgx-port-shelf": "00"
120                 },
121                 "index": 0
122             }],
123             "optic-type": "gray"
124         },
125         "due-date": "2018-06-15T00:00:01Z",
126         "operator-contact": "pw1234"
127     }
128     }
129
130     @classmethod
131     def setUpClass(cls):
132         cls.processes = test_utils.start_tpce()
133         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
134                                                ('roadma', cls.NODE_VERSION_221),
135                                                ('roadmc', cls.NODE_VERSION_221),
136                                                ('xpdrc2', cls.NODE_VERSION_71)])
137
138     @classmethod
139     def tearDownClass(cls):
140         # pylint: disable=not-an-iterable
141         for process in cls.processes:
142             test_utils.shutdown_process(process)
143         print("all processes killed")
144
145     def setUp(self):
146         time.sleep(2)
147
148     def test_01_connect_xpdra2(self):
149         response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
150         self.assertEqual(response.status_code,
151                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152
153     def test_02_connect_xpdrc2(self):
154         response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
155         self.assertEqual(response.status_code,
156                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157
158     def test_03_connect_rdma(self):
159         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
160         self.assertEqual(response.status_code,
161                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
162
163     def test_04_connect_rdmc(self):
164         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
165         self.assertEqual(response.status_code,
166                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
167
168     def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
169         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
170                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
171         self.assertEqual(response.status_code, requests.codes.ok)
172         res = response.json()
173         self.assertIn('Xponder Roadm Link created successfully',
174                       res["output"]["result"])
175         time.sleep(2)
176
177     def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
178         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
179                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
180         self.assertEqual(response.status_code, requests.codes.ok)
181         res = response.json()
182         self.assertIn('Roadm Xponder links created successfully',
183                       res["output"]["result"])
184         time.sleep(2)
185
186     def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
187         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
188                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191         self.assertIn('Xponder Roadm Link created successfully',
192                       res["output"]["result"])
193         time.sleep(2)
194
195     def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
196         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
197                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
198         self.assertEqual(response.status_code, requests.codes.ok)
199         res = response.json()
200         self.assertIn('Roadm Xponder links created successfully',
201                       res["output"]["result"])
202         time.sleep(2)
203
204     def test_09_add_omsAttributes_roadma_roadmc(self):
205         # Config ROADMA-ROADMC oms-attributes
206         data = {"span": {
207             "auto-spanloss": "true",
208             "spanloss-base": 11.4,
209             "spanloss-current": 12,
210             "engineered-spanloss": 12.2,
211             "link-concatenation": [{
212                 "SRLG-Id": 0,
213                 "fiber-type": "smf",
214                 "SRLG-length": 100000,
215                 "pmd": 0.5}]}}
216         response = test_utils.add_oms_attr_request(
217             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
218         self.assertEqual(response.status_code, requests.codes.created)
219
220     def test_10_add_omsAttributes_roadmc_roadma(self):
221         # Config ROADMC-ROADMA oms-attributes
222         data = {"span": {
223             "auto-spanloss": "true",
224             "spanloss-base": 11.4,
225             "spanloss-current": 12,
226             "engineered-spanloss": 12.2,
227             "link-concatenation": [{
228                 "SRLG-Id": 0,
229                 "fiber-type": "smf",
230                 "SRLG-length": 100000,
231                 "pmd": 0.5}]}}
232         response = test_utils.add_oms_attr_request(
233             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
234         self.assertEqual(response.status_code, requests.codes.created)
235
236 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
237     def test_11_check_otn_topology(self):
238         response = test_utils.get_otn_topo_request()
239         self.assertEqual(response.status_code, requests.codes.ok)
240         res = response.json()
241         nbNode = len(res['network'][0]['node'])
242         self.assertEqual(nbNode, 6, 'There should be 6 nodes')
243         self.assertNotIn('ietf-network-topology:link', res['network'][0])
244
245     def test_12_create_OTUC4_service(self):
246         response = test_utils.service_create_request(self.cr_serv_sample_data)
247         self.assertEqual(response.status_code, requests.codes.ok)
248         res = response.json()
249         self.assertIn('PCE calculation in progress',
250                       res['output']['configuration-response-common']['response-message'])
251         time.sleep(self.WAITING)
252
253     def test_13_get_OTUC4_service1(self):
254         response = test_utils.get_service_list_request(
255             "services/service1-OTUC4")
256         self.assertEqual(response.status_code, requests.codes.ok)
257         res = response.json()
258         self.assertEqual(
259             res['services'][0]['administrative-state'], 'inService')
260         self.assertEqual(
261             res['services'][0]['service-name'], 'service1-OTUC4')
262         self.assertEqual(
263             res['services'][0]['connection-type'], 'infrastructure')
264         self.assertEqual(
265             res['services'][0]['lifecycle-state'], 'planned')
266         self.assertEqual(
267             res['services'][0]['service-layer'], 'wdm')
268         self.assertEqual(
269             res['services'][0]['service-a-end']['service-rate'], 400)
270         self.assertEqual(
271             res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
272         time.sleep(2)
273         self.assertEqual(
274             res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
275         time.sleep(2)
276
277     # Check correct configuration of devices
278     def test_14_check_interface_otsi_xpdra2(self):
279         response = test_utils.check_netconf_node_request(
280             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
281         self.assertEqual(response.status_code, requests.codes.ok)
282         res = response.json()
283         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
284                                    'administrative-state': 'inService',
285                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
286                                    'type': 'org-openroadm-interfaces:otsi',
287                                    'supporting-port': 'L1'
288                                    }, **res['interface'][0]),
289                              res['interface'][0])
290
291         self.assertDictEqual(
292             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
293                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
294                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
295             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
296
297     def test_15_check_interface_OTSI_GROUP_xpdra2(self):
298         response = test_utils.check_netconf_node_request(
299             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-400G")
300         self.assertEqual(response.status_code, requests.codes.ok)
301         res = response.json()
302         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
303                         'administrative-state': 'inService',
304                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
305                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
306                         'type': 'org-openroadm-interfaces:otsi-group',
307                         'supporting-port': 'L1'
308                         }
309         input_dict_2 = {"group-id": 1,
310                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
311                         }
312         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
313                              res['interface'][0])
314         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
315                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
316                              res['interface'][0]
317                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
318
319     def test_16_check_interface_OTUC4_xpdra2(self):
320         response = test_utils.check_netconf_node_request(
321             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
322         self.assertEqual(response.status_code, requests.codes.ok)
323         res = response.json()
324         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
325                         'administrative-state': 'inService',
326                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
327                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
328                         'type': 'org-openroadm-interfaces:otnOtu',
329                         'supporting-port': 'L1'
330                         }
331         input_dict_2 = {'tx-sapi': 'G54UFNImtOE=',
332                         'expected-dapi': 'G54UFNImtOE=',
333                         'tx-dapi': 'J/FIUzQc+4M=',
334                         'expected-sapi': 'J/FIUzQc+4M=',
335                         'rate': 'org-openroadm-otn-common-types:OTUCn',
336                         'degthr-percentage': 100,
337                         'degm-intervals': 2,
338                         'otucn-n-rate': 4
339                         }
340         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
341                              res['interface'][0])
342         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
343                              res['interface'][0]
344                              ['org-openroadm-otn-otu-interfaces:otu'])
345
346     def test_17_check_interface_otsi_xpdrc2(self):
347         response = test_utils.check_netconf_node_request(
348             "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
349         self.assertEqual(response.status_code, requests.codes.ok)
350         res = response.json()
351         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
352                                    'administrative-state': 'inService',
353                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
354                                    'type': 'org-openroadm-interfaces:otsi',
355                                    'supporting-port': 'L1'
356                                    }, **res['interface'][0]),
357                              res['interface'][0])
358
359         self.assertDictEqual(
360             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
361                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
362                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
363             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
364
365     def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
366         response = test_utils.check_netconf_node_request(
367             "XPDR-C2", "interface/XPDR2-NETWORK1-OTSIGROUP-400G")
368         self.assertEqual(response.status_code, requests.codes.ok)
369         res = response.json()
370         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
371                         'administrative-state': 'inService',
372                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
373                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
374                         'type': 'org-openroadm-interfaces:otsi-group',
375                         'supporting-port': 'L1'
376                         }
377         input_dict_2 = {"group-id": 1,
378                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
379                         }
380         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
381                              res['interface'][0])
382         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
383                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
384                              res['interface'][0]
385                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
386
387     def test_19_check_interface_OTUC4_xpdrc2(self):
388         response = test_utils.check_netconf_node_request(
389             "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
390         self.assertEqual(response.status_code, requests.codes.ok)
391         res = response.json()
392         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
393                         'administrative-state': 'inService',
394                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
395                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
396                         'type': 'org-openroadm-interfaces:otnOtu',
397                         'supporting-port': 'L1'
398                         }
399         input_dict_2 = {'tx-dapi': 'G54UFNImtOE=',
400                         'expected-sapi': 'G54UFNImtOE=',
401                         'tx-sapi': 'J/FIUzQc+4M=',
402                         'expected-dapi': 'J/FIUzQc+4M=',
403                         'rate': 'org-openroadm-otn-common-types:OTUCn',
404                         'degthr-percentage': 100,
405                         'degm-intervals': 2,
406                         'otucn-n-rate': 4
407                         }
408
409         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
410                              res['interface'][0])
411
412         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
413                              res['interface'][0]
414                              ['org-openroadm-otn-otu-interfaces:otu'])
415
416     def test_20_check_no_interface_ODUC4_xpdra2(self):
417         response = test_utils.check_netconf_node_request(
418             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
419         self.assertEqual(response.status_code, requests.codes.conflict)
420         res = response.json()
421         self.assertIn(
422             {"error-type": "application", "error-tag": "data-missing",
423              "error-message": "Request could not be completed because the relevant data model content does not exist"},
424             res['errors']['error'])
425
426     def test_21_check_openroadm_topo_xpdra2(self):
427         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
428         self.assertEqual(response.status_code, requests.codes.ok)
429         res = response.json()
430         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
431         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
432         self.assertEqual({'frequency': 196.08125,
433                           'width': 75},
434                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
435         time.sleep(3)
436
437     def test_22_check_otn_topo_OTUC4_links(self):
438         response = test_utils.get_otn_topo_request()
439         self.assertEqual(response.status_code, requests.codes.ok)
440         res = response.json()
441         nb_links = len(res['network'][0]['ietf-network-topology:link'])
442         self.assertEqual(nb_links, 2)
443         listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
444                       'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
445         for link in res['network'][0]['ietf-network-topology:link']:
446             self.assertIn(link['link-id'], listLinkId)
447             self.assertEqual(
448                 link['transportpce-networkutils:otn-link-type'], 'OTUC4')
449             self.assertEqual(
450                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
451             self.assertEqual(
452                 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
453             self.assertEqual(
454                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
455             self.assertIn(
456                 link['org-openroadm-common-network:opposite-link'], listLinkId)
457
458 # test service-create for ODU4 service from xpdra2 to xpdrc2
459     def test_23_create_ODUC4_service(self):
460         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
461         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
462         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
463         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
464         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
465         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
466         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
467
468         response = test_utils.service_create_request(self.cr_serv_sample_data)
469         self.assertEqual(response.status_code, requests.codes.ok)
470         res = response.json()
471         self.assertIn('PCE calculation in progress',
472                       res['output']['configuration-response-common']['response-message'])
473         time.sleep(self.WAITING)
474
475     def test_24_get_ODUC4_service1(self):
476         response = test_utils.get_service_list_request(
477             "services/service1-ODUC4")
478         self.assertEqual(response.status_code, requests.codes.ok)
479         res = response.json()
480         self.assertEqual(
481             res['services'][0]['administrative-state'], 'inService')
482         self.assertEqual(
483             res['services'][0]['service-name'], 'service1-ODUC4')
484         self.assertEqual(
485             res['services'][0]['connection-type'], 'infrastructure')
486         self.assertEqual(
487             res['services'][0]['lifecycle-state'], 'planned')
488         self.assertEqual(
489             res['services'][0]['service-layer'], 'wdm')
490         self.assertEqual(
491             res['services'][0]['service-a-end']['service-rate'], 400)
492         self.assertEqual(
493             res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
494         self.assertEqual(
495             res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
496         time.sleep(2)
497
498     def test_25_check_interface_ODUC4_xpdra2(self):
499         response = test_utils.check_netconf_node_request(
500             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
501         self.assertEqual(response.status_code, requests.codes.ok)
502         res = response.json()
503         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
504                         'administrative-state': 'inService',
505                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
506                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
507                         'type': 'org-openroadm-interfaces:otnOdu',
508                         'supporting-port': 'L1'}
509         # SAPI/DAPI are added in the Otu4 renderer
510         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
511                         'rate': 'org-openroadm-otn-common-types:ODUCn',
512                         'expected-dapi': 'Nmbu2MNHvc4=',
513                         'expected-sapi': 'Nmbu2MNHvc4=',
514                         'tx-dapi': 'Nmbu2MNHvc4=',
515                         'tx-sapi': 'LY9PxYJqUbw='}
516
517         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
518                              res['interface'][0])
519         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
520                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
521         self.assertDictEqual(
522             {'payload-type': '22', 'exp-payload-type': '22'},
523             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
524
525     def test_26_check_interface_ODUC4_xpdrc2(self):
526         response = test_utils.check_netconf_node_request(
527             "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
528         self.assertEqual(response.status_code, requests.codes.ok)
529         res = response.json()
530         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
531                         'administrative-state': 'inService',
532                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
533                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
534                         'type': 'org-openroadm-interfaces:otnOdu',
535                         'supporting-port': 'L1'}
536         # SAPI/DAPI are added in the Otu4 renderer
537         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
538                         'rate': 'org-openroadm-otn-common-types:ODUCn',
539                         'tx-sapi': 'Nmbu2MNHvc4=',
540                         'tx-dapi': 'LY9PxYJqUbw=',
541                         'expected-sapi': 'LY9PxYJqUbw=',
542                         'expected-dapi': 'LY9PxYJqUbw='
543                         }
544         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
545                              res['interface'][0])
546         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
547                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
548         self.assertDictEqual(
549             {'payload-type': '22', 'exp-payload-type': '22'},
550             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
551
552     def test_27_check_otn_topo_links(self):
553         response = test_utils.get_otn_topo_request()
554         self.assertEqual(response.status_code, requests.codes.ok)
555         res = response.json()
556         nb_links = len(res['network'][0]['ietf-network-topology:link'])
557         self.assertEqual(nb_links, 4)
558         for link in res['network'][0]['ietf-network-topology:link']:
559             linkId = link['link-id']
560             if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
561                            'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
562                 self.assertEqual(
563                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
564                 self.assertEqual(
565                     link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
566             elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
567                              'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
568                 self.assertEqual(
569                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
570                 self.assertEqual(
571                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
572                 self.assertEqual(
573                     link['transportpce-networkutils:otn-link-type'], 'ODUC4')
574                 self.assertEqual(
575                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
576                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
577                               ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
578                                'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
579             else:
580                 self.fail("this link should not exist")
581
582     def test_28_check_otn_topo_tp(self):
583         response = test_utils.get_otn_topo_request()
584         res = response.json()
585         for node in res['network'][0]['node']:
586             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
587                 tpList = node['ietf-network-topology:termination-point']
588                 for tp in tpList:
589                     if tp['tp-id'] == 'XPDR2-NETWORK1':
590                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
591                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
592                         self.assertEqual(
593                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
594                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
595                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
596
597 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
598     def test_29_create_100GE_service_1(self):
599         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
600         self.cr_serv_sample_data["input"]["connection-type"] = "service"
601         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
602         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
603         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
604         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
605         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
606         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
607         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
608         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
609         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
610         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
611         response = test_utils.service_create_request(self.cr_serv_sample_data)
612         self.assertEqual(response.status_code, requests.codes.ok)
613         res = response.json()
614         self.assertIn('PCE calculation in progress',
615                       res['output']['configuration-response-common']['response-message'])
616         time.sleep(self.WAITING)
617
618     def test_30_get_100GE_service_1(self):
619         response = test_utils.get_service_list_request(
620             "services/service-100GE")
621         self.assertEqual(response.status_code, requests.codes.ok)
622         res = response.json()
623         self.assertEqual(
624             res['services'][0]['administrative-state'], 'inService')
625         self.assertEqual(
626             res['services'][0]['service-name'], 'service-100GE')
627         self.assertEqual(
628             res['services'][0]['connection-type'], 'service')
629         self.assertEqual(
630             res['services'][0]['lifecycle-state'], 'planned')
631         time.sleep(2)
632
633     def test_31_check_interface_100GE_CLIENT_xpdra2(self):
634         response = test_utils.check_netconf_node_request(
635             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET-100G")
636         self.assertEqual(response.status_code, requests.codes.ok)
637         res = response.json()
638         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
639                         'administrative-state': 'inService',
640                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
641                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
642                         'supporting-port': 'C1'
643                         }
644         input_dict_2 = {'speed': 100000}
645         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
646                              res['interface'][0])
647         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
648                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
649
650     def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
651         response = test_utils.check_netconf_node_request(
652             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4")
653         self.assertEqual(response.status_code, requests.codes.ok)
654         res = response.json()
655         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
656                         'administrative-state': 'inService',
657                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
658                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
659                         'type': 'org-openroadm-interfaces:otnOdu',
660                         'supporting-port': 'C1'}
661         input_dict_2 = {
662             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
663             'rate': 'org-openroadm-otn-common-types:ODU4',
664             'monitoring-mode': 'terminated'}
665
666         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
667                              res['interface'][0])
668         self.assertDictEqual(dict(input_dict_2,
669                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
670                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
671         self.assertDictEqual(
672             {'payload-type': '07', 'exp-payload-type': '07'},
673             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
674
675     def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
676         response = test_utils.check_netconf_node_request(
677             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4")
678         self.assertEqual(response.status_code, requests.codes.ok)
679         res = response.json()
680         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
681                         'administrative-state': 'inService',
682                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
683                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
684                         'type': 'org-openroadm-interfaces:otnOdu',
685                         'supporting-port': 'L1'}
686         input_dict_2 = {
687             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
688             'rate': 'org-openroadm-otn-common-types:ODU4',
689             'monitoring-mode': 'not-terminated'}
690         input_dict_3 = {'trib-port-number': 1}
691
692         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
693                              res['interface'][0])
694         self.assertDictEqual(dict(input_dict_2,
695                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
696                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
697         self.assertDictEqual(dict(input_dict_3,
698                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
699                                       'parent-odu-allocation']),
700                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
701         self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
702                       ['opucn-trib-slots'])
703         self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
704                       ['opucn-trib-slots'])
705
706     def test_34_check_ODU4_connection_xpdra2(self):
707         response = test_utils.check_netconf_node_request(
708             "XPDR-A2",
709             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
710         self.assertEqual(response.status_code, requests.codes.ok)
711         res = response.json()
712         input_dict_1 = {
713             'connection-name':
714             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
715             'direction': 'bidirectional'
716         }
717
718         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
719                              res['odu-connection'][0])
720         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
721                              res['odu-connection'][0]['destination'])
722         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
723                              res['odu-connection'][0]['source'])
724
725     def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
726         response = test_utils.check_netconf_node_request(
727             "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET-100G")
728         self.assertEqual(response.status_code, requests.codes.ok)
729         res = response.json()
730         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
731                         'administrative-state': 'inService',
732                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
733                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
734                         'supporting-port': 'C1'
735                         }
736         input_dict_2 = {'speed': 100000}
737         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
738                              res['interface'][0])
739         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
740                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
741
742     def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
743         response = test_utils.check_netconf_node_request(
744             "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4")
745         self.assertEqual(response.status_code, requests.codes.ok)
746         res = response.json()
747         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
748                         'administrative-state': 'inService',
749                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
750                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
751                         'type': 'org-openroadm-interfaces:otnOdu',
752                         'supporting-port': 'C1'}
753         input_dict_2 = {
754             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
755             'rate': 'org-openroadm-otn-common-types:ODU4',
756             'monitoring-mode': 'terminated'}
757
758         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
759                              res['interface'][0])
760         self.assertDictEqual(dict(input_dict_2,
761                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
762                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
763         self.assertDictEqual(
764             {'payload-type': '07', 'exp-payload-type': '07'},
765             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
766
767     def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
768         response = test_utils.check_netconf_node_request(
769             "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4")
770         self.assertEqual(response.status_code, requests.codes.ok)
771         res = response.json()
772         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
773                         'administrative-state': 'inService',
774                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
775                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
776                         'type': 'org-openroadm-interfaces:otnOdu',
777                         'supporting-port': 'C1'}
778         input_dict_2 = {
779             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
780             'rate': 'org-openroadm-otn-common-types:ODU4',
781             'monitoring-mode': 'not-terminated'}
782
783         input_dict_3 = {'trib-port-number': 1}
784
785         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
786                              res['interface'][0])
787         self.assertDictEqual(dict(input_dict_2,
788                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
789                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
790         self.assertDictEqual(dict(input_dict_3,
791                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
792                                       'parent-odu-allocation']),
793                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
794             'parent-odu-allocation'])
795         self.assertIn('1.1',
796                       res['interface'][0][
797                           'org-openroadm-otn-odu-interfaces:odu'][
798                           'parent-odu-allocation']['opucn-trib-slots'])
799         self.assertIn('1.20',
800                       res['interface'][0][
801                           'org-openroadm-otn-odu-interfaces:odu'][
802                           'parent-odu-allocation']['opucn-trib-slots'])
803
804     def test_38_check_ODU4_connection_xpdrc2(self):
805         response = test_utils.check_netconf_node_request(
806             "XPDR-C2",
807             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
808         self.assertEqual(response.status_code, requests.codes.ok)
809         res = response.json()
810         input_dict_1 = {
811             'connection-name':
812             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
813             'direction': 'bidirectional'
814         }
815
816         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
817                              res['odu-connection'][0])
818         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
819                              res['odu-connection'][0]['destination'])
820         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
821                              res['odu-connection'][0]['source'])
822
823     def test_39_check_otn_topo_links(self):
824         response = test_utils.get_otn_topo_request()
825         self.assertEqual(response.status_code, requests.codes.ok)
826         res = response.json()
827         nb_links = len(res['network'][0]['ietf-network-topology:link'])
828         self.assertEqual(nb_links, 4)
829         for link in res['network'][0]['ietf-network-topology:link']:
830             linkId = link['link-id']
831             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
832                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
833                 self.assertEqual(
834                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
835                 self.assertEqual(
836                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
837
838     def test_40_check_otn_topo_tp(self):
839         response = test_utils.get_otn_topo_request()
840         res = response.json()
841         for node in res['network'][0]['node']:
842             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
843                 tpList = node['ietf-network-topology:termination-point']
844                 for tp in tpList:
845                     if tp['tp-id'] == 'XPDR2-NETWORK1':
846                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
847                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
848                         tsPoolList = list(range(1, 20))
849                         self.assertNotIn(
850                             tsPoolList, xpdrTpPortConAt['ts-pool'])
851                         self.assertEqual(
852                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
853                         self.assertNotIn(
854                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
855
856 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
857     def test_41_create_100GE_service_2(self):
858         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
859         self.cr_serv_sample_data["input"]["connection-type"] = "service"
860         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
861         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
862         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
863         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
864         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
865         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
866         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
867         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
868         response = test_utils.service_create_request(self.cr_serv_sample_data)
869         self.assertEqual(response.status_code, requests.codes.ok)
870         res = response.json()
871         self.assertIn('PCE calculation in progress',
872                       res['output']['configuration-response-common']['response-message'])
873         time.sleep(self.WAITING)
874
875     def test_42_get_100GE_service_2(self):
876         response = test_utils.get_service_list_request(
877             "services/service-100GE2")
878         self.assertEqual(response.status_code, requests.codes.ok)
879         res = response.json()
880         self.assertEqual(
881             res['services'][0]['administrative-state'], 'inService')
882         self.assertEqual(
883             res['services'][0]['service-name'], 'service-100GE2')
884         self.assertEqual(
885             res['services'][0]['connection-type'], 'service')
886         self.assertEqual(
887             res['services'][0]['lifecycle-state'], 'planned')
888         time.sleep(2)
889
890     def test_43_check_service_list(self):
891         response = test_utils.get_service_list_request("")
892         self.assertEqual(response.status_code, requests.codes.ok)
893         res = response.json()
894         self.assertEqual(len(res['service-list']['services']), 4)
895         time.sleep(2)
896
897     def test_44_check_otn_topo_links(self):
898         response = test_utils.get_otn_topo_request()
899         self.assertEqual(response.status_code, requests.codes.ok)
900         res = response.json()
901         nb_links = len(res['network'][0]['ietf-network-topology:link'])
902         self.assertEqual(nb_links, 4)
903         for link in res['network'][0]['ietf-network-topology:link']:
904             linkId = link['link-id']
905             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
906                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
907                 self.assertEqual(
908                     link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
909                 self.assertEqual(
910                     link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
911
912     def test_45_check_otn_topo_tp(self):
913         response = test_utils.get_otn_topo_request()
914         res = response.json()
915         for node in res['network'][0]['node']:
916             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
917                 tpList = node['ietf-network-topology:termination-point']
918                 for tp in tpList:
919                     if tp['tp-id'] == 'XPDR2-NETWORK1':
920                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
921                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
922                         tsPoolList = list(range(1, 40))
923                         self.assertNotIn(
924                             tsPoolList, xpdrTpPortConAt['ts-pool'])
925                         self.assertEqual(
926                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
927                         self.assertNotIn(
928                             2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
929
930     def test_46_delete_100GE_service_2(self):
931         response = test_utils.service_delete_request("service-100GE2")
932         self.assertEqual(response.status_code, requests.codes.ok)
933         res = response.json()
934         self.assertIn('Renderer service delete in progress',
935                       res['output']['configuration-response-common']['response-message'])
936         time.sleep(self.WAITING)
937
938     def test_47_delete_100GE_service_1(self):
939         response = test_utils.service_delete_request("service-100GE")
940         self.assertEqual(response.status_code, requests.codes.ok)
941         res = response.json()
942         self.assertIn('Renderer service delete in progress',
943                       res['output']['configuration-response-common']['response-message'])
944         time.sleep(self.WAITING)
945
946     def test_48_check_service_list(self):
947         response = test_utils.get_service_list_request("")
948         self.assertEqual(response.status_code, requests.codes.ok)
949         res = response.json()
950         self.assertEqual(len(res['service-list']['services']), 2)
951         time.sleep(2)
952
953     def test_49_check_no_ODU4_connection_xpdra2(self):
954         response = test_utils.check_netconf_node_request("XPDR-A2", "")
955         self.assertEqual(response.status_code, requests.codes.ok)
956         res = response.json()
957         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
958         time.sleep(1)
959
960     def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
961         response = test_utils.check_netconf_node_request(
962             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4")
963         self.assertEqual(response.status_code, requests.codes.conflict)
964
965     def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
966         response = test_utils.check_netconf_node_request(
967             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4")
968         self.assertEqual(response.status_code, requests.codes.conflict)
969
970     def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
971         response = test_utils.check_netconf_node_request(
972             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET-100G")
973         self.assertEqual(response.status_code, requests.codes.conflict)
974
975     def test_53_check_otn_topo_links(self):
976         response = test_utils.get_otn_topo_request()
977         self.assertEqual(response.status_code, requests.codes.ok)
978         res = response.json()
979         nb_links = len(res['network'][0]['ietf-network-topology:link'])
980         self.assertEqual(nb_links, 4)
981         for link in res['network'][0]['ietf-network-topology:link']:
982             linkId = link['link-id']
983             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
984                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
985                 self.assertEqual(
986                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
987                 self.assertEqual(
988                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
989
990     def test_54_check_otn_topo_tp(self):
991         response = test_utils.get_otn_topo_request()
992         res = response.json()
993         for node in res['network'][0]['node']:
994             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
995                 tpList = node['ietf-network-topology:termination-point']
996                 for tp in tpList:
997                     if tp['tp-id'] == 'XPDR2-NETWORK1':
998                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
999                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1000                         self.assertEqual(
1001                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1002
1003     def test_55_delete_ODUC4_service(self):
1004         response = test_utils.service_delete_request("service1-ODUC4")
1005         self.assertEqual(response.status_code, requests.codes.ok)
1006         res = response.json()
1007         self.assertIn('Renderer service delete in progress',
1008                       res['output']['configuration-response-common']['response-message'])
1009         time.sleep(self.WAITING)
1010
1011     def test_56_check_service_list(self):
1012         response = test_utils.get_service_list_request("")
1013         self.assertEqual(response.status_code, requests.codes.ok)
1014         res = response.json()
1015         self.assertEqual(len(res['service-list']['services']), 1)
1016         time.sleep(2)
1017
1018     def test_57_check_no_interface_ODU4_xpdra2(self):
1019         response = test_utils.check_netconf_node_request(
1020             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1021         self.assertEqual(response.status_code, requests.codes.conflict)
1022
1023     def test_58_check_otn_topo_links(self):
1024         self.test_22_check_otn_topo_OTUC4_links()
1025
1026     def test_59_check_otn_topo_tp(self):
1027         response = test_utils.get_otn_topo_request()
1028         res = response.json()
1029         for node in res['network'][0]['node']:
1030             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1031                 tpList = node['ietf-network-topology:termination-point']
1032                 for tp in tpList:
1033                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1034                         self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1035                                          dict.keys(tp))
1036
1037     def test_60_delete_OTUC4_service(self):
1038         response = test_utils.service_delete_request("service1-OTUC4")
1039         self.assertEqual(response.status_code, requests.codes.ok)
1040         res = response.json()
1041         self.assertIn('Renderer service delete in progress',
1042                       res['output']['configuration-response-common']['response-message'])
1043         time.sleep(self.WAITING)
1044
1045     def test_61_get_no_service(self):
1046         response = test_utils.get_service_list_request("")
1047         self.assertEqual(response.status_code, requests.codes.conflict)
1048         res = response.json()
1049         self.assertIn(
1050             {"error-type": "application", "error-tag": "data-missing",
1051              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1052             res['errors']['error'])
1053         time.sleep(1)
1054
1055     def test_62_check_no_interface_OTUC4_xpdra2(self):
1056         response = test_utils.check_netconf_node_request(
1057             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1058         self.assertEqual(response.status_code, requests.codes.conflict)
1059
1060     def test_63_check_no_interface_OTSI_xpdra2(self):
1061         response = test_utils.check_netconf_node_request(
1062             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1063         self.assertEqual(response.status_code, requests.codes.conflict)
1064
1065     def test_64_check_no_interface_OTSIG_xpdra2(self):
1066         response = test_utils.check_netconf_node_request(
1067             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-400G")
1068         self.assertEqual(response.status_code, requests.codes.conflict)
1069
1070     def test_65_getLinks_OtnTopology(self):
1071         response = test_utils.get_otn_topo_request()
1072         self.assertEqual(response.status_code, requests.codes.ok)
1073         res = response.json()
1074         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1075
1076     def test_66_check_openroadm_topo_xpdra2(self):
1077         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1078         self.assertEqual(response.status_code, requests.codes.ok)
1079         res = response.json()
1080         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1081         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1082         self.assertNotIn('wavelength', dict.keys(
1083             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1084         time.sleep(3)
1085
1086     def test_67_check_openroadm_topology(self):
1087         response = test_utils.get_ordm_topo_request("")
1088         self.assertEqual(response.status_code, requests.codes.ok)
1089         res = response.json()
1090         links = res['network'][0]['ietf-network-topology:link']
1091         self.assertEqual(22, len(links), 'Topology should contain 22 links')
1092
1093     def test_68_connect_xprda2_1_N1_to_roadma_PP2(self):
1094         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1095                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1096         self.assertEqual(response.status_code, requests.codes.ok)
1097         res = response.json()
1098         self.assertIn('Xponder Roadm Link created successfully',
1099                       res["output"]["result"])
1100         time.sleep(2)
1101
1102     def test_69_connect_roadma_PP2_to_xpdra2_1_N1(self):
1103         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1104                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1105         self.assertEqual(response.status_code, requests.codes.ok)
1106         res = response.json()
1107         self.assertIn('Roadm Xponder links created successfully',
1108                       res["output"]["result"])
1109         time.sleep(2)
1110
1111     def test_70_connect_xprdc2_1_N1_to_roadmc_PP2(self):
1112         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1113                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1114         self.assertEqual(response.status_code, requests.codes.ok)
1115         res = response.json()
1116         self.assertIn('Xponder Roadm Link created successfully',
1117                       res["output"]["result"])
1118         time.sleep(2)
1119
1120     def test_71_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
1121         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1122                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1123         self.assertEqual(response.status_code, requests.codes.ok)
1124         res = response.json()
1125         self.assertIn('Roadm Xponder links created successfully',
1126                       res["output"]["result"])
1127         time.sleep(2)
1128
1129
1130 # test service-create for 400GE service from xpdra2 to xpdrc2
1131
1132
1133     def test_72_create_400GE_service(self):
1134         self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1135         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1136         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1137         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1138         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1139         del self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"]
1140         del self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"]
1141         del self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"]
1142         del self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"]
1143         del self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"]
1144         del self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"]
1145         del self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"]
1146         del self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"]
1147         response = test_utils.service_create_request(self.cr_serv_sample_data)
1148         self.assertEqual(response.status_code, requests.codes.ok)
1149         res = response.json()
1150         self.assertIn('PCE calculation in progress',
1151                       res['output']['configuration-response-common']['response-message'])
1152         time.sleep(self.WAITING)
1153
1154     def test_73_get_400GE_service(self):
1155         response = test_utils.get_service_list_request(
1156             "services/service-400GE")
1157         self.assertEqual(response.status_code, requests.codes.ok)
1158         res = response.json()
1159         self.assertEqual(
1160             res['services'][0]['administrative-state'], 'inService')
1161         self.assertEqual(
1162             res['services'][0]['service-name'], 'service-400GE')
1163         self.assertEqual(
1164             res['services'][0]['connection-type'], 'service')
1165         self.assertEqual(
1166             res['services'][0]['lifecycle-state'], 'planned')
1167         time.sleep(2)
1168
1169     def test_74_check_xc1_roadma(self):
1170         response = test_utils.check_netconf_node_request(
1171             "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1172         self.assertEqual(response.status_code, requests.codes.ok)
1173         res = response.json()
1174         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1175         self.assertDictEqual(
1176             dict({
1177                 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1178                 'opticalControlMode': 'gainLoss',
1179                 'target-output-power': -3.0
1180             }, **res['roadm-connections'][0]),
1181             res['roadm-connections'][0]
1182         )
1183         self.assertDictEqual(
1184             {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1185             res['roadm-connections'][0]['source'])
1186         self.assertDictEqual(
1187             {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1188             res['roadm-connections'][0]['destination'])
1189         time.sleep(5)
1190
1191     def test_75_check_topo_xpdra2(self):
1192         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1193         self.assertEqual(response.status_code, requests.codes.ok)
1194         res = response.json()
1195         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1196         for ele in liste_tp:
1197             if ele['tp-id'] == 'XPDR1-NETWORK1':
1198                 self.assertEqual({'frequency': 196.08125,
1199                                   'width': 75},
1200                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1201             if ele['tp-id'] == 'XPDR1-CLIENT1':
1202                 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1203         time.sleep(3)
1204
1205     def test_76_check_topo_roadma_SRG1(self):
1206         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1207         self.assertEqual(response.status_code, requests.codes.ok)
1208         res = response.json()
1209         freq_map = base64.b64decode(
1210             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1211         freq_map_array = [int(x) for x in freq_map]
1212         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1213         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1214         for ele in liste_tp:
1215             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1216                 freq_map = base64.b64decode(
1217                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1218                 freq_map_array = [int(x) for x in freq_map]
1219                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1220             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1221                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1222         time.sleep(3)
1223
1224     def test_77_check_topo_roadma_DEG1(self):
1225         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1226         self.assertEqual(response.status_code, requests.codes.ok)
1227         res = response.json()
1228         freq_map = base64.b64decode(
1229             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1230         freq_map_array = [int(x) for x in freq_map]
1231         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1232         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1233         for ele in liste_tp:
1234             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1235                 freq_map = base64.b64decode(
1236                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1237                 freq_map_array = [int(x) for x in freq_map]
1238                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1239             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1240                 freq_map = base64.b64decode(
1241                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1242                 freq_map_array = [int(x) for x in freq_map]
1243                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1244         time.sleep(3)
1245
1246     def test_78_check_interface_400GE_CLIENT_xpdra2(self):
1247         response = test_utils.check_netconf_node_request(
1248             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1249         self.assertEqual(response.status_code, requests.codes.ok)
1250         res = response.json()
1251         input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1252                         'administrative-state': 'inService',
1253                         'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1254                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1255                         'supporting-port': 'C1'
1256                         }
1257         input_dict_2 = {'speed': 400000}
1258         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1259                              res['interface'][0])
1260         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1261                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1262
1263     def test_79_check_interface_OTSI_xpdra2(self):
1264         response = test_utils.check_netconf_node_request(
1265             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1266         self.assertEqual(response.status_code, requests.codes.ok)
1267         res = response.json()
1268         input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1269                         'administrative-state': 'inService',
1270                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1271                         'type': 'org-openroadm-interfaces:otsi',
1272                         'supporting-port': 'L1'}
1273         input_dict_2 = {
1274             "frequency": 196.0812,
1275             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1276             "fec": "org-openroadm-common-types:ofec",
1277             "transmit-power": -5,
1278             "provision-mode": "explicit",
1279             "modulation-format": "dp-qam16"}
1280
1281         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1282                              res['interface'][0])
1283         self.assertDictEqual(dict(input_dict_2,
1284                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1285                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1286         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1287                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1288
1289     def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1290         response = test_utils.check_netconf_node_request(
1291             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSIGROUP-400G")
1292         self.assertEqual(response.status_code, requests.codes.ok)
1293         res = response.json()
1294         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTSIGROUP-400G',
1295                         'administrative-state': 'inService',
1296                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1297                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1298                         'type': 'org-openroadm-interfaces:otsi-group',
1299                         'supporting-port': 'L1'}
1300         input_dict_2 = {"group-id": 1,
1301                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1302
1303         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1304                              res['interface'][0])
1305         self.assertDictEqual(dict(input_dict_2,
1306                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1307                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1308
1309     def test_81_check_interface_OTUC4_xpdra2(self):
1310         response = test_utils.check_netconf_node_request(
1311             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1312         self.assertEqual(response.status_code, requests.codes.ok)
1313         res = response.json()
1314         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1315                         'administrative-state': 'inService',
1316                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1317                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSIGROUP-400G',
1318                         'type': 'org-openroadm-interfaces:otnOtu',
1319                         'supporting-port': 'L1'}
1320         input_dict_2 = {"tx-sapi": "ANeUjNzWtDLV",
1321                         "expected-dapi": "ANeUjNzWtDLV",
1322                         'tx-dapi': 'AKsqPmWceByv',
1323                         'expected-sapi': 'AKsqPmWceByv',
1324                         "rate": "org-openroadm-otn-common-types:OTUCn",
1325                         "degthr-percentage": 100,
1326                         "tim-detect-mode": "Disabled",
1327                         "otucn-n-rate": 4,
1328                         "degm-intervals": 2}
1329
1330         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1331                              res['interface'][0])
1332         self.assertDictEqual(dict(input_dict_2,
1333                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1334                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1335
1336     def test_82_check_interface_ODUC4_xpdra2(self):
1337         response = test_utils.check_netconf_node_request(
1338             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1339         self.assertEqual(response.status_code, requests.codes.ok)
1340         res = response.json()
1341         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1342                         'administrative-state': 'inService',
1343                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1344                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTUC4',
1345                         'type': 'org-openroadm-interfaces:otnOdu',
1346                         'supporting-port': 'L1',
1347                         'circuit-id': 'TBD',
1348                         'description': 'TBD'}
1349         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1350                         "tim-detect-mode": "Disabled",
1351                         "degm-intervals": 2,
1352                         "degthr-percentage": 100,
1353                         "monitoring-mode": "terminated",
1354                         "rate": "org-openroadm-otn-common-types:ODUCn",
1355                         "oducn-n-rate": 4}
1356         input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1357
1358         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1359                              res['interface'][0])
1360         self.assertDictEqual(dict(input_dict_2,
1361                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1362                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1363         self.assertDictEqual(dict(input_dict_3,
1364                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1365                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1366         self.assertEqual('XPDR1-NETWORK1-OTUC4', res['interface'][0]['supporting-interface-list'][0])
1367
1368     def test_82a_check_interface_ODUFLEX_xpdra2(self):
1369         response = test_utils.check_netconf_node_request(
1370             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUFLEX")
1371         self.assertEqual(response.status_code, requests.codes.ok)
1372         res = response.json()
1373         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUFLEX',
1374                         'administrative-state': 'inService',
1375                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1376                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-ODUC4',
1377                         'type': 'org-openroadm-interfaces:otnOdu',
1378                         'supporting-port': 'L1',
1379                         'circuit-id': 'TBD',
1380                         'description': 'TBD'}
1381         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
1382                         "tim-detect-mode": "Disabled",
1383                         "degm-intervals": 2,
1384                         "degthr-percentage": 100,
1385                         "monitoring-mode": "terminated",
1386                         "rate": "org-openroadm-otn-common-types:ODUflex-cbr",
1387                         "oduflex-cbr-service": "org-openroadm-otn-common-types:ODUflex-cbr-400G"
1388                         }
1389         input_dict_3 = {"exp-payload-type": "32", "payload-type": "32"}
1390         input_dict_4 = {'trib-port-number': 1}
1391
1392         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1393                              res['interface'][0])
1394         self.assertDictEqual(dict(input_dict_2,
1395                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1396                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1397         self.assertDictEqual(dict(input_dict_3,
1398                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1399                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1400         self.assertDictEqual(dict(input_dict_4,
1401                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1402                                       'parent-odu-allocation']),
1403                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1404         self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1405                       ['opucn-trib-slots'])
1406         self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1407                       ['opucn-trib-slots'])
1408         self.assertIn('2.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1409                       ['opucn-trib-slots'])
1410         self.assertIn('2.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1411                       ['opucn-trib-slots'])
1412         self.assertIn('3.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1413                       ['opucn-trib-slots'])
1414         self.assertIn('3.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1415                       ['opucn-trib-slots'])
1416         self.assertIn('4.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1417                       ['opucn-trib-slots'])
1418         self.assertIn('4.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1419                       ['opucn-trib-slots'])
1420         self.assertEqual('XPDR1-NETWORK1-ODUC4', res['interface'][0]['supporting-interface-list'][0])
1421
1422     def test_83_delete_400GE_service(self):
1423         response = test_utils.service_delete_request("service-400GE")
1424         self.assertEqual(response.status_code, requests.codes.ok)
1425         res = response.json()
1426         self.assertIn('Renderer service delete in progress',
1427                       res['output']['configuration-response-common']['response-message'])
1428         time.sleep(self.WAITING)
1429
1430     def test_84_get_no_service(self):
1431         response = test_utils.get_service_list_request("")
1432         self.assertEqual(response.status_code, requests.codes.conflict)
1433         res = response.json()
1434         self.assertIn(
1435             {"error-type": "application", "error-tag": "data-missing",
1436              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1437             res['errors']['error'])
1438         time.sleep(1)
1439
1440     def test_85_check_no_interface_ODUC4_xpdra2(self):
1441         response = test_utils.check_netconf_node_request(
1442             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1443         self.assertEqual(response.status_code, requests.codes.conflict)
1444
1445     def test_86_check_no_interface_OTUC4_xpdra2(self):
1446         response = test_utils.check_netconf_node_request(
1447             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1448         self.assertEqual(response.status_code, requests.codes.conflict)
1449
1450     def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1451         response = test_utils.check_netconf_node_request(
1452             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSIGROUP-400G")
1453         self.assertEqual(response.status_code, requests.codes.conflict)
1454
1455     def test_88_check_no_interface_OTSI_xpdra2(self):
1456         response = test_utils.check_netconf_node_request(
1457             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1458         self.assertEqual(response.status_code, requests.codes.conflict)
1459
1460     def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1461         response = test_utils.check_netconf_node_request(
1462             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1463         self.assertEqual(response.status_code, requests.codes.conflict)
1464
1465     def test_90_disconnect_xponders_from_roadm(self):
1466         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1467         response = test_utils.get_ordm_topo_request("")
1468         self.assertEqual(response.status_code, requests.codes.ok)
1469         res = response.json()
1470         links = res['network'][0]['ietf-network-topology:link']
1471         for link in links:
1472             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1473                 link_name = link["link-id"]
1474                 response = test_utils.delete_request(url+link_name)
1475                 self.assertEqual(response.status_code, requests.codes.ok)
1476
1477     def test_91_disconnect_xpdra2(self):
1478         response = test_utils.unmount_device("XPDR-A2")
1479         self.assertEqual(response.status_code, requests.codes.ok,
1480                          test_utils.CODE_SHOULD_BE_200)
1481
1482     def test_92_disconnect_xpdrc2(self):
1483         response = test_utils.unmount_device("XPDR-C2")
1484         self.assertEqual(response.status_code, requests.codes.ok,
1485                          test_utils.CODE_SHOULD_BE_200)
1486
1487     def test_93_disconnect_roadmA(self):
1488         response = test_utils.unmount_device("ROADM-A1")
1489         self.assertEqual(response.status_code, requests.codes.ok,
1490                          test_utils.CODE_SHOULD_BE_200)
1491
1492     def test_94_disconnect_roadmC(self):
1493         response = test_utils.unmount_device("ROADM-C1")
1494         self.assertEqual(response.status_code, requests.codes.ok,
1495                          test_utils.CODE_SHOULD_BE_200)
1496
1497
1498 if __name__ == "__main__":
1499     unittest.main(verbosity=2)