2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 sys.path.append('transportpce_tests/common/')
22 class TransportPCEFulltesting(unittest.TestCase):
25 cr_serv_sample_data = {"input": {
26 "sdnc-request-header": {
27 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
28 "rpc-action": "service-create",
29 "request-system-id": "appname",
30 "notification-url": "http://localhost:8585/NotificationServer/notify"
32 "service-name": "service1",
33 "common-id": "ASATT1234567",
34 "connection-type": "service",
36 "service-rate": "100",
38 "service-format": "Ethernet",
39 "clli": "SNJSCAMCJP8",
42 "port-device-name": "1/0/C1",
45 "port-rack": "000000.00",
46 "port-shelf": "Chassis#1"
49 "lgx-device-name": "Some lgx-device-name",
50 "lgx-port-name": "Some lgx-port-name",
51 "lgx-port-rack": "000000.00",
52 "lgx-port-shelf": "00"
57 "port-device-name": "1/0/C1",
60 "port-rack": "000000.00",
61 "port-shelf": "Chassis#1"
64 "lgx-device-name": "Some lgx-device-name",
65 "lgx-port-name": "Some lgx-port-name",
66 "lgx-port-rack": "000000.00",
67 "lgx-port-shelf": "00"
73 "service-rate": "100",
75 "service-format": "Ethernet",
76 "clli": "SNJSCAMCJT4",
79 "port-device-name": "1/0/C1",
82 "port-rack": "000000.00",
83 "port-shelf": "Chassis#1"
86 "lgx-device-name": "Some lgx-device-name",
87 "lgx-port-name": "Some lgx-port-name",
88 "lgx-port-rack": "000000.00",
89 "lgx-port-shelf": "00"
94 "port-device-name": "1/0/C1",
97 "port-rack": "000000.00",
98 "port-shelf": "Chassis#1"
101 "lgx-device-name": "Some lgx-device-name",
102 "lgx-port-name": "Some lgx-port-name",
103 "lgx-port-rack": "000000.00",
104 "lgx-port-shelf": "00"
109 "due-date": "2016-11-28T00:00:01Z",
110 "operator-contact": "pw1234"
114 WAITING = 25 # nominal value is 300
115 NODE_VERSION_121 = '1.2.1'
116 NODE_VERSION_221 = '2.2.1'
117 NODE_VERSION_71 = '7.1'
121 cls.processes = test_utils.start_tpce()
122 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
123 ('roadma', cls.NODE_VERSION_221),
124 ('roadmc', cls.NODE_VERSION_221),
125 ('xpdrc', cls.NODE_VERSION_71)])
128 def tearDownClass(cls):
129 # pylint: disable=not-an-iterable
130 for process in cls.processes:
131 test_utils.shutdown_process(process)
132 print("all processes killed")
135 def setUp(self): # instruction executed before each test method
136 print("execution of {}".format(self.id().split(".")[-1]))
138 def test_01_connect_xpdrA(self):
139 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
140 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
142 def test_02_connect_xpdrC(self):
143 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
144 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146 def test_03_connect_rdmA(self):
147 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
148 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150 def test_04_connect_rdmC(self):
151 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
152 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
154 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
155 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
156 "ROADM-A1", "1", "SRG1-PP1-TXRX")
157 self.assertEqual(response.status_code, requests.codes.ok)
158 res = response.json()
159 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
162 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
163 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
164 "ROADM-A1", "1", "SRG1-PP1-TXRX")
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
170 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
171 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
172 "ROADM-C1", "1", "SRG1-PP1-TXRX")
173 self.assertEqual(response.status_code, requests.codes.ok)
174 res = response.json()
175 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
178 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
179 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
180 "ROADM-C1", "1", "SRG1-PP1-TXRX")
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
183 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
186 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
187 # Config ROADMA-ROADMC oms-attributes
189 "auto-spanloss": "true",
190 "spanloss-base": 11.4,
191 "spanloss-current": 12,
192 "engineered-spanloss": 12.2,
193 "link-concatenation": [{
196 "SRLG-length": 100000,
198 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
199 self.assertEqual(response.status_code, requests.codes.created)
201 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
202 # Config ROADMC-ROADMA oms-attributes
204 "auto-spanloss": "true",
205 "spanloss-base": 11.4,
206 "spanloss-current": 12,
207 "engineered-spanloss": 12.2,
208 "link-concatenation": [{
211 "SRLG-length": 100000,
213 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
214 self.assertEqual(response.status_code, requests.codes.created)
216 # test service-create for Eth service from xpdr to xpdr
217 def test_11_create_eth_service1(self):
218 self.cr_serv_sample_data["input"]["service-name"] = "service1"
219 response = test_utils.service_create_request(self.cr_serv_sample_data)
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
222 self.assertIn('PCE calculation in progress',
223 res['output']['configuration-response-common']['response-message'])
224 time.sleep(self.WAITING)
226 def test_12_get_eth_service1(self):
227 response = test_utils.get_service_list_request("services/service1")
228 self.assertEqual(response.status_code, requests.codes.ok)
229 res = response.json()
231 res['services'][0]['administrative-state'], 'inService')
233 res['services'][0]['service-name'], 'service1')
235 res['services'][0]['connection-type'], 'service')
237 res['services'][0]['lifecycle-state'], 'planned')
240 def test_13_change_status_line_port_xpdra(self):
241 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
244 "logical-connection-point": "XPDR1-NETWORK1",
246 "circuit-id": "XPDRA-NETWORK",
247 "administrative-state": "outOfService",
248 "port-qual": "xpdr-network"}]}
249 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
250 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
251 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
252 self.assertEqual(response.status_code, requests.codes.ok)
255 def test_14_check_update_portmapping(self):
256 response = test_utils.portmapping_request("XPDRA01")
257 self.assertEqual(response.status_code, requests.codes.ok)
258 res = response.json()
259 mapping_list = res['nodes'][0]['mapping']
260 for mapping in mapping_list:
261 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
262 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
263 "Operational State should be 'OutOfService'")
264 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
265 "Administrative State should be 'OutOfService'")
267 self.assertEqual(mapping['port-oper-state'], 'InService',
268 "Operational State should be 'InService'")
269 self.assertEqual(mapping['port-admin-state'], 'InService',
270 "Administrative State should be 'InService'")
273 def test_15_check_update_openroadm_topo(self):
274 url = test_utils.URL_CONFIG_ORDM_TOPO
275 response = test_utils.get_request(url)
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
278 node_list = res['network'][0]['node']
280 for node in node_list:
281 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
282 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
283 tp_list = node['ietf-network-topology:termination-point']
285 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
286 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
287 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
290 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
291 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
292 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
294 link_list = res['network'][0]['ietf-network-topology:link']
295 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
296 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
298 for link in link_list:
299 if link['link-id'] in updated_links:
300 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
301 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
304 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
305 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
306 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
309 def test_16_check_update_service1(self):
310 response = test_utils.get_service_list_request("services/service1")
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
313 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
314 self.assertEqual(res['services'][0]['administrative-state'], 'outOfService')
317 def test_17_restore_status_line_port_xpdra(self):
318 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
321 "logical-connection-point": "XPDR1-NETWORK1",
323 "circuit-id": "XPDRA-NETWORK",
324 "administrative-state": "inService",
325 "port-qual": "xpdr-network"}]}
326 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
327 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
328 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
329 self.assertEqual(response.status_code, requests.codes.ok)
332 def test_18_check_update_portmapping_ok(self):
333 response = test_utils.portmapping_request("XPDRA01")
334 self.assertEqual(response.status_code, requests.codes.ok)
335 res = response.json()
336 mapping_list = res['nodes'][0]['mapping']
337 for mapping in mapping_list:
338 self.assertEqual(mapping['port-oper-state'], 'InService',
339 "Operational State should be 'InService'")
340 self.assertEqual(mapping['port-admin-state'], 'InService',
341 "Administrative State should be 'InService'")
344 def test_19_check_update_openroadm_topo_ok(self):
345 url = test_utils.URL_CONFIG_ORDM_TOPO
346 response = test_utils.get_request(url)
347 self.assertEqual(response.status_code, requests.codes.ok)
348 res = response.json()
349 node_list = res['network'][0]['node']
350 for node in node_list:
351 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
352 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
353 tp_list = node['ietf-network-topology:termination-point']
355 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
356 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
358 link_list = res['network'][0]['ietf-network-topology:link']
359 for link in link_list:
360 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
361 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
364 def test_20_check_update_service1_ok(self):
365 self.test_12_get_eth_service1()
367 def test_21_change_status_port_roadma_srg(self):
368 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
371 "logical-connection-point": "SRG1-PP1",
372 "port-type": "client",
373 "circuit-id": "SRG1",
374 "administrative-state": "outOfService",
375 "port-qual": "roadm-external"}]}
376 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
377 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
378 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
379 self.assertEqual(response.status_code, requests.codes.ok)
382 def test_22_check_update_portmapping(self):
383 response = test_utils.portmapping_request("ROADM-A1")
384 self.assertEqual(response.status_code, requests.codes.ok)
385 res = response.json()
386 mapping_list = res['nodes'][0]['mapping']
387 for mapping in mapping_list:
388 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
389 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
390 "Operational State should be 'OutOfService'")
391 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
392 "Administrative State should be 'OutOfService'")
394 self.assertEqual(mapping['port-oper-state'], 'InService',
395 "Operational State should be 'InService'")
396 self.assertEqual(mapping['port-admin-state'], 'InService',
397 "Administrative State should be 'InService'")
400 def test_23_check_update_openroadm_topo(self):
401 url = test_utils.URL_CONFIG_ORDM_TOPO
402 response = test_utils.get_request(url)
403 self.assertEqual(response.status_code, requests.codes.ok)
404 res = response.json()
405 node_list = res['network'][0]['node']
407 for node in node_list:
408 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
409 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
410 tp_list = node['ietf-network-topology:termination-point']
412 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
413 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
414 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
417 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
418 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
419 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
421 link_list = res['network'][0]['ietf-network-topology:link']
422 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
423 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
425 for link in link_list:
426 if link['link-id'] in updated_links:
427 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
428 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
431 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
432 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
433 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
436 def test_24_restore_status_port_roadma_srg(self):
437 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
440 "logical-connection-point": "SRG1-PP1",
441 "port-type": "client",
442 "circuit-id": "SRG1",
443 "administrative-state": "inService",
444 "port-qual": "roadm-external"}]}
445 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
446 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
447 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
448 self.assertEqual(response.status_code, requests.codes.ok)
451 def test_25_check_update_portmapping_ok(self):
452 self.test_18_check_update_portmapping_ok()
454 def test_26_check_update_openroadm_topo_ok(self):
455 self.test_19_check_update_openroadm_topo_ok()
457 def test_27_check_update_service1_ok(self):
458 self.test_12_get_eth_service1()
460 def test_28_change_status_line_port_roadma_deg(self):
461 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
464 "logical-connection-point": "DEG2-TTP-TXRX",
467 "administrative-state": "outOfService",
468 "port-qual": "roadm-external"}]}
469 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
470 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
471 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
472 self.assertEqual(response.status_code, requests.codes.ok)
475 def test_29_check_update_portmapping(self):
476 response = test_utils.portmapping_request("ROADM-A1")
477 self.assertEqual(response.status_code, requests.codes.ok)
478 res = response.json()
479 mapping_list = res['nodes'][0]['mapping']
480 for mapping in mapping_list:
481 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
482 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
483 "Operational State should be 'OutOfService'")
484 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
485 "Administrative State should be 'OutOfService'")
487 self.assertEqual(mapping['port-oper-state'], 'InService',
488 "Operational State should be 'InService'")
489 self.assertEqual(mapping['port-admin-state'], 'InService',
490 "Administrative State should be 'InService'")
493 def test_30_check_update_openroadm_topo(self):
494 url = test_utils.URL_CONFIG_ORDM_TOPO
495 response = test_utils.get_request(url)
496 self.assertEqual(response.status_code, requests.codes.ok)
497 res = response.json()
498 node_list = res['network'][0]['node']
500 for node in node_list:
501 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
502 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
503 tp_list = node['ietf-network-topology:termination-point']
505 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
506 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
507 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
510 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
511 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
512 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
514 link_list = res['network'][0]['ietf-network-topology:link']
515 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
516 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
518 for link in link_list:
519 if link['link-id'] in updated_links:
520 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
521 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
524 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
525 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
526 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
529 def test_31_restore_status_line_port_roadma_srg(self):
530 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
533 "logical-connection-point": "DEG2-TTP-TXRX",
536 "administrative-state": "inService",
537 "port-qual": "roadm-external"}]}
538 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
539 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
540 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
541 self.assertEqual(response.status_code, requests.codes.ok)
544 def test_32_check_update_portmapping_ok(self):
545 self.test_18_check_update_portmapping_ok()
547 def test_33_check_update_openroadm_topo_ok(self):
548 self.test_19_check_update_openroadm_topo_ok()
550 def test_34_check_update_service1_ok(self):
551 self.test_12_get_eth_service1()
553 def test_35_change_status_line_port_xpdrc(self):
554 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
558 "administrative-state": "outOfService",
559 "port-qual": "xpdr-network"}]}
560 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
561 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
562 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
563 self.assertEqual(response.status_code, requests.codes.ok)
566 def test_36_check_update_portmapping(self):
567 response = test_utils.portmapping_request("XPDR-C1")
568 self.assertEqual(response.status_code, requests.codes.ok)
569 res = response.json()
570 mapping_list = res['nodes'][0]['mapping']
571 for mapping in mapping_list:
572 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
573 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
574 "Operational State should be 'OutOfService'")
575 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
576 "Administrative State should be 'OutOfService'")
578 self.assertEqual(mapping['port-oper-state'], 'InService',
579 "Operational State should be 'InService'")
580 self.assertEqual(mapping['port-admin-state'], 'InService',
581 "Administrative State should be 'InService'")
584 def test_37_check_update_openroadm_topo(self):
585 url = test_utils.URL_CONFIG_ORDM_TOPO
586 response = test_utils.get_request(url)
587 self.assertEqual(response.status_code, requests.codes.ok)
588 res = response.json()
589 node_list = res['network'][0]['node']
591 for node in node_list:
592 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
593 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
594 tp_list = node['ietf-network-topology:termination-point']
596 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
597 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
598 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
601 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
602 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
603 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
605 link_list = res['network'][0]['ietf-network-topology:link']
606 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
607 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
609 for link in link_list:
610 if link['link-id'] in updated_links:
611 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
612 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
615 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
616 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
617 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
620 def test_38_restore_status_line_port_xpdrc(self):
621 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
625 "administrative-state": "inService",
626 "port-qual": "xpdr-network"}]}
627 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
628 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
629 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
630 self.assertEqual(response.status_code, requests.codes.ok)
633 def test_39_check_update_portmapping_ok(self):
634 self.test_18_check_update_portmapping_ok()
636 def test_40_check_update_openroadm_topo_ok(self):
637 self.test_19_check_update_openroadm_topo_ok()
639 def test_41_check_update_service1_ok(self):
640 self.test_12_get_eth_service1()
642 def test_42_change_status_port_roadma_srg(self):
643 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
646 "logical-connection-point": "SRG1-PP2",
647 "port-type": "client",
648 "circuit-id": "SRG1",
649 "administrative-state": "outOfService",
650 "port-qual": "roadm-external"}]}
651 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
652 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
653 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
654 self.assertEqual(response.status_code, requests.codes.ok)
657 def test_43_check_update_portmapping(self):
658 response = test_utils.portmapping_request("ROADM-A1")
659 self.assertEqual(response.status_code, requests.codes.ok)
660 res = response.json()
661 mapping_list = res['nodes'][0]['mapping']
662 for mapping in mapping_list:
663 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
664 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
665 "Operational State should be 'OutOfService'")
666 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
667 "Administrative State should be 'OutOfService'")
669 self.assertEqual(mapping['port-oper-state'], 'InService',
670 "Operational State should be 'InService'")
671 self.assertEqual(mapping['port-admin-state'], 'InService',
672 "Administrative State should be 'InService'")
675 def test_44_check_update_openroadm_topo(self):
676 url = test_utils.URL_CONFIG_ORDM_TOPO
677 response = test_utils.get_request(url)
678 self.assertEqual(response.status_code, requests.codes.ok)
679 res = response.json()
680 node_list = res['network'][0]['node']
682 for node in node_list:
683 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
684 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
685 tp_list = node['ietf-network-topology:termination-point']
687 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
688 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
689 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
692 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
693 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
694 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
696 link_list = res['network'][0]['ietf-network-topology:link']
698 for link in link_list:
699 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
700 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
701 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
704 def test_45_check_update_service1_ok(self):
705 self.test_12_get_eth_service1()
707 def test_46_delete_eth_service1(self):
708 response = test_utils.service_delete_request("service1")
709 self.assertEqual(response.status_code, requests.codes.ok)
710 res = response.json()
711 self.assertIn('Renderer service delete in progress',
712 res['output']['configuration-response-common']['response-message'])
713 time.sleep(self.WAITING)
715 def test_47_disconnect_xponders_from_roadm(self):
716 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
717 response = test_utils.get_ordm_topo_request("")
718 self.assertEqual(response.status_code, requests.codes.ok)
719 res = response.json()
720 links = res['network'][0]['ietf-network-topology:link']
722 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
723 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
724 link_name = link["link-id"]
725 response = test_utils.delete_request(url+link_name)
726 self.assertEqual(response.status_code, requests.codes.ok)
728 def test_48_disconnect_XPDRA(self):
729 response = test_utils.unmount_device("XPDRA01")
730 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
732 def test_49_disconnect_XPDRC(self):
733 response = test_utils.unmount_device("XPDR-C1")
734 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
736 def test_50_disconnect_ROADMA(self):
737 response = test_utils.unmount_device("ROADM-A1")
738 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
740 def test_51_disconnect_ROADMC(self):
741 response = test_utils.unmount_device("ROADM-C1")
742 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
745 if __name__ == "__main__":
746 unittest.main(verbosity=2)