2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils # nopep8
25 class TransportPCEFulltesting(unittest.TestCase):
28 cr_serv_sample_data = {"input": {
29 "sdnc-request-header": {
30 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
31 "rpc-action": "service-create",
32 "request-system-id": "appname",
33 "notification-url": "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
45 "port-device-name": "1/0/C1",
48 "port-rack": "000000.00",
49 "port-shelf": "Chassis#1"
52 "lgx-device-name": "Some lgx-device-name",
53 "lgx-port-name": "Some lgx-port-name",
54 "lgx-port-rack": "000000.00",
55 "lgx-port-shelf": "00"
60 "port-device-name": "1/0/C1",
63 "port-rack": "000000.00",
64 "port-shelf": "Chassis#1"
67 "lgx-device-name": "Some lgx-device-name",
68 "lgx-port-name": "Some lgx-port-name",
69 "lgx-port-rack": "000000.00",
70 "lgx-port-shelf": "00"
76 "service-rate": "100",
78 "service-format": "Ethernet",
79 "clli": "SNJSCAMCJT4",
82 "port-device-name": "1/0/C1",
85 "port-rack": "000000.00",
86 "port-shelf": "Chassis#1"
89 "lgx-device-name": "Some lgx-device-name",
90 "lgx-port-name": "Some lgx-port-name",
91 "lgx-port-rack": "000000.00",
92 "lgx-port-shelf": "00"
97 "port-device-name": "1/0/C1",
100 "port-rack": "000000.00",
101 "port-shelf": "Chassis#1"
104 "lgx-device-name": "Some lgx-device-name",
105 "lgx-port-name": "Some lgx-port-name",
106 "lgx-port-rack": "000000.00",
107 "lgx-port-shelf": "00"
112 "due-date": "2016-11-28T00:00:01Z",
113 "operator-contact": "pw1234"
117 WAITING = 25 # nominal value is 300
118 NODE_VERSION_121 = '1.2.1'
119 NODE_VERSION_221 = '2.2.1'
120 NODE_VERSION_71 = '7.1'
124 cls.processes = test_utils.start_tpce()
125 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
126 ('roadma', cls.NODE_VERSION_221),
127 ('roadmc', cls.NODE_VERSION_221),
128 ('xpdrc', cls.NODE_VERSION_71)])
131 def tearDownClass(cls):
132 # pylint: disable=not-an-iterable
133 for process in cls.processes:
134 test_utils.shutdown_process(process)
135 print("all processes killed")
138 def setUp(self): # instruction executed before each test method
139 print("execution of {}".format(self.id().split(".")[-1]))
141 def test_01_connect_xpdrA(self):
142 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_02_connect_xpdrC(self):
146 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
147 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_03_connect_rdmA(self):
150 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
151 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_04_connect_rdmC(self):
154 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
155 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
158 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
159 "ROADM-A1", "1", "SRG1-PP1-TXRX")
160 self.assertEqual(response.status_code, requests.codes.ok)
161 res = response.json()
162 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
165 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
166 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
167 "ROADM-A1", "1", "SRG1-PP1-TXRX")
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
173 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
174 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
175 "ROADM-C1", "1", "SRG1-PP1-TXRX")
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
178 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
181 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
182 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
183 "ROADM-C1", "1", "SRG1-PP1-TXRX")
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
189 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
190 # Config ROADMA-ROADMC oms-attributes
192 "auto-spanloss": "true",
193 "spanloss-base": 11.4,
194 "spanloss-current": 12,
195 "engineered-spanloss": 12.2,
196 "link-concatenation": [{
199 "SRLG-length": 100000,
201 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
202 self.assertEqual(response.status_code, requests.codes.created)
204 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
205 # Config ROADMC-ROADMA oms-attributes
207 "auto-spanloss": "true",
208 "spanloss-base": 11.4,
209 "spanloss-current": 12,
210 "engineered-spanloss": 12.2,
211 "link-concatenation": [{
214 "SRLG-length": 100000,
216 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
217 self.assertEqual(response.status_code, requests.codes.created)
219 # test service-create for Eth service from xpdr to xpdr
220 def test_11_create_eth_service1(self):
221 self.cr_serv_sample_data["input"]["service-name"] = "service1"
222 response = test_utils.service_create_request(self.cr_serv_sample_data)
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
225 self.assertIn('PCE calculation in progress',
226 res['output']['configuration-response-common']['response-message'])
227 time.sleep(self.WAITING)
229 def test_12_get_eth_service1(self):
230 response = test_utils.get_service_list_request("services/service1")
231 self.assertEqual(response.status_code, requests.codes.ok)
232 res = response.json()
234 res['services'][0]['administrative-state'], 'inService')
236 res['services'][0]['service-name'], 'service1')
238 res['services'][0]['connection-type'], 'service')
240 res['services'][0]['lifecycle-state'], 'planned')
243 def test_13_change_status_line_port_xpdra(self):
244 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
247 "logical-connection-point": "XPDR1-NETWORK1",
249 "circuit-id": "XPDRA-NETWORK",
250 "administrative-state": "outOfService",
251 "port-qual": "xpdr-network"}]}
252 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
253 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
254 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
255 self.assertEqual(response.status_code, requests.codes.ok)
258 def test_14_check_update_portmapping(self):
259 response = test_utils.portmapping_request("XPDRA01")
260 self.assertEqual(response.status_code, requests.codes.ok)
261 res = response.json()
262 mapping_list = res['nodes'][0]['mapping']
263 for mapping in mapping_list:
264 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
265 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
266 "Operational State should be 'OutOfService'")
267 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
268 "Administrative State should be 'OutOfService'")
270 self.assertEqual(mapping['port-oper-state'], 'InService',
271 "Operational State should be 'InService'")
272 self.assertEqual(mapping['port-admin-state'], 'InService',
273 "Administrative State should be 'InService'")
276 def test_15_check_update_openroadm_topo(self):
277 url = test_utils.URL_CONFIG_ORDM_TOPO
278 response = test_utils.get_request(url)
279 self.assertEqual(response.status_code, requests.codes.ok)
280 res = response.json()
281 node_list = res['network'][0]['node']
283 for node in node_list:
284 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
285 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
286 tp_list = node['ietf-network-topology:termination-point']
288 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
289 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
290 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
293 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
294 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
295 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
297 link_list = res['network'][0]['ietf-network-topology:link']
298 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
299 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
301 for link in link_list:
302 if link['link-id'] in updated_links:
303 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
304 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
307 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
308 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
309 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
312 def test_16_check_update_service1(self):
313 response = test_utils.get_service_list_request("services/service1")
314 self.assertEqual(response.status_code, requests.codes.ok)
315 res = response.json()
316 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
317 self.assertEqual(res['services'][0]['administrative-state'], 'outOfService')
320 def test_17_restore_status_line_port_xpdra(self):
321 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
324 "logical-connection-point": "XPDR1-NETWORK1",
326 "circuit-id": "XPDRA-NETWORK",
327 "administrative-state": "inService",
328 "port-qual": "xpdr-network"}]}
329 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
330 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
331 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
332 self.assertEqual(response.status_code, requests.codes.ok)
335 def test_18_check_update_portmapping_ok(self):
336 response = test_utils.portmapping_request("XPDRA01")
337 self.assertEqual(response.status_code, requests.codes.ok)
338 res = response.json()
339 mapping_list = res['nodes'][0]['mapping']
340 for mapping in mapping_list:
341 self.assertEqual(mapping['port-oper-state'], 'InService',
342 "Operational State should be 'InService'")
343 self.assertEqual(mapping['port-admin-state'], 'InService',
344 "Administrative State should be 'InService'")
347 def test_19_check_update_openroadm_topo_ok(self):
348 url = test_utils.URL_CONFIG_ORDM_TOPO
349 response = test_utils.get_request(url)
350 self.assertEqual(response.status_code, requests.codes.ok)
351 res = response.json()
352 node_list = res['network'][0]['node']
353 for node in node_list:
354 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
355 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
356 tp_list = node['ietf-network-topology:termination-point']
358 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
359 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
361 link_list = res['network'][0]['ietf-network-topology:link']
362 for link in link_list:
363 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
364 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
367 def test_20_check_update_service1_ok(self):
368 self.test_12_get_eth_service1()
370 def test_21_change_status_port_roadma_srg(self):
371 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
374 "logical-connection-point": "SRG1-PP1",
375 "port-type": "client",
376 "circuit-id": "SRG1",
377 "administrative-state": "outOfService",
378 "port-qual": "roadm-external"}]}
379 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
380 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
381 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
382 self.assertEqual(response.status_code, requests.codes.ok)
385 def test_22_check_update_portmapping(self):
386 response = test_utils.portmapping_request("ROADM-A1")
387 self.assertEqual(response.status_code, requests.codes.ok)
388 res = response.json()
389 mapping_list = res['nodes'][0]['mapping']
390 for mapping in mapping_list:
391 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
392 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
393 "Operational State should be 'OutOfService'")
394 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
395 "Administrative State should be 'OutOfService'")
397 self.assertEqual(mapping['port-oper-state'], 'InService',
398 "Operational State should be 'InService'")
399 self.assertEqual(mapping['port-admin-state'], 'InService',
400 "Administrative State should be 'InService'")
403 def test_23_check_update_openroadm_topo(self):
404 url = test_utils.URL_CONFIG_ORDM_TOPO
405 response = test_utils.get_request(url)
406 self.assertEqual(response.status_code, requests.codes.ok)
407 res = response.json()
408 node_list = res['network'][0]['node']
410 for node in node_list:
411 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
412 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
413 tp_list = node['ietf-network-topology:termination-point']
415 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
416 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
417 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
420 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
421 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
422 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
424 link_list = res['network'][0]['ietf-network-topology:link']
425 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
426 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
428 for link in link_list:
429 if link['link-id'] in updated_links:
430 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
431 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
434 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
435 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
436 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
439 def test_24_restore_status_port_roadma_srg(self):
440 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
443 "logical-connection-point": "SRG1-PP1",
444 "port-type": "client",
445 "circuit-id": "SRG1",
446 "administrative-state": "inService",
447 "port-qual": "roadm-external"}]}
448 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
449 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
450 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
451 self.assertEqual(response.status_code, requests.codes.ok)
454 def test_25_check_update_portmapping_ok(self):
455 self.test_18_check_update_portmapping_ok()
457 def test_26_check_update_openroadm_topo_ok(self):
458 self.test_19_check_update_openroadm_topo_ok()
460 def test_27_check_update_service1_ok(self):
461 self.test_12_get_eth_service1()
463 def test_28_change_status_line_port_roadma_deg(self):
464 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
467 "logical-connection-point": "DEG2-TTP-TXRX",
470 "administrative-state": "outOfService",
471 "port-qual": "roadm-external"}]}
472 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
473 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
474 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
475 self.assertEqual(response.status_code, requests.codes.ok)
478 def test_29_check_update_portmapping(self):
479 response = test_utils.portmapping_request("ROADM-A1")
480 self.assertEqual(response.status_code, requests.codes.ok)
481 res = response.json()
482 mapping_list = res['nodes'][0]['mapping']
483 for mapping in mapping_list:
484 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
485 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
486 "Operational State should be 'OutOfService'")
487 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
488 "Administrative State should be 'OutOfService'")
490 self.assertEqual(mapping['port-oper-state'], 'InService',
491 "Operational State should be 'InService'")
492 self.assertEqual(mapping['port-admin-state'], 'InService',
493 "Administrative State should be 'InService'")
496 def test_30_check_update_openroadm_topo(self):
497 url = test_utils.URL_CONFIG_ORDM_TOPO
498 response = test_utils.get_request(url)
499 self.assertEqual(response.status_code, requests.codes.ok)
500 res = response.json()
501 node_list = res['network'][0]['node']
503 for node in node_list:
504 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
505 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
506 tp_list = node['ietf-network-topology:termination-point']
508 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
509 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
510 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
513 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
514 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
515 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
517 link_list = res['network'][0]['ietf-network-topology:link']
518 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
519 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
521 for link in link_list:
522 if link['link-id'] in updated_links:
523 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
524 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
527 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
528 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
529 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
532 def test_31_restore_status_line_port_roadma_srg(self):
533 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
536 "logical-connection-point": "DEG2-TTP-TXRX",
539 "administrative-state": "inService",
540 "port-qual": "roadm-external"}]}
541 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
542 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
543 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
544 self.assertEqual(response.status_code, requests.codes.ok)
547 def test_32_check_update_portmapping_ok(self):
548 self.test_18_check_update_portmapping_ok()
550 def test_33_check_update_openroadm_topo_ok(self):
551 self.test_19_check_update_openroadm_topo_ok()
553 def test_34_check_update_service1_ok(self):
554 self.test_12_get_eth_service1()
556 def test_35_change_status_line_port_xpdrc(self):
557 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
561 "administrative-state": "outOfService",
562 "port-qual": "xpdr-network"}]}
563 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
564 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
565 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
566 self.assertEqual(response.status_code, requests.codes.ok)
569 def test_36_check_update_portmapping(self):
570 response = test_utils.portmapping_request("XPDR-C1")
571 self.assertEqual(response.status_code, requests.codes.ok)
572 res = response.json()
573 mapping_list = res['nodes'][0]['mapping']
574 for mapping in mapping_list:
575 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
576 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
577 "Operational State should be 'OutOfService'")
578 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
579 "Administrative State should be 'OutOfService'")
581 self.assertEqual(mapping['port-oper-state'], 'InService',
582 "Operational State should be 'InService'")
583 self.assertEqual(mapping['port-admin-state'], 'InService',
584 "Administrative State should be 'InService'")
587 def test_37_check_update_openroadm_topo(self):
588 url = test_utils.URL_CONFIG_ORDM_TOPO
589 response = test_utils.get_request(url)
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 node_list = res['network'][0]['node']
594 for node in node_list:
595 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
596 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
597 tp_list = node['ietf-network-topology:termination-point']
599 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
600 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
601 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
604 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
605 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
606 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
608 link_list = res['network'][0]['ietf-network-topology:link']
609 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
610 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
612 for link in link_list:
613 if link['link-id'] in updated_links:
614 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
615 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
618 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
619 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
620 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
623 def test_38_restore_status_line_port_xpdrc(self):
624 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
628 "administrative-state": "inService",
629 "port-qual": "xpdr-network"}]}
630 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
631 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
632 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
633 self.assertEqual(response.status_code, requests.codes.ok)
636 def test_39_check_update_portmapping_ok(self):
637 self.test_18_check_update_portmapping_ok()
639 def test_40_check_update_openroadm_topo_ok(self):
640 self.test_19_check_update_openroadm_topo_ok()
642 def test_41_check_update_service1_ok(self):
643 self.test_12_get_eth_service1()
645 def test_42_change_status_port_roadma_srg(self):
646 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
649 "logical-connection-point": "SRG1-PP2",
650 "port-type": "client",
651 "circuit-id": "SRG1",
652 "administrative-state": "outOfService",
653 "port-qual": "roadm-external"}]}
654 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
655 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
656 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
657 self.assertEqual(response.status_code, requests.codes.ok)
660 def test_43_check_update_portmapping(self):
661 response = test_utils.portmapping_request("ROADM-A1")
662 self.assertEqual(response.status_code, requests.codes.ok)
663 res = response.json()
664 mapping_list = res['nodes'][0]['mapping']
665 for mapping in mapping_list:
666 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
667 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
668 "Operational State should be 'OutOfService'")
669 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
670 "Administrative State should be 'OutOfService'")
672 self.assertEqual(mapping['port-oper-state'], 'InService',
673 "Operational State should be 'InService'")
674 self.assertEqual(mapping['port-admin-state'], 'InService',
675 "Administrative State should be 'InService'")
678 def test_44_check_update_openroadm_topo(self):
679 url = test_utils.URL_CONFIG_ORDM_TOPO
680 response = test_utils.get_request(url)
681 self.assertEqual(response.status_code, requests.codes.ok)
682 res = response.json()
683 node_list = res['network'][0]['node']
685 for node in node_list:
686 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
687 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
688 tp_list = node['ietf-network-topology:termination-point']
690 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
691 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
692 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
695 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
696 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
697 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
699 link_list = res['network'][0]['ietf-network-topology:link']
701 for link in link_list:
702 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
703 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
704 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
707 def test_45_check_update_service1_ok(self):
708 self.test_12_get_eth_service1()
710 def test_46_delete_eth_service1(self):
711 response = test_utils.service_delete_request("service1")
712 self.assertEqual(response.status_code, requests.codes.ok)
713 res = response.json()
714 self.assertIn('Renderer service delete in progress',
715 res['output']['configuration-response-common']['response-message'])
716 time.sleep(self.WAITING)
718 def test_47_disconnect_xponders_from_roadm(self):
719 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
720 response = test_utils.get_ordm_topo_request("")
721 self.assertEqual(response.status_code, requests.codes.ok)
722 res = response.json()
723 links = res['network'][0]['ietf-network-topology:link']
725 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
726 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
727 link_name = link["link-id"]
728 response = test_utils.delete_request(url+link_name)
729 self.assertEqual(response.status_code, requests.codes.ok)
731 def test_48_disconnect_XPDRA(self):
732 response = test_utils.unmount_device("XPDRA01")
733 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
735 def test_49_disconnect_XPDRC(self):
736 response = test_utils.unmount_device("XPDR-C1")
737 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
739 def test_50_disconnect_ROADMA(self):
740 response = test_utils.unmount_device("ROADM-A1")
741 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
743 def test_51_disconnect_ROADMC(self):
744 response = test_utils.unmount_device("ROADM-C1")
745 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
748 if __name__ == "__main__":
749 unittest.main(verbosity=2)