2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
23 class TransportPCEFulltesting(unittest.TestCase):
26 cr_serv_sample_data = {"input": {
27 "sdnc-request-header": {
28 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
29 "rpc-action": "service-create",
30 "request-system-id": "appname",
31 "notification-url": "http://localhost:8585/NotificationServer/notify"
33 "service-name": "service1",
34 "common-id": "ASATT1234567",
35 "connection-type": "service",
37 "service-rate": "100",
39 "service-format": "Ethernet",
40 "clli": "SNJSCAMCJP8",
43 "port-device-name": "1/0/C1",
46 "port-rack": "000000.00",
47 "port-shelf": "Chassis#1"
50 "lgx-device-name": "Some lgx-device-name",
51 "lgx-port-name": "Some lgx-port-name",
52 "lgx-port-rack": "000000.00",
53 "lgx-port-shelf": "00"
58 "port-device-name": "1/0/C1",
61 "port-rack": "000000.00",
62 "port-shelf": "Chassis#1"
65 "lgx-device-name": "Some lgx-device-name",
66 "lgx-port-name": "Some lgx-port-name",
67 "lgx-port-rack": "000000.00",
68 "lgx-port-shelf": "00"
74 "service-rate": "100",
76 "service-format": "Ethernet",
77 "clli": "SNJSCAMCJT4",
80 "port-device-name": "1/0/C1",
83 "port-rack": "000000.00",
84 "port-shelf": "Chassis#1"
87 "lgx-device-name": "Some lgx-device-name",
88 "lgx-port-name": "Some lgx-port-name",
89 "lgx-port-rack": "000000.00",
90 "lgx-port-shelf": "00"
95 "port-device-name": "1/0/C1",
98 "port-rack": "000000.00",
99 "port-shelf": "Chassis#1"
102 "lgx-device-name": "Some lgx-device-name",
103 "lgx-port-name": "Some lgx-port-name",
104 "lgx-port-rack": "000000.00",
105 "lgx-port-shelf": "00"
110 "due-date": "2016-11-28T00:00:01Z",
111 "operator-contact": "pw1234"
115 WAITING = 25 # nominal value is 300
116 NODE_VERSION_121 = '1.2.1'
117 NODE_VERSION_221 = '2.2.1'
118 NODE_VERSION_71 = '7.1'
122 cls.processes = test_utils.start_tpce()
123 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
124 ('roadma', cls.NODE_VERSION_221),
125 ('roadmc', cls.NODE_VERSION_221),
126 ('xpdrc', cls.NODE_VERSION_71)])
129 def tearDownClass(cls):
130 # pylint: disable=not-an-iterable
131 for process in cls.processes:
132 test_utils.shutdown_process(process)
133 print("all processes killed")
136 def setUp(self): # instruction executed before each test method
137 print("execution of {}".format(self.id().split(".")[-1]))
139 def test_01_connect_xpdrA(self):
140 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_02_connect_xpdrC(self):
144 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_03_connect_rdmA(self):
148 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_04_connect_rdmC(self):
152 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
156 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
157 "ROADM-A1", "1", "SRG1-PP1-TXRX")
158 self.assertEqual(response.status_code, requests.codes.ok)
159 res = response.json()
160 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
163 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
164 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
165 "ROADM-A1", "1", "SRG1-PP1-TXRX")
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
171 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
172 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
173 "ROADM-C1", "1", "SRG1-PP1-TXRX")
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
179 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
180 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
181 "ROADM-C1", "1", "SRG1-PP1-TXRX")
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
184 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
187 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
188 # Config ROADMA-ROADMC oms-attributes
190 "auto-spanloss": "true",
191 "spanloss-base": 11.4,
192 "spanloss-current": 12,
193 "engineered-spanloss": 12.2,
194 "link-concatenation": [{
197 "SRLG-length": 100000,
199 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
200 self.assertEqual(response.status_code, requests.codes.created)
202 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
203 # Config ROADMC-ROADMA oms-attributes
205 "auto-spanloss": "true",
206 "spanloss-base": 11.4,
207 "spanloss-current": 12,
208 "engineered-spanloss": 12.2,
209 "link-concatenation": [{
212 "SRLG-length": 100000,
214 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
215 self.assertEqual(response.status_code, requests.codes.created)
217 # test service-create for Eth service from xpdr to xpdr
218 def test_11_create_eth_service1(self):
219 self.cr_serv_sample_data["input"]["service-name"] = "service1"
220 response = test_utils.service_create_request(self.cr_serv_sample_data)
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 self.assertIn('PCE calculation in progress',
224 res['output']['configuration-response-common']['response-message'])
225 time.sleep(self.WAITING)
227 def test_12_get_eth_service1(self):
228 response = test_utils.get_service_list_request("services/service1")
229 self.assertEqual(response.status_code, requests.codes.ok)
230 res = response.json()
232 res['services'][0]['administrative-state'], 'inService')
234 res['services'][0]['service-name'], 'service1')
236 res['services'][0]['connection-type'], 'service')
238 res['services'][0]['lifecycle-state'], 'planned')
241 def test_13_change_status_line_port_xpdra(self):
242 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
245 "logical-connection-point": "XPDR1-NETWORK1",
247 "circuit-id": "XPDRA-NETWORK",
248 "administrative-state": "outOfService",
249 "port-qual": "xpdr-network"}]}
250 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
251 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
252 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
253 self.assertEqual(response.status_code, requests.codes.ok)
256 def test_14_check_update_portmapping(self):
257 response = test_utils.portmapping_request("XPDRA01")
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 mapping_list = res['nodes'][0]['mapping']
261 for mapping in mapping_list:
262 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
263 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
264 "Operational State should be 'OutOfService'")
265 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
266 "Administrative State should be 'OutOfService'")
268 self.assertEqual(mapping['port-oper-state'], 'InService',
269 "Operational State should be 'InService'")
270 self.assertEqual(mapping['port-admin-state'], 'InService',
271 "Administrative State should be 'InService'")
274 def test_15_check_update_openroadm_topo(self):
275 url = test_utils.URL_CONFIG_ORDM_TOPO
276 response = test_utils.get_request(url)
277 self.assertEqual(response.status_code, requests.codes.ok)
278 res = response.json()
279 node_list = res['network'][0]['node']
281 for node in node_list:
282 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
283 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
284 tp_list = node['ietf-network-topology:termination-point']
286 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
287 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
288 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
291 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
292 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
293 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
295 link_list = res['network'][0]['ietf-network-topology:link']
296 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
297 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
299 for link in link_list:
300 if link['link-id'] in updated_links:
301 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
302 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
305 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
306 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
307 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
310 def test_16_check_update_service1(self):
311 response = test_utils.get_service_list_request("services/service1")
312 self.assertEqual(response.status_code, requests.codes.ok)
313 res = response.json()
314 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
315 self.assertEqual(res['services'][0]['administrative-state'], 'outOfService')
318 def test_17_restore_status_line_port_xpdra(self):
319 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
322 "logical-connection-point": "XPDR1-NETWORK1",
324 "circuit-id": "XPDRA-NETWORK",
325 "administrative-state": "inService",
326 "port-qual": "xpdr-network"}]}
327 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
328 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
329 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
330 self.assertEqual(response.status_code, requests.codes.ok)
333 def test_18_check_update_portmapping_ok(self):
334 response = test_utils.portmapping_request("XPDRA01")
335 self.assertEqual(response.status_code, requests.codes.ok)
336 res = response.json()
337 mapping_list = res['nodes'][0]['mapping']
338 for mapping in mapping_list:
339 self.assertEqual(mapping['port-oper-state'], 'InService',
340 "Operational State should be 'InService'")
341 self.assertEqual(mapping['port-admin-state'], 'InService',
342 "Administrative State should be 'InService'")
345 def test_19_check_update_openroadm_topo_ok(self):
346 url = test_utils.URL_CONFIG_ORDM_TOPO
347 response = test_utils.get_request(url)
348 self.assertEqual(response.status_code, requests.codes.ok)
349 res = response.json()
350 node_list = res['network'][0]['node']
351 for node in node_list:
352 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
353 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
354 tp_list = node['ietf-network-topology:termination-point']
356 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
357 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
359 link_list = res['network'][0]['ietf-network-topology:link']
360 for link in link_list:
361 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
362 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
365 def test_20_check_update_service1_ok(self):
366 self.test_12_get_eth_service1()
368 def test_21_change_status_port_roadma_srg(self):
369 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
372 "logical-connection-point": "SRG1-PP1",
373 "port-type": "client",
374 "circuit-id": "SRG1",
375 "administrative-state": "outOfService",
376 "port-qual": "roadm-external"}]}
377 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
378 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
379 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
380 self.assertEqual(response.status_code, requests.codes.ok)
383 def test_22_check_update_portmapping(self):
384 response = test_utils.portmapping_request("ROADM-A1")
385 self.assertEqual(response.status_code, requests.codes.ok)
386 res = response.json()
387 mapping_list = res['nodes'][0]['mapping']
388 for mapping in mapping_list:
389 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
390 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
391 "Operational State should be 'OutOfService'")
392 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
393 "Administrative State should be 'OutOfService'")
395 self.assertEqual(mapping['port-oper-state'], 'InService',
396 "Operational State should be 'InService'")
397 self.assertEqual(mapping['port-admin-state'], 'InService',
398 "Administrative State should be 'InService'")
401 def test_23_check_update_openroadm_topo(self):
402 url = test_utils.URL_CONFIG_ORDM_TOPO
403 response = test_utils.get_request(url)
404 self.assertEqual(response.status_code, requests.codes.ok)
405 res = response.json()
406 node_list = res['network'][0]['node']
408 for node in node_list:
409 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
410 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
411 tp_list = node['ietf-network-topology:termination-point']
413 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
414 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
415 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
418 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
419 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
420 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
422 link_list = res['network'][0]['ietf-network-topology:link']
423 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
424 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
426 for link in link_list:
427 if link['link-id'] in updated_links:
428 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
429 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
432 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
433 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
434 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
437 def test_24_restore_status_port_roadma_srg(self):
438 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
441 "logical-connection-point": "SRG1-PP1",
442 "port-type": "client",
443 "circuit-id": "SRG1",
444 "administrative-state": "inService",
445 "port-qual": "roadm-external"}]}
446 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
447 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
448 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
449 self.assertEqual(response.status_code, requests.codes.ok)
452 def test_25_check_update_portmapping_ok(self):
453 self.test_18_check_update_portmapping_ok()
455 def test_26_check_update_openroadm_topo_ok(self):
456 self.test_19_check_update_openroadm_topo_ok()
458 def test_27_check_update_service1_ok(self):
459 self.test_12_get_eth_service1()
461 def test_28_change_status_line_port_roadma_deg(self):
462 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
465 "logical-connection-point": "DEG2-TTP-TXRX",
468 "administrative-state": "outOfService",
469 "port-qual": "roadm-external"}]}
470 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
471 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
472 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
473 self.assertEqual(response.status_code, requests.codes.ok)
476 def test_29_check_update_portmapping(self):
477 response = test_utils.portmapping_request("ROADM-A1")
478 self.assertEqual(response.status_code, requests.codes.ok)
479 res = response.json()
480 mapping_list = res['nodes'][0]['mapping']
481 for mapping in mapping_list:
482 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
483 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
484 "Operational State should be 'OutOfService'")
485 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
486 "Administrative State should be 'OutOfService'")
488 self.assertEqual(mapping['port-oper-state'], 'InService',
489 "Operational State should be 'InService'")
490 self.assertEqual(mapping['port-admin-state'], 'InService',
491 "Administrative State should be 'InService'")
494 def test_30_check_update_openroadm_topo(self):
495 url = test_utils.URL_CONFIG_ORDM_TOPO
496 response = test_utils.get_request(url)
497 self.assertEqual(response.status_code, requests.codes.ok)
498 res = response.json()
499 node_list = res['network'][0]['node']
501 for node in node_list:
502 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
503 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
504 tp_list = node['ietf-network-topology:termination-point']
506 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
507 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
508 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
511 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
512 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
513 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
515 link_list = res['network'][0]['ietf-network-topology:link']
516 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
517 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
519 for link in link_list:
520 if link['link-id'] in updated_links:
521 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
522 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
525 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
526 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
527 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
530 def test_31_restore_status_line_port_roadma_srg(self):
531 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
534 "logical-connection-point": "DEG2-TTP-TXRX",
537 "administrative-state": "inService",
538 "port-qual": "roadm-external"}]}
539 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
540 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
541 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
542 self.assertEqual(response.status_code, requests.codes.ok)
545 def test_32_check_update_portmapping_ok(self):
546 self.test_18_check_update_portmapping_ok()
548 def test_33_check_update_openroadm_topo_ok(self):
549 self.test_19_check_update_openroadm_topo_ok()
551 def test_34_check_update_service1_ok(self):
552 self.test_12_get_eth_service1()
554 def test_35_change_status_line_port_xpdrc(self):
555 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
559 "administrative-state": "outOfService",
560 "port-qual": "xpdr-network"}]}
561 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
562 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
563 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
564 self.assertEqual(response.status_code, requests.codes.ok)
567 def test_36_check_update_portmapping(self):
568 response = test_utils.portmapping_request("XPDR-C1")
569 self.assertEqual(response.status_code, requests.codes.ok)
570 res = response.json()
571 mapping_list = res['nodes'][0]['mapping']
572 for mapping in mapping_list:
573 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
574 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
575 "Operational State should be 'OutOfService'")
576 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
577 "Administrative State should be 'OutOfService'")
579 self.assertEqual(mapping['port-oper-state'], 'InService',
580 "Operational State should be 'InService'")
581 self.assertEqual(mapping['port-admin-state'], 'InService',
582 "Administrative State should be 'InService'")
585 def test_37_check_update_openroadm_topo(self):
586 url = test_utils.URL_CONFIG_ORDM_TOPO
587 response = test_utils.get_request(url)
588 self.assertEqual(response.status_code, requests.codes.ok)
589 res = response.json()
590 node_list = res['network'][0]['node']
592 for node in node_list:
593 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
594 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
595 tp_list = node['ietf-network-topology:termination-point']
597 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
598 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
599 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
602 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
603 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
604 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
606 link_list = res['network'][0]['ietf-network-topology:link']
607 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
608 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
610 for link in link_list:
611 if link['link-id'] in updated_links:
612 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
613 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
616 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
617 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
618 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
621 def test_38_restore_status_line_port_xpdrc(self):
622 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
626 "administrative-state": "inService",
627 "port-qual": "xpdr-network"}]}
628 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
629 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
630 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
631 self.assertEqual(response.status_code, requests.codes.ok)
634 def test_39_check_update_portmapping_ok(self):
635 self.test_18_check_update_portmapping_ok()
637 def test_40_check_update_openroadm_topo_ok(self):
638 self.test_19_check_update_openroadm_topo_ok()
640 def test_41_check_update_service1_ok(self):
641 self.test_12_get_eth_service1()
643 def test_42_change_status_port_roadma_srg(self):
644 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
647 "logical-connection-point": "SRG1-PP2",
648 "port-type": "client",
649 "circuit-id": "SRG1",
650 "administrative-state": "outOfService",
651 "port-qual": "roadm-external"}]}
652 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
653 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
654 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
655 self.assertEqual(response.status_code, requests.codes.ok)
658 def test_43_check_update_portmapping(self):
659 response = test_utils.portmapping_request("ROADM-A1")
660 self.assertEqual(response.status_code, requests.codes.ok)
661 res = response.json()
662 mapping_list = res['nodes'][0]['mapping']
663 for mapping in mapping_list:
664 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
665 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
666 "Operational State should be 'OutOfService'")
667 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
668 "Administrative State should be 'OutOfService'")
670 self.assertEqual(mapping['port-oper-state'], 'InService',
671 "Operational State should be 'InService'")
672 self.assertEqual(mapping['port-admin-state'], 'InService',
673 "Administrative State should be 'InService'")
676 def test_44_check_update_openroadm_topo(self):
677 url = test_utils.URL_CONFIG_ORDM_TOPO
678 response = test_utils.get_request(url)
679 self.assertEqual(response.status_code, requests.codes.ok)
680 res = response.json()
681 node_list = res['network'][0]['node']
683 for node in node_list:
684 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
685 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
686 tp_list = node['ietf-network-topology:termination-point']
688 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
689 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
690 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
693 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
694 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
695 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
697 link_list = res['network'][0]['ietf-network-topology:link']
699 for link in link_list:
700 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
701 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
702 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
705 def test_45_check_update_service1_ok(self):
706 self.test_12_get_eth_service1()
708 def test_46_delete_eth_service1(self):
709 response = test_utils.service_delete_request("service1")
710 self.assertEqual(response.status_code, requests.codes.ok)
711 res = response.json()
712 self.assertIn('Renderer service delete in progress',
713 res['output']['configuration-response-common']['response-message'])
714 time.sleep(self.WAITING)
716 def test_47_disconnect_xponders_from_roadm(self):
717 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
718 response = test_utils.get_ordm_topo_request("")
719 self.assertEqual(response.status_code, requests.codes.ok)
720 res = response.json()
721 links = res['network'][0]['ietf-network-topology:link']
723 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
724 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
725 link_name = link["link-id"]
726 response = test_utils.delete_request(url+link_name)
727 self.assertEqual(response.status_code, requests.codes.ok)
729 def test_48_disconnect_XPDRA(self):
730 response = test_utils.unmount_device("XPDRA01")
731 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
733 def test_49_disconnect_XPDRC(self):
734 response = test_utils.unmount_device("XPDR-C1")
735 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
737 def test_50_disconnect_ROADMA(self):
738 response = test_utils.unmount_device("ROADM-A1")
739 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
741 def test_51_disconnect_ROADMC(self):
742 response = test_utils.unmount_device("ROADM-C1")
743 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
746 if __name__ == "__main__":
747 unittest.main(verbosity=2)