2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCEFulltesting(unittest.TestCase):
28 cr_serv_input_data = {
29 "sdnc-request-header": {
30 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
31 "rpc-action": "service-create",
32 "request-system-id": "appname",
33 "notification-url": "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
43 "tx-direction": [{"index": 0}],
44 "rx-direction": [{"index": 0}],
48 "service-rate": "100",
50 "service-format": "Ethernet",
51 "clli": "SNJSCAMCJT4",
52 "tx-direction": [{"index": 0}],
53 "rx-direction": [{"index": 0}],
56 "due-date": "2016-11-28T00:00:01Z",
57 "operator-contact": "pw1234"
60 del_serv_input_data = {
61 "sdnc-request-header": {
62 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
63 "rpc-action": "service-delete",
64 "request-system-id": "appname",
65 "notification-url": "http://localhost:8585/NotificationServer/notify"},
66 "service-delete-req-info": {
67 "service-name": "TBD",
68 "tail-retention": "no"}
71 WAITING = 25 # nominal value is 300
72 NODE_VERSION_121 = '1.2.1'
73 NODE_VERSION_221 = '2.2.1'
74 NODE_VERSION_71 = '7.1'
78 cls.processes = test_utils_rfc8040.start_tpce()
79 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION_121),
80 ('roadma', cls.NODE_VERSION_221),
81 ('roadmc', cls.NODE_VERSION_221),
82 ('xpdrc', cls.NODE_VERSION_71)])
85 def tearDownClass(cls):
86 # pylint: disable=not-an-iterable
87 for process in cls.processes:
88 test_utils_rfc8040.shutdown_process(process)
89 print("all processes killed")
92 def setUp(self): # instruction executed before each test method
93 # pylint: disable=consider-using-f-string
94 print("execution of {}".format(self.id().split(".")[-1]))
96 def test_01_connect_xpdrA(self):
97 response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
98 self.assertEqual(response.status_code,
99 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
101 def test_02_connect_xpdrC(self):
102 response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
103 self.assertEqual(response.status_code,
104 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
106 def test_03_connect_rdmA(self):
107 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
108 self.assertEqual(response.status_code,
109 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
111 def test_04_connect_rdmC(self):
112 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
113 self.assertEqual(response.status_code,
114 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
116 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
117 response = test_utils_rfc8040.transportpce_api_rpc_request(
118 'transportpce-networkutils', 'init-xpdr-rdm-links',
119 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
120 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
121 self.assertEqual(response['status_code'], requests.codes.ok)
122 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
124 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
125 response = test_utils_rfc8040.transportpce_api_rpc_request(
126 'transportpce-networkutils', 'init-rdm-xpdr-links',
127 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
128 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
129 self.assertEqual(response['status_code'], requests.codes.ok)
130 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
132 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
133 response = test_utils_rfc8040.transportpce_api_rpc_request(
134 'transportpce-networkutils', 'init-xpdr-rdm-links',
135 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
136 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
137 self.assertEqual(response['status_code'], requests.codes.ok)
138 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
140 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
141 response = test_utils_rfc8040.transportpce_api_rpc_request(
142 'transportpce-networkutils', 'init-rdm-xpdr-links',
143 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
144 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
145 self.assertEqual(response['status_code'], requests.codes.ok)
146 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
148 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
149 # Config ROADMA-ROADMC oms-attributes
151 "auto-spanloss": "true",
152 "spanloss-base": 11.4,
153 "spanloss-current": 12,
154 "engineered-spanloss": 12.2,
155 "link-concatenation": [{
158 "SRLG-length": 100000,
160 response = test_utils_rfc8040.add_oms_attr_request(
161 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
162 self.assertEqual(response.status_code, requests.codes.created)
164 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
165 # Config ROADMC-ROADMA oms-attributes
167 "auto-spanloss": "true",
168 "spanloss-base": 11.4,
169 "spanloss-current": 12,
170 "engineered-spanloss": 12.2,
171 "link-concatenation": [{
174 "SRLG-length": 100000,
176 response = test_utils_rfc8040.add_oms_attr_request(
177 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
178 self.assertEqual(response.status_code, requests.codes.created)
180 # test service-create for Eth service from xpdr to xpdr
181 def test_11_create_eth_service1(self):
182 response = test_utils_rfc8040.transportpce_api_rpc_request(
183 'org-openroadm-service', 'service-create',
184 self.cr_serv_input_data)
185 self.assertEqual(response['status_code'], requests.codes.ok)
186 self.assertIn('PCE calculation in progress',
187 response['output']['configuration-response-common']['response-message'])
188 time.sleep(self.WAITING)
190 def test_12_get_eth_service1(self):
191 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
192 "services", "service1")
193 self.assertEqual(response['status_code'], requests.codes.ok)
194 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
195 self.assertEqual(response['services'][0]['connection-type'], 'service')
196 self.assertEqual(response['services'][0]['service-name'], 'service1')
197 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
200 def test_13_change_status_line_port_xpdra(self):
201 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
204 "logical-connection-point": "XPDR1-NETWORK1",
206 "circuit-id": "XPDRA-NETWORK",
207 "administrative-state": "outOfService",
208 "port-qual": "xpdr-network"}]}
209 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
210 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
211 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
212 self.assertEqual(response.status_code, requests.codes.ok)
215 def test_14_check_update_portmapping(self):
216 response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
217 self.assertEqual(response['status_code'], requests.codes.ok)
218 mapping_list = response['nodes'][0]['mapping']
219 for mapping in mapping_list:
220 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
221 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
222 "Operational State should be 'OutOfService'")
223 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
224 "Administrative State should be 'OutOfService'")
226 self.assertEqual(mapping['port-oper-state'], 'InService',
227 "Operational State should be 'InService'")
228 self.assertEqual(mapping['port-admin-state'], 'InService',
229 "Administrative State should be 'InService'")
232 def test_15_check_update_openroadm_topo(self):
233 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
234 self.assertEqual(response['status_code'], requests.codes.ok)
235 node_list = response['network'][0]['node']
237 for node in node_list:
238 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
239 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
240 tp_list = node['ietf-network-topology:termination-point']
242 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
243 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
244 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
247 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
248 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
249 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
251 link_list = response['network'][0]['ietf-network-topology:link']
252 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
253 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
255 for link in link_list:
256 if link['link-id'] in updated_links:
257 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
258 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
261 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
262 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
263 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
266 def test_16_check_update_service1(self):
267 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
268 self.assertEqual(response['status_code'], requests.codes.ok)
269 self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
270 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
273 def test_17_restore_status_line_port_xpdra(self):
274 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
277 "logical-connection-point": "XPDR1-NETWORK1",
279 "circuit-id": "XPDRA-NETWORK",
280 "administrative-state": "inService",
281 "port-qual": "xpdr-network"}]}
282 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
283 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
284 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
285 self.assertEqual(response.status_code, requests.codes.ok)
288 def test_18_check_update_portmapping_ok(self):
289 response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
290 self.assertEqual(response['status_code'], requests.codes.ok)
291 mapping_list = response['nodes'][0]['mapping']
292 for mapping in mapping_list:
293 self.assertEqual(mapping['port-oper-state'], 'InService',
294 "Operational State should be 'InService'")
295 self.assertEqual(mapping['port-admin-state'], 'InService',
296 "Administrative State should be 'InService'")
299 def test_19_check_update_openroadm_topo_ok(self):
300 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
301 self.assertEqual(response['status_code'], requests.codes.ok)
302 node_list = response['network'][0]['node']
303 for node in node_list:
304 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
305 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
306 tp_list = node['ietf-network-topology:termination-point']
308 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
309 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
311 link_list = response['network'][0]['ietf-network-topology:link']
312 for link in link_list:
313 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
314 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
317 def test_20_check_update_service1_ok(self):
318 self.test_12_get_eth_service1()
320 def test_21_change_status_port_roadma_srg(self):
321 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
324 "logical-connection-point": "SRG1-PP1",
325 "port-type": "client",
326 "circuit-id": "SRG1",
327 "administrative-state": "outOfService",
328 "port-qual": "roadm-external"}]}
329 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
330 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
331 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
332 self.assertEqual(response.status_code, requests.codes.ok)
335 def test_22_check_update_portmapping(self):
336 response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
337 self.assertEqual(response['status_code'], requests.codes.ok)
338 mapping_list = response['nodes'][0]['mapping']
339 for mapping in mapping_list:
340 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
341 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
342 "Operational State should be 'OutOfService'")
343 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
344 "Administrative State should be 'OutOfService'")
346 self.assertEqual(mapping['port-oper-state'], 'InService',
347 "Operational State should be 'InService'")
348 self.assertEqual(mapping['port-admin-state'], 'InService',
349 "Administrative State should be 'InService'")
352 def test_23_check_update_openroadm_topo(self):
353 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
354 self.assertEqual(response['status_code'], requests.codes.ok)
355 node_list = response['network'][0]['node']
357 for node in node_list:
358 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
359 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
360 tp_list = node['ietf-network-topology:termination-point']
362 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
363 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
364 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
367 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
368 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
369 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
371 link_list = response['network'][0]['ietf-network-topology:link']
372 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
373 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
375 for link in link_list:
376 if link['link-id'] in updated_links:
377 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
378 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
381 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
382 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
383 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
386 def test_24_restore_status_port_roadma_srg(self):
387 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
390 "logical-connection-point": "SRG1-PP1",
391 "port-type": "client",
392 "circuit-id": "SRG1",
393 "administrative-state": "inService",
394 "port-qual": "roadm-external"}]}
395 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
396 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
397 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
398 self.assertEqual(response.status_code, requests.codes.ok)
401 def test_25_check_update_portmapping_ok(self):
402 self.test_18_check_update_portmapping_ok()
404 def test_26_check_update_openroadm_topo_ok(self):
405 self.test_19_check_update_openroadm_topo_ok()
407 def test_27_check_update_service1_ok(self):
408 self.test_12_get_eth_service1()
410 def test_28_change_status_line_port_roadma_deg(self):
411 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
414 "logical-connection-point": "DEG2-TTP-TXRX",
417 "administrative-state": "outOfService",
418 "port-qual": "roadm-external"}]}
419 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
420 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
421 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
422 self.assertEqual(response.status_code, requests.codes.ok)
425 def test_29_check_update_portmapping(self):
426 response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
427 self.assertEqual(response['status_code'], requests.codes.ok)
428 mapping_list = response['nodes'][0]['mapping']
429 for mapping in mapping_list:
430 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
431 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
432 "Operational State should be 'OutOfService'")
433 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
434 "Administrative State should be 'OutOfService'")
436 self.assertEqual(mapping['port-oper-state'], 'InService',
437 "Operational State should be 'InService'")
438 self.assertEqual(mapping['port-admin-state'], 'InService',
439 "Administrative State should be 'InService'")
442 def test_30_check_update_openroadm_topo(self):
443 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
444 self.assertEqual(response['status_code'], requests.codes.ok)
445 node_list = response['network'][0]['node']
447 for node in node_list:
448 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
449 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
450 tp_list = node['ietf-network-topology:termination-point']
452 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
453 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
454 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
457 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
458 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
459 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
461 link_list = response['network'][0]['ietf-network-topology:link']
462 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
463 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
465 for link in link_list:
466 if link['link-id'] in updated_links:
467 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
468 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
471 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
472 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
473 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
476 def test_31_restore_status_line_port_roadma_srg(self):
477 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
480 "logical-connection-point": "DEG2-TTP-TXRX",
483 "administrative-state": "inService",
484 "port-qual": "roadm-external"}]}
485 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
486 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
487 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
488 self.assertEqual(response.status_code, requests.codes.ok)
491 def test_32_check_update_portmapping_ok(self):
492 self.test_18_check_update_portmapping_ok()
494 def test_33_check_update_openroadm_topo_ok(self):
495 self.test_19_check_update_openroadm_topo_ok()
497 def test_34_check_update_service1_ok(self):
498 self.test_12_get_eth_service1()
500 def test_35_change_status_line_port_xpdrc(self):
501 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
505 "administrative-state": "outOfService",
506 "port-qual": "xpdr-network"}]}
507 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
508 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
509 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
510 self.assertEqual(response.status_code, requests.codes.ok)
513 def test_36_check_update_portmapping(self):
514 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None)
515 self.assertEqual(response['status_code'], requests.codes.ok)
516 mapping_list = response['nodes'][0]['mapping']
517 for mapping in mapping_list:
518 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
519 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
520 "Operational State should be 'OutOfService'")
521 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
522 "Administrative State should be 'OutOfService'")
524 self.assertEqual(mapping['port-oper-state'], 'InService',
525 "Operational State should be 'InService'")
526 self.assertEqual(mapping['port-admin-state'], 'InService',
527 "Administrative State should be 'InService'")
530 def test_37_check_update_openroadm_topo(self):
531 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
532 self.assertEqual(response['status_code'], requests.codes.ok)
533 node_list = response['network'][0]['node']
535 for node in node_list:
536 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
537 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
538 tp_list = node['ietf-network-topology:termination-point']
540 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
541 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
542 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
545 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
546 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
547 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
549 link_list = response['network'][0]['ietf-network-topology:link']
550 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
551 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
553 for link in link_list:
554 if link['link-id'] in updated_links:
555 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
556 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
559 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
560 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
561 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
564 def test_38_restore_status_line_port_xpdrc(self):
565 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
569 "administrative-state": "inService",
570 "port-qual": "xpdr-network"}]}
571 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
572 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
573 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
574 self.assertEqual(response.status_code, requests.codes.ok)
577 def test_39_check_update_portmapping_ok(self):
578 self.test_18_check_update_portmapping_ok()
580 def test_40_check_update_openroadm_topo_ok(self):
581 self.test_19_check_update_openroadm_topo_ok()
583 def test_41_check_update_service1_ok(self):
584 self.test_12_get_eth_service1()
586 def test_42_change_status_port_roadma_srg(self):
587 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
590 "logical-connection-point": "SRG1-PP2",
591 "port-type": "client",
592 "circuit-id": "SRG1",
593 "administrative-state": "outOfService",
594 "port-qual": "roadm-external"}]}
595 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
596 data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
597 auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
598 self.assertEqual(response.status_code, requests.codes.ok)
601 def test_43_check_update_portmapping(self):
602 response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
603 self.assertEqual(response['status_code'], requests.codes.ok)
604 mapping_list = response['nodes'][0]['mapping']
605 for mapping in mapping_list:
606 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
607 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
608 "Operational State should be 'OutOfService'")
609 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
610 "Administrative State should be 'OutOfService'")
612 self.assertEqual(mapping['port-oper-state'], 'InService',
613 "Operational State should be 'InService'")
614 self.assertEqual(mapping['port-admin-state'], 'InService',
615 "Administrative State should be 'InService'")
618 def test_44_check_update_openroadm_topo(self):
619 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
620 self.assertEqual(response['status_code'], requests.codes.ok)
621 node_list = response['network'][0]['node']
623 for node in node_list:
624 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
625 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
626 tp_list = node['ietf-network-topology:termination-point']
628 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
629 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
630 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
633 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
634 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
635 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
637 link_list = response['network'][0]['ietf-network-topology:link']
639 for link in link_list:
640 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
641 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
642 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
645 def test_45_check_update_service1_ok(self):
646 self.test_12_get_eth_service1()
648 def test_46_delete_eth_service1(self):
649 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
650 response = test_utils_rfc8040.transportpce_api_rpc_request(
651 'org-openroadm-service', 'service-delete',
652 self.del_serv_input_data)
653 self.assertEqual(response['status_code'], requests.codes.ok)
654 self.assertIn('Renderer service delete in progress',
655 response['output']['configuration-response-common']['response-message'])
656 time.sleep(self.WAITING)
658 def test_47_disconnect_xponders_from_roadm(self):
659 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
660 self.assertEqual(response['status_code'], requests.codes.ok)
661 links = response['network'][0]['ietf-network-topology:link']
663 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
664 response = test_utils_rfc8040.del_ietf_network_link_request(
665 'openroadm-topology', link['link-id'], 'config')
666 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
668 def test_48_disconnect_XPDRA(self):
669 response = test_utils_rfc8040.unmount_device("XPDRA01")
670 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
672 def test_49_disconnect_XPDRC(self):
673 response = test_utils_rfc8040.unmount_device("XPDR-C1")
674 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
676 def test_50_disconnect_ROADMA(self):
677 response = test_utils_rfc8040.unmount_device("ROADM-A1")
678 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
680 def test_51_disconnect_ROADMC(self):
681 response = test_utils_rfc8040.unmount_device("ROADM-C1")
682 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
685 if __name__ == "__main__":
686 unittest.main(verbosity=2)