Fix some pylint warnings
[transportpce.git] / tests / transportpce_tests / hybrid / test02_B100G_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION_221 = '2.2.1'
33     NODE_VERSION_71 = '7.1'
34
35     cr_serv_sample_data = {"input": {
36         "sdnc-request-header": {
37             "request-id": "request-1",
38             "rpc-action": "service-create",
39             "request-system-id": "appname"
40         },
41         "service-name": "service1-OTUC4",
42         "common-id": "commonId",
43         "connection-type": "infrastructure",
44         "service-a-end": {
45             "service-rate": "400",
46             "node-id": "XPDR-A2",
47             "service-format": "OTU",
48             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
49             "clli": "NodeSA",
50             "subrate-eth-sla": {
51                     "subrate-eth-sla": {
52                         "committed-info-rate": "100000",
53                         "committed-burst-size": "64"
54                     }
55             },
56             "tx-direction": {
57                 "port": {
58                     "port-device-name": "XPDR-A2-XPDR2",
59                     "port-type": "fixed",
60                     "port-name": "XPDR2-NETWORK1",
61                     "port-rack": "000000.00",
62                     "port-shelf": "Chassis#1"
63                 },
64                 "lgx": {
65                     "lgx-device-name": "Some lgx-device-name",
66                     "lgx-port-name": "Some lgx-port-name",
67                     "lgx-port-rack": "000000.00",
68                     "lgx-port-shelf": "00"
69                 }
70             },
71             "rx-direction": {
72                 "port": {
73                     "port-device-name": "XPDR-A2-XPDR2",
74                     "port-type": "fixed",
75                     "port-name": "XPDR2-NETWORK1",
76                     "port-rack": "000000.00",
77                     "port-shelf": "Chassis#1"
78                 },
79                 "lgx": {
80                     "lgx-device-name": "Some lgx-device-name",
81                     "lgx-port-name": "Some lgx-port-name",
82                     "lgx-port-rack": "000000.00",
83                     "lgx-port-shelf": "00"
84                 }
85             },
86             "optic-type": "gray"
87         },
88         "service-z-end": {
89             "service-rate": "400",
90             "node-id": "XPDR-C2",
91             "service-format": "OTU",
92             "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
93             "clli": "NodeSC",
94             "subrate-eth-sla": {
95                     "subrate-eth-sla": {
96                         "committed-info-rate": "100000",
97                         "committed-burst-size": "64"
98                     }
99             },
100             "tx-direction": {
101                 "port": {
102                     "port-device-name": "XPDR-C2-XPDR2",
103                     "port-type": "fixed",
104                     "port-name": "XPDR2-NETWORK1",
105                     "port-rack": "000000.00",
106                     "port-shelf": "Chassis#1"
107                 },
108                 "lgx": {
109                     "lgx-device-name": "Some lgx-device-name",
110                     "lgx-port-name": "Some lgx-port-name",
111                     "lgx-port-rack": "000000.00",
112                     "lgx-port-shelf": "00"
113                 }
114             },
115             "rx-direction": {
116                 "port": {
117                     "port-device-name": "XPDR-C2-XPDR2",
118                     "port-type": "fixed",
119                     "port-name": "XPDR2-NETWORK1",
120                     "port-rack": "000000.00",
121                     "port-shelf": "Chassis#1"
122                 },
123                 "lgx": {
124                     "lgx-device-name": "Some lgx-device-name",
125                     "lgx-port-name": "Some lgx-port-name",
126                     "lgx-port-rack": "000000.00",
127                     "lgx-port-shelf": "00"
128                 }
129             },
130             "optic-type": "gray"
131         },
132         "due-date": "2018-06-15T00:00:01Z",
133         "operator-contact": "pw1234"
134     }
135     }
136
137     @classmethod
138     def setUpClass(cls):
139         cls.processes = test_utils.start_tpce()
140         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
141                                                ('roadma', cls.NODE_VERSION_221),
142                                                ('roadmc', cls.NODE_VERSION_221),
143                                                ('xpdrc2', cls.NODE_VERSION_71)])
144
145     @classmethod
146     def tearDownClass(cls):
147         # pylint: disable=not-an-iterable
148         for process in cls.processes:
149             test_utils.shutdown_process(process)
150         print("all processes killed")
151
152     def setUp(self):
153         time.sleep(5)
154
155     def test_01_connect_xpdra2(self):
156         response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
157         self.assertEqual(response.status_code,
158                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
159
160     def test_02_connect_xpdrc2(self):
161         response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
162         self.assertEqual(response.status_code,
163                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
164
165     def test_03_connect_rdma(self):
166         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
167         self.assertEqual(response.status_code,
168                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
169
170     def test_04_connect_rdmc(self):
171         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
172         self.assertEqual(response.status_code,
173                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
174
175     def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
176         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
177                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
178         self.assertEqual(response.status_code, requests.codes.ok)
179         res = response.json()
180         self.assertIn('Xponder Roadm Link created successfully',
181                       res["output"]["result"])
182         time.sleep(2)
183
184     def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
185         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
186                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
187         self.assertEqual(response.status_code, requests.codes.ok)
188         res = response.json()
189         self.assertIn('Roadm Xponder links created successfully',
190                       res["output"]["result"])
191         time.sleep(2)
192
193     def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
194         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
195                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
196         self.assertEqual(response.status_code, requests.codes.ok)
197         res = response.json()
198         self.assertIn('Xponder Roadm Link created successfully',
199                       res["output"]["result"])
200         time.sleep(2)
201
202     def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
203         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
204                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
205         self.assertEqual(response.status_code, requests.codes.ok)
206         res = response.json()
207         self.assertIn('Roadm Xponder links created successfully',
208                       res["output"]["result"])
209         time.sleep(2)
210
211     def test_09_add_omsAttributes_roadma_roadmc(self):
212         # Config ROADMA-ROADMC oms-attributes
213         data = {"span": {
214             "auto-spanloss": "true",
215             "spanloss-base": 11.4,
216             "spanloss-current": 12,
217             "engineered-spanloss": 12.2,
218             "link-concatenation": [{
219                 "SRLG-Id": 0,
220                 "fiber-type": "smf",
221                 "SRLG-length": 100000,
222                 "pmd": 0.5}]}}
223         response = test_utils.add_oms_attr_request(
224             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
225         self.assertEqual(response.status_code, requests.codes.created)
226
227     def test_10_add_omsAttributes_roadmc_roadma(self):
228         # Config ROADMC-ROADMA oms-attributes
229         data = {"span": {
230             "auto-spanloss": "true",
231             "spanloss-base": 11.4,
232             "spanloss-current": 12,
233             "engineered-spanloss": 12.2,
234             "link-concatenation": [{
235                 "SRLG-Id": 0,
236                 "fiber-type": "smf",
237                 "SRLG-length": 100000,
238                 "pmd": 0.5}]}}
239         response = test_utils.add_oms_attr_request(
240             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
241         self.assertEqual(response.status_code, requests.codes.created)
242
243 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
244     def test_11_check_otn_topology(self):
245         response = test_utils.get_otn_topo_request()
246         self.assertEqual(response.status_code, requests.codes.ok)
247         res = response.json()
248         nbNode = len(res['network'][0]['node'])
249         self.assertEqual(nbNode, 4, 'There should be 4 nodes')
250         self.assertNotIn('ietf-network-topology:link', res['network'][0])
251
252     def test_12_create_OTUC4_service(self):
253         response = test_utils.service_create_request(self.cr_serv_sample_data)
254         self.assertEqual(response.status_code, requests.codes.ok)
255         res = response.json()
256         self.assertIn('PCE calculation in progress',
257                       res['output']['configuration-response-common']['response-message'])
258         time.sleep(self.WAITING)
259
260     def test_13_get_OTUC4_service1(self):
261         response = test_utils.get_service_list_request(
262             "services/service1-OTUC4")
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         self.assertEqual(
266             res['services'][0]['administrative-state'], 'inService')
267         self.assertEqual(
268             res['services'][0]['service-name'], 'service1-OTUC4')
269         self.assertEqual(
270             res['services'][0]['connection-type'], 'infrastructure')
271         self.assertEqual(
272             res['services'][0]['lifecycle-state'], 'planned')
273         self.assertEqual(
274             res['services'][0]['service-layer'], 'wdm')
275         self.assertEqual(
276             res['services'][0]['service-a-end']['service-rate'], 400)
277         self.assertEqual(
278             res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
279         time.sleep(2)
280         self.assertEqual(
281             res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
282         time.sleep(2)
283
284     # Check correct configuration of devices
285     def test_14_check_interface_och_xpdra2(self):
286         response = test_utils.check_netconf_node_request(
287             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
288         self.assertEqual(response.status_code, requests.codes.ok)
289         res = response.json()
290         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
291                                    'administrative-state': 'inService',
292                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
293                                    'type': 'org-openroadm-interfaces:otsi',
294                                    'supporting-port': 'L1'
295                                    }, **res['interface'][0]),
296                              res['interface'][0])
297
298         self.assertDictEqual(
299             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
300                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
301                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
302             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
303
304     def test_15_check_interface_OTSI_GROUP_xpdra2(self):
305         response = test_utils.check_netconf_node_request(
306             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
307         self.assertEqual(response.status_code, requests.codes.ok)
308         res = response.json()
309         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
310                         'administrative-state': 'inService',
311                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
312                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
313                         'type': 'org-openroadm-interfaces:otsi-group',
314                         'supporting-port': 'L1'
315                         }
316         input_dict_2 = {"group-id": 1,
317                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
318                         }
319         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
320                              res['interface'][0])
321         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
322                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
323                              res['interface'][0]
324                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
325
326     def test_16_check_interface_OTUC4_xpdra2(self):
327         response = test_utils.check_netconf_node_request(
328             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
329         self.assertEqual(response.status_code, requests.codes.ok)
330         res = response.json()
331         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
332                         'administrative-state': 'inService',
333                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
334                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
335                         'type': 'org-openroadm-interfaces:otnOtu',
336                         'supporting-port': 'L1'
337                         }
338         input_dict_2 = {'tx-sapi': 'LY9PxYJqUbw=',
339                         'expected-dapi': 'LY9PxYJqUbw=',
340                         'rate': 'org-openroadm-otn-common-types:OTUCn',
341                         'degthr-percentage': 100,
342                         'degm-intervals': 2,
343                         'otucn-n-rate': 4
344                         }
345         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
346                              res['interface'][0])
347         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
348                              res['interface'][0]
349                              ['org-openroadm-otn-otu-interfaces:otu'])
350
351     def test_17_check_interface_och_xpdrc2(self):
352         response = test_utils.check_netconf_node_request(
353             "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
354         self.assertEqual(response.status_code, requests.codes.ok)
355         res = response.json()
356         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
357                                    'administrative-state': 'inService',
358                                    'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
359                                    'type': 'org-openroadm-interfaces:otsi',
360                                    'supporting-port': 'L1'
361                                    }, **res['interface'][0]),
362                              res['interface'][0])
363
364         self.assertDictEqual(
365             dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
366                   'transmit-power': -5, 'modulation-format': 'dp-qam16'},
367                  **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
368             res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
369
370     def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
371         response = test_utils.check_netconf_node_request(
372             "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
373         self.assertEqual(response.status_code, requests.codes.ok)
374         res = response.json()
375         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
376                         'administrative-state': 'inService',
377                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
378                         'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
379                         'type': 'org-openroadm-interfaces:otsi-group',
380                         'supporting-port': 'L1'
381                         }
382         input_dict_2 = {"group-id": 1,
383                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
384                         }
385         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
386                              res['interface'][0])
387         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
388                                   ['org-openroadm-otsi-group-interfaces:otsi-group']),
389                              res['interface'][0]
390                              ['org-openroadm-otsi-group-interfaces:otsi-group'])
391
392     def test_19_check_interface_OTUC4_xpdrc2(self):
393         response = test_utils.check_netconf_node_request(
394             "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
395         self.assertEqual(response.status_code, requests.codes.ok)
396         res = response.json()
397         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
398                         'administrative-state': 'inService',
399                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
400                         'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
401                         'type': 'org-openroadm-interfaces:otnOtu',
402                         'supporting-port': 'L1'
403                         }
404         input_dict_2 = {'tx-dapi': 'LY9PxYJqUbw=',
405                         'expected-sapi': 'LY9PxYJqUbw=',
406                         'tx-sapi': 'Nmbu2MNHvc4=',
407                         'expected-dapi': 'Nmbu2MNHvc4=',
408                         'rate': 'org-openroadm-otn-common-types:OTUCn',
409                         'degthr-percentage': 100,
410                         'degm-intervals': 2,
411                         'otucn-n-rate': 4
412                         }
413
414         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
415                              res['interface'][0])
416
417         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
418                              res['interface'][0]
419                              ['org-openroadm-otn-otu-interfaces:otu'])
420
421     def test_20_check_no_interface_ODUC4_xpdra2(self):
422         response = test_utils.check_netconf_node_request(
423             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
424         self.assertEqual(response.status_code, requests.codes.conflict)
425         res = response.json()
426         self.assertIn(
427             {"error-type": "application", "error-tag": "data-missing",
428              "error-message": "Request could not be completed because the relevant data model content does not exist"},
429             res['errors']['error'])
430
431     def test_21_check_openroadm_topo_xpdra2(self):
432         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
433         self.assertEqual(response.status_code, requests.codes.ok)
434         res = response.json()
435         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
436         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
437         self.assertEqual({'frequency': 196.08125,
438                           'width': 75},
439                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
440         time.sleep(3)
441
442     def test_22_check_otn_topo_OTUC4_links(self):
443         response = test_utils.get_otn_topo_request()
444         self.assertEqual(response.status_code, requests.codes.ok)
445         res = response.json()
446         nb_links = len(res['network'][0]['ietf-network-topology:link'])
447         self.assertEqual(nb_links, 2)
448         listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
449                       'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
450         for link in res['network'][0]['ietf-network-topology:link']:
451             self.assertIn(link['link-id'], listLinkId)
452             self.assertEqual(
453                 link['transportpce-topology:otn-link-type'], 'OTUC4')
454             self.assertEqual(
455                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
456             self.assertEqual(
457                 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
458             self.assertEqual(
459                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
460             self.assertIn(
461                 link['org-openroadm-common-network:opposite-link'], listLinkId)
462
463 # test service-create for ODU4 service from xpdra2 to xpdrc2
464     def test_23_create_ODUC4_service(self):
465         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
466         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
467         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
468         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
469         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
470         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
471         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
472
473         response = test_utils.service_create_request(self.cr_serv_sample_data)
474         self.assertEqual(response.status_code, requests.codes.ok)
475         res = response.json()
476         self.assertIn('PCE calculation in progress',
477                       res['output']['configuration-response-common']['response-message'])
478         time.sleep(self.WAITING)
479
480     def test_24_get_ODUC4_service1(self):
481         response = test_utils.get_service_list_request(
482             "services/service1-ODUC4")
483         self.assertEqual(response.status_code, requests.codes.ok)
484         res = response.json()
485         self.assertEqual(
486             res['services'][0]['administrative-state'], 'inService')
487         self.assertEqual(
488             res['services'][0]['service-name'], 'service1-ODUC4')
489         self.assertEqual(
490             res['services'][0]['connection-type'], 'infrastructure')
491         self.assertEqual(
492             res['services'][0]['lifecycle-state'], 'planned')
493         self.assertEqual(
494             res['services'][0]['service-layer'], 'wdm')
495         self.assertEqual(
496             res['services'][0]['service-a-end']['service-rate'], 400)
497         self.assertEqual(
498             res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
499         self.assertEqual(
500             res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
501         time.sleep(2)
502
503     def test_25_check_interface_ODUC4_xpdra2(self):
504         response = test_utils.check_netconf_node_request(
505             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
506         self.assertEqual(response.status_code, requests.codes.ok)
507         res = response.json()
508         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
509                         'administrative-state': 'inService',
510                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
511                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
512                         'type': 'org-openroadm-interfaces:otnOdu',
513                         'supporting-port': 'L1'}
514         # SAPI/DAPI are added in the Otu4 renderer
515         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
516                         'rate': 'org-openroadm-otn-common-types:ODUCn',
517                         'expected-dapi': 'Nmbu2MNHvc4=',
518                         'expected-sapi': 'Nmbu2MNHvc4=',
519                         'tx-dapi': 'Nmbu2MNHvc4=',
520                         'tx-sapi': 'LY9PxYJqUbw='}
521
522         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
523                              res['interface'][0])
524         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
525                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
526         self.assertDictEqual(
527             {'payload-type': '22', 'exp-payload-type': '22'},
528             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
529
530     def test_26_check_interface_ODUC4_xpdrc2(self):
531         response = test_utils.check_netconf_node_request(
532             "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
533         self.assertEqual(response.status_code, requests.codes.ok)
534         res = response.json()
535         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
536                         'administrative-state': 'inService',
537                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
538                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
539                         'type': 'org-openroadm-interfaces:otnOdu',
540                         'supporting-port': 'L1'}
541         # SAPI/DAPI are added in the Otu4 renderer
542         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
543                         'rate': 'org-openroadm-otn-common-types:ODUCn',
544                         'tx-sapi': 'Nmbu2MNHvc4=',
545                         'tx-dapi': 'LY9PxYJqUbw=',
546                         'expected-sapi': 'LY9PxYJqUbw=',
547                         'expected-dapi': 'LY9PxYJqUbw='
548                         }
549         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
550                              res['interface'][0])
551         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
552                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
553         self.assertDictEqual(
554             {'payload-type': '22', 'exp-payload-type': '22'},
555             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
556
557     def test_27_check_otn_topo_links(self):
558         response = test_utils.get_otn_topo_request()
559         self.assertEqual(response.status_code, requests.codes.ok)
560         res = response.json()
561         nb_links = len(res['network'][0]['ietf-network-topology:link'])
562         self.assertEqual(nb_links, 4)
563         for link in res['network'][0]['ietf-network-topology:link']:
564             linkId = link['link-id']
565             if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
566                            'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
567                 self.assertEqual(
568                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
569                 self.assertEqual(
570                     link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
571             elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
572                              'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
573                 self.assertEqual(
574                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
575                 self.assertEqual(
576                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
577                 self.assertEqual(
578                     link['transportpce-topology:otn-link-type'], 'ODUC4')
579                 self.assertEqual(
580                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
581                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
582                               ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
583                                'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
584             else:
585                 self.fail("this link should not exist")
586
587     def test_28_check_otn_topo_tp(self):
588         response = test_utils.get_otn_topo_request()
589         res = response.json()
590         for node in res['network'][0]['node']:
591             if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
592                 tpList = node['ietf-network-topology:termination-point']
593                 for tp in tpList:
594                     if tp['tp-id'] == 'XPDR2-NETWORK1':
595                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
596                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
597                         self.assertEqual(
598                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
599                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
600                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
601
602 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
603     def test_29_create_100GE_service_1(self):
604         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
605         self.cr_serv_sample_data["input"]["connection-type"] = "service"
606         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
607         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
608         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
609         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
610         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
612         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
613         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
614         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
615         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
616         response = test_utils.service_create_request(self.cr_serv_sample_data)
617         self.assertEqual(response.status_code, requests.codes.ok)
618         res = response.json()
619         self.assertIn('PCE calculation in progress',
620                       res['output']['configuration-response-common']['response-message'])
621         time.sleep(self.WAITING)
622
623     def test_30_get_100GE_service_1(self):
624         response = test_utils.get_service_list_request(
625             "services/service-100GE")
626         self.assertEqual(response.status_code, requests.codes.ok)
627         res = response.json()
628         self.assertEqual(
629             res['services'][0]['administrative-state'], 'inService')
630         self.assertEqual(
631             res['services'][0]['service-name'], 'service-100GE')
632         self.assertEqual(
633             res['services'][0]['connection-type'], 'service')
634         self.assertEqual(
635             res['services'][0]['lifecycle-state'], 'planned')
636         time.sleep(2)
637
638     def test_31_check_interface_100GE_CLIENT_xpdra2(self):
639         response = test_utils.check_netconf_node_request(
640             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
641         self.assertEqual(response.status_code, requests.codes.ok)
642         res = response.json()
643         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
644                         'administrative-state': 'inService',
645                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
646                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
647                         'supporting-port': 'C1'
648                         }
649         input_dict_2 = {'speed': 100000}
650         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
651                              res['interface'][0])
652         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
653                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
654
655     def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
656         response = test_utils.check_netconf_node_request(
657             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
658         self.assertEqual(response.status_code, requests.codes.ok)
659         res = response.json()
660         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
661                         'administrative-state': 'inService',
662                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
663                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
664                         'type': 'org-openroadm-interfaces:otnOdu',
665                         'supporting-port': 'C1'}
666         input_dict_2 = {
667             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
668             'rate': 'org-openroadm-otn-common-types:ODU4',
669             'monitoring-mode': 'terminated'}
670
671         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
672                              res['interface'][0])
673         self.assertDictEqual(dict(input_dict_2,
674                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
675                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
676         self.assertDictEqual(
677             {'payload-type': '07', 'exp-payload-type': '07'},
678             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
679
680     def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
681         response = test_utils.check_netconf_node_request(
682             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
683         self.assertEqual(response.status_code, requests.codes.ok)
684         res = response.json()
685         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
686                         'administrative-state': 'inService',
687                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
688                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
689                         'type': 'org-openroadm-interfaces:otnOdu',
690                         'supporting-port': 'L1'}
691         input_dict_2 = {
692             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
693             'rate': 'org-openroadm-otn-common-types:ODU4',
694             'monitoring-mode': 'not-terminated'}
695         input_dict_3 = {'trib-port-number': 1}
696
697         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
698                              res['interface'][0])
699         self.assertDictEqual(dict(input_dict_2,
700                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
701                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
702         self.assertDictEqual(dict(input_dict_3,
703                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
704                                       'parent-odu-allocation']),
705                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
706         self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
707                       ['opucn-trib-slots'])
708         self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
709                       ['opucn-trib-slots'])
710
711     def test_34_check_ODU4_connection_xpdra2(self):
712         response = test_utils.check_netconf_node_request(
713             "XPDR-A2",
714             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
715         self.assertEqual(response.status_code, requests.codes.ok)
716         res = response.json()
717         input_dict_1 = {
718             'connection-name':
719             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
720             'direction': 'bidirectional'
721         }
722
723         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
724                              res['odu-connection'][0])
725         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
726                              res['odu-connection'][0]['destination'])
727         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
728                              res['odu-connection'][0]['source'])
729
730     def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
731         response = test_utils.check_netconf_node_request(
732             "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
733         self.assertEqual(response.status_code, requests.codes.ok)
734         res = response.json()
735         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
736                         'administrative-state': 'inService',
737                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
738                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
739                         'supporting-port': 'C1'
740                         }
741         input_dict_2 = {'speed': 100000}
742         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
743                              res['interface'][0])
744         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
745                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
746
747     def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
748         response = test_utils.check_netconf_node_request(
749             "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
750         self.assertEqual(response.status_code, requests.codes.ok)
751         res = response.json()
752         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
753                         'administrative-state': 'inService',
754                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
755                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
756                         'type': 'org-openroadm-interfaces:otnOdu',
757                         'supporting-port': 'C1'}
758         input_dict_2 = {
759             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
760             'rate': 'org-openroadm-otn-common-types:ODU4',
761             'monitoring-mode': 'terminated'}
762
763         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
764                              res['interface'][0])
765         self.assertDictEqual(dict(input_dict_2,
766                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
767                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
768         self.assertDictEqual(
769             {'payload-type': '07', 'exp-payload-type': '07'},
770             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
771
772     def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
773         response = test_utils.check_netconf_node_request(
774             "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
775         self.assertEqual(response.status_code, requests.codes.ok)
776         res = response.json()
777         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
778                         'administrative-state': 'inService',
779                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
780                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
781                         'type': 'org-openroadm-interfaces:otnOdu',
782                         'supporting-port': 'C1'}
783         input_dict_2 = {
784             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
785             'rate': 'org-openroadm-otn-common-types:ODU4',
786             'monitoring-mode': 'not-terminated'}
787
788         input_dict_3 = {'trib-port-number': 1}
789
790         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
791                              res['interface'][0])
792         self.assertDictEqual(dict(input_dict_2,
793                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
794                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
795         self.assertDictEqual(dict(input_dict_3,
796                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
797                                       'parent-odu-allocation']),
798                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
799             'parent-odu-allocation'])
800         self.assertIn('1.1',
801                       res['interface'][0][
802                           'org-openroadm-otn-odu-interfaces:odu'][
803                           'parent-odu-allocation']['opucn-trib-slots'])
804         self.assertIn('1.20',
805                       res['interface'][0][
806                           'org-openroadm-otn-odu-interfaces:odu'][
807                           'parent-odu-allocation']['opucn-trib-slots'])
808
809     def test_38_check_ODU4_connection_xpdrc2(self):
810         response = test_utils.check_netconf_node_request(
811             "XPDR-C2",
812             "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
813         self.assertEqual(response.status_code, requests.codes.ok)
814         res = response.json()
815         input_dict_1 = {
816             'connection-name':
817             'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
818             'direction': 'bidirectional'
819         }
820
821         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
822                              res['odu-connection'][0])
823         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
824                              res['odu-connection'][0]['destination'])
825         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
826                              res['odu-connection'][0]['source'])
827
828     def test_39_check_otn_topo_links(self):
829         response = test_utils.get_otn_topo_request()
830         self.assertEqual(response.status_code, requests.codes.ok)
831         res = response.json()
832         nb_links = len(res['network'][0]['ietf-network-topology:link'])
833         self.assertEqual(nb_links, 4)
834         for link in res['network'][0]['ietf-network-topology:link']:
835             linkId = link['link-id']
836             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
837                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
838                 self.assertEqual(
839                     link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
840                 self.assertEqual(
841                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
842
843     def test_40_check_otn_topo_tp(self):
844         response = test_utils.get_otn_topo_request()
845         res = response.json()
846         for node in res['network'][0]['node']:
847             if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
848                 tpList = node['ietf-network-topology:termination-point']
849                 for tp in tpList:
850                     if tp['tp-id'] == 'XPDR2-NETWORK1':
851                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
852                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
853                         tsPoolList = list(range(1, 20))
854                         self.assertNotIn(
855                             tsPoolList, xpdrTpPortConAt['ts-pool'])
856                         self.assertEqual(
857                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
858                         self.assertNotIn(
859                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
860
861 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
862     def test_41_create_100GE_service_2(self):
863         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
864         self.cr_serv_sample_data["input"]["connection-type"] = "service"
865         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
866         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
867         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
868         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
869         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
870         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
871         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
872         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
873         response = test_utils.service_create_request(self.cr_serv_sample_data)
874         self.assertEqual(response.status_code, requests.codes.ok)
875         res = response.json()
876         self.assertIn('PCE calculation in progress',
877                       res['output']['configuration-response-common']['response-message'])
878         time.sleep(self.WAITING)
879
880     def test_42_get_100GE_service_2(self):
881         response = test_utils.get_service_list_request(
882             "services/service-100GE2")
883         self.assertEqual(response.status_code, requests.codes.ok)
884         res = response.json()
885         self.assertEqual(
886             res['services'][0]['administrative-state'], 'inService')
887         self.assertEqual(
888             res['services'][0]['service-name'], 'service-100GE2')
889         self.assertEqual(
890             res['services'][0]['connection-type'], 'service')
891         self.assertEqual(
892             res['services'][0]['lifecycle-state'], 'planned')
893         time.sleep(2)
894
895     def test_43_check_service_list(self):
896         response = test_utils.get_service_list_request("")
897         self.assertEqual(response.status_code, requests.codes.ok)
898         res = response.json()
899         self.assertEqual(len(res['service-list']['services']), 4)
900         time.sleep(2)
901
902     def test_44_check_otn_topo_links(self):
903         response = test_utils.get_otn_topo_request()
904         self.assertEqual(response.status_code, requests.codes.ok)
905         res = response.json()
906         nb_links = len(res['network'][0]['ietf-network-topology:link'])
907         self.assertEqual(nb_links, 4)
908         for link in res['network'][0]['ietf-network-topology:link']:
909             linkId = link['link-id']
910             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
911                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
912                 self.assertEqual(
913                     link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
914                 self.assertEqual(
915                     link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
916
917     def test_45_check_otn_topo_tp(self):
918         response = test_utils.get_otn_topo_request()
919         res = response.json()
920         for node in res['network'][0]['node']:
921             if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
922                 tpList = node['ietf-network-topology:termination-point']
923                 for tp in tpList:
924                     if tp['tp-id'] == 'XPDR2-NETWORK1':
925                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
926                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
927                         tsPoolList = list(range(1, 40))
928                         self.assertNotIn(
929                             tsPoolList, xpdrTpPortConAt['ts-pool'])
930                         self.assertEqual(
931                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
932                         self.assertNotIn(
933                             2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
934
935     def test_46_delete_100GE_service_2(self):
936         response = test_utils.service_delete_request("service-100GE2")
937         self.assertEqual(response.status_code, requests.codes.ok)
938         res = response.json()
939         self.assertIn('Renderer service delete in progress',
940                       res['output']['configuration-response-common']['response-message'])
941         time.sleep(self.WAITING)
942
943     def test_47_delete_100GE_service_1(self):
944         response = test_utils.service_delete_request("service-100GE")
945         self.assertEqual(response.status_code, requests.codes.ok)
946         res = response.json()
947         self.assertIn('Renderer service delete in progress',
948                       res['output']['configuration-response-common']['response-message'])
949         time.sleep(self.WAITING)
950
951     def test_48_check_service_list(self):
952         response = test_utils.get_service_list_request("")
953         self.assertEqual(response.status_code, requests.codes.ok)
954         res = response.json()
955         self.assertEqual(len(res['service-list']['services']), 2)
956         time.sleep(2)
957
958     def test_49_check_no_ODU4_connection_xpdra2(self):
959         response = test_utils.check_netconf_node_request("XPDR-A2", "")
960         self.assertEqual(response.status_code, requests.codes.ok)
961         res = response.json()
962         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
963         time.sleep(1)
964
965     def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
966         response = test_utils.check_netconf_node_request(
967             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
968         self.assertEqual(response.status_code, requests.codes.conflict)
969
970     def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
971         response = test_utils.check_netconf_node_request(
972             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
973         self.assertEqual(response.status_code, requests.codes.conflict)
974
975     def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
976         response = test_utils.check_netconf_node_request(
977             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
978         self.assertEqual(response.status_code, requests.codes.conflict)
979
980     def test_53_check_otn_topo_links(self):
981         response = test_utils.get_otn_topo_request()
982         self.assertEqual(response.status_code, requests.codes.ok)
983         res = response.json()
984         nb_links = len(res['network'][0]['ietf-network-topology:link'])
985         self.assertEqual(nb_links, 4)
986         for link in res['network'][0]['ietf-network-topology:link']:
987             linkId = link['link-id']
988             if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
989                            'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
990                 self.assertEqual(
991                     link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
992                 self.assertEqual(
993                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
994
995     def test_54_check_otn_topo_tp(self):
996         response = test_utils.get_otn_topo_request()
997         res = response.json()
998         for node in res['network'][0]['node']:
999             if (node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2'):
1000                 tpList = node['ietf-network-topology:termination-point']
1001                 for tp in tpList:
1002                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1003                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1004                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1005                         self.assertEqual(
1006                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1007
1008     def test_55_delete_ODUC4_service(self):
1009         response = test_utils.service_delete_request("service1-ODUC4")
1010         self.assertEqual(response.status_code, requests.codes.ok)
1011         res = response.json()
1012         self.assertIn('Renderer service delete in progress',
1013                       res['output']['configuration-response-common']['response-message'])
1014         time.sleep(self.WAITING)
1015
1016     def test_56_check_service_list(self):
1017         response = test_utils.get_service_list_request("")
1018         self.assertEqual(response.status_code, requests.codes.ok)
1019         res = response.json()
1020         self.assertEqual(len(res['service-list']['services']), 1)
1021         time.sleep(2)
1022
1023     def test_57_check_no_interface_ODU4_xpdra2(self):
1024         response = test_utils.check_netconf_node_request(
1025             "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1026         self.assertEqual(response.status_code, requests.codes.conflict)
1027
1028     def test_58_check_otn_topo_links(self):
1029         self.test_22_check_otn_topo_OTUC4_links()
1030
1031     def test_59_check_otn_topo_tp(self):
1032         response = test_utils.get_otn_topo_request()
1033         res = response.json()
1034         for node in res['network'][0]['node']:
1035             if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
1036                 tpList = node['ietf-network-topology:termination-point']
1037                 for tp in tpList:
1038                     if tp['tp-id'] == 'XPDR2-NETWORK1':
1039                         self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1040                                          dict.keys(tp))
1041
1042     def test_60_delete_OTUC4_service(self):
1043         response = test_utils.service_delete_request("service1-OTUC4")
1044         self.assertEqual(response.status_code, requests.codes.ok)
1045         res = response.json()
1046         self.assertIn('Renderer service delete in progress',
1047                       res['output']['configuration-response-common']['response-message'])
1048         time.sleep(self.WAITING)
1049
1050     def test_61_get_no_service(self):
1051         response = test_utils.get_service_list_request("")
1052         self.assertEqual(response.status_code, requests.codes.conflict)
1053         res = response.json()
1054         self.assertIn(
1055             {"error-type": "application", "error-tag": "data-missing",
1056              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1057             res['errors']['error'])
1058         time.sleep(1)
1059
1060     def test_62_check_no_interface_OTUC4_xpdra2(self):
1061         response = test_utils.check_netconf_node_request(
1062             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1063         self.assertEqual(response.status_code, requests.codes.conflict)
1064
1065     def test_63_check_no_interface_OCH_xpdra2(self):
1066         response = test_utils.check_netconf_node_request(
1067             "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1068         self.assertEqual(response.status_code, requests.codes.conflict)
1069
1070     def test_64_check_no_interface_OTSI_xpdra2(self):
1071         response = test_utils.check_netconf_node_request(
1072             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1073         self.assertEqual(response.status_code, requests.codes.conflict)
1074
1075     def test_65_getLinks_OtnTopology(self):
1076         response = test_utils.get_otn_topo_request()
1077         self.assertEqual(response.status_code, requests.codes.ok)
1078         res = response.json()
1079         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1080
1081     def test_66_check_openroadm_topo_xpdra2(self):
1082         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1083         self.assertEqual(response.status_code, requests.codes.ok)
1084         res = response.json()
1085         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1086         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1087         self.assertNotIn('wavelength', dict.keys(
1088             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1089         time.sleep(3)
1090
1091     def test_67_check_openroadm_topology(self):
1092         response = test_utils.get_ordm_topo_request("")
1093         self.assertEqual(response.status_code, requests.codes.ok)
1094         res = response.json()
1095         links = res['network'][0]['ietf-network-topology:link']
1096         self.assertEqual(22, len(links), 'Topology should contain 22 links')
1097
1098     def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1099         response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1100                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1101         self.assertEqual(response.status_code, requests.codes.ok)
1102         res = response.json()
1103         self.assertIn('Xponder Roadm Link created successfully',
1104                       res["output"]["result"])
1105         time.sleep(2)
1106
1107     def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1108         response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1109                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
1110         self.assertEqual(response.status_code, requests.codes.ok)
1111         res = response.json()
1112         self.assertIn('Roadm Xponder links created successfully',
1113                       res["output"]["result"])
1114         time.sleep(2)
1115
1116     def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1117         response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1118                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1119         self.assertEqual(response.status_code, requests.codes.ok)
1120         res = response.json()
1121         self.assertIn('Xponder Roadm Link created successfully',
1122                       res["output"]["result"])
1123         time.sleep(2)
1124
1125     def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1126         response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1127                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
1128         self.assertEqual(response.status_code, requests.codes.ok)
1129         res = response.json()
1130         self.assertIn('Roadm Xponder links created successfully',
1131                       res["output"]["result"])
1132         time.sleep(2)
1133
1134 # test service-create for 400GE service from xpdra2 to xpdrc2
1135     def test_72_create_400GE_service(self):
1136         self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1137         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1138         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1139         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1140         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1141         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1142         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1143         response = test_utils.service_create_request(self.cr_serv_sample_data)
1144         self.assertEqual(response.status_code, requests.codes.ok)
1145         res = response.json()
1146         self.assertIn('PCE calculation in progress',
1147                       res['output']['configuration-response-common']['response-message'])
1148         time.sleep(self.WAITING)
1149
1150     def test_73_get_400GE_service(self):
1151         response = test_utils.get_service_list_request(
1152             "services/service-400GE")
1153         self.assertEqual(response.status_code, requests.codes.ok)
1154         res = response.json()
1155         self.assertEqual(
1156             res['services'][0]['administrative-state'], 'inService')
1157         self.assertEqual(
1158             res['services'][0]['service-name'], 'service-400GE')
1159         self.assertEqual(
1160             res['services'][0]['connection-type'], 'service')
1161         self.assertEqual(
1162             res['services'][0]['lifecycle-state'], 'planned')
1163         time.sleep(2)
1164
1165     def test_74_check_xc1_roadma(self):
1166         response = test_utils.check_netconf_node_request(
1167             "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1168         self.assertEqual(response.status_code, requests.codes.ok)
1169         res = response.json()
1170         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1171         self.assertDictEqual(
1172             dict({
1173                 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1174                 'opticalControlMode': 'gainLoss',
1175                 'target-output-power': -3.0
1176             }, **res['roadm-connections'][0]),
1177             res['roadm-connections'][0]
1178         )
1179         self.assertDictEqual(
1180             {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1181             res['roadm-connections'][0]['source'])
1182         self.assertDictEqual(
1183             {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1184             res['roadm-connections'][0]['destination'])
1185         time.sleep(5)
1186
1187     def test_75_check_topo_xpdra2(self):
1188         response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1189         self.assertEqual(response.status_code, requests.codes.ok)
1190         res = response.json()
1191         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1192         for ele in liste_tp:
1193             if ele['tp-id'] == 'XPDR1-NETWORK1':
1194                 self.assertEqual({'frequency': 196.08125,
1195                                   'width': 75},
1196                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1197             if ele['tp-id'] == 'XPDR1-CLIENT1':
1198                 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1199         time.sleep(3)
1200
1201     def test_76_check_topo_roadma_SRG1(self):
1202         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1203         self.assertEqual(response.status_code, requests.codes.ok)
1204         res = response.json()
1205         freq_map = base64.b64decode(
1206             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1207         freq_map_array = [int(x) for x in freq_map]
1208         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1209         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1210         for ele in liste_tp:
1211             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1212                 freq_map = base64.b64decode(
1213                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1214                 freq_map_array = [int(x) for x in freq_map]
1215                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1216             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1217                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1218         time.sleep(3)
1219
1220     def test_77_check_topo_roadma_DEG1(self):
1221         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1222         self.assertEqual(response.status_code, requests.codes.ok)
1223         res = response.json()
1224         freq_map = base64.b64decode(
1225             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1226         freq_map_array = [int(x) for x in freq_map]
1227         self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1228         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1229         for ele in liste_tp:
1230             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1231                 freq_map = base64.b64decode(
1232                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1233                 freq_map_array = [int(x) for x in freq_map]
1234                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1235             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1236                 freq_map = base64.b64decode(
1237                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1238                 freq_map_array = [int(x) for x in freq_map]
1239                 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1240         time.sleep(3)
1241
1242     def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1243         response = test_utils.check_netconf_node_request(
1244             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1245         self.assertEqual(response.status_code, requests.codes.ok)
1246         res = response.json()
1247         input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1248                         'administrative-state': 'inService',
1249                         'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1250                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
1251                         'supporting-port': 'C1'
1252                         }
1253         input_dict_2 = {'speed': 400000}
1254         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1255                              res['interface'][0])
1256         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1257                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1258
1259     def test_79_check_interface_OTSI_xpdra2(self):
1260         response = test_utils.check_netconf_node_request(
1261             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1262         self.assertEqual(response.status_code, requests.codes.ok)
1263         res = response.json()
1264         input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1265                         'administrative-state': 'inService',
1266                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1267                         'type': 'org-openroadm-interfaces:otsi',
1268                         'supporting-port': 'L1'}
1269         input_dict_2 = {
1270             "frequency": 196.0812,
1271             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1272             "fec": "org-openroadm-common-types:ofec",
1273             "transmit-power": -5,
1274             "provision-mode": "explicit",
1275             "modulation-format": "dp-qam16"}
1276
1277         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1278                              res['interface'][0])
1279         self.assertDictEqual(dict(input_dict_2,
1280                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1281                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1282         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1283                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1284
1285     def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1286         response = test_utils.check_netconf_node_request(
1287             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1288         self.assertEqual(response.status_code, requests.codes.ok)
1289         res = response.json()
1290         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1291                         'administrative-state': 'inService',
1292                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1293                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1294                         'type': 'org-openroadm-interfaces:otsi-group',
1295                         'supporting-port': 'L1'}
1296         input_dict_2 = {"group-id": 1,
1297                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1298
1299         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1300                              res['interface'][0])
1301         self.assertDictEqual(dict(input_dict_2,
1302                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1303                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1304
1305     def test_81_check_interface_OTUC4_xpdra2(self):
1306         response = test_utils.check_netconf_node_request(
1307             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1308         self.assertEqual(response.status_code, requests.codes.ok)
1309         res = response.json()
1310         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1311                         'administrative-state': 'inService',
1312                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1313                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1314                         'type': 'org-openroadm-interfaces:otnOtu',
1315                         'supporting-port': 'L1'}
1316         input_dict_2 = {"tx-sapi": "AIGiVAQ4gDil", "rate": "org-openroadm-otn-common-types:OTUCn",
1317                         "degthr-percentage": 100,
1318                         "tim-detect-mode": "Disabled",
1319                         "otucn-n-rate": 4,
1320                         "degm-intervals": 2,
1321                         "expected-dapi": "AIGiVAQ4gDil"}
1322
1323         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1324                              res['interface'][0])
1325         self.assertDictEqual(dict(input_dict_2,
1326                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1327                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1328
1329     def test_82_check_interface_ODUC4_xpdra2(self):
1330         response = test_utils.check_netconf_node_request(
1331             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1332         self.assertEqual(response.status_code, requests.codes.ok)
1333         res = response.json()
1334         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1335                         'administrative-state': 'inService',
1336                         'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1337                         ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1338                         'type': 'org-openroadm-interfaces:otnOdu',
1339                         'supporting-port': 'L1'}
1340         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1341                         "tim-detect-mode": "Disabled",
1342                         "degm-intervals": 2,
1343                         "degthr-percentage": 100,
1344                         "monitoring-mode": "terminated",
1345                         "rate": "org-openroadm-otn-common-types:ODUCn",
1346                         "oducn-n-rate": 4}
1347         input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1348
1349         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1350                              res['interface'][0])
1351         self.assertDictEqual(dict(input_dict_2,
1352                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1353                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1354         self.assertDictEqual(dict(input_dict_3,
1355                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1356                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1357
1358     def test_83_delete_400GE_service(self):
1359         response = test_utils.service_delete_request("service-400GE")
1360         self.assertEqual(response.status_code, requests.codes.ok)
1361         res = response.json()
1362         self.assertIn('Renderer service delete in progress',
1363                       res['output']['configuration-response-common']['response-message'])
1364         time.sleep(self.WAITING)
1365
1366     def test_84_get_no_service(self):
1367         response = test_utils.get_service_list_request("")
1368         self.assertEqual(response.status_code, requests.codes.conflict)
1369         res = response.json()
1370         self.assertIn(
1371             {"error-type": "application", "error-tag": "data-missing",
1372              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1373             res['errors']['error'])
1374         time.sleep(1)
1375
1376     def test_85_check_no_interface_ODUC4_xpdra2(self):
1377         response = test_utils.check_netconf_node_request(
1378             "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1379         self.assertEqual(response.status_code, requests.codes.conflict)
1380
1381     def test_86_check_no_interface_OTUC4_xpdra2(self):
1382         response = test_utils.check_netconf_node_request(
1383             "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1384         self.assertEqual(response.status_code, requests.codes.conflict)
1385
1386     def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1387         response = test_utils.check_netconf_node_request(
1388             "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1389         self.assertEqual(response.status_code, requests.codes.conflict)
1390
1391     def test_88_check_no_interface_OTSI_xpdra2(self):
1392         response = test_utils.check_netconf_node_request(
1393             "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1394         self.assertEqual(response.status_code, requests.codes.conflict)
1395
1396     def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1397         response = test_utils.check_netconf_node_request(
1398             "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1399         self.assertEqual(response.status_code, requests.codes.conflict)
1400
1401     def test_90_disconnect_xponders_from_roadm(self):
1402         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1403         response = test_utils.get_ordm_topo_request("")
1404         self.assertEqual(response.status_code, requests.codes.ok)
1405         res = response.json()
1406         links = res['network'][0]['ietf-network-topology:link']
1407         for link in links:
1408             if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1409                     link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1410                 link_name = link["link-id"]
1411                 response = test_utils.delete_request(url+link_name)
1412                 self.assertEqual(response.status_code, requests.codes.ok)
1413
1414     def test_91_disconnect_xpdra2(self):
1415         response = test_utils.unmount_device("XPDR-A2")
1416         self.assertEqual(response.status_code, requests.codes.ok,
1417                          test_utils.CODE_SHOULD_BE_200)
1418
1419     def test_92_disconnect_xpdrc2(self):
1420         response = test_utils.unmount_device("XPDR-C2")
1421         self.assertEqual(response.status_code, requests.codes.ok,
1422                          test_utils.CODE_SHOULD_BE_200)
1423
1424     def test_93_disconnect_roadmA(self):
1425         response = test_utils.unmount_device("ROADM-A1")
1426         self.assertEqual(response.status_code, requests.codes.ok,
1427                          test_utils.CODE_SHOULD_BE_200)
1428
1429     def test_94_disconnect_roadmC(self):
1430         response = test_utils.unmount_device("ROADM-C1")
1431         self.assertEqual(response.status_code, requests.codes.ok,
1432                          test_utils.CODE_SHOULD_BE_200)
1433
1434
1435 if __name__ == "__main__":
1436     unittest.main(verbosity=2)