2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils # nopep8
25 class TransportPCEFulltesting(unittest.TestCase):
28 cr_serv_sample_data = {"input": {
29 "sdnc-request-header": {
30 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
31 "rpc-action": "service-create",
32 "request-system-id": "appname",
33 "notification-url": "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
43 "tx-direction": [{"index": 0}],
44 "rx-direction": [{"index": 0}],
48 "service-rate": "100",
50 "service-format": "Ethernet",
51 "clli": "SNJSCAMCJT4",
52 "tx-direction": [{"index": 0}],
53 "rx-direction": [{"index": 0}],
56 "due-date": "2016-11-28T00:00:01Z",
57 "operator-contact": "pw1234"
61 WAITING = 25 # nominal value is 300
62 NODE_VERSION_121 = '1.2.1'
63 NODE_VERSION_221 = '2.2.1'
64 NODE_VERSION_71 = '7.1'
68 cls.processes = test_utils.start_tpce()
69 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
70 ('roadma', cls.NODE_VERSION_221),
71 ('roadmc', cls.NODE_VERSION_221),
72 ('xpdrc', cls.NODE_VERSION_71)])
75 def tearDownClass(cls):
76 # pylint: disable=not-an-iterable
77 for process in cls.processes:
78 test_utils.shutdown_process(process)
79 print("all processes killed")
82 def setUp(self): # instruction executed before each test method
83 # pylint: disable=consider-using-f-string
84 print("execution of {}".format(self.id().split(".")[-1]))
86 def test_01_connect_xpdrA(self):
87 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
88 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
90 def test_02_connect_xpdrC(self):
91 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
92 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
94 def test_03_connect_rdmA(self):
95 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
96 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
98 def test_04_connect_rdmC(self):
99 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
100 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
102 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
103 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
104 "ROADM-A1", "1", "SRG1-PP1-TXRX")
105 self.assertEqual(response.status_code, requests.codes.ok)
106 res = response.json()
107 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
110 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
111 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
112 "ROADM-A1", "1", "SRG1-PP1-TXRX")
113 self.assertEqual(response.status_code, requests.codes.ok)
114 res = response.json()
115 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
118 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
119 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
120 "ROADM-C1", "1", "SRG1-PP1-TXRX")
121 self.assertEqual(response.status_code, requests.codes.ok)
122 res = response.json()
123 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
126 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
127 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
128 "ROADM-C1", "1", "SRG1-PP1-TXRX")
129 self.assertEqual(response.status_code, requests.codes.ok)
130 res = response.json()
131 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
134 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
135 # Config ROADMA-ROADMC oms-attributes
137 "auto-spanloss": "true",
138 "spanloss-base": 11.4,
139 "spanloss-current": 12,
140 "engineered-spanloss": 12.2,
141 "link-concatenation": [{
144 "SRLG-length": 100000,
146 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
147 self.assertEqual(response.status_code, requests.codes.created)
149 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
150 # Config ROADMC-ROADMA oms-attributes
152 "auto-spanloss": "true",
153 "spanloss-base": 11.4,
154 "spanloss-current": 12,
155 "engineered-spanloss": 12.2,
156 "link-concatenation": [{
159 "SRLG-length": 100000,
161 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
162 self.assertEqual(response.status_code, requests.codes.created)
164 # test service-create for Eth service from xpdr to xpdr
165 def test_11_create_eth_service1(self):
166 self.cr_serv_sample_data["input"]["service-name"] = "service1"
167 response = test_utils.service_create_request(self.cr_serv_sample_data)
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertIn('PCE calculation in progress',
171 res['output']['configuration-response-common']['response-message'])
172 time.sleep(self.WAITING)
174 def test_12_get_eth_service1(self):
175 response = test_utils.get_service_list_request("services/service1")
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
179 res['services'][0]['administrative-state'], 'inService')
181 res['services'][0]['service-name'], 'service1')
183 res['services'][0]['connection-type'], 'service')
185 res['services'][0]['lifecycle-state'], 'planned')
188 def test_13_change_status_line_port_xpdra(self):
189 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
192 "logical-connection-point": "XPDR1-NETWORK1",
194 "circuit-id": "XPDRA-NETWORK",
195 "administrative-state": "outOfService",
196 "port-qual": "xpdr-network"}]}
197 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
198 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
199 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
200 self.assertEqual(response.status_code, requests.codes.ok)
203 def test_14_check_update_portmapping(self):
204 response = test_utils.portmapping_request("XPDRA01")
205 self.assertEqual(response.status_code, requests.codes.ok)
206 res = response.json()
207 mapping_list = res['nodes'][0]['mapping']
208 for mapping in mapping_list:
209 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
210 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
211 "Operational State should be 'OutOfService'")
212 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
213 "Administrative State should be 'OutOfService'")
215 self.assertEqual(mapping['port-oper-state'], 'InService',
216 "Operational State should be 'InService'")
217 self.assertEqual(mapping['port-admin-state'], 'InService',
218 "Administrative State should be 'InService'")
221 def test_15_check_update_openroadm_topo(self):
222 url = test_utils.URL_CONFIG_ORDM_TOPO
223 response = test_utils.get_request(url)
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
226 node_list = res['network'][0]['node']
228 for node in node_list:
229 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
230 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
231 tp_list = node['ietf-network-topology:termination-point']
233 if node['node-id'] == 'XPDRA01-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
234 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
235 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
238 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
239 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
240 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
242 link_list = res['network'][0]['ietf-network-topology:link']
243 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
244 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
246 for link in link_list:
247 if link['link-id'] in updated_links:
248 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
249 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
252 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
253 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
254 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
257 def test_16_check_update_service1(self):
258 response = test_utils.get_service_list_request("services/service1")
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
261 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
262 self.assertEqual(res['services'][0]['administrative-state'], 'outOfService')
265 def test_17_restore_status_line_port_xpdra(self):
266 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
269 "logical-connection-point": "XPDR1-NETWORK1",
271 "circuit-id": "XPDRA-NETWORK",
272 "administrative-state": "inService",
273 "port-qual": "xpdr-network"}]}
274 response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
275 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
276 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
277 self.assertEqual(response.status_code, requests.codes.ok)
280 def test_18_check_update_portmapping_ok(self):
281 response = test_utils.portmapping_request("XPDRA01")
282 self.assertEqual(response.status_code, requests.codes.ok)
283 res = response.json()
284 mapping_list = res['nodes'][0]['mapping']
285 for mapping in mapping_list:
286 self.assertEqual(mapping['port-oper-state'], 'InService',
287 "Operational State should be 'InService'")
288 self.assertEqual(mapping['port-admin-state'], 'InService',
289 "Administrative State should be 'InService'")
292 def test_19_check_update_openroadm_topo_ok(self):
293 url = test_utils.URL_CONFIG_ORDM_TOPO
294 response = test_utils.get_request(url)
295 self.assertEqual(response.status_code, requests.codes.ok)
296 res = response.json()
297 node_list = res['network'][0]['node']
298 for node in node_list:
299 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
300 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
301 tp_list = node['ietf-network-topology:termination-point']
303 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
304 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
306 link_list = res['network'][0]['ietf-network-topology:link']
307 for link in link_list:
308 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
309 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
312 def test_20_check_update_service1_ok(self):
313 self.test_12_get_eth_service1()
315 def test_21_change_status_port_roadma_srg(self):
316 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
319 "logical-connection-point": "SRG1-PP1",
320 "port-type": "client",
321 "circuit-id": "SRG1",
322 "administrative-state": "outOfService",
323 "port-qual": "roadm-external"}]}
324 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
325 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
326 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
327 self.assertEqual(response.status_code, requests.codes.ok)
330 def test_22_check_update_portmapping(self):
331 response = test_utils.portmapping_request("ROADM-A1")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 mapping_list = res['nodes'][0]['mapping']
335 for mapping in mapping_list:
336 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
337 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
338 "Operational State should be 'OutOfService'")
339 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
340 "Administrative State should be 'OutOfService'")
342 self.assertEqual(mapping['port-oper-state'], 'InService',
343 "Operational State should be 'InService'")
344 self.assertEqual(mapping['port-admin-state'], 'InService',
345 "Administrative State should be 'InService'")
348 def test_23_check_update_openroadm_topo(self):
349 url = test_utils.URL_CONFIG_ORDM_TOPO
350 response = test_utils.get_request(url)
351 self.assertEqual(response.status_code, requests.codes.ok)
352 res = response.json()
353 node_list = res['network'][0]['node']
355 for node in node_list:
356 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
357 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
358 tp_list = node['ietf-network-topology:termination-point']
360 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
361 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
362 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
365 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
366 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
367 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
369 link_list = res['network'][0]['ietf-network-topology:link']
370 updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
371 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
373 for link in link_list:
374 if link['link-id'] in updated_links:
375 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
376 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
379 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
380 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
381 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
384 def test_24_restore_status_port_roadma_srg(self):
385 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
388 "logical-connection-point": "SRG1-PP1",
389 "port-type": "client",
390 "circuit-id": "SRG1",
391 "administrative-state": "inService",
392 "port-qual": "roadm-external"}]}
393 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
394 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
395 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
396 self.assertEqual(response.status_code, requests.codes.ok)
399 def test_25_check_update_portmapping_ok(self):
400 self.test_18_check_update_portmapping_ok()
402 def test_26_check_update_openroadm_topo_ok(self):
403 self.test_19_check_update_openroadm_topo_ok()
405 def test_27_check_update_service1_ok(self):
406 self.test_12_get_eth_service1()
408 def test_28_change_status_line_port_roadma_deg(self):
409 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
412 "logical-connection-point": "DEG2-TTP-TXRX",
415 "administrative-state": "outOfService",
416 "port-qual": "roadm-external"}]}
417 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
418 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
419 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
420 self.assertEqual(response.status_code, requests.codes.ok)
423 def test_29_check_update_portmapping(self):
424 response = test_utils.portmapping_request("ROADM-A1")
425 self.assertEqual(response.status_code, requests.codes.ok)
426 res = response.json()
427 mapping_list = res['nodes'][0]['mapping']
428 for mapping in mapping_list:
429 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
430 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
431 "Operational State should be 'OutOfService'")
432 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
433 "Administrative State should be 'OutOfService'")
435 self.assertEqual(mapping['port-oper-state'], 'InService',
436 "Operational State should be 'InService'")
437 self.assertEqual(mapping['port-admin-state'], 'InService',
438 "Administrative State should be 'InService'")
441 def test_30_check_update_openroadm_topo(self):
442 url = test_utils.URL_CONFIG_ORDM_TOPO
443 response = test_utils.get_request(url)
444 self.assertEqual(response.status_code, requests.codes.ok)
445 res = response.json()
446 node_list = res['network'][0]['node']
448 for node in node_list:
449 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
450 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
451 tp_list = node['ietf-network-topology:termination-point']
453 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
454 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
455 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
458 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
459 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
460 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
462 link_list = res['network'][0]['ietf-network-topology:link']
463 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
464 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
466 for link in link_list:
467 if link['link-id'] in updated_links:
468 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
469 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
472 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
473 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
474 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
477 def test_31_restore_status_line_port_roadma_srg(self):
478 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
481 "logical-connection-point": "DEG2-TTP-TXRX",
484 "administrative-state": "inService",
485 "port-qual": "roadm-external"}]}
486 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
487 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
488 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
489 self.assertEqual(response.status_code, requests.codes.ok)
492 def test_32_check_update_portmapping_ok(self):
493 self.test_18_check_update_portmapping_ok()
495 def test_33_check_update_openroadm_topo_ok(self):
496 self.test_19_check_update_openroadm_topo_ok()
498 def test_34_check_update_service1_ok(self):
499 self.test_12_get_eth_service1()
501 def test_35_change_status_line_port_xpdrc(self):
502 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
506 "administrative-state": "outOfService",
507 "port-qual": "xpdr-network"}]}
508 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
509 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
510 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
511 self.assertEqual(response.status_code, requests.codes.ok)
514 def test_36_check_update_portmapping(self):
515 response = test_utils.portmapping_request("XPDR-C1")
516 self.assertEqual(response.status_code, requests.codes.ok)
517 res = response.json()
518 mapping_list = res['nodes'][0]['mapping']
519 for mapping in mapping_list:
520 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
521 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
522 "Operational State should be 'OutOfService'")
523 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
524 "Administrative State should be 'OutOfService'")
526 self.assertEqual(mapping['port-oper-state'], 'InService',
527 "Operational State should be 'InService'")
528 self.assertEqual(mapping['port-admin-state'], 'InService',
529 "Administrative State should be 'InService'")
532 def test_37_check_update_openroadm_topo(self):
533 url = test_utils.URL_CONFIG_ORDM_TOPO
534 response = test_utils.get_request(url)
535 self.assertEqual(response.status_code, requests.codes.ok)
536 res = response.json()
537 node_list = res['network'][0]['node']
539 for node in node_list:
540 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
541 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
542 tp_list = node['ietf-network-topology:termination-point']
544 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
545 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
546 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
549 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
550 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
551 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
553 link_list = res['network'][0]['ietf-network-topology:link']
554 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
555 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
557 for link in link_list:
558 if link['link-id'] in updated_links:
559 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
560 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
563 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
564 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
565 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
568 def test_38_restore_status_line_port_xpdrc(self):
569 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
573 "administrative-state": "inService",
574 "port-qual": "xpdr-network"}]}
575 response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
576 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
577 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
578 self.assertEqual(response.status_code, requests.codes.ok)
581 def test_39_check_update_portmapping_ok(self):
582 self.test_18_check_update_portmapping_ok()
584 def test_40_check_update_openroadm_topo_ok(self):
585 self.test_19_check_update_openroadm_topo_ok()
587 def test_41_check_update_service1_ok(self):
588 self.test_12_get_eth_service1()
590 def test_42_change_status_port_roadma_srg(self):
591 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
594 "logical-connection-point": "SRG1-PP2",
595 "port-type": "client",
596 "circuit-id": "SRG1",
597 "administrative-state": "outOfService",
598 "port-qual": "roadm-external"}]}
599 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
600 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
601 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
602 self.assertEqual(response.status_code, requests.codes.ok)
605 def test_43_check_update_portmapping(self):
606 response = test_utils.portmapping_request("ROADM-A1")
607 self.assertEqual(response.status_code, requests.codes.ok)
608 res = response.json()
609 mapping_list = res['nodes'][0]['mapping']
610 for mapping in mapping_list:
611 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
612 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
613 "Operational State should be 'OutOfService'")
614 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
615 "Administrative State should be 'OutOfService'")
617 self.assertEqual(mapping['port-oper-state'], 'InService',
618 "Operational State should be 'InService'")
619 self.assertEqual(mapping['port-admin-state'], 'InService',
620 "Administrative State should be 'InService'")
623 def test_44_check_update_openroadm_topo(self):
624 url = test_utils.URL_CONFIG_ORDM_TOPO
625 response = test_utils.get_request(url)
626 self.assertEqual(response.status_code, requests.codes.ok)
627 res = response.json()
628 node_list = res['network'][0]['node']
630 for node in node_list:
631 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
632 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
633 tp_list = node['ietf-network-topology:termination-point']
635 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
636 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
637 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
640 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
641 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
642 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
644 link_list = res['network'][0]['ietf-network-topology:link']
646 for link in link_list:
647 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
648 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
649 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
652 def test_45_check_update_service1_ok(self):
653 self.test_12_get_eth_service1()
655 def test_46_delete_eth_service1(self):
656 response = test_utils.service_delete_request("service1")
657 self.assertEqual(response.status_code, requests.codes.ok)
658 res = response.json()
659 self.assertIn('Renderer service delete in progress',
660 res['output']['configuration-response-common']['response-message'])
661 time.sleep(self.WAITING)
663 def test_47_disconnect_xponders_from_roadm(self):
664 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
665 response = test_utils.get_ordm_topo_request("")
666 self.assertEqual(response.status_code, requests.codes.ok)
667 res = response.json()
668 links = res['network'][0]['ietf-network-topology:link']
670 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
671 link_name = link["link-id"]
672 response = test_utils.delete_request(url+link_name)
673 self.assertEqual(response.status_code, requests.codes.ok)
675 def test_48_disconnect_XPDRA(self):
676 response = test_utils.unmount_device("XPDRA01")
677 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
679 def test_49_disconnect_XPDRC(self):
680 response = test_utils.unmount_device("XPDR-C1")
681 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
683 def test_50_disconnect_ROADMA(self):
684 response = test_utils.unmount_device("ROADM-A1")
685 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
687 def test_51_disconnect_ROADMC(self):
688 response = test_utils.unmount_device("ROADM-C1")
689 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
692 if __name__ == "__main__":
693 unittest.main(verbosity=2)