2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils # nopep8
25 class TransportPCEFulltesting(unittest.TestCase):
28 cr_serv_sample_data = {"input": {
29 "sdnc-request-header": {
30 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
31 "rpc-action": "service-create",
32 "request-system-id": "appname",
33 "notification-url": "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
46 "port-rack": "000000.00",
47 "port-shelf": "Chassis#1"
50 "lgx-device-name": "Some lgx-device-name",
51 "lgx-port-name": "Some lgx-port-name",
52 "lgx-port-rack": "000000.00",
53 "lgx-port-shelf": "00"
59 "port-rack": "000000.00",
60 "port-shelf": "Chassis#1"
63 "lgx-device-name": "Some lgx-device-name",
64 "lgx-port-name": "Some lgx-port-name",
65 "lgx-port-rack": "000000.00",
66 "lgx-port-shelf": "00"
72 "service-rate": "100",
74 "service-format": "Ethernet",
75 "clli": "SNJSCAMCJT4",
79 "port-rack": "000000.00",
80 "port-shelf": "Chassis#1"
83 "lgx-device-name": "Some lgx-device-name",
84 "lgx-port-name": "Some lgx-port-name",
85 "lgx-port-rack": "000000.00",
86 "lgx-port-shelf": "00"
92 "port-rack": "000000.00",
93 "port-shelf": "Chassis#1"
96 "lgx-device-name": "Some lgx-device-name",
97 "lgx-port-name": "Some lgx-port-name",
98 "lgx-port-rack": "000000.00",
99 "lgx-port-shelf": "00"
104 "due-date": "2016-11-28T00:00:01Z",
105 "operator-contact": "pw1234"
109 WAITING = 25 # nominal value is 300
110 NODE_VERSION_121 = '1.2.1'
111 NODE_VERSION_221 = '2.2.1'
112 NODE_VERSION_71 = '7.1'
116 cls.processes = test_utils.start_tpce()
117 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
118 ('roadma', cls.NODE_VERSION_221),
119 ('roadmc', cls.NODE_VERSION_221),
120 ('xpdrc', cls.NODE_VERSION_71)])
123 def tearDownClass(cls):
124 # pylint: disable=not-an-iterable
125 for process in cls.processes:
126 test_utils.shutdown_process(process)
127 print("all processes killed")
130 def setUp(self): # instruction executed before each test method
131 # pylint: disable=consider-using-f-string
132 print("execution of {}".format(self.id().split(".")[-1]))
134 def test_01_connect_xpdrA(self):
135 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
136 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
138 def test_02_connect_xpdrC(self):
139 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
140 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
142 def test_03_connect_rdmA(self):
143 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
144 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146 def test_04_connect_rdmC(self):
147 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
148 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
151 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
152 "ROADM-A1", "1", "SRG1-PP1-TXRX")
153 self.assertEqual(response.status_code, requests.codes.ok)
154 res = response.json()
155 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
158 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
159 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
160 "ROADM-A1", "1", "SRG1-PP1-TXRX")
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
166 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
167 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
168 "ROADM-C1", "1", "SRG1-PP1-TXRX")
169 self.assertEqual(response.status_code, requests.codes.ok)
170 res = response.json()
171 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
174 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
175 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
176 "ROADM-C1", "1", "SRG1-PP1-TXRX")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
182 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
183 # Config ROADMA-ROADMC oms-attributes
185 "auto-spanloss": "true",
186 "spanloss-base": 11.4,
187 "spanloss-current": 12,
188 "engineered-spanloss": 12.2,
189 "link-concatenation": [{
192 "SRLG-length": 100000,
194 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
195 self.assertEqual(response.status_code, requests.codes.created)
197 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
198 # Config ROADMC-ROADMA oms-attributes
200 "auto-spanloss": "true",
201 "spanloss-base": 11.4,
202 "spanloss-current": 12,
203 "engineered-spanloss": 12.2,
204 "link-concatenation": [{
207 "SRLG-length": 100000,
209 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
210 self.assertEqual(response.status_code, requests.codes.created)
212 # test service-create for Eth service from xpdr to xpdr
213 def test_11_create_eth_service1(self):
214 self.cr_serv_sample_data["input"]["service-name"] = "service1"
215 response = test_utils.service_create_request(self.cr_serv_sample_data)
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 self.assertIn('PCE calculation in progress',
219 res['output']['configuration-response-common']['response-message'])
220 time.sleep(self.WAITING)
222 def test_12_get_eth_service1(self):
223 response = test_utils.get_service_list_request("services/service1")
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
227 res['services'][0]['administrative-state'], 'inService')
229 res['services'][0]['service-name'], 'service1')
231 res['services'][0]['connection-type'], 'service')
233 res['services'][0]['lifecycle-state'], 'planned')
236 def test_13_change_status_line_port_xpdra(self):
237 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
240 "logical-connection-point": "XPDR1-NETWORK1",
242 "circuit-id": "XPDRA-NETWORK",
243 "administrative-state": "outOfService",
244 "port-qual": "xpdr-network"}]}
245 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
246 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
247 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
248 self.assertEqual(response.status_code, requests.codes.ok)
251 def test_14_check_update_portmapping(self):
252 response = test_utils.portmapping_request("XPDRA01")
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
255 mapping_list = res['nodes'][0]['mapping']
256 for mapping in mapping_list:
257 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
258 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
259 "Operational State should be 'OutOfService'")
260 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
261 "Administrative State should be 'OutOfService'")
263 self.assertEqual(mapping['port-oper-state'], 'InService',
264 "Operational State should be 'InService'")
265 self.assertEqual(mapping['port-admin-state'], 'InService',
266 "Administrative State should be 'InService'")
269 def test_15_check_update_openroadm_topo(self):
270 url = test_utils.URL_CONFIG_ORDM_TOPO
271 response = test_utils.get_request(url)
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 node_list = res['network'][0]['node']
276 for node in node_list:
277 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
278 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
279 tp_list = node['ietf-network-topology:termination-point']
281 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
282 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
283 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
286 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
287 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
288 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
290 link_list = res['network'][0]['ietf-network-topology:link']
291 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
292 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
294 for link in link_list:
295 if link['link-id'] in updated_links:
296 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
297 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
300 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
301 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
302 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
305 def test_16_check_update_service1(self):
306 response = test_utils.get_service_list_request("services/service1")
307 self.assertEqual(response.status_code, requests.codes.ok)
308 res = response.json()
309 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
310 self.assertEqual(res['services'][0]['administrative-state'], 'outOfService')
313 def test_17_restore_status_line_port_xpdra(self):
314 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
317 "logical-connection-point": "XPDR1-NETWORK1",
319 "circuit-id": "XPDRA-NETWORK",
320 "administrative-state": "inService",
321 "port-qual": "xpdr-network"}]}
322 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
323 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
324 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
325 self.assertEqual(response.status_code, requests.codes.ok)
328 def test_18_check_update_portmapping_ok(self):
329 response = test_utils.portmapping_request("XPDRA01")
330 self.assertEqual(response.status_code, requests.codes.ok)
331 res = response.json()
332 mapping_list = res['nodes'][0]['mapping']
333 for mapping in mapping_list:
334 self.assertEqual(mapping['port-oper-state'], 'InService',
335 "Operational State should be 'InService'")
336 self.assertEqual(mapping['port-admin-state'], 'InService',
337 "Administrative State should be 'InService'")
340 def test_19_check_update_openroadm_topo_ok(self):
341 url = test_utils.URL_CONFIG_ORDM_TOPO
342 response = test_utils.get_request(url)
343 self.assertEqual(response.status_code, requests.codes.ok)
344 res = response.json()
345 node_list = res['network'][0]['node']
346 for node in node_list:
347 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
348 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
349 tp_list = node['ietf-network-topology:termination-point']
351 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
352 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
354 link_list = res['network'][0]['ietf-network-topology:link']
355 for link in link_list:
356 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
357 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
360 def test_20_check_update_service1_ok(self):
361 self.test_12_get_eth_service1()
363 def test_21_change_status_port_roadma_srg(self):
364 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
367 "logical-connection-point": "SRG1-PP1",
368 "port-type": "client",
369 "circuit-id": "SRG1",
370 "administrative-state": "outOfService",
371 "port-qual": "roadm-external"}]}
372 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
373 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
374 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
375 self.assertEqual(response.status_code, requests.codes.ok)
378 def test_22_check_update_portmapping(self):
379 response = test_utils.portmapping_request("ROADM-A1")
380 self.assertEqual(response.status_code, requests.codes.ok)
381 res = response.json()
382 mapping_list = res['nodes'][0]['mapping']
383 for mapping in mapping_list:
384 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
385 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
386 "Operational State should be 'OutOfService'")
387 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
388 "Administrative State should be 'OutOfService'")
390 self.assertEqual(mapping['port-oper-state'], 'InService',
391 "Operational State should be 'InService'")
392 self.assertEqual(mapping['port-admin-state'], 'InService',
393 "Administrative State should be 'InService'")
396 def test_23_check_update_openroadm_topo(self):
397 url = test_utils.URL_CONFIG_ORDM_TOPO
398 response = test_utils.get_request(url)
399 self.assertEqual(response.status_code, requests.codes.ok)
400 res = response.json()
401 node_list = res['network'][0]['node']
403 for node in node_list:
404 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
405 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
406 tp_list = node['ietf-network-topology:termination-point']
408 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
409 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
410 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
413 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
414 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
415 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
417 link_list = res['network'][0]['ietf-network-topology:link']
418 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
419 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
421 for link in link_list:
422 if link['link-id'] in updated_links:
423 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
424 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
427 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
428 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
429 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
432 def test_24_restore_status_port_roadma_srg(self):
433 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
436 "logical-connection-point": "SRG1-PP1",
437 "port-type": "client",
438 "circuit-id": "SRG1",
439 "administrative-state": "inService",
440 "port-qual": "roadm-external"}]}
441 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
442 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
443 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
444 self.assertEqual(response.status_code, requests.codes.ok)
447 def test_25_check_update_portmapping_ok(self):
448 self.test_18_check_update_portmapping_ok()
450 def test_26_check_update_openroadm_topo_ok(self):
451 self.test_19_check_update_openroadm_topo_ok()
453 def test_27_check_update_service1_ok(self):
454 self.test_12_get_eth_service1()
456 def test_28_change_status_line_port_roadma_deg(self):
457 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
460 "logical-connection-point": "DEG2-TTP-TXRX",
463 "administrative-state": "outOfService",
464 "port-qual": "roadm-external"}]}
465 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
466 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
467 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
468 self.assertEqual(response.status_code, requests.codes.ok)
471 def test_29_check_update_portmapping(self):
472 response = test_utils.portmapping_request("ROADM-A1")
473 self.assertEqual(response.status_code, requests.codes.ok)
474 res = response.json()
475 mapping_list = res['nodes'][0]['mapping']
476 for mapping in mapping_list:
477 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
478 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
479 "Operational State should be 'OutOfService'")
480 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
481 "Administrative State should be 'OutOfService'")
483 self.assertEqual(mapping['port-oper-state'], 'InService',
484 "Operational State should be 'InService'")
485 self.assertEqual(mapping['port-admin-state'], 'InService',
486 "Administrative State should be 'InService'")
489 def test_30_check_update_openroadm_topo(self):
490 url = test_utils.URL_CONFIG_ORDM_TOPO
491 response = test_utils.get_request(url)
492 self.assertEqual(response.status_code, requests.codes.ok)
493 res = response.json()
494 node_list = res['network'][0]['node']
496 for node in node_list:
497 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
498 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
499 tp_list = node['ietf-network-topology:termination-point']
501 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
502 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
503 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
506 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
507 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
508 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
510 link_list = res['network'][0]['ietf-network-topology:link']
511 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
512 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
514 for link in link_list:
515 if link['link-id'] in updated_links:
516 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
517 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
520 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
521 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
522 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
525 def test_31_restore_status_line_port_roadma_srg(self):
526 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
529 "logical-connection-point": "DEG2-TTP-TXRX",
532 "administrative-state": "inService",
533 "port-qual": "roadm-external"}]}
534 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
535 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
536 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
537 self.assertEqual(response.status_code, requests.codes.ok)
540 def test_32_check_update_portmapping_ok(self):
541 self.test_18_check_update_portmapping_ok()
543 def test_33_check_update_openroadm_topo_ok(self):
544 self.test_19_check_update_openroadm_topo_ok()
546 def test_34_check_update_service1_ok(self):
547 self.test_12_get_eth_service1()
549 def test_35_change_status_line_port_xpdrc(self):
550 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
554 "administrative-state": "outOfService",
555 "port-qual": "xpdr-network"}]}
556 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
557 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
558 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
559 self.assertEqual(response.status_code, requests.codes.ok)
562 def test_36_check_update_portmapping(self):
563 response = test_utils.portmapping_request("XPDR-C1")
564 self.assertEqual(response.status_code, requests.codes.ok)
565 res = response.json()
566 mapping_list = res['nodes'][0]['mapping']
567 for mapping in mapping_list:
568 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
569 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
570 "Operational State should be 'OutOfService'")
571 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
572 "Administrative State should be 'OutOfService'")
574 self.assertEqual(mapping['port-oper-state'], 'InService',
575 "Operational State should be 'InService'")
576 self.assertEqual(mapping['port-admin-state'], 'InService',
577 "Administrative State should be 'InService'")
580 def test_37_check_update_openroadm_topo(self):
581 url = test_utils.URL_CONFIG_ORDM_TOPO
582 response = test_utils.get_request(url)
583 self.assertEqual(response.status_code, requests.codes.ok)
584 res = response.json()
585 node_list = res['network'][0]['node']
587 for node in node_list:
588 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
589 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
590 tp_list = node['ietf-network-topology:termination-point']
592 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
593 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
594 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
597 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
598 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
599 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
601 link_list = res['network'][0]['ietf-network-topology:link']
602 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
603 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
605 for link in link_list:
606 if link['link-id'] in updated_links:
607 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
608 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
611 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
612 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
613 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
616 def test_38_restore_status_line_port_xpdrc(self):
617 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
621 "administrative-state": "inService",
622 "port-qual": "xpdr-network"}]}
623 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
624 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
625 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
626 self.assertEqual(response.status_code, requests.codes.ok)
629 def test_39_check_update_portmapping_ok(self):
630 self.test_18_check_update_portmapping_ok()
632 def test_40_check_update_openroadm_topo_ok(self):
633 self.test_19_check_update_openroadm_topo_ok()
635 def test_41_check_update_service1_ok(self):
636 self.test_12_get_eth_service1()
638 def test_42_change_status_port_roadma_srg(self):
639 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
642 "logical-connection-point": "SRG1-PP2",
643 "port-type": "client",
644 "circuit-id": "SRG1",
645 "administrative-state": "outOfService",
646 "port-qual": "roadm-external"}]}
647 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
648 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
649 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
650 self.assertEqual(response.status_code, requests.codes.ok)
653 def test_43_check_update_portmapping(self):
654 response = test_utils.portmapping_request("ROADM-A1")
655 self.assertEqual(response.status_code, requests.codes.ok)
656 res = response.json()
657 mapping_list = res['nodes'][0]['mapping']
658 for mapping in mapping_list:
659 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
660 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
661 "Operational State should be 'OutOfService'")
662 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
663 "Administrative State should be 'OutOfService'")
665 self.assertEqual(mapping['port-oper-state'], 'InService',
666 "Operational State should be 'InService'")
667 self.assertEqual(mapping['port-admin-state'], 'InService',
668 "Administrative State should be 'InService'")
671 def test_44_check_update_openroadm_topo(self):
672 url = test_utils.URL_CONFIG_ORDM_TOPO
673 response = test_utils.get_request(url)
674 self.assertEqual(response.status_code, requests.codes.ok)
675 res = response.json()
676 node_list = res['network'][0]['node']
678 for node in node_list:
679 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
680 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
681 tp_list = node['ietf-network-topology:termination-point']
683 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
684 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
685 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
688 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
689 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
690 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
692 link_list = res['network'][0]['ietf-network-topology:link']
694 for link in link_list:
695 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
696 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
697 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
700 def test_45_check_update_service1_ok(self):
701 self.test_12_get_eth_service1()
703 def test_46_delete_eth_service1(self):
704 response = test_utils.service_delete_request("service1")
705 self.assertEqual(response.status_code, requests.codes.ok)
706 res = response.json()
707 self.assertIn('Renderer service delete in progress',
708 res['output']['configuration-response-common']['response-message'])
709 time.sleep(self.WAITING)
711 def test_47_disconnect_xponders_from_roadm(self):
712 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
713 response = test_utils.get_ordm_topo_request("")
714 self.assertEqual(response.status_code, requests.codes.ok)
715 res = response.json()
716 links = res['network'][0]['ietf-network-topology:link']
718 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
719 link_name = link["link-id"]
720 response = test_utils.delete_request(url+link_name)
721 self.assertEqual(response.status_code, requests.codes.ok)
723 def test_48_disconnect_XPDRA(self):
724 response = test_utils.unmount_device("XPDRA01")
725 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
727 def test_49_disconnect_XPDRC(self):
728 response = test_utils.unmount_device("XPDR-C1")
729 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
731 def test_50_disconnect_ROADMA(self):
732 response = test_utils.unmount_device("ROADM-A1")
733 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
735 def test_51_disconnect_ROADMC(self):
736 response = test_utils.unmount_device("ROADM-C1")
737 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
740 if __name__ == "__main__":
741 unittest.main(verbosity=2)