aa49014e03a459d7edf7a4e5b4e373968eeb76df
[transportpce.git] / tests / transportpce_tests / hybrid / test02_B100G_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
16
17 import base64
18 import unittest
19 import time
20 import requests
21 # pylint: disable=wrong-import-order
22 import sys
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION_221 = '2.2.1'
34     NODE_VERSION_71 = '7.1'
35
36     cr_serv_sample_data = {"input": {
37         "sdnc-request-header": {
38             "request-id": "request-1",
39             "rpc-action": "service-create",
40             "request-system-id": "appname"
41         },
42         "service-name": "service1-OTUC4",
43         "common-id": "commonId",
44         "connection-type": "infrastructure",
45         "service-a-end": {
46             "service-rate": "400",
47             "node-id": "XPDR-A2",
48             "service-format": "OTU",
49             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
50             "clli": "NodeSA",
51             "subrate-eth-sla": {
52                     "subrate-eth-sla": {
53                         "committed-info-rate": "100000",
54                         "committed-burst-size": "64"
55                     }
56             },
57             "tx-direction": {
58                 "port": {
59                     "port-device-name": "XPDR-A2-XPDR2",
60                     "port-type": "fixed",
61                     "port-name": "XPDR2-NETWORK1",
62                     "port-rack": "000000.00",
63                     "port-shelf": "Chassis#1"
64                 },
65                 "lgx": {
66                     "lgx-device-name": "Some lgx-device-name",
67                     "lgx-port-name": "Some lgx-port-name",
68                     "lgx-port-rack": "000000.00",
69                     "lgx-port-shelf": "00"
70                 }
71             },
72             "rx-direction": {
73                 "port": {
74                     "port-device-name": "XPDR-A2-XPDR2",
75                     "port-type": "fixed",
76                     "port-name": "XPDR2-NETWORK1",
77                     "port-rack": "000000.00",
78                     "port-shelf": "Chassis#1"
79                 },
80                 "lgx": {
81                     "lgx-device-name": "Some lgx-device-name",
82                     "lgx-port-name": "Some lgx-port-name",
83                     "lgx-port-rack": "000000.00",
84                     "lgx-port-shelf": "00"
85                 }
86             },
87             "optic-type": "gray"
88         },
89         "service-z-end": {
90             "service-rate": "400",
91             "node-id": "XPDR-C2",
92             "service-format": "OTU",
93             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
94             "clli": "NodeSC",
95             "subrate-eth-sla": {
96                     "subrate-eth-sla": {
97                         "committed-info-rate": "100000",
98                         "committed-burst-size": "64"
99                     }
100             },
101             "tx-direction": {
102                 "port": {
103                     "port-device-name": "XPDR-C2-XPDR2",
104                     "port-type": "fixed",
105                     "port-name": "XPDR2-NETWORK1",
106                     "port-rack": "000000.00",
107                     "port-shelf": "Chassis#1"
108                 },
109                 "lgx": {
110                     "lgx-device-name": "Some lgx-device-name",
111                     "lgx-port-name": "Some lgx-port-name",
112                     "lgx-port-rack": "000000.00",
113                     "lgx-port-shelf": "00"
114                 }
115             },
116             "rx-direction": {
117                 "port": {
118                     "port-device-name": "XPDR-C2-XPDR2",
119                     "port-type": "fixed",
120                     "port-name": "XPDR2-NETWORK1",
121                     "port-rack": "000000.00",
122                     "port-shelf": "Chassis#1"
123                 },
124                 "lgx": {
125                     "lgx-device-name": "Some lgx-device-name",
126                     "lgx-port-name": "Some lgx-port-name",
127                     "lgx-port-rack": "000000.00",
128                     "lgx-port-shelf": "00"
129                 }
130             },
131             "optic-type": "gray"
132         },
133         "due-date": "2018-06-15T00:00:01Z",
134         "operator-contact": "pw1234"
135     }
136     }
137
138     @classmethod
139     def setUpClass(cls):
140         cls.processes = test_utils.start_tpce()
141         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
142                                                ('roadma', cls.NODE_VERSION_221),
143                                                ('roadmc', cls.NODE_VERSION_221),
144                                                ('xpdrc2', cls.NODE_VERSION_71)])
145
146     @classmethod
147     def tearDownClass(cls):
148         # pylint: disable=not-an-iterable
149         for process in cls.processes:
150             test_utils.shutdown_process(process)
151         print("all processes killed")
152
153     def setUp(self):
154         time.sleep(2)
155
156     def test_01_connect_xpdra2(self):
157         response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
158         self.assertEqual(response.status_code,
159                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_02_connect_xpdrc2(self):
162         response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
163         self.assertEqual(response.status_code,
164                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_03_connect_rdma(self):
167         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
168         self.assertEqual(response.status_code,
169                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170
171     def test_04_connect_rdmc(self):
172         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
173         self.assertEqual(response.status_code,
174                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175
176     def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
177         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
178                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
179         self.assertEqual(response.status_code, requests.codes.ok)
180         res = response.json()
181         self.assertIn('Xponder Roadm Link created successfully',
182                       res["output"]["result"])
183         time.sleep(2)
184
185     def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
186         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
187                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
188         self.assertEqual(response.status_code, requests.codes.ok)
189         res = response.json()
190         self.assertIn('Roadm Xponder links created successfully',
191                       res["output"]["result"])
192         time.sleep(2)
193
194     def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
196                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
197         self.assertEqual(response.status_code, requests.codes.ok)
198         res = response.json()
199         self.assertIn('Xponder Roadm Link created successfully',
200                       res["output"]["result"])
201         time.sleep(2)
202
203     def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
204         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
205                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
206         self.assertEqual(response.status_code, requests.codes.ok)
207         res = response.json()
208         self.assertIn('Roadm Xponder links created successfully',
209                       res["output"]["result"])
210         time.sleep(2)
211
212     def test_09_add_omsAttributes_roadma_roadmc(self):
213         # Config ROADMA-ROADMC oms-attributes
214         data = {"span": {
215             "auto-spanloss": "true",
216             "spanloss-base": 11.4,
217             "spanloss-current": 12,
218             "engineered-spanloss": 12.2,
219             "link-concatenation": [{
220                 "SRLG-Id": 0,
221                 "fiber-type": "smf",
222                 "SRLG-length": 100000,
223                 "pmd": 0.5}]}}
224         response = test_utils.add_oms_attr_request(
225             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
226         self.assertEqual(response.status_code, requests.codes.created)
227
228     def test_10_add_omsAttributes_roadmc_roadma(self):
229         # Config ROADMC-ROADMA oms-attributes
230         data = {"span": {
231             "auto-spanloss": "true",
232             "spanloss-base": 11.4,
233             "spanloss-current": 12,
234             "engineered-spanloss": 12.2,
235             "link-concatenation": [{
236                 "SRLG-Id": 0,
237                 "fiber-type": "smf",
238                 "SRLG-length": 100000,
239                 "pmd": 0.5}]}}
240         response = test_utils.add_oms_attr_request(
241             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
242         self.assertEqual(response.status_code, requests.codes.created)
243
244 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
245     def test_11_check_otn_topology(self):
246         response = test_utils.get_otn_topo_request()
247         self.assertEqual(response.status_code, requests.codes.ok)
248         res = response.json()
249         nbNode = len(res['network'][0]['node'])
250         self.assertEqual(nbNode, 4, 'There should be 4 nodes')
251         self.assertNotIn('ietf-network-topology:link', res['network'][0])
252
253     def test_12_create_OTUC4_service(self):
254         response = test_utils.service_create_request(self.cr_serv_sample_data)
255         self.assertEqual(response.status_code, requests.codes.ok)
256         res = response.json()
257         self.assertIn('PCE calculation in progress',
258                       res['output']['configuration-response-common']['response-message'])
259         time.sleep(self.WAITING)
260
261     def test_13_get_OTUC4_service1(self):
262         response = test_utils.get_service_list_request(
263             "services/service1-OTUC4")
264         self.assertEqual(response.status_code, requests.codes.ok)
265         res = response.json()
266         self.assertEqual(
267             res['services'][0]['administrative-state'], 'inService')
268         self.assertEqual(
269             res['services'][0]['service-name'], 'service1-OTUC4')
270         self.assertEqual(
271             res['services'][0]['connection-type'], 'infrastructure')
272         self.assertEqual(
273             res['services'][0]['lifecycle-state'], 'planned')
274         self.assertEqual(
275             res['services'][0]['service-layer'], 'wdm')
276         self.assertEqual(
277             res['services'][0]['service-a-end']['service-rate'], 400)
278         self.assertEqual(
279             res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
280         time.sleep(2)
281         self.assertEqual(
282             res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
283         time.sleep(2)
284
285     # Check correct configuration of devices
286     def test_14_check_interface_och_xpdra2(self):
287         response = test_utils.check_netconf_node_request(
288             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
289         self.assertEqual(response.status_code, requests.codes.ok)
290         res = response.json()
291         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
292                                    'administrative-state': 'inService',
293                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294                                    'type': 'org-openroadm-interfaces:otsi',
295                                    'supporting-port': 'L1'
296                                    }, **res['interface'][0]),
297                              res['interface'][0])
298
299         self.assertDictEqual(
300             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
301                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
302                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
303             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
304
305     def test_15_check_interface_OTSI_GROUP_xpdra2(self):
306         response = test_utils.check_netconf_node_request(
307             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
308         self.assertEqual(response.status_code, requests.codes.ok)
309         res = response.json()
310         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
311                         'administrative-state': 'inService',
312                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
313                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
314                         'type': 'org-openroadm-interfaces:otsi-group',
315                         'supporting-port': 'L1'
316                         }
317         input_dict_2 = {"group-id": 1,
318                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
319                         }
320         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
321                              res['interface'][0])
322         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
323                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
324                              res['interface'][0]
325                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
326
327     def test_16_check_interface_OTUC4_xpdra2(self):
328         response = test_utils.check_netconf_node_request(
329             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
330         self.assertEqual(response.status_code, requests.codes.ok)
331         res = response.json()
332         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
333                         'administrative-state': 'inService',
334                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
335                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
336                         'type': 'org-openroadm-interfaces:otnOtu',
337                         'supporting-port': 'L1'
338                         }
339         input_dict_2 = {'tx-sapi': 'G54UFNImtOE=',
340                         'expected-dapi': 'G54UFNImtOE=',
341                         'tx-dapi': 'J/FIUzQc+4M=',
342                         'expected-sapi': 'J/FIUzQc+4M=',
343                         'rate': 'org-openroadm-otn-common-types:OTUCn',
344                         'degthr-percentage': 100,
345                         'degm-intervals': 2,
346                         'otucn-n-rate': 4
347                         }
348         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
349                              res['interface'][0])
350         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
351                              res['interface'][0]
352                              ['org-openroadm-otn-otu-interfaces:otu'])
353
354     def test_17_check_interface_och_xpdrc2(self):
355         response = test_utils.check_netconf_node_request(
356             "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
357         self.assertEqual(response.status_code, requests.codes.ok)
358         res = response.json()
359         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
360                                    'administrative-state': 'inService',
361                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
362                                    'type': 'org-openroadm-interfaces:otsi',
363                                    'supporting-port': 'L1'
364                                    }, **res['interface'][0]),
365                              res['interface'][0])
366
367         self.assertDictEqual(
368             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
369                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
370                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
371             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
372
373     def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
374         response = test_utils.check_netconf_node_request(
375             "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
376         self.assertEqual(response.status_code, requests.codes.ok)
377         res = response.json()
378         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
379                         'administrative-state': 'inService',
380                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
381                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
382                         'type': 'org-openroadm-interfaces:otsi-group',
383                         'supporting-port': 'L1'
384                         }
385         input_dict_2 = {"group-id": 1,
386                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
387                         }
388         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
389                              res['interface'][0])
390         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
391                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
392                              res['interface'][0]
393                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
394
395     def test_19_check_interface_OTUC4_xpdrc2(self):
396         response = test_utils.check_netconf_node_request(
397             "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
398         self.assertEqual(response.status_code, requests.codes.ok)
399         res = response.json()
400         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
401                         'administrative-state': 'inService',
402                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
403                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
404                         'type': 'org-openroadm-interfaces:otnOtu',
405                         'supporting-port': 'L1'
406                         }
407         input_dict_2 = {'tx-dapi': 'G54UFNImtOE=',
408                         'expected-sapi': 'G54UFNImtOE=',
409                         'tx-sapi': 'J/FIUzQc+4M=',
410                         'expected-dapi': 'J/FIUzQc+4M=',
411                         'rate': 'org-openroadm-otn-common-types:OTUCn',
412                         'degthr-percentage': 100,
413                         'degm-intervals': 2,
414                         'otucn-n-rate': 4
415                         }
416
417         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
418                              res['interface'][0])
419
420         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
421                              res['interface'][0]
422                              ['org-openroadm-otn-otu-interfaces:otu'])
423
424     def test_20_check_no_interface_ODUC4_xpdra2(self):
425         response = test_utils.check_netconf_node_request(
426             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
427         self.assertEqual(response.status_code, requests.codes.conflict)
428         res = response.json()
429         self.assertIn(
430             {"error-type": "application", "error-tag": "data-missing",
431              "error-message": "Request could not be completed because the relevant data model content does not exist"},
432             res['errors']['error'])
433
434     def test_21_check_openroadm_topo_xpdra2(self):
435         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
436         self.assertEqual(response.status_code, requests.codes.ok)
437         res = response.json()
438         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
439         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
440         self.assertEqual({'frequency': 196.08125,
441                           'width': 75},
442                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
443         time.sleep(3)
444
445     def test_22_check_otn_topo_OTUC4_links(self):
446         response = test_utils.get_otn_topo_request()
447         self.assertEqual(response.status_code, requests.codes.ok)
448         res = response.json()
449         nb_links = len(res['network'][0]['ietf-network-topology:link'])
450         self.assertEqual(nb_links, 2)
451         listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
452                       'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
453         for link in res['network'][0]['ietf-network-topology:link']:
454             self.assertIn(link['link-id'], listLinkId)
455             self.assertEqual(
456                 link['transportpce-topology:otn-link-type'], 'OTUC4')
457             self.assertEqual(
458                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
459             self.assertEqual(
460                 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
461             self.assertEqual(
462                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
463             self.assertIn(
464                 link['org-openroadm-common-network:opposite-link'], listLinkId)
465
466 # test service-create for ODU4 service from xpdra2 to xpdrc2
467     def test_23_create_ODUC4_service(self):
468         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
469         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
470         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
471         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
472         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
473         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
474         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
475
476         response = test_utils.service_create_request(self.cr_serv_sample_data)
477         self.assertEqual(response.status_code, requests.codes.ok)
478         res = response.json()
479         self.assertIn('PCE calculation in progress',
480                       res['output']['configuration-response-common']['response-message'])
481         time.sleep(self.WAITING)
482
483     def test_24_get_ODUC4_service1(self):
484         response = test_utils.get_service_list_request(
485             "services/service1-ODUC4")
486         self.assertEqual(response.status_code, requests.codes.ok)
487         res = response.json()
488         self.assertEqual(
489             res['services'][0]['administrative-state'], 'inService')
490         self.assertEqual(
491             res['services'][0]['service-name'], 'service1-ODUC4')
492         self.assertEqual(
493             res['services'][0]['connection-type'], 'infrastructure')
494         self.assertEqual(
495             res['services'][0]['lifecycle-state'], 'planned')
496         self.assertEqual(
497             res['services'][0]['service-layer'], 'wdm')
498         self.assertEqual(
499             res['services'][0]['service-a-end']['service-rate'], 400)
500         self.assertEqual(
501             res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
502         self.assertEqual(
503             res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
504         time.sleep(2)
505
506     def test_25_check_interface_ODUC4_xpdra2(self):
507         response = test_utils.check_netconf_node_request(
508             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
509         self.assertEqual(response.status_code, requests.codes.ok)
510         res = response.json()
511         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
512                         'administrative-state': 'inService',
513                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
514                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
515                         'type': 'org-openroadm-interfaces:otnOdu',
516                         'supporting-port': 'L1'}
517         # SAPI/DAPI are added in the Otu4 renderer
518         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
519                         'rate': 'org-openroadm-otn-common-types:ODUCn',
520                         'expected-dapi': 'Nmbu2MNHvc4=',
521                         'expected-sapi': 'Nmbu2MNHvc4=',
522                         'tx-dapi': 'Nmbu2MNHvc4=',
523                         'tx-sapi': 'LY9PxYJqUbw='}
524
525         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
526                              res['interface'][0])
527         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
528                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
529         self.assertDictEqual(
530             {'payload-type': '22', 'exp-payload-type': '22'},
531             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
532
533     def test_26_check_interface_ODUC4_xpdrc2(self):
534         response = test_utils.check_netconf_node_request(
535             "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
536         self.assertEqual(response.status_code, requests.codes.ok)
537         res = response.json()
538         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
539                         'administrative-state': 'inService',
540                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
541                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
542                         'type': 'org-openroadm-interfaces:otnOdu',
543                         'supporting-port': 'L1'}
544         # SAPI/DAPI are added in the Otu4 renderer
545         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
546                         'rate': 'org-openroadm-otn-common-types:ODUCn',
547                         'tx-sapi': 'Nmbu2MNHvc4=',
548                         'tx-dapi': 'LY9PxYJqUbw=',
549                         'expected-sapi': 'LY9PxYJqUbw=',
550                         'expected-dapi': 'LY9PxYJqUbw='
551                         }
552         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
553                              res['interface'][0])
554         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
555                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
556         self.assertDictEqual(
557             {'payload-type': '22', 'exp-payload-type': '22'},
558             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
559
560     def test_27_check_otn_topo_links(self):
561         response = test_utils.get_otn_topo_request()
562         self.assertEqual(response.status_code, requests.codes.ok)
563         res = response.json()
564         nb_links = len(res['network'][0]['ietf-network-topology:link'])
565         self.assertEqual(nb_links, 4)
566         for link in res['network'][0]['ietf-network-topology:link']:
567             linkId = link['link-id']
568             if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
569                            'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
570                 self.assertEqual(
571                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
572                 self.assertEqual(
573                     link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
574             elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
575                              'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
576                 self.assertEqual(
577                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
578                 self.assertEqual(
579                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
580                 self.assertEqual(
581                     link['transportpce-topology:otn-link-type'], 'ODUC4')
582                 self.assertEqual(
583                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
584                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
585                               ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
586                                'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
587             else:
588                 self.fail("this link should not exist")
589
590     def test_28_check_otn_topo_tp(self):
591         response = test_utils.get_otn_topo_request()
592         res = response.json()
593         for node in res['network'][0]['node']:
594             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
595                 tpList = node['ietf-network-topology:termination-point']
596                 for tp in tpList:
597                     if tp['tp-id'] == 'XPDR2-NETWORK1':
598                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
599                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
600                         self.assertEqual(
601                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
602                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
603                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
604
605 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
606     def test_29_create_100GE_service_1(self):
607         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
608         self.cr_serv_sample_data["input"]["connection-type"] = "service"
609         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
610         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
611         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
612         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
613         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
614         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
615         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
616         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
617         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
618         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
619         response = test_utils.service_create_request(self.cr_serv_sample_data)
620         self.assertEqual(response.status_code, requests.codes.ok)
621         res = response.json()
622         self.assertIn('PCE calculation in progress',
623                       res['output']['configuration-response-common']['response-message'])
624         time.sleep(self.WAITING)
625
626     def test_30_get_100GE_service_1(self):
627         response = test_utils.get_service_list_request(
628             "services/service-100GE")
629         self.assertEqual(response.status_code, requests.codes.ok)
630         res = response.json()
631         self.assertEqual(
632             res['services'][0]['administrative-state'], 'inService')
633         self.assertEqual(
634             res['services'][0]['service-name'], 'service-100GE')
635         self.assertEqual(
636             res['services'][0]['connection-type'], 'service')
637         self.assertEqual(
638             res['services'][0]['lifecycle-state'], 'planned')
639         time.sleep(2)
640
641     def test_31_check_interface_100GE_CLIENT_xpdra2(self):
642         response = test_utils.check_netconf_node_request(
643             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
644         self.assertEqual(response.status_code, requests.codes.ok)
645         res = response.json()
646         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
647                         'administrative-state': 'inService',
648                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
649                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
650                         'supporting-port': 'C1'
651                         }
652         input_dict_2 = {'speed': 100000}
653         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
654                              res['interface'][0])
655         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
656                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
657
658     def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
659         response = test_utils.check_netconf_node_request(
660             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
661         self.assertEqual(response.status_code, requests.codes.ok)
662         res = response.json()
663         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
664                         'administrative-state': 'inService',
665                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
666                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
667                         'type': 'org-openroadm-interfaces:otnOdu',
668                         'supporting-port': 'C1'}
669         input_dict_2 = {
670             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
671             'rate': 'org-openroadm-otn-common-types:ODU4',
672             'monitoring-mode': 'terminated'}
673
674         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
675                              res['interface'][0])
676         self.assertDictEqual(dict(input_dict_2,
677                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
678                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
679         self.assertDictEqual(
680             {'payload-type': '07', 'exp-payload-type': '07'},
681             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
682
683     def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
684         response = test_utils.check_netconf_node_request(
685             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
686         self.assertEqual(response.status_code, requests.codes.ok)
687         res = response.json()
688         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
689                         'administrative-state': 'inService',
690                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
691                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
692                         'type': 'org-openroadm-interfaces:otnOdu',
693                         'supporting-port': 'L1'}
694         input_dict_2 = {
695             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
696             'rate': 'org-openroadm-otn-common-types:ODU4',
697             'monitoring-mode': 'not-terminated'}
698         input_dict_3 = {'trib-port-number': 1}
699
700         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
701                              res['interface'][0])
702         self.assertDictEqual(dict(input_dict_2,
703                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
704                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
705         self.assertDictEqual(dict(input_dict_3,
706                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
707                                       'parent-odu-allocation']),
708                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
709         self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
710                       ['opucn-trib-slots'])
711         self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
712                       ['opucn-trib-slots'])
713
714     def test_34_check_ODU4_connection_xpdra2(self):
715         response = test_utils.check_netconf_node_request(
716             "XPDR-A2",
717             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
718         self.assertEqual(response.status_code, requests.codes.ok)
719         res = response.json()
720         input_dict_1 = {
721             'connection-name':
722             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
723             'direction': 'bidirectional'
724         }
725
726         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
727                              res['odu-connection'][0])
728         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
729                              res['odu-connection'][0]['destination'])
730         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
731                              res['odu-connection'][0]['source'])
732
733     def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
734         response = test_utils.check_netconf_node_request(
735             "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
736         self.assertEqual(response.status_code, requests.codes.ok)
737         res = response.json()
738         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
739                         'administrative-state': 'inService',
740                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
741                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
742                         'supporting-port': 'C1'
743                         }
744         input_dict_2 = {'speed': 100000}
745         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
746                              res['interface'][0])
747         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
748                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
749
750     def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
751         response = test_utils.check_netconf_node_request(
752             "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
753         self.assertEqual(response.status_code, requests.codes.ok)
754         res = response.json()
755         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
756                         'administrative-state': 'inService',
757                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
758                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
759                         'type': 'org-openroadm-interfaces:otnOdu',
760                         'supporting-port': 'C1'}
761         input_dict_2 = {
762             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
763             'rate': 'org-openroadm-otn-common-types:ODU4',
764             'monitoring-mode': 'terminated'}
765
766         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
767                              res['interface'][0])
768         self.assertDictEqual(dict(input_dict_2,
769                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
770                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
771         self.assertDictEqual(
772             {'payload-type': '07', 'exp-payload-type': '07'},
773             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
774
775     def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
776         response = test_utils.check_netconf_node_request(
777             "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
778         self.assertEqual(response.status_code, requests.codes.ok)
779         res = response.json()
780         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
781                         'administrative-state': 'inService',
782                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
783                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
784                         'type': 'org-openroadm-interfaces:otnOdu',
785                         'supporting-port': 'C1'}
786         input_dict_2 = {
787             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
788             'rate': 'org-openroadm-otn-common-types:ODU4',
789             'monitoring-mode': 'not-terminated'}
790
791         input_dict_3 = {'trib-port-number': 1}
792
793         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
794                              res['interface'][0])
795         self.assertDictEqual(dict(input_dict_2,
796                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
797                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
798         self.assertDictEqual(dict(input_dict_3,
799                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
800                                       'parent-odu-allocation']),
801                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
802             'parent-odu-allocation'])
803         self.assertIn('1.1',
804                       res['interface'][0][
805                           'org-openroadm-otn-odu-interfaces:odu'][
806                           'parent-odu-allocation']['opucn-trib-slots'])
807         self.assertIn('1.20',
808                       res['interface'][0][
809                           'org-openroadm-otn-odu-interfaces:odu'][
810                           'parent-odu-allocation']['opucn-trib-slots'])
811
812     def test_38_check_ODU4_connection_xpdrc2(self):
813         response = test_utils.check_netconf_node_request(
814             "XPDR-C2",
815             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
816         self.assertEqual(response.status_code, requests.codes.ok)
817         res = response.json()
818         input_dict_1 = {
819             'connection-name':
820             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
821             'direction': 'bidirectional'
822         }
823
824         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
825                              res['odu-connection'][0])
826         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
827                              res['odu-connection'][0]['destination'])
828         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
829                              res['odu-connection'][0]['source'])
830
831     def test_39_check_otn_topo_links(self):
832         response = test_utils.get_otn_topo_request()
833         self.assertEqual(response.status_code, requests.codes.ok)
834         res = response.json()
835         nb_links = len(res['network'][0]['ietf-network-topology:link'])
836         self.assertEqual(nb_links, 4)
837         for link in res['network'][0]['ietf-network-topology:link']:
838             linkId = link['link-id']
839             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
840                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
841                 self.assertEqual(
842                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
843                 self.assertEqual(
844                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
845
846     def test_40_check_otn_topo_tp(self):
847         response = test_utils.get_otn_topo_request()
848         res = response.json()
849         for node in res['network'][0]['node']:
850             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
851                 tpList = node['ietf-network-topology:termination-point']
852                 for tp in tpList:
853                     if tp['tp-id'] == 'XPDR2-NETWORK1':
854                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
855                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
856                         tsPoolList = list(range(1, 20))
857                         self.assertNotIn(
858                             tsPoolList, xpdrTpPortConAt['ts-pool'])
859                         self.assertEqual(
860                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
861                         self.assertNotIn(
862                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
863
864 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
865     def test_41_create_100GE_service_2(self):
866         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
867         self.cr_serv_sample_data["input"]["connection-type"] = "service"
868         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
869         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
870         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
871         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
872         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
873         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
874         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
875         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
876         response = test_utils.service_create_request(self.cr_serv_sample_data)
877         self.assertEqual(response.status_code, requests.codes.ok)
878         res = response.json()
879         self.assertIn('PCE calculation in progress',
880                       res['output']['configuration-response-common']['response-message'])
881         time.sleep(self.WAITING)
882
883     def test_42_get_100GE_service_2(self):
884         response = test_utils.get_service_list_request(
885             "services/service-100GE2")
886         self.assertEqual(response.status_code, requests.codes.ok)
887         res = response.json()
888         self.assertEqual(
889             res['services'][0]['administrative-state'], 'inService')
890         self.assertEqual(
891             res['services'][0]['service-name'], 'service-100GE2')
892         self.assertEqual(
893             res['services'][0]['connection-type'], 'service')
894         self.assertEqual(
895             res['services'][0]['lifecycle-state'], 'planned')
896         time.sleep(2)
897
898     def test_43_check_service_list(self):
899         response = test_utils.get_service_list_request("")
900         self.assertEqual(response.status_code, requests.codes.ok)
901         res = response.json()
902         self.assertEqual(len(res['service-list']['services']), 4)
903         time.sleep(2)
904
905     def test_44_check_otn_topo_links(self):
906         response = test_utils.get_otn_topo_request()
907         self.assertEqual(response.status_code, requests.codes.ok)
908         res = response.json()
909         nb_links = len(res['network'][0]['ietf-network-topology:link'])
910         self.assertEqual(nb_links, 4)
911         for link in res['network'][0]['ietf-network-topology:link']:
912             linkId = link['link-id']
913             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
914                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
915                 self.assertEqual(
916                     link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
917                 self.assertEqual(
918                     link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
919
920     def test_45_check_otn_topo_tp(self):
921         response = test_utils.get_otn_topo_request()
922         res = response.json()
923         for node in res['network'][0]['node']:
924             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
925                 tpList = node['ietf-network-topology:termination-point']
926                 for tp in tpList:
927                     if tp['tp-id'] == 'XPDR2-NETWORK1':
928                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
929                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
930                         tsPoolList = list(range(1, 40))
931                         self.assertNotIn(
932                             tsPoolList, xpdrTpPortConAt['ts-pool'])
933                         self.assertEqual(
934                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
935                         self.assertNotIn(
936                             2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
937
938     def test_46_delete_100GE_service_2(self):
939         response = test_utils.service_delete_request("service-100GE2")
940         self.assertEqual(response.status_code, requests.codes.ok)
941         res = response.json()
942         self.assertIn('Renderer service delete in progress',
943                       res['output']['configuration-response-common']['response-message'])
944         time.sleep(self.WAITING)
945
946     def test_47_delete_100GE_service_1(self):
947         response = test_utils.service_delete_request("service-100GE")
948         self.assertEqual(response.status_code, requests.codes.ok)
949         res = response.json()
950         self.assertIn('Renderer service delete in progress',
951                       res['output']['configuration-response-common']['response-message'])
952         time.sleep(self.WAITING)
953
954     def test_48_check_service_list(self):
955         response = test_utils.get_service_list_request("")
956         self.assertEqual(response.status_code, requests.codes.ok)
957         res = response.json()
958         self.assertEqual(len(res['service-list']['services']), 2)
959         time.sleep(2)
960
961     def test_49_check_no_ODU4_connection_xpdra2(self):
962         response = test_utils.check_netconf_node_request("XPDR-A2", "")
963         self.assertEqual(response.status_code, requests.codes.ok)
964         res = response.json()
965         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
966         time.sleep(1)
967
968     def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
969         response = test_utils.check_netconf_node_request(
970             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
971         self.assertEqual(response.status_code, requests.codes.conflict)
972
973     def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
974         response = test_utils.check_netconf_node_request(
975             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
976         self.assertEqual(response.status_code, requests.codes.conflict)
977
978     def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
979         response = test_utils.check_netconf_node_request(
980             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
981         self.assertEqual(response.status_code, requests.codes.conflict)
982
983     def test_53_check_otn_topo_links(self):
984         response = test_utils.get_otn_topo_request()
985         self.assertEqual(response.status_code, requests.codes.ok)
986         res = response.json()
987         nb_links = len(res['network'][0]['ietf-network-topology:link'])
988         self.assertEqual(nb_links, 4)
989         for link in res['network'][0]['ietf-network-topology:link']:
990             linkId = link['link-id']
991             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
992                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
993                 self.assertEqual(
994                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
995                 self.assertEqual(
996                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
997
998     def test_54_check_otn_topo_tp(self):
999         response = test_utils.get_otn_topo_request()
1000         res = response.json()
1001         for node in res['network'][0]['node']:
1002             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1003                 tpList = node['ietf-network-topology:termination-point']
1004                 for tp in tpList:
1005                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1006                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1007                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1008                         self.assertEqual(
1009                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1010
1011     def test_55_delete_ODUC4_service(self):
1012         response = test_utils.service_delete_request("service1-ODUC4")
1013         self.assertEqual(response.status_code, requests.codes.ok)
1014         res = response.json()
1015         self.assertIn('Renderer service delete in progress',
1016                       res['output']['configuration-response-common']['response-message'])
1017         time.sleep(self.WAITING)
1018
1019     def test_56_check_service_list(self):
1020         response = test_utils.get_service_list_request("")
1021         self.assertEqual(response.status_code, requests.codes.ok)
1022         res = response.json()
1023         self.assertEqual(len(res['service-list']['services']), 1)
1024         time.sleep(2)
1025
1026     def test_57_check_no_interface_ODU4_xpdra2(self):
1027         response = test_utils.check_netconf_node_request(
1028             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1029         self.assertEqual(response.status_code, requests.codes.conflict)
1030
1031     def test_58_check_otn_topo_links(self):
1032         self.test_22_check_otn_topo_OTUC4_links()
1033
1034     def test_59_check_otn_topo_tp(self):
1035         response = test_utils.get_otn_topo_request()
1036         res = response.json()
1037         for node in res['network'][0]['node']:
1038             if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1039                 tpList = node['ietf-network-topology:termination-point']
1040                 for tp in tpList:
1041                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1042                         self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1043                                          dict.keys(tp))
1044
1045     def test_60_delete_OTUC4_service(self):
1046         response = test_utils.service_delete_request("service1-OTUC4")
1047         self.assertEqual(response.status_code, requests.codes.ok)
1048         res = response.json()
1049         self.assertIn('Renderer service delete in progress',
1050                       res['output']['configuration-response-common']['response-message'])
1051         time.sleep(self.WAITING)
1052
1053     def test_61_get_no_service(self):
1054         response = test_utils.get_service_list_request("")
1055         self.assertEqual(response.status_code, requests.codes.conflict)
1056         res = response.json()
1057         self.assertIn(
1058             {"error-type": "application", "error-tag": "data-missing",
1059              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1060             res['errors']['error'])
1061         time.sleep(1)
1062
1063     def test_62_check_no_interface_OTUC4_xpdra2(self):
1064         response = test_utils.check_netconf_node_request(
1065             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1066         self.assertEqual(response.status_code, requests.codes.conflict)
1067
1068     def test_63_check_no_interface_OCH_xpdra2(self):
1069         response = test_utils.check_netconf_node_request(
1070             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1071         self.assertEqual(response.status_code, requests.codes.conflict)
1072
1073     def test_64_check_no_interface_OTSI_xpdra2(self):
1074         response = test_utils.check_netconf_node_request(
1075             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1076         self.assertEqual(response.status_code, requests.codes.conflict)
1077
1078     def test_65_getLinks_OtnTopology(self):
1079         response = test_utils.get_otn_topo_request()
1080         self.assertEqual(response.status_code, requests.codes.ok)
1081         res = response.json()
1082         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1083
1084     def test_66_check_openroadm_topo_xpdra2(self):
1085         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1086         self.assertEqual(response.status_code, requests.codes.ok)
1087         res = response.json()
1088         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1089         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1090         self.assertNotIn('wavelength', dict.keys(
1091             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1092         time.sleep(3)
1093
1094     def test_67_check_openroadm_topology(self):
1095         response = test_utils.get_ordm_topo_request("")
1096         self.assertEqual(response.status_code, requests.codes.ok)
1097         res = response.json()
1098         links = res['network'][0]['ietf-network-topology:link']
1099         self.assertEqual(22, len(links), 'Topology should contain 22 links')
1100
1101     def test_68_connect_xprda2_1_N1_to_roadma_PP2(self):
1102         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1103                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1104         self.assertEqual(response.status_code, requests.codes.ok)
1105         res = response.json()
1106         self.assertIn('Xponder Roadm Link created successfully',
1107                       res["output"]["result"])
1108         time.sleep(2)
1109
1110     def test_69_connect_roadma_PP2_to_xpdra2_1_N1(self):
1111         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1112                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1113         self.assertEqual(response.status_code, requests.codes.ok)
1114         res = response.json()
1115         self.assertIn('Roadm Xponder links created successfully',
1116                       res["output"]["result"])
1117         time.sleep(2)
1118
1119     def test_70_connect_xprdc2_1_N1_to_roadmc_PP2(self):
1120         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1121                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1122         self.assertEqual(response.status_code, requests.codes.ok)
1123         res = response.json()
1124         self.assertIn('Xponder Roadm Link created successfully',
1125                       res["output"]["result"])
1126         time.sleep(2)
1127
1128     def test_71_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
1129         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1130                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1131         self.assertEqual(response.status_code, requests.codes.ok)
1132         res = response.json()
1133         self.assertIn('Roadm Xponder links created successfully',
1134                       res["output"]["result"])
1135         time.sleep(2)
1136
1137
1138 # test service-create for 400GE service from xpdra2 to xpdrc2
1139
1140     def test_72_create_400GE_service(self):
1141         self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1142         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1143         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1144         response = test_utils.service_create_request(self.cr_serv_sample_data)
1145         self.assertEqual(response.status_code, requests.codes.ok)
1146         res = response.json()
1147         self.assertIn('PCE calculation in progress',
1148                       res['output']['configuration-response-common']['response-message'])
1149         time.sleep(self.WAITING)
1150
1151     def test_73_get_400GE_service(self):
1152         response = test_utils.get_service_list_request(
1153             "services/service-400GE")
1154         self.assertEqual(response.status_code, requests.codes.ok)
1155         res = response.json()
1156         self.assertEqual(
1157             res['services'][0]['administrative-state'], 'inService')
1158         self.assertEqual(
1159             res['services'][0]['service-name'], 'service-400GE')
1160         self.assertEqual(
1161             res['services'][0]['connection-type'], 'service')
1162         self.assertEqual(
1163             res['services'][0]['lifecycle-state'], 'planned')
1164         time.sleep(2)
1165
1166     def test_74_check_xc1_roadma(self):
1167         response = test_utils.check_netconf_node_request(
1168             "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1169         self.assertEqual(response.status_code, requests.codes.ok)
1170         res = response.json()
1171         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1172         self.assertDictEqual(
1173             dict({
1174                 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1175                 'opticalControlMode': 'gainLoss',
1176                 'target-output-power': -3.0
1177             }, **res['roadm-connections'][0]),
1178             res['roadm-connections'][0]
1179         )
1180         self.assertDictEqual(
1181             {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1182             res['roadm-connections'][0]['source'])
1183         self.assertDictEqual(
1184             {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1185             res['roadm-connections'][0]['destination'])
1186         time.sleep(5)
1187
1188     def test_75_check_topo_xpdra2(self):
1189         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1190         self.assertEqual(response.status_code, requests.codes.ok)
1191         res = response.json()
1192         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1193         for ele in liste_tp:
1194             if ele['tp-id'] == 'XPDR1-NETWORK1':
1195                 self.assertEqual({'frequency': 196.08125,
1196                                   'width': 75},
1197                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1198             if ele['tp-id'] == 'XPDR1-CLIENT1':
1199                 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1200         time.sleep(3)
1201
1202     def test_76_check_topo_roadma_SRG1(self):
1203         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1204         self.assertEqual(response.status_code, requests.codes.ok)
1205         res = response.json()
1206         freq_map = base64.b64decode(
1207             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1208         freq_map_array = [int(x) for x in freq_map]
1209         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1210         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1211         for ele in liste_tp:
1212             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1213                 freq_map = base64.b64decode(
1214                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1215                 freq_map_array = [int(x) for x in freq_map]
1216                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1217             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1218                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1219         time.sleep(3)
1220
1221     def test_77_check_topo_roadma_DEG1(self):
1222         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1223         self.assertEqual(response.status_code, requests.codes.ok)
1224         res = response.json()
1225         freq_map = base64.b64decode(
1226             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1227         freq_map_array = [int(x) for x in freq_map]
1228         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1229         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1230         for ele in liste_tp:
1231             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1232                 freq_map = base64.b64decode(
1233                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1234                 freq_map_array = [int(x) for x in freq_map]
1235                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1236             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1237                 freq_map = base64.b64decode(
1238                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1239                 freq_map_array = [int(x) for x in freq_map]
1240                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1241         time.sleep(3)
1242
1243     def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1244         response = test_utils.check_netconf_node_request(
1245             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1246         self.assertEqual(response.status_code, requests.codes.ok)
1247         res = response.json()
1248         input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1249                         'administrative-state': 'inService',
1250                         'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1251                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1252                         'supporting-port': 'C1'
1253                         }
1254         input_dict_2 = {'speed': 400000}
1255         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1256                              res['interface'][0])
1257         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1258                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1259
1260     def test_79_check_interface_OTSI_xpdra2(self):
1261         response = test_utils.check_netconf_node_request(
1262             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1263         self.assertEqual(response.status_code, requests.codes.ok)
1264         res = response.json()
1265         input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1266                         'administrative-state': 'inService',
1267                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1268                         'type': 'org-openroadm-interfaces:otsi',
1269                         'supporting-port': 'L1'}
1270         input_dict_2 = {
1271             "frequency": 196.0812,
1272             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1273             "fec": "org-openroadm-common-types:ofec",
1274             "transmit-power": -5,
1275             "provision-mode": "explicit",
1276             "modulation-format": "dp-qam16"}
1277
1278         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1279                              res['interface'][0])
1280         self.assertDictEqual(dict(input_dict_2,
1281                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1282                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1283         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1284                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1285
1286     def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1287         response = test_utils.check_netconf_node_request(
1288             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1289         self.assertEqual(response.status_code, requests.codes.ok)
1290         res = response.json()
1291         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1292                         'administrative-state': 'inService',
1293                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1294                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1295                         'type': 'org-openroadm-interfaces:otsi-group',
1296                         'supporting-port': 'L1'}
1297         input_dict_2 = {"group-id": 1,
1298                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1299
1300         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1301                              res['interface'][0])
1302         self.assertDictEqual(dict(input_dict_2,
1303                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1304                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1305
1306     def test_81_check_interface_OTUC4_xpdra2(self):
1307         response = test_utils.check_netconf_node_request(
1308             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1309         self.assertEqual(response.status_code, requests.codes.ok)
1310         res = response.json()
1311         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1312                         'administrative-state': 'inService',
1313                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1314                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1315                         'type': 'org-openroadm-interfaces:otnOtu',
1316                         'supporting-port': 'L1'}
1317         input_dict_2 = {"tx-sapi": "ANeUjNzWtDLV",
1318                         "expected-dapi": "ANeUjNzWtDLV",
1319                         'tx-dapi': 'AKsqPmWceByv',
1320                         'expected-sapi': 'AKsqPmWceByv',
1321                         "rate": "org-openroadm-otn-common-types:OTUCn",
1322                         "degthr-percentage": 100,
1323                         "tim-detect-mode": "Disabled",
1324                         "otucn-n-rate": 4,
1325                         "degm-intervals": 2}
1326
1327         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1328                              res['interface'][0])
1329         self.assertDictEqual(dict(input_dict_2,
1330                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1331                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1332
1333     def test_82_check_interface_ODUC4_xpdra2(self):
1334         response = test_utils.check_netconf_node_request(
1335             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1336         self.assertEqual(response.status_code, requests.codes.ok)
1337         res = response.json()
1338         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1339                         'administrative-state': 'inService',
1340                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1341                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTUC4',
1342                         'type': 'org-openroadm-interfaces:otnOdu',
1343                         'supporting-port': 'L1',
1344                         'circuit-id': 'TBD',
1345                         'description': 'TBD'}
1346         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1347                         "tim-detect-mode": "Disabled",
1348                         "degm-intervals": 2,
1349                         "degthr-percentage": 100,
1350                         "monitoring-mode": "terminated",
1351                         "rate": "org-openroadm-otn-common-types:ODUCn",
1352                         "oducn-n-rate": 4}
1353         input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1354
1355         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1356                              res['interface'][0])
1357         self.assertDictEqual(dict(input_dict_2,
1358                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1359                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1360         self.assertDictEqual(dict(input_dict_3,
1361                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1362                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1363         self.assertEqual('XPDR1-NETWORK1-OTUC4', res['interface'][0]['supporting-interface-list'][0])
1364
1365     def test_83_delete_400GE_service(self):
1366         response = test_utils.service_delete_request("service-400GE")
1367         self.assertEqual(response.status_code, requests.codes.ok)
1368         res = response.json()
1369         self.assertIn('Renderer service delete in progress',
1370                       res['output']['configuration-response-common']['response-message'])
1371         time.sleep(self.WAITING)
1372
1373     def test_84_get_no_service(self):
1374         response = test_utils.get_service_list_request("")
1375         self.assertEqual(response.status_code, requests.codes.conflict)
1376         res = response.json()
1377         self.assertIn(
1378             {"error-type": "application", "error-tag": "data-missing",
1379              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1380             res['errors']['error'])
1381         time.sleep(1)
1382
1383     def test_85_check_no_interface_ODUC4_xpdra2(self):
1384         response = test_utils.check_netconf_node_request(
1385             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1386         self.assertEqual(response.status_code, requests.codes.conflict)
1387
1388     def test_86_check_no_interface_OTUC4_xpdra2(self):
1389         response = test_utils.check_netconf_node_request(
1390             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1391         self.assertEqual(response.status_code, requests.codes.conflict)
1392
1393     def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1394         response = test_utils.check_netconf_node_request(
1395             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1396         self.assertEqual(response.status_code, requests.codes.conflict)
1397
1398     def test_88_check_no_interface_OTSI_xpdra2(self):
1399         response = test_utils.check_netconf_node_request(
1400             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1401         self.assertEqual(response.status_code, requests.codes.conflict)
1402
1403     def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1404         response = test_utils.check_netconf_node_request(
1405             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1406         self.assertEqual(response.status_code, requests.codes.conflict)
1407
1408     def test_90_disconnect_xponders_from_roadm(self):
1409         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1410         response = test_utils.get_ordm_topo_request("")
1411         self.assertEqual(response.status_code, requests.codes.ok)
1412         res = response.json()
1413         links = res['network'][0]['ietf-network-topology:link']
1414         for link in links:
1415             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1416                 link_name = link["link-id"]
1417                 response = test_utils.delete_request(url+link_name)
1418                 self.assertEqual(response.status_code, requests.codes.ok)
1419
1420     def test_91_disconnect_xpdra2(self):
1421         response = test_utils.unmount_device("XPDR-A2")
1422         self.assertEqual(response.status_code, requests.codes.ok,
1423                          test_utils.CODE_SHOULD_BE_200)
1424
1425     def test_92_disconnect_xpdrc2(self):
1426         response = test_utils.unmount_device("XPDR-C2")
1427         self.assertEqual(response.status_code, requests.codes.ok,
1428                          test_utils.CODE_SHOULD_BE_200)
1429
1430     def test_93_disconnect_roadmA(self):
1431         response = test_utils.unmount_device("ROADM-A1")
1432         self.assertEqual(response.status_code, requests.codes.ok,
1433                          test_utils.CODE_SHOULD_BE_200)
1434
1435     def test_94_disconnect_roadmC(self):
1436         response = test_utils.unmount_device("ROADM-C1")
1437         self.assertEqual(response.status_code, requests.codes.ok,
1438                          test_utils.CODE_SHOULD_BE_200)
1439
1440
1441 if __name__ == "__main__":
1442     unittest.main(verbosity=2)