2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCEFulltesting(unittest.TestCase):
28 cr_serv_input_data = {
29 "sdnc-request-header": {
30 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
31 "rpc-action": "service-create",
32 "request-system-id": "appname",
33 "notification-url": "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
43 "tx-direction": [{"index": 0}],
44 "rx-direction": [{"index": 0}],
48 "service-rate": "100",
50 "service-format": "Ethernet",
51 "clli": "SNJSCAMCJT4",
52 "tx-direction": [{"index": 0}],
53 "rx-direction": [{"index": 0}],
56 "due-date": "2016-11-28T00:00:01Z",
57 "operator-contact": "pw1234"
60 del_serv_input_data = {
61 "sdnc-request-header": {
62 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
63 "rpc-action": "service-delete",
64 "request-system-id": "appname",
65 "notification-url": "http://localhost:8585/NotificationServer/notify"},
66 "service-delete-req-info": {
67 "service-name": "TBD",
68 "tail-retention": "no"}
71 WAITING = 25 # nominal value is 300
72 NODE_VERSION_121 = '1.2.1'
73 NODE_VERSION_221 = '2.2.1'
74 NODE_VERSION_71 = '7.1'
78 cls.processes = test_utils_rfc8040.start_tpce()
79 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION_121),
80 ('roadma', cls.NODE_VERSION_221),
81 ('roadmc', cls.NODE_VERSION_221),
82 ('xpdrc', cls.NODE_VERSION_71)])
85 def tearDownClass(cls):
86 # pylint: disable=not-an-iterable
87 for process in cls.processes:
88 test_utils_rfc8040.shutdown_process(process)
89 print("all processes killed")
92 def setUp(self): # instruction executed before each test method
93 # pylint: disable=consider-using-f-string
94 print("execution of {}".format(self.id().split(".")[-1]))
96 def test_01_connect_xpdrA(self):
97 response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
98 self.assertEqual(response.status_code,
99 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
101 def test_02_connect_xpdrC(self):
102 response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
103 self.assertEqual(response.status_code,
104 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
106 def test_03_connect_rdmA(self):
107 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
108 self.assertEqual(response.status_code,
109 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
111 def test_04_connect_rdmC(self):
112 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
113 self.assertEqual(response.status_code,
114 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
116 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
117 response = test_utils_rfc8040.transportpce_api_rpc_request(
118 'transportpce-networkutils', 'init-xpdr-rdm-links',
119 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
120 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
121 self.assertEqual(response['status_code'], requests.codes.ok)
122 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
124 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
125 response = test_utils_rfc8040.transportpce_api_rpc_request(
126 'transportpce-networkutils', 'init-rdm-xpdr-links',
127 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
128 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
129 self.assertEqual(response['status_code'], requests.codes.ok)
130 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
132 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
133 response = test_utils_rfc8040.transportpce_api_rpc_request(
134 'transportpce-networkutils', 'init-xpdr-rdm-links',
135 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
136 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
137 self.assertEqual(response['status_code'], requests.codes.ok)
138 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
140 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
141 response = test_utils_rfc8040.transportpce_api_rpc_request(
142 'transportpce-networkutils', 'init-rdm-xpdr-links',
143 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
144 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
145 self.assertEqual(response['status_code'], requests.codes.ok)
146 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
148 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
149 # Config ROADMA-ROADMC oms-attributes
151 "auto-spanloss": "true",
152 "spanloss-base": 11.4,
153 "spanloss-current": 12,
154 "engineered-spanloss": 12.2,
155 "link-concatenation": [{
158 "SRLG-length": 100000,
160 response = test_utils_rfc8040.add_oms_attr_request(
161 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
162 self.assertEqual(response.status_code, requests.codes.created)
164 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
165 # Config ROADMC-ROADMA oms-attributes
167 "auto-spanloss": "true",
168 "spanloss-base": 11.4,
169 "spanloss-current": 12,
170 "engineered-spanloss": 12.2,
171 "link-concatenation": [{
174 "SRLG-length": 100000,
176 response = test_utils_rfc8040.add_oms_attr_request(
177 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
178 self.assertEqual(response.status_code, requests.codes.created)
180 # test service-create for Eth service from xpdr to xpdr
181 def test_11_create_eth_service1(self):
182 response = test_utils_rfc8040.transportpce_api_rpc_request(
183 'org-openroadm-service', 'service-create',
184 self.cr_serv_input_data)
185 self.assertEqual(response['status_code'], requests.codes.ok)
186 self.assertIn('PCE calculation in progress',
187 response['output']['configuration-response-common']['response-message'])
188 time.sleep(self.WAITING)
190 def test_12_get_eth_service1(self):
191 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
192 "services", "service1")
193 self.assertEqual(response['status_code'], requests.codes.ok)
194 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
195 self.assertEqual(response['services'][0]['connection-type'], 'service')
196 self.assertEqual(response['services'][0]['service-name'], 'service1')
197 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
200 def test_13_change_status_line_port_xpdra(self):
201 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
204 "logical-connection-point": "XPDR1-NETWORK1",
206 "circuit-id": "XPDRA-NETWORK",
207 "administrative-state": "outOfService",
208 "port-qual": "xpdr-network"}]}
209 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
210 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
211 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
212 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
213 self.assertEqual(response.status_code, requests.codes.ok)
216 def test_14_check_update_portmapping(self):
217 response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
218 self.assertEqual(response['status_code'], requests.codes.ok)
219 mapping_list = response['nodes'][0]['mapping']
220 for mapping in mapping_list:
221 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
222 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
223 "Operational State should be 'OutOfService'")
224 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
225 "Administrative State should be 'OutOfService'")
227 self.assertEqual(mapping['port-oper-state'], 'InService',
228 "Operational State should be 'InService'")
229 self.assertEqual(mapping['port-admin-state'], 'InService',
230 "Administrative State should be 'InService'")
233 def test_15_check_update_openroadm_topo(self):
234 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
235 self.assertEqual(response['status_code'], requests.codes.ok)
236 node_list = response['network'][0]['node']
238 for node in node_list:
239 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
240 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
241 tp_list = node['ietf-network-topology:termination-point']
243 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
244 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
245 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
248 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
249 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
250 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
252 link_list = response['network'][0]['ietf-network-topology:link']
253 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
254 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
256 for link in link_list:
257 if link['link-id'] in updated_links:
258 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
259 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
262 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
263 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
264 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
267 def test_16_check_update_service1(self):
268 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
269 self.assertEqual(response['status_code'], requests.codes.ok)
270 self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
271 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
274 def test_17_restore_status_line_port_xpdra(self):
275 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
278 "logical-connection-point": "XPDR1-NETWORK1",
280 "circuit-id": "XPDRA-NETWORK",
281 "administrative-state": "inService",
282 "port-qual": "xpdr-network"}]}
283 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
284 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
285 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
286 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
287 self.assertEqual(response.status_code, requests.codes.ok)
290 def test_18_check_update_portmapping_ok(self):
291 response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
292 self.assertEqual(response['status_code'], requests.codes.ok)
293 mapping_list = response['nodes'][0]['mapping']
294 for mapping in mapping_list:
295 self.assertEqual(mapping['port-oper-state'], 'InService',
296 "Operational State should be 'InService'")
297 self.assertEqual(mapping['port-admin-state'], 'InService',
298 "Administrative State should be 'InService'")
301 def test_19_check_update_openroadm_topo_ok(self):
302 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
303 self.assertEqual(response['status_code'], requests.codes.ok)
304 node_list = response['network'][0]['node']
305 for node in node_list:
306 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
307 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
308 tp_list = node['ietf-network-topology:termination-point']
310 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
311 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
313 link_list = response['network'][0]['ietf-network-topology:link']
314 for link in link_list:
315 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
316 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
319 def test_20_check_update_service1_ok(self):
320 self.test_12_get_eth_service1()
322 def test_21_change_status_port_roadma_srg(self):
323 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
326 "logical-connection-point": "SRG1-PP1",
327 "port-type": "client",
328 "circuit-id": "SRG1",
329 "administrative-state": "outOfService",
330 "port-qual": "roadm-external"}]}
331 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
332 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
333 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
334 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
335 self.assertEqual(response.status_code, requests.codes.ok)
338 def test_22_check_update_portmapping(self):
339 response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
340 self.assertEqual(response['status_code'], requests.codes.ok)
341 mapping_list = response['nodes'][0]['mapping']
342 for mapping in mapping_list:
343 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
344 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
345 "Operational State should be 'OutOfService'")
346 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
347 "Administrative State should be 'OutOfService'")
349 self.assertEqual(mapping['port-oper-state'], 'InService',
350 "Operational State should be 'InService'")
351 self.assertEqual(mapping['port-admin-state'], 'InService',
352 "Administrative State should be 'InService'")
355 def test_23_check_update_openroadm_topo(self):
356 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
357 self.assertEqual(response['status_code'], requests.codes.ok)
358 node_list = response['network'][0]['node']
360 for node in node_list:
361 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
362 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
363 tp_list = node['ietf-network-topology:termination-point']
365 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
366 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
367 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
370 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
371 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
372 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
374 link_list = response['network'][0]['ietf-network-topology:link']
375 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
376 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
378 for link in link_list:
379 if link['link-id'] in updated_links:
380 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
381 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
384 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
385 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
386 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
389 def test_24_restore_status_port_roadma_srg(self):
390 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
393 "logical-connection-point": "SRG1-PP1",
394 "port-type": "client",
395 "circuit-id": "SRG1",
396 "administrative-state": "inService",
397 "port-qual": "roadm-external"}]}
398 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
399 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
400 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
401 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
402 self.assertEqual(response.status_code, requests.codes.ok)
405 def test_25_check_update_portmapping_ok(self):
406 self.test_18_check_update_portmapping_ok()
408 def test_26_check_update_openroadm_topo_ok(self):
409 self.test_19_check_update_openroadm_topo_ok()
411 def test_27_check_update_service1_ok(self):
412 self.test_12_get_eth_service1()
414 def test_28_change_status_line_port_roadma_deg(self):
415 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
418 "logical-connection-point": "DEG2-TTP-TXRX",
421 "administrative-state": "outOfService",
422 "port-qual": "roadm-external"}]}
423 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
424 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
425 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
426 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
427 self.assertEqual(response.status_code, requests.codes.ok)
430 def test_29_check_update_portmapping(self):
431 response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
432 self.assertEqual(response['status_code'], requests.codes.ok)
433 mapping_list = response['nodes'][0]['mapping']
434 for mapping in mapping_list:
435 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
436 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
437 "Operational State should be 'OutOfService'")
438 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
439 "Administrative State should be 'OutOfService'")
441 self.assertEqual(mapping['port-oper-state'], 'InService',
442 "Operational State should be 'InService'")
443 self.assertEqual(mapping['port-admin-state'], 'InService',
444 "Administrative State should be 'InService'")
447 def test_30_check_update_openroadm_topo(self):
448 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
449 self.assertEqual(response['status_code'], requests.codes.ok)
450 node_list = response['network'][0]['node']
452 for node in node_list:
453 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
454 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
455 tp_list = node['ietf-network-topology:termination-point']
457 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
458 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
459 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
462 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
463 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
464 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
466 link_list = response['network'][0]['ietf-network-topology:link']
467 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
468 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
470 for link in link_list:
471 if link['link-id'] in updated_links:
472 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
473 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
476 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
477 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
478 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
481 def test_31_restore_status_line_port_roadma_srg(self):
482 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
485 "logical-connection-point": "DEG2-TTP-TXRX",
488 "administrative-state": "inService",
489 "port-qual": "roadm-external"}]}
490 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
491 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
492 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
493 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
494 self.assertEqual(response.status_code, requests.codes.ok)
497 def test_32_check_update_portmapping_ok(self):
498 self.test_18_check_update_portmapping_ok()
500 def test_33_check_update_openroadm_topo_ok(self):
501 self.test_19_check_update_openroadm_topo_ok()
503 def test_34_check_update_service1_ok(self):
504 self.test_12_get_eth_service1()
506 def test_35_change_status_line_port_xpdrc(self):
507 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
511 "administrative-state": "outOfService",
512 "port-qual": "xpdr-network"}]}
513 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
514 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
515 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
516 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
517 self.assertEqual(response.status_code, requests.codes.ok)
520 def test_36_check_update_portmapping(self):
521 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None)
522 self.assertEqual(response['status_code'], requests.codes.ok)
523 mapping_list = response['nodes'][0]['mapping']
524 for mapping in mapping_list:
525 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
526 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
527 "Operational State should be 'OutOfService'")
528 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
529 "Administrative State should be 'OutOfService'")
531 self.assertEqual(mapping['port-oper-state'], 'InService',
532 "Operational State should be 'InService'")
533 self.assertEqual(mapping['port-admin-state'], 'InService',
534 "Administrative State should be 'InService'")
537 def test_37_check_update_openroadm_topo(self):
538 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
539 self.assertEqual(response['status_code'], requests.codes.ok)
540 node_list = response['network'][0]['node']
542 for node in node_list:
543 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
544 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
545 tp_list = node['ietf-network-topology:termination-point']
547 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
548 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
549 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
552 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
553 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
554 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
556 link_list = response['network'][0]['ietf-network-topology:link']
557 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
558 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
560 for link in link_list:
561 if link['link-id'] in updated_links:
562 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
563 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
566 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
567 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
568 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
571 def test_38_restore_status_line_port_xpdrc(self):
572 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
576 "administrative-state": "inService",
577 "port-qual": "xpdr-network"}]}
578 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
579 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
580 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
581 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
582 self.assertEqual(response.status_code, requests.codes.ok)
585 def test_39_check_update_portmapping_ok(self):
586 self.test_18_check_update_portmapping_ok()
588 def test_40_check_update_openroadm_topo_ok(self):
589 self.test_19_check_update_openroadm_topo_ok()
591 def test_41_check_update_service1_ok(self):
592 self.test_12_get_eth_service1()
594 def test_42_change_status_port_roadma_srg(self):
595 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
598 "logical-connection-point": "SRG1-PP2",
599 "port-type": "client",
600 "circuit-id": "SRG1",
601 "administrative-state": "outOfService",
602 "port-qual": "roadm-external"}]}
603 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
604 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
605 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
606 timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
607 self.assertEqual(response.status_code, requests.codes.ok)
610 def test_43_check_update_portmapping(self):
611 response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
612 self.assertEqual(response['status_code'], requests.codes.ok)
613 mapping_list = response['nodes'][0]['mapping']
614 for mapping in mapping_list:
615 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
616 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
617 "Operational State should be 'OutOfService'")
618 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
619 "Administrative State should be 'OutOfService'")
621 self.assertEqual(mapping['port-oper-state'], 'InService',
622 "Operational State should be 'InService'")
623 self.assertEqual(mapping['port-admin-state'], 'InService',
624 "Administrative State should be 'InService'")
627 def test_44_check_update_openroadm_topo(self):
628 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
629 self.assertEqual(response['status_code'], requests.codes.ok)
630 node_list = response['network'][0]['node']
632 for node in node_list:
633 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
634 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
635 tp_list = node['ietf-network-topology:termination-point']
637 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
638 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
639 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
642 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
643 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
644 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
646 link_list = response['network'][0]['ietf-network-topology:link']
648 for link in link_list:
649 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
650 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
651 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
654 def test_45_check_update_service1_ok(self):
655 self.test_12_get_eth_service1()
657 def test_46_delete_eth_service1(self):
658 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
659 response = test_utils_rfc8040.transportpce_api_rpc_request(
660 'org-openroadm-service', 'service-delete',
661 self.del_serv_input_data)
662 self.assertEqual(response['status_code'], requests.codes.ok)
663 self.assertIn('Renderer service delete in progress',
664 response['output']['configuration-response-common']['response-message'])
665 time.sleep(self.WAITING)
667 def test_47_disconnect_xponders_from_roadm(self):
668 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
669 self.assertEqual(response['status_code'], requests.codes.ok)
670 links = response['network'][0]['ietf-network-topology:link']
672 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
673 response = test_utils_rfc8040.del_ietf_network_link_request(
674 'openroadm-topology', link['link-id'], 'config')
675 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
677 def test_48_disconnect_XPDRA(self):
678 response = test_utils_rfc8040.unmount_device("XPDRA01")
679 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
681 def test_49_disconnect_XPDRC(self):
682 response = test_utils_rfc8040.unmount_device("XPDR-C1")
683 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
685 def test_50_disconnect_ROADMA(self):
686 response = test_utils_rfc8040.unmount_device("ROADM-A1")
687 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
689 def test_51_disconnect_ROADMC(self):
690 response = test_utils_rfc8040.unmount_device("ROADM-C1")
691 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
694 if __name__ == "__main__":
695 unittest.main(verbosity=2)