Adapt functional tests to Chlorine
[transportpce.git] / tests / transportpce_tests / tapi / test03_tapi_device_change_notifications.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13 import os
14 import json
15 import unittest
16 import time
17 import requests
18 # pylint: disable=wrong-import-order
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils  # nopep8
24
25 # pylint: disable=too-few-public-methods
26
27
28 class UuidServices:
29     def __init__(self):
30         # pylint: disable=invalid-name
31         self.pm = None
32         self.odu = None
33         self.dsr = None
34         self.eth = None
35
36
37 class TransportPCEFulltesting(unittest.TestCase):
38
39     cr_serv_input_data = {
40         "end-point": [
41             {
42                 "layer-protocol-name": "DSR",
43                 "service-interface-point": {
44                     "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
45                 },
46                 "administrative-state": "UNLOCKED",
47                 "operational-state": "ENABLED",
48                 "direction": "BIDIRECTIONAL",
49                 "role": "SYMMETRIC",
50                 "protection-role": "WORK",
51                 "local-id": "XPDR-C1-XPDR1",
52                 "name": [
53                         {
54                             "value-name": "OpenROADM node id",
55                             "value": "XPDR-C1-XPDR1"
56                         }
57                 ]
58             },
59             {
60                 "layer-protocol-name": "DSR",
61                 "service-interface-point": {
62                     "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
63                 },
64                 "administrative-state": "UNLOCKED",
65                 "operational-state": "ENABLED",
66                 "direction": "BIDIRECTIONAL",
67                 "role": "SYMMETRIC",
68                 "protection-role": "WORK",
69                 "local-id": "XPDR-A1-XPDR1",
70                 "name": [
71                         {
72                             "value-name": "OpenROADM node id",
73                             "value": "XPDR-A1-XPDR1"
74                         }
75                 ]
76             }
77         ],
78         "connectivity-constraint": {
79             "service-layer": "ETH",
80             "service-type": "POINT_TO_POINT_CONNECTIVITY",
81             "service-level": "Some service-level",
82             "requested-capacity": {
83                 "total-size": {
84                     "value": "100",
85                     "unit": "GB"
86                 }
87             }
88         },
89         "state": "Some state"
90     }
91
92     del_serv_input_data = {"service-id-or-name": "TBD"}
93
94     tapi_topo = {"topology-id-or-name": "TBD"}
95
96     node_details = {
97         "topology-id-or-name": "TBD",
98         "node-id-or-name": "TBD"
99     }
100
101     tapi_serv_details = {"service-id-or-name": "TBD"}
102
103     processes = []
104     uuid_services = UuidServices()
105     WAITING = 25  # nominal value is 300
106     NODE_VERSION_221 = '2.2.1'
107
108     @classmethod
109     def setUpClass(cls):
110         # pylint: disable=unsubscriptable-object
111         cls.init_failed = False
112         os.environ['JAVA_MIN_MEM'] = '1024M'
113         os.environ['JAVA_MAX_MEM'] = '4096M'
114         cls.processes = test_utils.start_tpce()
115         # TAPI feature is not installed by default in Karaf
116         if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
117             print("installing tapi feature...")
118             result = test_utils.install_karaf_feature("odl-transportpce-tapi")
119             if result.returncode != 0:
120                 cls.init_failed = True
121             print("Restarting OpenDaylight...")
122             test_utils.shutdown_process(cls.processes[0])
123             cls.processes[0] = test_utils.start_karaf()
124             test_utils.process_list[0] = cls.processes[0]
125 # cause troubles after Chlorine bump
126 #            cls.init_failed = not test_utils.wait_until_log_contains(
127 #                test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
128         if cls.init_failed:
129             print("tapi installation feature failed...")
130             test_utils.shutdown_process(cls.processes[0])
131             sys.exit(2)
132         cls.processes = test_utils.start_tpce()
133         cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
134                                                ('roadma', cls.NODE_VERSION_221),
135                                                ('roadmc', cls.NODE_VERSION_221),
136                                                ('xpdrc', cls.NODE_VERSION_221)])
137
138     @classmethod
139     def tearDownClass(cls):
140         # pylint: disable=not-an-iterable
141         for process in cls.processes:
142             test_utils.shutdown_process(process)
143         print("all processes killed")
144
145     def setUp(self):  # instruction executed before each test method
146         # pylint: disable=consider-using-f-string
147         print("execution of {}".format(self.id().split(".")[-1]))
148         time.sleep(1)
149
150     def test_01_connect_xpdrA(self):
151         response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
152         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153
154     def test_02_connect_xpdrC(self):
155         response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
156         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157
158     def test_03_connect_rdmA(self):
159         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
160         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161
162     def test_04_connect_rdmC(self):
163         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
164         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
167         response = test_utils.transportpce_api_rpc_request(
168             'transportpce-networkutils', 'init-xpdr-rdm-links',
169             {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
170                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
171         self.assertEqual(response['status_code'], requests.codes.ok)
172         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
173         time.sleep(2)
174
175     def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
176         response = test_utils.transportpce_api_rpc_request(
177             'transportpce-networkutils', 'init-rdm-xpdr-links',
178             {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
179                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
180         self.assertEqual(response['status_code'], requests.codes.ok)
181         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
182         time.sleep(2)
183
184     def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
185         response = test_utils.transportpce_api_rpc_request(
186             'transportpce-networkutils', 'init-xpdr-rdm-links',
187             {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
188                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
189         self.assertEqual(response['status_code'], requests.codes.ok)
190         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
191         time.sleep(2)
192
193     def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
194         response = test_utils.transportpce_api_rpc_request(
195             'transportpce-networkutils', 'init-rdm-xpdr-links',
196             {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
197                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
198         self.assertEqual(response['status_code'], requests.codes.ok)
199         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
200         time.sleep(2)
201
202     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
203         # Config ROADMA-ROADMC oms-attributes
204         data = {"span": {
205             "auto-spanloss": "true",
206             "spanloss-base": 11.4,
207             "spanloss-current": 12,
208             "engineered-spanloss": 12.2,
209             "link-concatenation": [{
210                 "SRLG-Id": 0,
211                 "fiber-type": "smf",
212                 "SRLG-length": 100000,
213                 "pmd": 0.5}]}}
214         response = test_utils.add_oms_attr_request(
215             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
216         self.assertEqual(response.status_code, requests.codes.created)
217
218     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
219         # Config ROADMC-ROADMA oms-attributes
220         data = {"span": {
221             "auto-spanloss": "true",
222             "spanloss-base": 11.4,
223             "spanloss-current": 12,
224             "engineered-spanloss": 12.2,
225             "link-concatenation": [{
226                 "SRLG-Id": 0,
227                 "fiber-type": "smf",
228                 "SRLG-length": 100000,
229                 "pmd": 0.5}]}}
230         response = test_utils.add_oms_attr_request(
231             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
232         self.assertEqual(response.status_code, requests.codes.created)
233
234 # test service-create for Eth service from xpdr to xpdr
235     def test_11_create_connectivity_service_Ethernet(self):
236         response = test_utils.transportpce_api_rpc_request(
237             'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
238         time.sleep(self.WAITING)
239         self.uuid_services.eth = response['output']['service']['uuid']
240         # pylint: disable=consider-using-f-string
241
242         input_dict_1 = {'administrative-state': 'LOCKED',
243                         'lifecycle-state': 'PLANNED',
244                         'operational-state': 'DISABLED',
245                         'service-type': 'POINT_TO_POINT_CONNECTIVITY',
246                         'service-layer': 'ETH',
247                         'connectivity-direction': 'BIDIRECTIONAL'
248                         }
249         input_dict_2 = {'value-name': 'OpenROADM node id',
250                         'value': 'XPDR-C1-XPDR1'}
251         input_dict_3 = {'value-name': 'OpenROADM node id',
252                         'value': 'XPDR-A1-XPDR1'}
253
254         self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
255                              response['output']['service'])
256         self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
257                              response['output']['service']['end-point'][0]['name'][0])
258         self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
259                              response['output']['service']['end-point'][1]['name'][0])
260         # If the gate fails is because of the waiting time not being enough
261 #        time.sleep(self.WAITING)
262
263     def test_12_get_service_Ethernet(self):
264         response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
265         self.assertEqual(response['status_code'], requests.codes.ok)
266         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
267         self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.eth))
268         self.assertEqual(response['services'][0]['connection-type'], 'service')
269         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
270         time.sleep(1)
271
272     def test_13_get_connectivity_service_Ethernet(self):
273         self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
274         response = test_utils.transportpce_api_rpc_request(
275             'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
276         self.assertEqual(response['status_code'], requests.codes.ok)
277         self.assertEqual(response['output']['service']['operational-state'], 'ENABLED')
278         self.assertEqual(response['output']['service']['name'][0]['value'], self.uuid_services.eth)
279         self.assertEqual(response['output']['service']['administrative-state'], 'UNLOCKED')
280         self.assertEqual(response['output']['service']['lifecycle-state'], 'INSTALLED')
281
282     def test_14_change_status_line_port_xpdrc(self):
283         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
284         body = {"ports": [{
285             "port-name": "1",
286             "port-type": "CFP2",
287             "administrative-state": "outOfService",
288             "port-qual": "xpdr-network"}]}
289         response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
290                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
291                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
292                                     timeout=test_utils.REQUEST_TIMEOUT)
293         self.assertEqual(response.status_code, requests.codes.ok)
294         time.sleep(2)
295
296     def test_15_check_update_portmapping(self):
297         response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
298         self.assertEqual(response['status_code'], requests.codes.ok)
299         mapping_list = response['nodes'][0]['mapping']
300         for mapping in mapping_list:
301             if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
302                 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
303                                  "Operational State should be 'OutOfService'")
304                 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
305                                  "Administrative State should be 'OutOfService'")
306             else:
307                 self.assertEqual(mapping['port-oper-state'], 'InService',
308                                  "Operational State should be 'InService'")
309                 self.assertEqual(mapping['port-admin-state'], 'InService',
310                                  "Administrative State should be 'InService'")
311         time.sleep(1)
312
313     def test_16_check_update_openroadm_topo(self):
314         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
315         self.assertEqual(response['status_code'], requests.codes.ok)
316         node_list = response['network'][0]['node']
317         nb_updated_tp = 0
318         for node in node_list:
319             self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
320             self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
321             tp_list = node['ietf-network-topology:termination-point']
322             for tp in tp_list:
323                 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
324                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
325                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
326                     nb_updated_tp += 1
327                 else:
328                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
329                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
330         self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
331
332         link_list = response['network'][0]['ietf-network-topology:link']
333         updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
334                          'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
335         nb_updated_link = 0
336         for link in link_list:
337             if link['link-id'] in updated_links:
338                 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
339                 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
340                 nb_updated_link += 1
341             else:
342                 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
343                 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
344         self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
345         time.sleep(1)
346
347     def test_17_check_update_tapi_neps(self):
348         self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
349         self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
350         response = test_utils.transportpce_api_rpc_request(
351             'tapi-topology', 'get-node-details', self.node_details)
352         self.assertEqual(response['status_code'], requests.codes.ok)
353         nep_list = response['output']['node']['owned-node-edge-point']
354         nb_updated_neps = 0
355         for nep in nep_list:
356             if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
357                 self.assertEqual(nep['operational-state'], 'DISABLED',
358                                  "Operational State should be 'DISABLED'")
359                 self.assertEqual(nep['administrative-state'], 'LOCKED',
360                                  "Administrative State should be 'LOCKED'")
361                 nb_updated_neps += 1
362             else:
363                 self.assertEqual(nep['operational-state'], 'ENABLED',
364                                  "Operational State should be 'ENABLED'")
365                 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
366                                  "Administrative State should be 'UNLOCKED'")
367         self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
368         response = test_utils.transportpce_api_rpc_request(
369             'tapi-topology', 'get-node-details', self.node_details)
370         self.assertEqual(response['status_code'], requests.codes.ok)
371         nep_list = response['output']['node']['owned-node-edge-point']
372         for nep in nep_list:
373             if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
374                 self.assertEqual(nep['operational-state'], 'DISABLED',
375                                  "Operational State should be 'DISABLED'")
376                 self.assertEqual(nep['administrative-state'], 'LOCKED',
377                                  "Administrative State should be 'LOCKED'")
378                 nb_updated_neps += 1
379             else:
380                 self.assertEqual(nep['operational-state'], 'ENABLED',
381                                  "Operational State should be 'ENABLED'")
382                 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
383                                  "Administrative State should be 'UNLOCKED'")
384         self.assertEqual(nb_updated_neps, 4, "Only two xponder neps should have been modified")
385         time.sleep(1)
386
387     def test_18_check_update_tapi_links(self):
388         self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
389         response = test_utils.transportpce_api_rpc_request(
390             'tapi-topology', 'get-topology-details', self.tapi_topo)
391         time.sleep(2)
392         self.assertEqual(response['status_code'], requests.codes.ok)
393         link_list = response['output']['topology']['link']
394         nb_updated_link = 0
395         for link in link_list:
396             if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']):
397                 self.assertEqual(link['operational-state'], 'DISABLED')
398                 self.assertEqual(link['administrative-state'], 'LOCKED')
399                 nb_updated_link += 1
400             else:
401                 self.assertEqual(link['operational-state'], 'ENABLED')
402                 self.assertEqual(link['administrative-state'], 'UNLOCKED')
403         self.assertEqual(nb_updated_link, 2,
404                          "Only two xponder-output/input & xponder-transi links should have been modified")
405         time.sleep(1)
406
407     def test_19_check_update_service_Ethernet(self):
408         response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
409         self.assertEqual(response['status_code'], requests.codes.ok)
410         self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
411         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
412
413     def test_20_check_update_connectivity_service_Ethernet(self):
414         self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
415         response = test_utils.transportpce_api_rpc_request(
416             'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
417         self.assertEqual(response['status_code'], requests.codes.ok)
418         self.assertEqual(response['output']['service']['operational-state'], 'DISABLED')
419         self.assertEqual(response['output']['service']['administrative-state'], 'LOCKED')
420         time.sleep(1)
421
422     def test_21_restore_status_line_port_xpdrc(self):
423         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
424         body = {"ports": [{
425             "port-name": "1",
426             "port-type": "CFP2",
427             "administrative-state": "inService",
428             "port-qual": "xpdr-network"}]}
429         response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
430                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
431                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
432                                     timeout=test_utils.REQUEST_TIMEOUT)
433         self.assertEqual(response.status_code, requests.codes.ok)
434         time.sleep(2)
435
436     def test_22_check_update_portmapping_ok(self):
437         response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
438         self.assertEqual(response['status_code'], requests.codes.ok)
439         mapping_list = response['nodes'][0]['mapping']
440         for mapping in mapping_list:
441             self.assertEqual(mapping['port-oper-state'], 'InService',
442                              "Operational State should be 'InService'")
443             self.assertEqual(mapping['port-admin-state'], 'InService',
444                              "Administrative State should be 'InService'")
445         time.sleep(1)
446
447     def test_23_check_update_openroadm_topo_ok(self):
448         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
449         self.assertEqual(response['status_code'], requests.codes.ok)
450         node_list = response['network'][0]['node']
451         for node in node_list:
452             self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
453             self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
454             tp_list = node['ietf-network-topology:termination-point']
455             for tp in tp_list:
456                 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
457                 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
458
459         link_list = response['network'][0]['ietf-network-topology:link']
460         for link in link_list:
461             self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
462             self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
463         time.sleep(1)
464
465     def test_24_check_update_tapi_neps_ok(self):
466         self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
467         self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
468         response = test_utils.transportpce_api_rpc_request(
469             'tapi-topology', 'get-node-details', self.node_details)
470         self.assertEqual(response['status_code'], requests.codes.ok)
471         nep_list = response['output']['node']['owned-node-edge-point']
472         for nep in nep_list:
473             self.assertEqual(nep['operational-state'], 'ENABLED',
474                              "Operational State should be 'ENABLED'")
475             self.assertEqual(nep['administrative-state'], 'UNLOCKED',
476                              "Administrative State should be 'UNLOCKED'")
477
478         self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
479         response = test_utils.transportpce_api_rpc_request(
480             'tapi-topology', 'get-node-details', self.node_details)
481         self.assertEqual(response['status_code'], requests.codes.ok)
482         nep_list = response['output']['node']['owned-node-edge-point']
483         for nep in nep_list:
484             self.assertEqual(nep['operational-state'], 'ENABLED',
485                              "Operational State should be 'ENABLED'")
486             self.assertEqual(nep['administrative-state'], 'UNLOCKED',
487                              "Administrative State should be 'UNLOCKED'")
488         time.sleep(1)
489
490     def test_25_check_update_tapi_links_ok(self):
491         self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
492         response = test_utils.transportpce_api_rpc_request(
493             'tapi-topology', 'get-topology-details', self.tapi_topo)
494         time.sleep(2)
495         link_list = response['output']['topology']['link']
496         for link in link_list:
497             self.assertEqual(link['operational-state'], 'ENABLED')
498             self.assertEqual(link['administrative-state'], 'UNLOCKED')
499         time.sleep(1)
500
501     def test_26_check_update_service1_ok(self):
502         self.test_12_get_service_Ethernet()
503
504     def test_27_check_update_connectivity_service_Ethernet_ok(self):
505         self.test_13_get_connectivity_service_Ethernet()
506
507     def test_28_change_status_port_roadma_srg(self):
508         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
509         body = {"ports": [{
510             "port-name": "C1",
511             "logical-connection-point": "SRG1-PP1",
512             "port-type": "client",
513             "circuit-id": "SRG1",
514             "administrative-state": "outOfService",
515             "port-qual": "roadm-external"}]}
516         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
517                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
518                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
519                                     timeout=test_utils.REQUEST_TIMEOUT)
520         self.assertEqual(response.status_code, requests.codes.ok)
521         time.sleep(2)
522
523     def test_29_check_update_portmapping(self):
524         response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
525         self.assertEqual(response['status_code'], requests.codes.ok)
526         mapping_list = response['nodes'][0]['mapping']
527         for mapping in mapping_list:
528             if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
529                 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
530                                  "Operational State should be 'OutOfService'")
531                 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
532                                  "Administrative State should be 'OutOfService'")
533             else:
534                 self.assertEqual(mapping['port-oper-state'], 'InService',
535                                  "Operational State should be 'InService'")
536                 self.assertEqual(mapping['port-admin-state'], 'InService',
537                                  "Administrative State should be 'InService'")
538         time.sleep(1)
539
540     def test_30_check_update_openroadm_topo(self):
541         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
542         self.assertEqual(response['status_code'], requests.codes.ok)
543         node_list = response['network'][0]['node']
544         nb_updated_tp = 0
545         for node in node_list:
546             self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
547             self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
548             tp_list = node['ietf-network-topology:termination-point']
549             for tp in tp_list:
550                 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
551                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
552                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
553                     nb_updated_tp += 1
554                 else:
555                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
556                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
557         self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
558
559         link_list = response['network'][0]['ietf-network-topology:link']
560         updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
561                          'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
562         nb_updated_link = 0
563         for link in link_list:
564             if link['link-id'] in updated_links:
565                 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
566                 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
567                 nb_updated_link += 1
568             else:
569                 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
570                 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
571         self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
572         time.sleep(1)
573
574     def test_31_check_update_tapi_neps(self):
575         self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
576         self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
577         response = test_utils.transportpce_api_rpc_request(
578             'tapi-topology', 'get-node-details', self.node_details)
579         self.assertEqual(response['status_code'], requests.codes.ok)
580         nep_list = response['output']['node']['owned-node-edge-point']
581         nb_updated_neps = 0
582         for nep in nep_list:
583             if 'SRG1-PP1-TXRX' in nep['name'][0]['value']:
584                 self.assertEqual(nep['operational-state'], 'DISABLED',
585                                  "Operational State should be 'DISABLED'")
586                 self.assertEqual(nep['administrative-state'], 'LOCKED',
587                                  "Administrative State should be 'LOCKED'")
588                 nb_updated_neps += 1
589             else:
590                 self.assertEqual(nep['operational-state'], 'ENABLED',
591                                  "Operational State should be 'ENABLED'")
592                 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
593                                  "Administrative State should be 'UNLOCKED'")
594         self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
595         time.sleep(1)
596
597     def test_32_check_update_tapi_links(self):
598         self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
599         response = test_utils.transportpce_api_rpc_request(
600             'tapi-topology', 'get-topology-details', self.tapi_topo)
601         time.sleep(2)
602         link_list = response['output']['topology']['link']
603         nb_updated_link = 0
604         for link in link_list:
605             if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']):
606                 self.assertEqual(link['operational-state'], 'DISABLED')
607                 self.assertEqual(link['administrative-state'], 'LOCKED')
608                 nb_updated_link += 1
609             else:
610                 self.assertEqual(link['operational-state'], 'ENABLED')
611                 self.assertEqual(link['administrative-state'], 'UNLOCKED')
612         self.assertEqual(nb_updated_link, 1,
613                          "Only one xponder-output/input link should have been modified")
614         time.sleep(1)
615
616     def test_33_check_update_service_Ethernet(self):
617         self.test_19_check_update_service_Ethernet()
618
619     def test_34_check_update_connectivity_service_Ethernet(self):
620         self.test_20_check_update_connectivity_service_Ethernet()
621
622     def test_35_restore_status_port_roadma_srg(self):
623         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
624         body = {"ports": [{
625             "port-name": "C1",
626             "logical-connection-point": "SRG1-PP1",
627             "port-type": "client",
628             "circuit-id": "SRG1",
629             "administrative-state": "inService",
630             "port-qual": "roadm-external"}]}
631         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
632                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
633                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
634                                     timeout=test_utils.REQUEST_TIMEOUT)
635         self.assertEqual(response.status_code, requests.codes.ok)
636         time.sleep(2)
637
638     def test_36_check_update_portmapping_ok(self):
639         response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
640         self.assertEqual(response['status_code'], requests.codes.ok)
641         mapping_list = response['nodes'][0]['mapping']
642         for mapping in mapping_list:
643             self.assertEqual(mapping['port-oper-state'], 'InService',
644                              "Operational State should be 'InService'")
645             self.assertEqual(mapping['port-admin-state'], 'InService',
646                              "Administrative State should be 'InService'")
647         time.sleep(1)
648
649     def test_37_check_update_openroadm_topo_ok(self):
650         self.test_23_check_update_openroadm_topo_ok()
651
652     def test_38_check_update_tapi_neps_ok(self):
653         self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
654         self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
655         response = test_utils.transportpce_api_rpc_request(
656             'tapi-topology', 'get-node-details', self.node_details)
657         self.assertEqual(response['status_code'], requests.codes.ok)
658         nep_list = response['output']['node']['owned-node-edge-point']
659         for nep in nep_list:
660             self.assertEqual(nep['operational-state'], 'ENABLED',
661                              "Operational State should be 'ENABLED'")
662             self.assertEqual(nep['administrative-state'], 'UNLOCKED',
663                              "Administrative State should be 'UNLOCKED'")
664
665         time.sleep(1)
666
667     def test_39_check_update_tapi_links_ok(self):
668         self.test_25_check_update_tapi_links_ok()
669
670     def test_40_check_update_service1_ok(self):
671         self.test_12_get_service_Ethernet()
672
673     def test_41_check_update_connectivity_service_Ethernet_ok(self):
674         self.test_13_get_connectivity_service_Ethernet()
675
676     def test_42_change_status_line_port_roadma_deg(self):
677         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
678         body = {"ports": [{
679             "port-name": "L1",
680             "logical-connection-point": "DEG2-TTP-TXRX",
681             "port-type": "LINE",
682             "circuit-id": "1",
683             "administrative-state": "outOfService",
684             "port-qual": "roadm-external"}]}
685         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
686                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
687                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
688                                     timeout=test_utils.REQUEST_TIMEOUT)
689         self.assertEqual(response.status_code, requests.codes.ok)
690         time.sleep(2)
691
692     def test_43_check_update_portmapping(self):
693         response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
694         self.assertEqual(response['status_code'], requests.codes.ok)
695         mapping_list = response['nodes'][0]['mapping']
696         for mapping in mapping_list:
697             if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
698                 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
699                                  "Operational State should be 'OutOfService'")
700                 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
701                                  "Administrative State should be 'OutOfService'")
702             else:
703                 self.assertEqual(mapping['port-oper-state'], 'InService',
704                                  "Operational State should be 'InService'")
705                 self.assertEqual(mapping['port-admin-state'], 'InService',
706                                  "Administrative State should be 'InService'")
707         time.sleep(1)
708
709     def test_44_check_update_openroadm_topo(self):
710         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
711         self.assertEqual(response['status_code'], requests.codes.ok)
712         node_list = response['network'][0]['node']
713         nb_updated_tp = 0
714         for node in node_list:
715             self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
716             self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
717             tp_list = node['ietf-network-topology:termination-point']
718             for tp in tp_list:
719                 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
720                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
721                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
722                     nb_updated_tp += 1
723                 else:
724                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
725                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
726         self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
727
728         link_list = response['network'][0]['ietf-network-topology:link']
729         updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
730                          'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
731         nb_updated_link = 0
732         for link in link_list:
733             if link['link-id'] in updated_links:
734                 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
735                 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
736                 nb_updated_link += 1
737             else:
738                 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
739                 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
740         self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
741         time.sleep(1)
742
743     def test_45_check_update_tapi_neps(self):
744         self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
745         self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
746         response = test_utils.transportpce_api_rpc_request(
747             'tapi-topology', 'get-node-details', self.node_details)
748         self.assertEqual(response['status_code'], requests.codes.ok)
749         nep_list = response['output']['node']['owned-node-edge-point']
750         nb_updated_neps = 0
751         for nep in nep_list:
752             if 'DEG2-TTP-TXRX' in nep['name'][0]['value']:
753                 self.assertEqual(nep['operational-state'], 'DISABLED',
754                                  "Operational State should be 'DISABLED'")
755                 self.assertEqual(nep['administrative-state'], 'LOCKED',
756                                  "Administrative State should be 'LOCKED'")
757                 nb_updated_neps += 1
758             else:
759                 self.assertEqual(nep['operational-state'], 'ENABLED',
760                                  "Operational State should be 'ENABLED'")
761                 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
762                                  "Administrative State should be 'UNLOCKED'")
763         self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
764         time.sleep(1)
765
766     def test_46_check_update_tapi_links(self):
767         self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
768         response = test_utils.transportpce_api_rpc_request(
769             'tapi-topology', 'get-topology-details', self.tapi_topo)
770         time.sleep(2)
771         self.assertEqual(response['status_code'], requests.codes.ok)
772         link_list = response['output']['topology']['link']
773         nb_updated_link = 0
774         for link in link_list:
775             if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']):
776                 self.assertEqual(link['operational-state'], 'DISABLED')
777                 self.assertEqual(link['administrative-state'], 'LOCKED')
778                 nb_updated_link += 1
779             else:
780                 self.assertEqual(link['operational-state'], 'ENABLED')
781                 self.assertEqual(link['administrative-state'], 'UNLOCKED')
782         self.assertEqual(nb_updated_link, 1,
783                          "Only one rdm-rdm link should have been modified")
784         time.sleep(1)
785
786     def test_47_check_update_service_Ethernet(self):
787         self.test_19_check_update_service_Ethernet()
788
789     def test_48_check_update_connectivity_service_Ethernet(self):
790         self.test_20_check_update_connectivity_service_Ethernet()
791
792     def test_49_restore_status_line_port_roadma_deg(self):
793         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
794         body = {"ports": [{
795             "port-name": "L1",
796             "logical-connection-point": "DEG2-TTP-TXRX",
797             "port-type": "LINE",
798             "circuit-id": "1",
799             "administrative-state": "inService",
800             "port-qual": "roadm-external"}]}
801         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
802                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
803                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
804                                     timeout=test_utils.REQUEST_TIMEOUT)
805         self.assertEqual(response.status_code, requests.codes.ok)
806         time.sleep(2)
807
808     def test_50_check_update_portmapping_ok(self):
809         self.test_36_check_update_portmapping_ok()
810
811     def test_51_check_update_openroadm_topo_ok(self):
812         self.test_23_check_update_openroadm_topo_ok()
813
814     def test_52_check_update_tapi_neps_ok(self):
815         self.test_38_check_update_tapi_neps_ok()
816
817     def test_53_check_update_tapi_links_ok(self):
818         self.test_25_check_update_tapi_links_ok()
819
820     def test_54_check_update_service1_ok(self):
821         self.test_12_get_service_Ethernet()
822
823     def test_55_check_update_connectivity_service_Ethernet_ok(self):
824         self.test_13_get_connectivity_service_Ethernet()
825
826     def test_56_change_status_port_roadma_srg(self):
827         url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
828         body = {"ports": [{
829             "port-name": "C2",
830             "logical-connection-point": "SRG1-PP2",
831             "port-type": "client",
832             "circuit-id": "SRG1",
833             "administrative-state": "outOfService",
834             "port-qual": "roadm-external"}]}
835         response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
836                                     data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
837                                     auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
838                                     timeout=test_utils.REQUEST_TIMEOUT)
839         self.assertEqual(response.status_code, requests.codes.ok)
840         time.sleep(2)
841
842     def test_57_check_update_portmapping(self):
843         response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
844         self.assertEqual(response['status_code'], requests.codes.ok)
845         mapping_list = response['nodes'][0]['mapping']
846         for mapping in mapping_list:
847             if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
848                 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
849                                  "Operational State should be 'OutOfService'")
850                 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
851                                  "Administrative State should be 'OutOfService'")
852             else:
853                 self.assertEqual(mapping['port-oper-state'], 'InService',
854                                  "Operational State should be 'InService'")
855                 self.assertEqual(mapping['port-admin-state'], 'InService',
856                                  "Administrative State should be 'InService'")
857         time.sleep(1)
858
859     def test_58_check_update_openroadm_topo(self):
860         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
861         self.assertEqual(response['status_code'], requests.codes.ok)
862         node_list = response['network'][0]['node']
863         nb_updated_tp = 0
864         for node in node_list:
865             self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
866             self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
867             tp_list = node['ietf-network-topology:termination-point']
868             for tp in tp_list:
869                 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
870                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
871                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
872                     nb_updated_tp += 1
873                 else:
874                     self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
875                     self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
876         self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
877
878         link_list = response['network'][0]['ietf-network-topology:link']
879         nb_updated_link = 0
880         for link in link_list:
881             self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
882             self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
883         self.assertEqual(nb_updated_link, 0, "No link should have been modified")
884         time.sleep(1)
885
886     def test_59_check_update_tapi_neps(self):
887         self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
888         self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
889         response = test_utils.transportpce_api_rpc_request(
890             'tapi-topology', 'get-node-details', self.node_details)
891         self.assertEqual(response['status_code'], requests.codes.ok)
892         nep_list = response['output']['node']['owned-node-edge-point']
893         nb_updated_neps = 0
894         for nep in nep_list:
895             if 'SRG1-PP2-TXRX' in nep['name'][0]['value']:
896                 self.assertEqual(nep['operational-state'], 'DISABLED',
897                                  "Operational State should be 'DISABLED'")
898                 self.assertEqual(nep['administrative-state'], 'LOCKED',
899                                  "Administrative State should be 'LOCKED'")
900                 nb_updated_neps += 1
901             else:
902                 self.assertEqual(nep['operational-state'], 'ENABLED',
903                                  "Operational State should be 'ENABLED'")
904                 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
905                                  "Administrative State should be 'UNLOCKED'")
906         self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
907         time.sleep(1)
908
909     def test_60_check_update_tapi_links(self):
910         self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
911         response = test_utils.transportpce_api_rpc_request(
912             'tapi-topology', 'get-topology-details', self.tapi_topo)
913         time.sleep(2)
914         self.assertEqual(response['status_code'], requests.codes.ok)
915         link_list = response['output']['topology']['link']
916         nb_updated_link = 0
917         for link in link_list:
918             if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']):
919                 self.assertEqual(link['operational-state'], 'DISABLED')
920                 self.assertEqual(link['administrative-state'], 'LOCKED')
921                 nb_updated_link += 1
922             else:
923                 self.assertEqual(link['operational-state'], 'ENABLED')
924                 self.assertEqual(link['administrative-state'], 'UNLOCKED')
925         self.assertEqual(nb_updated_link, 0,
926                          "No link should have been modified")
927         time.sleep(1)
928
929     def test_61_check_update_service1_ok(self):
930         self.test_12_get_service_Ethernet()
931
932     def test_62_check_update_connectivity_service_Ethernet_ok(self):
933         self.test_13_get_connectivity_service_Ethernet()
934
935     def test_63_delete_connectivity_service_Ethernet(self):
936         self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.eth)
937         response = test_utils.transportpce_api_rpc_request(
938             'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
939         self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
940         time.sleep(self.WAITING)
941
942     def test_64_disconnect_xponders_from_roadm(self):
943         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
944         self.assertEqual(response['status_code'], requests.codes.ok)
945         links = response['network'][0]['ietf-network-topology:link']
946         for link in links:
947             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
948                 response = test_utils.del_ietf_network_link_request(
949                     'openroadm-topology', link['link-id'], 'config')
950                 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
951
952     def test_65_disconnect_XPDRA(self):
953         response = test_utils.unmount_device("XPDR-A1")
954         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
955
956     def test_66_disconnect_XPDRC(self):
957         response = test_utils.unmount_device("XPDR-C1")
958         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
959
960     def test_67_disconnect_ROADMA(self):
961         response = test_utils.unmount_device("ROADM-A1")
962         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
963
964     def test_68_disconnect_ROADMC(self):
965         response = test_utils.unmount_device("ROADM-C1")
966         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
967
968
969 if __name__ == "__main__":
970     unittest.main(verbosity=2)