3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['spdra'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
37 def test_01_connect_SPDR_SA1(self):
38 response = test_utils.mount_device("SPDR-SA1", 'spdra')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 url = "{}/operational/network-topology:network-topology/topology/topology-netconf/node/SPDR-SA1"
43 response = test_utils.get_request(url)
44 self.assertEqual(response.status_code, requests.codes.ok)
47 res['node'][0]['netconf-node-topology:connection-status'],
50 def test_02_get_portmapping_CLIENT1(self):
51 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
52 response = test_utils.get_request(url)
53 self.assertEqual(response.status_code, requests.codes.ok)
56 {'supported-interface-capability': [
57 'org-openroadm-port-types:if-10GE-ODU2e',
58 'org-openroadm-port-types:if-10GE-ODU2',
59 'org-openroadm-port-types:if-10GE'],
60 'supporting-port': 'CP1-SFP4-P1',
61 'supporting-circuit-pack-name': 'CP1-SFP4',
62 'logical-connection-point': 'XPDR1-CLIENT1',
63 'port-direction': 'bidirectional',
64 'port-qual': 'xpdr-client',
65 'lcp-hash-val': 'FqlcrxV7p30='},
68 def test_03_get_portmapping_NETWORK1(self):
69 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
70 response = test_utils.get_request(url)
71 self.assertEqual(response.status_code, requests.codes.ok)
74 {"logical-connection-point": "XPDR1-NETWORK1",
75 "supporting-port": "CP1-CFP0-P1",
76 "supported-interface-capability": [
77 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
79 "port-direction": "bidirectional",
80 "port-qual": "xpdr-network",
81 "supporting-circuit-pack-name": "CP1-CFP0",
82 "xponder-type": "mpdr",
83 'lcp-hash-val': 'Swfw02qXGyI='},
86 def test_04_service_path_create_OCH_OTU4(self):
87 url = "{}/operations/transportpce-device-renderer:service-path"
88 data = {"renderer:input": {
89 "service-name": "service_ODU4",
91 "modulation-format": "qpsk",
92 "operation": "create",
94 {"node-id": "SPDR-SA1",
95 "dest-tp": "XPDR1-NETWORK1"}]}}
96 response = test_utils.post_request(url, data)
98 self.assertEqual(response.status_code, requests.codes.ok)
100 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
101 self.assertTrue(res["output"]["success"])
103 {'node-id': 'SPDR-SA1',
104 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
105 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
107 def test_05_get_portmapping_NETWORK1(self):
108 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
109 response = test_utils.get_request(url)
110 self.assertEqual(response.status_code, requests.codes.ok)
111 res = response.json()
113 {"logical-connection-point": "XPDR1-NETWORK1",
114 "supporting-port": "CP1-CFP0-P1",
115 "supported-interface-capability": [
116 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
118 "port-direction": "bidirectional",
119 "port-qual": "xpdr-network",
120 "supporting-circuit-pack-name": "CP1-CFP0",
121 "xponder-type": "mpdr",
122 "lcp-hash-val": "Swfw02qXGyI="},
125 def test_06_check_interface_och(self):
126 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
127 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
128 "interface/XPDR1-NETWORK1-1"
130 response = test_utils.get_request(url)
131 self.assertEqual(response.status_code, requests.codes.ok)
132 res = response.json()
134 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
135 'administrative-state': 'inService',
136 'supporting-circuit-pack-name': 'CP1-CFP0',
137 'type': 'org-openroadm-interfaces:opticalChannel',
138 'supporting-port': 'CP1-CFP0-P1'
142 self.assertDictEqual(
143 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
144 u'transmit-power': -5},
145 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
147 def test_07_check_interface_OTU(self):
148 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
149 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
150 "interface/XPDR1-NETWORK1-OTU"
152 response = test_utils.get_request(url)
153 self.assertEqual(response.status_code, requests.codes.ok)
154 res = response.json()
155 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
156 'administrative-state': 'inService',
157 'supporting-circuit-pack-name': 'CP1-CFP0',
158 'supporting-interface': 'XPDR1-NETWORK1-1',
159 'type': 'org-openroadm-interfaces:otnOtu',
160 'supporting-port': 'CP1-CFP0-P1'
163 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
164 'expected-sapi': 'Swfw02qXGyI=',
165 'tx-sapi': 'Swfw02qXGyI=',
166 'expected-dapi': 'Swfw02qXGyI=',
167 'rate': 'org-openroadm-otn-common-types:OTU4',
171 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
174 self.assertDictEqual(input_dict_2,
175 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
177 def test_08_otn_service_path_create_ODU4(self):
178 url = "{}/operations/transportpce-device-renderer:otn-service-path"
179 data = {"renderer:input": {
180 "service-name": "service_ODU4",
181 "operation": "create",
182 "service-rate": "100G",
183 "service-type": "ODU",
185 {"node-id": "SPDR-SA1",
186 "network-tp": "XPDR1-NETWORK1"}]}}
187 response = test_utils.post_request(url, data)
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
192 self.assertTrue(res["output"]["success"])
194 {'node-id': 'SPDR-SA1',
195 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
197 def test_09_get_portmapping_NETWORK1(self):
198 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
199 response = test_utils.get_request(url)
200 self.assertEqual(response.status_code, requests.codes.ok)
201 res = response.json()
203 {"logical-connection-point": "XPDR1-NETWORK1",
204 "supporting-port": "CP1-CFP0-P1",
205 "supported-interface-capability": [
206 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
208 "port-direction": "bidirectional",
209 "port-qual": "xpdr-network",
210 "supporting-circuit-pack-name": "CP1-CFP0",
211 "xponder-type": "mpdr",
212 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
213 "lcp-hash-val": "Swfw02qXGyI="
217 def test_10_check_interface_ODU4(self):
218 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
219 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
220 "interface/XPDR1-NETWORK1-ODU4"
222 response = test_utils.get_request(url)
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
225 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
226 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
227 'type': 'org-openroadm-interfaces:otnOdu',
228 'supporting-port': 'CP1-CFP0-P1'}
229 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
230 'rate': 'org-openroadm-otn-common-types:ODU4',
231 'expected-dapi': 'Swfw02qXGyI=',
232 'expected-sapi': 'Swfw02qXGyI=',
233 'tx-dapi': 'Swfw02qXGyI=',
234 'tx-sapi': 'Swfw02qXGyI='}
236 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
238 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
241 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
243 self.assertDictEqual(
244 {u'payload-type': u'21', u'exp-payload-type': u'21'},
245 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
247 def test_11_otn_service_path_create_10GE(self):
248 url = "{}/operations/transportpce-device-renderer:otn-service-path"
249 data = {"renderer:input": {
250 "service-name": "service1",
251 "operation": "create",
252 "service-rate": "10G",
253 "service-type": "Ethernet",
254 "ethernet-encoding": "eth encode",
256 "trib-port-number": "1",
258 {"node-id": "SPDR-SA1",
259 "client-tp": "XPDR1-CLIENT1",
260 "network-tp": "XPDR1-NETWORK1"}]}}
261 response = test_utils.post_request(url, data)
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
266 self.assertTrue(res["output"]["success"])
268 {'node-id': 'SPDR-SA1',
269 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
270 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
271 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
273 def test_12_check_interface_10GE_CLIENT(self):
274 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
275 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
276 "interface/XPDR1-CLIENT1-ETHERNET10G"
278 response = test_utils.get_request(url)
279 self.assertEqual(response.status_code, requests.codes.ok)
280 res = response.json()
281 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
282 'administrative-state': 'inService',
283 'supporting-circuit-pack-name': 'CP1-SFP4',
284 'type': 'org-openroadm-interfaces:ethernetCsmacd',
285 'supporting-port': 'CP1-SFP4-P1'
287 self.assertDictEqual(dict(res['interface'][0], **input_dict),
289 self.assertDictEqual(
291 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
293 def test_13_check_interface_ODU2E_CLIENT(self):
294 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
295 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
296 "interface/XPDR1-CLIENT1-ODU2e-service1"
298 response = test_utils.get_request(url)
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
302 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
303 'administrative-state': 'inService',
304 'supporting-circuit-pack-name': 'CP1-SFP4',
305 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
306 'type': 'org-openroadm-interfaces:otnOdu',
307 'supporting-port': 'CP1-SFP4-P1'}
309 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
310 'rate': 'org-openroadm-otn-common-types:ODU2e',
311 'monitoring-mode': 'terminated'}
313 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
315 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
317 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
318 self.assertDictEqual(
319 {u'payload-type': u'03', u'exp-payload-type': u'03'},
320 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
322 def test_14_check_interface_ODU2E_NETWORK(self):
323 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
324 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
325 "interface/XPDR1-NETWORK1-ODU2e-service1"
327 response = test_utils.get_request(url)
328 self.assertEqual(response.status_code, requests.codes.ok)
329 res = response.json()
330 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
331 'supporting-circuit-pack-name': 'CP1-CFP0',
332 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
333 'type': 'org-openroadm-interfaces:otnOdu',
334 'supporting-port': 'CP1-CFP0-P1'}
336 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
337 'rate': 'org-openroadm-otn-common-types:ODU2e',
338 'monitoring-mode': 'monitored'}
340 input_dict_3 = {'trib-port-number': 1}
342 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
344 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
346 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
347 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
348 'parent-odu-allocation'], **input_dict_3
350 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
351 'parent-odu-allocation'])
354 'org-openroadm-otn-odu-interfaces:odu'][
355 'parent-odu-allocation']['trib-slots'])
357 def test_15_check_ODU2E_connection(self):
358 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
359 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
360 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
362 response = test_utils.get_request(url)
363 self.assertEqual(response.status_code, requests.codes.ok)
364 res = response.json()
367 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
368 'direction': 'bidirectional'
371 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
372 res['odu-connection'][0])
373 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
374 res['odu-connection'][0]['destination'])
375 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
376 res['odu-connection'][0]['source'])
378 def test_16_otn_service_path_delete_10GE(self):
379 url = "{}/operations/transportpce-device-renderer:otn-service-path"
380 data = {"renderer:input": {
381 "service-name": "service1",
382 "operation": "delete",
383 "service-rate": "10G",
384 "service-type": "Ethernet",
385 "ethernet-encoding": "eth encode",
387 "trib-port-number": "1",
389 {"node-id": "SPDR-SA1",
390 "client-tp": "XPDR1-CLIENT1",
391 "network-tp": "XPDR1-NETWORK1"}]}}
392 response = test_utils.post_request(url, data)
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
396 self.assertIn('Request processed', res["output"]["result"])
397 self.assertTrue(res["output"]["success"])
399 def test_17_check_no_ODU2E_connection(self):
400 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
401 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
402 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
404 response = test_utils.get_request(url)
405 self.assertEqual(response.status_code, requests.codes.not_found)
407 def test_18_check_no_interface_ODU2E_NETWORK(self):
408 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
409 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
410 "interface/XPDR1-NETWORK1-ODU2e-service1"
412 response = test_utils.get_request(url)
413 self.assertEqual(response.status_code, requests.codes.not_found)
415 def test_19_check_no_interface_ODU2E_CLIENT(self):
416 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
417 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
418 "interface/XPDR1-CLIENT1-ODU2e-service1"
420 response = test_utils.get_request(url)
421 self.assertEqual(response.status_code, requests.codes.not_found)
423 def test_20_check_no_interface_10GE_CLIENT(self):
424 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
425 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
426 "interface/XPDR1-CLIENT1-ETHERNET10G"
428 response = test_utils.get_request(url)
429 self.assertEqual(response.status_code, requests.codes.not_found)
431 def test_21_otn_service_path_delete_ODU4(self):
432 url = "{}/operations/transportpce-device-renderer:otn-service-path"
433 data = {"renderer:input": {
434 "service-name": "service_ODU4",
435 "operation": "delete",
436 "service-rate": "100G",
437 "service-type": "ODU",
439 {"node-id": "SPDR-SA1",
440 "network-tp": "XPDR1-NETWORK1"}]}}
441 response = test_utils.post_request(url, data)
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 self.assertIn('Request processed', res["output"]["result"])
446 self.assertTrue(res["output"]["success"])
448 def test_22_check_no_interface_ODU4(self):
449 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
450 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
451 "interface/XPDR1-NETWORK1-ODU4"
453 response = test_utils.get_request(url)
454 self.assertEqual(response.status_code, requests.codes.not_found)
456 def test_23_service_path_delete_OCH_OTU4(self):
457 url = "{}/operations/transportpce-device-renderer:service-path"
458 data = {"renderer:input": {
459 "service-name": "service_OTU4",
461 "modulation-format": "qpsk",
462 "operation": "delete",
464 {"node-id": "SPDR-SA1",
465 "dest-tp": "XPDR1-NETWORK1"}]}}
466 response = test_utils.post_request(url, data)
468 self.assertEqual(response.status_code, requests.codes.ok)
469 res = response.json()
470 self.assertIn('Request processed', res["output"]["result"])
471 self.assertTrue(res["output"]["success"])
473 def test_24_check_no_interface_OTU4(self):
474 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
475 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
476 "interface/XPDR1-NETWORK1-OTU"
478 response = test_utils.get_request(url)
479 self.assertEqual(response.status_code, requests.codes.not_found)
481 def test_25_check_no_interface_OCH(self):
482 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
483 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
484 "interface/XPDR1-NETWORK1-1"
486 response = test_utils.get_request(url)
487 self.assertEqual(response.status_code, requests.codes.not_found)
489 def test_26_disconnect_SPDR_SA1(self):
490 response = test_utils.unmount_device("SPDR-SA1")
491 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
494 if __name__ == "__main__":
495 unittest.main(verbosity=2)