2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils # nopep8
25 class TransportPCEFulltesting(unittest.TestCase):
28 cr_serv_sample_data = {"input": {
29 "sdnc-request-header": {
30 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
31 "rpc-action": "service-create",
32 "request-system-id": "appname",
33 "notification-url": "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
45 "port-device-name": "1/0/C1",
48 "port-rack": "000000.00",
49 "port-shelf": "Chassis#1"
52 "lgx-device-name": "Some lgx-device-name",
53 "lgx-port-name": "Some lgx-port-name",
54 "lgx-port-rack": "000000.00",
55 "lgx-port-shelf": "00"
60 "port-device-name": "1/0/C1",
63 "port-rack": "000000.00",
64 "port-shelf": "Chassis#1"
67 "lgx-device-name": "Some lgx-device-name",
68 "lgx-port-name": "Some lgx-port-name",
69 "lgx-port-rack": "000000.00",
70 "lgx-port-shelf": "00"
76 "service-rate": "100",
78 "service-format": "Ethernet",
79 "clli": "SNJSCAMCJT4",
82 "port-device-name": "1/0/C1",
85 "port-rack": "000000.00",
86 "port-shelf": "Chassis#1"
89 "lgx-device-name": "Some lgx-device-name",
90 "lgx-port-name": "Some lgx-port-name",
91 "lgx-port-rack": "000000.00",
92 "lgx-port-shelf": "00"
97 "port-device-name": "1/0/C1",
100 "port-rack": "000000.00",
101 "port-shelf": "Chassis#1"
104 "lgx-device-name": "Some lgx-device-name",
105 "lgx-port-name": "Some lgx-port-name",
106 "lgx-port-rack": "000000.00",
107 "lgx-port-shelf": "00"
112 "due-date": "2016-11-28T00:00:01Z",
113 "operator-contact": "pw1234"
117 WAITING = 25 # nominal value is 300
118 NODE_VERSION_121 = '1.2.1'
119 NODE_VERSION_221 = '2.2.1'
120 NODE_VERSION_71 = '7.1'
124 cls.processes = test_utils.start_tpce()
125 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
126 ('roadma', cls.NODE_VERSION_221),
127 ('roadmc', cls.NODE_VERSION_221),
128 ('xpdrc', cls.NODE_VERSION_71)])
131 def tearDownClass(cls):
132 # pylint: disable=not-an-iterable
133 for process in cls.processes:
134 test_utils.shutdown_process(process)
135 print("all processes killed")
138 def setUp(self): # instruction executed before each test method
139 # pylint: disable=consider-using-f-string
140 print("execution of {}".format(self.id().split(".")[-1]))
142 def test_01_connect_xpdrA(self):
143 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
144 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146 def test_02_connect_xpdrC(self):
147 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
148 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150 def test_03_connect_rdmA(self):
151 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
152 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
154 def test_04_connect_rdmC(self):
155 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
156 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
158 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
159 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
160 "ROADM-A1", "1", "SRG1-PP1-TXRX")
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
166 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
167 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
168 "ROADM-A1", "1", "SRG1-PP1-TXRX")
169 self.assertEqual(response.status_code, requests.codes.ok)
170 res = response.json()
171 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
174 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
175 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
176 "ROADM-C1", "1", "SRG1-PP1-TXRX")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
182 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
183 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
184 "ROADM-C1", "1", "SRG1-PP1-TXRX")
185 self.assertEqual(response.status_code, requests.codes.ok)
186 res = response.json()
187 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
190 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
191 # Config ROADMA-ROADMC oms-attributes
193 "auto-spanloss": "true",
194 "spanloss-base": 11.4,
195 "spanloss-current": 12,
196 "engineered-spanloss": 12.2,
197 "link-concatenation": [{
200 "SRLG-length": 100000,
202 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
203 self.assertEqual(response.status_code, requests.codes.created)
205 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
206 # Config ROADMC-ROADMA oms-attributes
208 "auto-spanloss": "true",
209 "spanloss-base": 11.4,
210 "spanloss-current": 12,
211 "engineered-spanloss": 12.2,
212 "link-concatenation": [{
215 "SRLG-length": 100000,
217 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
218 self.assertEqual(response.status_code, requests.codes.created)
220 # test service-create for Eth service from xpdr to xpdr
221 def test_11_create_eth_service1(self):
222 self.cr_serv_sample_data["input"]["service-name"] = "service1"
223 response = test_utils.service_create_request(self.cr_serv_sample_data)
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
226 self.assertIn('PCE calculation in progress',
227 res['output']['configuration-response-common']['response-message'])
228 time.sleep(self.WAITING)
230 def test_12_get_eth_service1(self):
231 response = test_utils.get_service_list_request("services/service1")
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
235 res['services'][0]['administrative-state'], 'inService')
237 res['services'][0]['service-name'], 'service1')
239 res['services'][0]['connection-type'], 'service')
241 res['services'][0]['lifecycle-state'], 'planned')
244 def test_13_change_status_line_port_xpdra(self):
245 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
248 "logical-connection-point": "XPDR1-NETWORK1",
250 "circuit-id": "XPDRA-NETWORK",
251 "administrative-state": "outOfService",
252 "port-qual": "xpdr-network"}]}
253 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
254 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
255 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
256 self.assertEqual(response.status_code, requests.codes.ok)
259 def test_14_check_update_portmapping(self):
260 response = test_utils.portmapping_request("XPDRA01")
261 self.assertEqual(response.status_code, requests.codes.ok)
262 res = response.json()
263 mapping_list = res['nodes'][0]['mapping']
264 for mapping in mapping_list:
265 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
266 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
267 "Operational State should be 'OutOfService'")
268 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
269 "Administrative State should be 'OutOfService'")
271 self.assertEqual(mapping['port-oper-state'], 'InService',
272 "Operational State should be 'InService'")
273 self.assertEqual(mapping['port-admin-state'], 'InService',
274 "Administrative State should be 'InService'")
277 def test_15_check_update_openroadm_topo(self):
278 url = test_utils.URL_CONFIG_ORDM_TOPO
279 response = test_utils.get_request(url)
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
282 node_list = res['network'][0]['node']
284 for node in node_list:
285 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
286 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
287 tp_list = node['ietf-network-topology:termination-point']
289 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
290 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
291 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
294 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
295 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
296 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
298 link_list = res['network'][0]['ietf-network-topology:link']
299 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
300 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
302 for link in link_list:
303 if link['link-id'] in updated_links:
304 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
305 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
308 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
309 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
310 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
313 def test_16_check_update_service1(self):
314 response = test_utils.get_service_list_request("services/service1")
315 self.assertEqual(response.status_code, requests.codes.ok)
316 res = response.json()
317 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
318 self.assertEqual(res['services'][0]['administrative-state'], 'outOfService')
321 def test_17_restore_status_line_port_xpdra(self):
322 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
325 "logical-connection-point": "XPDR1-NETWORK1",
327 "circuit-id": "XPDRA-NETWORK",
328 "administrative-state": "inService",
329 "port-qual": "xpdr-network"}]}
330 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
331 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
332 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
333 self.assertEqual(response.status_code, requests.codes.ok)
336 def test_18_check_update_portmapping_ok(self):
337 response = test_utils.portmapping_request("XPDRA01")
338 self.assertEqual(response.status_code, requests.codes.ok)
339 res = response.json()
340 mapping_list = res['nodes'][0]['mapping']
341 for mapping in mapping_list:
342 self.assertEqual(mapping['port-oper-state'], 'InService',
343 "Operational State should be 'InService'")
344 self.assertEqual(mapping['port-admin-state'], 'InService',
345 "Administrative State should be 'InService'")
348 def test_19_check_update_openroadm_topo_ok(self):
349 url = test_utils.URL_CONFIG_ORDM_TOPO
350 response = test_utils.get_request(url)
351 self.assertEqual(response.status_code, requests.codes.ok)
352 res = response.json()
353 node_list = res['network'][0]['node']
354 for node in node_list:
355 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
356 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
357 tp_list = node['ietf-network-topology:termination-point']
359 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
360 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
362 link_list = res['network'][0]['ietf-network-topology:link']
363 for link in link_list:
364 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
365 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
368 def test_20_check_update_service1_ok(self):
369 self.test_12_get_eth_service1()
371 def test_21_change_status_port_roadma_srg(self):
372 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
375 "logical-connection-point": "SRG1-PP1",
376 "port-type": "client",
377 "circuit-id": "SRG1",
378 "administrative-state": "outOfService",
379 "port-qual": "roadm-external"}]}
380 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
381 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
382 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
383 self.assertEqual(response.status_code, requests.codes.ok)
386 def test_22_check_update_portmapping(self):
387 response = test_utils.portmapping_request("ROADM-A1")
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 mapping_list = res['nodes'][0]['mapping']
391 for mapping in mapping_list:
392 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
393 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
394 "Operational State should be 'OutOfService'")
395 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
396 "Administrative State should be 'OutOfService'")
398 self.assertEqual(mapping['port-oper-state'], 'InService',
399 "Operational State should be 'InService'")
400 self.assertEqual(mapping['port-admin-state'], 'InService',
401 "Administrative State should be 'InService'")
404 def test_23_check_update_openroadm_topo(self):
405 url = test_utils.URL_CONFIG_ORDM_TOPO
406 response = test_utils.get_request(url)
407 self.assertEqual(response.status_code, requests.codes.ok)
408 res = response.json()
409 node_list = res['network'][0]['node']
411 for node in node_list:
412 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
413 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
414 tp_list = node['ietf-network-topology:termination-point']
416 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
417 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
418 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
421 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
422 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
423 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
425 link_list = res['network'][0]['ietf-network-topology:link']
426 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
427 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
429 for link in link_list:
430 if link['link-id'] in updated_links:
431 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
432 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
435 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
436 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
437 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
440 def test_24_restore_status_port_roadma_srg(self):
441 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
444 "logical-connection-point": "SRG1-PP1",
445 "port-type": "client",
446 "circuit-id": "SRG1",
447 "administrative-state": "inService",
448 "port-qual": "roadm-external"}]}
449 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
450 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
451 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
452 self.assertEqual(response.status_code, requests.codes.ok)
455 def test_25_check_update_portmapping_ok(self):
456 self.test_18_check_update_portmapping_ok()
458 def test_26_check_update_openroadm_topo_ok(self):
459 self.test_19_check_update_openroadm_topo_ok()
461 def test_27_check_update_service1_ok(self):
462 self.test_12_get_eth_service1()
464 def test_28_change_status_line_port_roadma_deg(self):
465 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
468 "logical-connection-point": "DEG2-TTP-TXRX",
471 "administrative-state": "outOfService",
472 "port-qual": "roadm-external"}]}
473 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
474 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
475 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
476 self.assertEqual(response.status_code, requests.codes.ok)
479 def test_29_check_update_portmapping(self):
480 response = test_utils.portmapping_request("ROADM-A1")
481 self.assertEqual(response.status_code, requests.codes.ok)
482 res = response.json()
483 mapping_list = res['nodes'][0]['mapping']
484 for mapping in mapping_list:
485 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
486 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
487 "Operational State should be 'OutOfService'")
488 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
489 "Administrative State should be 'OutOfService'")
491 self.assertEqual(mapping['port-oper-state'], 'InService',
492 "Operational State should be 'InService'")
493 self.assertEqual(mapping['port-admin-state'], 'InService',
494 "Administrative State should be 'InService'")
497 def test_30_check_update_openroadm_topo(self):
498 url = test_utils.URL_CONFIG_ORDM_TOPO
499 response = test_utils.get_request(url)
500 self.assertEqual(response.status_code, requests.codes.ok)
501 res = response.json()
502 node_list = res['network'][0]['node']
504 for node in node_list:
505 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
506 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
507 tp_list = node['ietf-network-topology:termination-point']
509 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
510 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
511 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
514 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
515 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
516 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
518 link_list = res['network'][0]['ietf-network-topology:link']
519 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
520 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
522 for link in link_list:
523 if link['link-id'] in updated_links:
524 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
525 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
528 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
529 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
530 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
533 def test_31_restore_status_line_port_roadma_srg(self):
534 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
537 "logical-connection-point": "DEG2-TTP-TXRX",
540 "administrative-state": "inService",
541 "port-qual": "roadm-external"}]}
542 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
543 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
544 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
545 self.assertEqual(response.status_code, requests.codes.ok)
548 def test_32_check_update_portmapping_ok(self):
549 self.test_18_check_update_portmapping_ok()
551 def test_33_check_update_openroadm_topo_ok(self):
552 self.test_19_check_update_openroadm_topo_ok()
554 def test_34_check_update_service1_ok(self):
555 self.test_12_get_eth_service1()
557 def test_35_change_status_line_port_xpdrc(self):
558 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
562 "administrative-state": "outOfService",
563 "port-qual": "xpdr-network"}]}
564 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
565 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
566 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
567 self.assertEqual(response.status_code, requests.codes.ok)
570 def test_36_check_update_portmapping(self):
571 response = test_utils.portmapping_request("XPDR-C1")
572 self.assertEqual(response.status_code, requests.codes.ok)
573 res = response.json()
574 mapping_list = res['nodes'][0]['mapping']
575 for mapping in mapping_list:
576 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
577 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
578 "Operational State should be 'OutOfService'")
579 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
580 "Administrative State should be 'OutOfService'")
582 self.assertEqual(mapping['port-oper-state'], 'InService',
583 "Operational State should be 'InService'")
584 self.assertEqual(mapping['port-admin-state'], 'InService',
585 "Administrative State should be 'InService'")
588 def test_37_check_update_openroadm_topo(self):
589 url = test_utils.URL_CONFIG_ORDM_TOPO
590 response = test_utils.get_request(url)
591 self.assertEqual(response.status_code, requests.codes.ok)
592 res = response.json()
593 node_list = res['network'][0]['node']
595 for node in node_list:
596 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
597 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
598 tp_list = node['ietf-network-topology:termination-point']
600 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
601 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
602 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
605 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
606 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
607 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
609 link_list = res['network'][0]['ietf-network-topology:link']
610 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
611 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
613 for link in link_list:
614 if link['link-id'] in updated_links:
615 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
616 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
619 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
620 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
621 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
624 def test_38_restore_status_line_port_xpdrc(self):
625 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
629 "administrative-state": "inService",
630 "port-qual": "xpdr-network"}]}
631 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
632 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
633 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
634 self.assertEqual(response.status_code, requests.codes.ok)
637 def test_39_check_update_portmapping_ok(self):
638 self.test_18_check_update_portmapping_ok()
640 def test_40_check_update_openroadm_topo_ok(self):
641 self.test_19_check_update_openroadm_topo_ok()
643 def test_41_check_update_service1_ok(self):
644 self.test_12_get_eth_service1()
646 def test_42_change_status_port_roadma_srg(self):
647 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
650 "logical-connection-point": "SRG1-PP2",
651 "port-type": "client",
652 "circuit-id": "SRG1",
653 "administrative-state": "outOfService",
654 "port-qual": "roadm-external"}]}
655 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
656 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
657 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
658 self.assertEqual(response.status_code, requests.codes.ok)
661 def test_43_check_update_portmapping(self):
662 response = test_utils.portmapping_request("ROADM-A1")
663 self.assertEqual(response.status_code, requests.codes.ok)
664 res = response.json()
665 mapping_list = res['nodes'][0]['mapping']
666 for mapping in mapping_list:
667 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
668 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
669 "Operational State should be 'OutOfService'")
670 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
671 "Administrative State should be 'OutOfService'")
673 self.assertEqual(mapping['port-oper-state'], 'InService',
674 "Operational State should be 'InService'")
675 self.assertEqual(mapping['port-admin-state'], 'InService',
676 "Administrative State should be 'InService'")
679 def test_44_check_update_openroadm_topo(self):
680 url = test_utils.URL_CONFIG_ORDM_TOPO
681 response = test_utils.get_request(url)
682 self.assertEqual(response.status_code, requests.codes.ok)
683 res = response.json()
684 node_list = res['network'][0]['node']
686 for node in node_list:
687 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
688 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
689 tp_list = node['ietf-network-topology:termination-point']
691 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
692 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
693 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
696 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
697 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
698 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
700 link_list = res['network'][0]['ietf-network-topology:link']
702 for link in link_list:
703 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
704 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
705 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
708 def test_45_check_update_service1_ok(self):
709 self.test_12_get_eth_service1()
711 def test_46_delete_eth_service1(self):
712 response = test_utils.service_delete_request("service1")
713 self.assertEqual(response.status_code, requests.codes.ok)
714 res = response.json()
715 self.assertIn('Renderer service delete in progress',
716 res['output']['configuration-response-common']['response-message'])
717 time.sleep(self.WAITING)
719 def test_47_disconnect_xponders_from_roadm(self):
720 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
721 response = test_utils.get_ordm_topo_request("")
722 self.assertEqual(response.status_code, requests.codes.ok)
723 res = response.json()
724 links = res['network'][0]['ietf-network-topology:link']
726 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
727 link_name = link["link-id"]
728 response = test_utils.delete_request(url+link_name)
729 self.assertEqual(response.status_code, requests.codes.ok)
731 def test_48_disconnect_XPDRA(self):
732 response = test_utils.unmount_device("XPDRA01")
733 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
735 def test_49_disconnect_XPDRC(self):
736 response = test_utils.unmount_device("XPDR-C1")
737 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
739 def test_50_disconnect_ROADMA(self):
740 response = test_utils.unmount_device("ROADM-A1")
741 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
743 def test_51_disconnect_ROADMC(self):
744 response = test_utils.unmount_device("ROADM-C1")
745 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
748 if __name__ == "__main__":
749 unittest.main(verbosity=2)