2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 sys.path.append('transportpce_tests/common/')
21 class TransportPCEFulltesting(unittest.TestCase):
24 cr_serv_sample_data = {"input": {
25 "sdnc-request-header": {
26 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
27 "rpc-action": "service-create",
28 "request-system-id": "appname",
29 "notification-url": "http://localhost:8585/NotificationServer/notify"
31 "service-name": "service1",
32 "common-id": "ASATT1234567",
33 "connection-type": "service",
35 "service-rate": "100",
37 "service-format": "Ethernet",
38 "clli": "SNJSCAMCJP8",
41 "port-device-name": "1/0/C1",
44 "port-rack": "000000.00",
45 "port-shelf": "Chassis#1"
48 "lgx-device-name": "Some lgx-device-name",
49 "lgx-port-name": "Some lgx-port-name",
50 "lgx-port-rack": "000000.00",
51 "lgx-port-shelf": "00"
56 "port-device-name": "1/0/C1",
59 "port-rack": "000000.00",
60 "port-shelf": "Chassis#1"
63 "lgx-device-name": "Some lgx-device-name",
64 "lgx-port-name": "Some lgx-port-name",
65 "lgx-port-rack": "000000.00",
66 "lgx-port-shelf": "00"
72 "service-rate": "100",
74 "service-format": "Ethernet",
75 "clli": "SNJSCAMCJT4",
78 "port-device-name": "1/0/C1",
81 "port-rack": "000000.00",
82 "port-shelf": "Chassis#1"
85 "lgx-device-name": "Some lgx-device-name",
86 "lgx-port-name": "Some lgx-port-name",
87 "lgx-port-rack": "000000.00",
88 "lgx-port-shelf": "00"
93 "port-device-name": "1/0/C1",
96 "port-rack": "000000.00",
97 "port-shelf": "Chassis#1"
100 "lgx-device-name": "Some lgx-device-name",
101 "lgx-port-name": "Some lgx-port-name",
102 "lgx-port-rack": "000000.00",
103 "lgx-port-shelf": "00"
108 "due-date": "2016-11-28T00:00:01Z",
109 "operator-contact": "pw1234"
113 WAITING = 25 # nominal value is 300
114 NODE_VERSION_121 = '1.2.1'
115 NODE_VERSION_221 = '2.2.1'
116 NODE_VERSION_71 = '7.1'
120 cls.processes = test_utils.start_tpce()
121 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
122 ('roadma', cls.NODE_VERSION_221),
123 ('roadmc', cls.NODE_VERSION_221),
124 ('xpdrc', cls.NODE_VERSION_71)])
127 def tearDownClass(cls):
128 # pylint: disable=not-an-iterable
129 for process in cls.processes:
130 test_utils.shutdown_process(process)
131 print("all processes killed")
134 def setUp(self): # instruction executed before each test method
135 print("execution of {}".format(self.id().split(".")[-1]))
137 def test_01_connect_xpdrA(self):
138 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
139 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
141 def test_02_connect_xpdrC(self):
142 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_03_connect_rdmA(self):
146 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
147 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_04_connect_rdmC(self):
150 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
151 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
154 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
155 "ROADM-A1", "1", "SRG1-PP1-TXRX")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
161 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
162 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
163 "ROADM-A1", "1", "SRG1-PP1-TXRX")
164 self.assertEqual(response.status_code, requests.codes.ok)
165 res = response.json()
166 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
169 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
170 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
171 "ROADM-C1", "1", "SRG1-PP1-TXRX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
177 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
178 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
179 "ROADM-C1", "1", "SRG1-PP1-TXRX")
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
185 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
186 # Config ROADMA-ROADMC oms-attributes
188 "auto-spanloss": "true",
189 "spanloss-base": 11.4,
190 "spanloss-current": 12,
191 "engineered-spanloss": 12.2,
192 "link-concatenation": [{
195 "SRLG-length": 100000,
197 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
198 self.assertEqual(response.status_code, requests.codes.created)
200 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
201 # Config ROADMC-ROADMA oms-attributes
203 "auto-spanloss": "true",
204 "spanloss-base": 11.4,
205 "spanloss-current": 12,
206 "engineered-spanloss": 12.2,
207 "link-concatenation": [{
210 "SRLG-length": 100000,
212 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
213 self.assertEqual(response.status_code, requests.codes.created)
215 # test service-create for Eth service from xpdr to xpdr
216 def test_11_create_eth_service1(self):
217 self.cr_serv_sample_data["input"]["service-name"] = "service1"
218 response = test_utils.service_create_request(self.cr_serv_sample_data)
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 self.assertIn('PCE calculation in progress',
222 res['output']['configuration-response-common']['response-message'])
223 time.sleep(self.WAITING)
225 def test_12_get_eth_service1(self):
226 response = test_utils.get_service_list_request("services/service1")
227 self.assertEqual(response.status_code, requests.codes.ok)
228 res = response.json()
230 res['services'][0]['administrative-state'], 'inService')
232 res['services'][0]['service-name'], 'service1')
234 res['services'][0]['connection-type'], 'service')
236 res['services'][0]['lifecycle-state'], 'planned')
239 def test_13_change_status_line_port_xpdra(self):
240 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
243 "logical-connection-point": "XPDR1-NETWORK1",
245 "circuit-id": "XPDRA-NETWORK",
246 "administrative-state": "outOfService",
247 "port-qual": "xpdr-network"}]}
248 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
249 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
250 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
251 self.assertEqual(response.status_code, requests.codes.ok)
254 def test_14_check_update_portmapping(self):
255 response = test_utils.portmapping_request("XPDRA01")
256 self.assertEqual(response.status_code, requests.codes.ok)
257 res = response.json()
258 mapping_list = res['nodes'][0]['mapping']
259 for mapping in mapping_list:
260 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
261 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
262 "Operational State should be 'OutOfService'")
263 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
264 "Administrative State should be 'OutOfService'")
266 self.assertEqual(mapping['port-oper-state'], 'InService',
267 "Operational State should be 'InService'")
268 self.assertEqual(mapping['port-admin-state'], 'InService',
269 "Administrative State should be 'InService'")
272 def test_15_check_update_openroadm_topo(self):
273 url = test_utils.URL_CONFIG_ORDM_TOPO
274 response = test_utils.get_request(url)
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
277 node_list = res['network'][0]['node']
279 for node in node_list:
280 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
281 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
282 tp_list = node['ietf-network-topology:termination-point']
284 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
285 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
286 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
289 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
290 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
291 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
293 link_list = res['network'][0]['ietf-network-topology:link']
294 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
295 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
297 for link in link_list:
298 if link['link-id'] in updated_links:
299 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
300 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
303 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
304 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
305 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
308 def test_16_check_update_service1(self):
309 response = test_utils.get_service_list_request("services/service1")
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
313 self.assertEqual(res['services'][0]['administrative-state'], 'outOfService')
316 def test_17_restore_status_line_port_xpdra(self):
317 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
320 "logical-connection-point": "XPDR1-NETWORK1",
322 "circuit-id": "XPDRA-NETWORK",
323 "administrative-state": "inService",
324 "port-qual": "xpdr-network"}]}
325 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
326 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
327 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
328 self.assertEqual(response.status_code, requests.codes.ok)
331 def test_18_check_update_portmapping_ok(self):
332 response = test_utils.portmapping_request("XPDRA01")
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 mapping_list = res['nodes'][0]['mapping']
336 for mapping in mapping_list:
337 self.assertEqual(mapping['port-oper-state'], 'InService',
338 "Operational State should be 'InService'")
339 self.assertEqual(mapping['port-admin-state'], 'InService',
340 "Administrative State should be 'InService'")
343 def test_19_check_update_openroadm_topo_ok(self):
344 url = test_utils.URL_CONFIG_ORDM_TOPO
345 response = test_utils.get_request(url)
346 self.assertEqual(response.status_code, requests.codes.ok)
347 res = response.json()
348 node_list = res['network'][0]['node']
349 for node in node_list:
350 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
351 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
352 tp_list = node['ietf-network-topology:termination-point']
354 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
355 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
357 link_list = res['network'][0]['ietf-network-topology:link']
358 for link in link_list:
359 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
360 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
363 def test_20_check_update_service1_ok(self):
364 self.test_12_get_eth_service1()
366 def test_21_change_status_port_roadma_srg(self):
367 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
370 "logical-connection-point": "SRG1-PP1",
371 "port-type": "client",
372 "circuit-id": "SRG1",
373 "administrative-state": "outOfService",
374 "port-qual": "roadm-external"}]}
375 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
376 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
377 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
378 self.assertEqual(response.status_code, requests.codes.ok)
381 def test_22_check_update_portmapping(self):
382 response = test_utils.portmapping_request("ROADM-A1")
383 self.assertEqual(response.status_code, requests.codes.ok)
384 res = response.json()
385 mapping_list = res['nodes'][0]['mapping']
386 for mapping in mapping_list:
387 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
388 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
389 "Operational State should be 'OutOfService'")
390 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
391 "Administrative State should be 'OutOfService'")
393 self.assertEqual(mapping['port-oper-state'], 'InService',
394 "Operational State should be 'InService'")
395 self.assertEqual(mapping['port-admin-state'], 'InService',
396 "Administrative State should be 'InService'")
399 def test_23_check_update_openroadm_topo(self):
400 url = test_utils.URL_CONFIG_ORDM_TOPO
401 response = test_utils.get_request(url)
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 node_list = res['network'][0]['node']
406 for node in node_list:
407 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
408 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
409 tp_list = node['ietf-network-topology:termination-point']
411 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
412 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
413 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
416 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
417 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
418 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
420 link_list = res['network'][0]['ietf-network-topology:link']
421 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
422 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
424 for link in link_list:
425 if link['link-id'] in updated_links:
426 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
427 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
430 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
431 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
432 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
435 def test_24_restore_status_port_roadma_srg(self):
436 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
439 "logical-connection-point": "SRG1-PP1",
440 "port-type": "client",
441 "circuit-id": "SRG1",
442 "administrative-state": "inService",
443 "port-qual": "roadm-external"}]}
444 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
445 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
446 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
447 self.assertEqual(response.status_code, requests.codes.ok)
450 def test_25_check_update_portmapping_ok(self):
451 self.test_18_check_update_portmapping_ok()
453 def test_26_check_update_openroadm_topo_ok(self):
454 self.test_19_check_update_openroadm_topo_ok()
456 def test_27_check_update_service1_ok(self):
457 self.test_12_get_eth_service1()
459 def test_28_change_status_line_port_roadma_deg(self):
460 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
463 "logical-connection-point": "DEG2-TTP-TXRX",
466 "administrative-state": "outOfService",
467 "port-qual": "roadm-external"}]}
468 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
469 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
470 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
471 self.assertEqual(response.status_code, requests.codes.ok)
474 def test_29_check_update_portmapping(self):
475 response = test_utils.portmapping_request("ROADM-A1")
476 self.assertEqual(response.status_code, requests.codes.ok)
477 res = response.json()
478 mapping_list = res['nodes'][0]['mapping']
479 for mapping in mapping_list:
480 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
481 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
482 "Operational State should be 'OutOfService'")
483 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
484 "Administrative State should be 'OutOfService'")
486 self.assertEqual(mapping['port-oper-state'], 'InService',
487 "Operational State should be 'InService'")
488 self.assertEqual(mapping['port-admin-state'], 'InService',
489 "Administrative State should be 'InService'")
492 def test_30_check_update_openroadm_topo(self):
493 url = test_utils.URL_CONFIG_ORDM_TOPO
494 response = test_utils.get_request(url)
495 self.assertEqual(response.status_code, requests.codes.ok)
496 res = response.json()
497 node_list = res['network'][0]['node']
499 for node in node_list:
500 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
501 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
502 tp_list = node['ietf-network-topology:termination-point']
504 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
505 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
506 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
509 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
510 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
511 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
513 link_list = res['network'][0]['ietf-network-topology:link']
514 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
515 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
517 for link in link_list:
518 if link['link-id'] in updated_links:
519 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
520 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
523 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
524 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
525 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
528 def test_31_restore_status_line_port_roadma_srg(self):
529 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
532 "logical-connection-point": "DEG2-TTP-TXRX",
535 "administrative-state": "inService",
536 "port-qual": "roadm-external"}]}
537 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
538 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
539 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
540 self.assertEqual(response.status_code, requests.codes.ok)
543 def test_32_check_update_portmapping_ok(self):
544 self.test_18_check_update_portmapping_ok()
546 def test_33_check_update_openroadm_topo_ok(self):
547 self.test_19_check_update_openroadm_topo_ok()
549 def test_34_check_update_service1_ok(self):
550 self.test_12_get_eth_service1()
552 def test_35_change_status_line_port_xpdrc(self):
553 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
557 "administrative-state": "outOfService",
558 "port-qual": "xpdr-network"}]}
559 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
560 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
561 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
562 self.assertEqual(response.status_code, requests.codes.ok)
565 def test_36_check_update_portmapping(self):
566 response = test_utils.portmapping_request("XPDR-C1")
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 mapping_list = res['nodes'][0]['mapping']
570 for mapping in mapping_list:
571 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
572 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
573 "Operational State should be 'OutOfService'")
574 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
575 "Administrative State should be 'OutOfService'")
577 self.assertEqual(mapping['port-oper-state'], 'InService',
578 "Operational State should be 'InService'")
579 self.assertEqual(mapping['port-admin-state'], 'InService',
580 "Administrative State should be 'InService'")
583 def test_37_check_update_openroadm_topo(self):
584 url = test_utils.URL_CONFIG_ORDM_TOPO
585 response = test_utils.get_request(url)
586 self.assertEqual(response.status_code, requests.codes.ok)
587 res = response.json()
588 node_list = res['network'][0]['node']
590 for node in node_list:
591 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
592 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
593 tp_list = node['ietf-network-topology:termination-point']
595 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
596 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
597 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
600 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
601 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
602 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
604 link_list = res['network'][0]['ietf-network-topology:link']
605 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
606 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
608 for link in link_list:
609 if link['link-id'] in updated_links:
610 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
611 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
614 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
615 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
616 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
619 def test_38_restore_status_line_port_xpdrc(self):
620 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
624 "administrative-state": "inService",
625 "port-qual": "xpdr-network"}]}
626 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
627 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
628 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
629 self.assertEqual(response.status_code, requests.codes.ok)
632 def test_39_check_update_portmapping_ok(self):
633 self.test_18_check_update_portmapping_ok()
635 def test_40_check_update_openroadm_topo_ok(self):
636 self.test_19_check_update_openroadm_topo_ok()
638 def test_41_check_update_service1_ok(self):
639 self.test_12_get_eth_service1()
641 def test_42_change_status_port_roadma_srg(self):
642 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
645 "logical-connection-point": "SRG1-PP2",
646 "port-type": "client",
647 "circuit-id": "SRG1",
648 "administrative-state": "outOfService",
649 "port-qual": "roadm-external"}]}
650 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
651 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
652 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
653 self.assertEqual(response.status_code, requests.codes.ok)
656 def test_43_check_update_portmapping(self):
657 response = test_utils.portmapping_request("ROADM-A1")
658 self.assertEqual(response.status_code, requests.codes.ok)
659 res = response.json()
660 mapping_list = res['nodes'][0]['mapping']
661 for mapping in mapping_list:
662 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
663 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
664 "Operational State should be 'OutOfService'")
665 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
666 "Administrative State should be 'OutOfService'")
668 self.assertEqual(mapping['port-oper-state'], 'InService',
669 "Operational State should be 'InService'")
670 self.assertEqual(mapping['port-admin-state'], 'InService',
671 "Administrative State should be 'InService'")
674 def test_44_check_update_openroadm_topo(self):
675 url = test_utils.URL_CONFIG_ORDM_TOPO
676 response = test_utils.get_request(url)
677 self.assertEqual(response.status_code, requests.codes.ok)
678 res = response.json()
679 node_list = res['network'][0]['node']
681 for node in node_list:
682 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
683 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
684 tp_list = node['ietf-network-topology:termination-point']
686 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
687 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
688 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
691 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
692 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
693 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
695 link_list = res['network'][0]['ietf-network-topology:link']
697 for link in link_list:
698 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
699 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
700 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
703 def test_45_check_update_service1_ok(self):
704 self.test_12_get_eth_service1()
706 def test_46_delete_eth_service1(self):
707 response = test_utils.service_delete_request("service1")
708 self.assertEqual(response.status_code, requests.codes.ok)
709 res = response.json()
710 self.assertIn('Renderer service delete in progress',
711 res['output']['configuration-response-common']['response-message'])
712 time.sleep(self.WAITING)
714 def test_47_disconnect_xponders_from_roadm(self):
715 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
716 response = test_utils.get_ordm_topo_request("")
717 self.assertEqual(response.status_code, requests.codes.ok)
718 res = response.json()
719 links = res['network'][0]['ietf-network-topology:link']
721 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
722 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
723 link_name = link["link-id"]
724 response = test_utils.delete_request(url+link_name)
725 self.assertEqual(response.status_code, requests.codes.ok)
727 def test_48_disconnect_XPDRA(self):
728 response = test_utils.unmount_device("XPDRA01")
729 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
731 def test_49_disconnect_XPDRC(self):
732 response = test_utils.unmount_device("XPDR-C1")
733 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
735 def test_50_disconnect_ROADMA(self):
736 response = test_utils.unmount_device("ROADM-A1")
737 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
739 def test_51_disconnect_ROADMC(self):
740 response = test_utils.unmount_device("ROADM-C1")
741 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
744 if __name__ == "__main__":
745 unittest.main(verbosity=2)