3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['spdra'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
37 def test_01_connect_SPDR_SA1(self):
38 response = test_utils.mount_device("SPDR-SA1", 'spdra')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 url = "{}/operational/network-topology:network-topology/topology/topology-netconf/node/SPDR-SA1"
43 response = test_utils.get_request(url)
44 self.assertEqual(response.status_code, requests.codes.ok)
47 res['node'][0]['netconf-node-topology:connection-status'],
50 def test_02_get_portmapping_CLIENT1(self):
51 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
52 response = test_utils.get_request(url)
53 self.assertEqual(response.status_code, requests.codes.ok)
56 {'supported-interface-capability': [
57 'org-openroadm-port-types:if-10GE-ODU2e',
58 'org-openroadm-port-types:if-10GE-ODU2',
59 'org-openroadm-port-types:if-10GE'],
60 'supporting-port': 'CP1-SFP4-P1',
61 'supporting-circuit-pack-name': 'CP1-SFP4',
62 'logical-connection-point': 'XPDR1-CLIENT1',
63 'port-direction': 'bidirectional',
64 'port-qual': 'xpdr-client',
65 'lcp-hash-val': 'FqlcrxV7p30='},
68 def test_03_get_portmapping_NETWORK1(self):
69 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
70 response = test_utils.get_request(url)
71 self.assertEqual(response.status_code, requests.codes.ok)
74 {"logical-connection-point": "XPDR1-NETWORK1",
75 "supporting-port": "CP1-CFP0-P1",
76 "supported-interface-capability": [
77 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
79 "port-direction": "bidirectional",
80 "port-qual": "xpdr-network",
81 "supporting-circuit-pack-name": "CP1-CFP0",
82 "xponder-type": "mpdr",
83 'lcp-hash-val': 'Swfw02qXGyI='},
86 def test_04_service_path_create_OCH_OTU4(self):
87 url = "{}/operations/transportpce-device-renderer:service-path"
88 data = {"renderer:input": {
89 "service-name": "service_ODU4",
91 "modulation-format": "qpsk",
92 "operation": "create",
94 {"node-id": "SPDR-SA1",
95 "dest-tp": "XPDR1-NETWORK1"}]}}
96 response = test_utils.post_request(url, data)
98 self.assertEqual(response.status_code, requests.codes.ok)
100 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
101 self.assertTrue(res["output"]["success"])
103 {'node-id': 'SPDR-SA1',
104 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
105 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
107 def test_05_get_portmapping_NETWORK1(self):
108 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
109 response = test_utils.get_request(url)
110 self.assertEqual(response.status_code, requests.codes.ok)
111 res = response.json()
113 {"logical-connection-point": "XPDR1-NETWORK1",
114 "supporting-port": "CP1-CFP0-P1",
115 "supported-interface-capability": [
116 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
118 "port-direction": "bidirectional",
119 "port-qual": "xpdr-network",
120 "supporting-circuit-pack-name": "CP1-CFP0",
121 "xponder-type": "mpdr",
122 "lcp-hash-val": "Swfw02qXGyI="},
125 def test_06_check_interface_och(self):
126 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
127 self.assertEqual(response.status_code, requests.codes.ok)
128 res = response.json()
130 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
131 'administrative-state': 'inService',
132 'supporting-circuit-pack-name': 'CP1-CFP0',
133 'type': 'org-openroadm-interfaces:opticalChannel',
134 'supporting-port': 'CP1-CFP0-P1'
138 self.assertDictEqual(
139 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
140 u'transmit-power': -5},
141 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
143 def test_07_check_interface_OTU(self):
144 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
145 self.assertEqual(response.status_code, requests.codes.ok)
146 res = response.json()
147 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
148 'administrative-state': 'inService',
149 'supporting-circuit-pack-name': 'CP1-CFP0',
150 'supporting-interface': 'XPDR1-NETWORK1-1',
151 'type': 'org-openroadm-interfaces:otnOtu',
152 'supporting-port': 'CP1-CFP0-P1'
155 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
156 'expected-sapi': 'Swfw02qXGyI=',
157 'tx-sapi': 'Swfw02qXGyI=',
158 'expected-dapi': 'Swfw02qXGyI=',
159 'rate': 'org-openroadm-otn-common-types:OTU4',
163 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
166 self.assertDictEqual(input_dict_2,
167 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
169 def test_08_otn_service_path_create_ODU4(self):
170 url = "{}/operations/transportpce-device-renderer:otn-service-path"
171 data = {"renderer:input": {
172 "service-name": "service_ODU4",
173 "operation": "create",
174 "service-rate": "100G",
175 "service-type": "ODU",
177 {"node-id": "SPDR-SA1",
178 "network-tp": "XPDR1-NETWORK1"}]}}
179 response = test_utils.post_request(url, data)
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
183 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
184 self.assertTrue(res["output"]["success"])
186 {'node-id': 'SPDR-SA1',
187 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
189 def test_09_get_portmapping_NETWORK1(self):
190 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
191 response = test_utils.get_request(url)
192 self.assertEqual(response.status_code, requests.codes.ok)
193 res = response.json()
195 {"logical-connection-point": "XPDR1-NETWORK1",
196 "supporting-port": "CP1-CFP0-P1",
197 "supported-interface-capability": [
198 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
200 "port-direction": "bidirectional",
201 "port-qual": "xpdr-network",
202 "supporting-circuit-pack-name": "CP1-CFP0",
203 "xponder-type": "mpdr",
204 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
205 "lcp-hash-val": "Swfw02qXGyI="
209 def test_10_check_interface_ODU4(self):
210 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
213 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
214 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
215 'type': 'org-openroadm-interfaces:otnOdu',
216 'supporting-port': 'CP1-CFP0-P1'}
217 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
218 'rate': 'org-openroadm-otn-common-types:ODU4',
219 'expected-dapi': 'Swfw02qXGyI=',
220 'expected-sapi': 'Swfw02qXGyI=',
221 'tx-dapi': 'Swfw02qXGyI=',
222 'tx-sapi': 'Swfw02qXGyI='}
224 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
226 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
229 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
231 self.assertDictEqual(
232 {u'payload-type': u'21', u'exp-payload-type': u'21'},
233 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
235 def test_11_otn_service_path_create_10GE(self):
236 url = "{}/operations/transportpce-device-renderer:otn-service-path"
237 data = {"renderer:input": {
238 "service-name": "service1",
239 "operation": "create",
240 "service-rate": "10G",
241 "service-type": "Ethernet",
242 "ethernet-encoding": "eth encode",
244 "trib-port-number": "1",
246 {"node-id": "SPDR-SA1",
247 "client-tp": "XPDR1-CLIENT1",
248 "network-tp": "XPDR1-NETWORK1"}]}}
249 response = test_utils.post_request(url, data)
251 self.assertEqual(response.status_code, requests.codes.ok)
252 res = response.json()
253 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
254 self.assertTrue(res["output"]["success"])
256 {'node-id': 'SPDR-SA1',
257 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
258 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
259 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
261 def test_12_check_interface_10GE_CLIENT(self):
262 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
266 'administrative-state': 'inService',
267 'supporting-circuit-pack-name': 'CP1-SFP4',
268 'type': 'org-openroadm-interfaces:ethernetCsmacd',
269 'supporting-port': 'CP1-SFP4-P1'
271 self.assertDictEqual(dict(res['interface'][0], **input_dict),
273 self.assertDictEqual(
275 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
277 def test_13_check_interface_ODU2E_CLIENT(self):
278 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
279 self.assertEqual(response.status_code, requests.codes.ok)
280 res = response.json()
282 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
283 'administrative-state': 'inService',
284 'supporting-circuit-pack-name': 'CP1-SFP4',
285 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
286 'type': 'org-openroadm-interfaces:otnOdu',
287 'supporting-port': 'CP1-SFP4-P1'}
289 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
290 'rate': 'org-openroadm-otn-common-types:ODU2e',
291 'monitoring-mode': 'terminated'}
293 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
295 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
297 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
298 self.assertDictEqual(
299 {u'payload-type': u'03', u'exp-payload-type': u'03'},
300 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
302 def test_14_check_interface_ODU2E_NETWORK(self):
303 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
304 self.assertEqual(response.status_code, requests.codes.ok)
305 res = response.json()
306 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
307 'supporting-circuit-pack-name': 'CP1-CFP0',
308 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
309 'type': 'org-openroadm-interfaces:otnOdu',
310 'supporting-port': 'CP1-CFP0-P1'}
312 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
313 'rate': 'org-openroadm-otn-common-types:ODU2e',
314 'monitoring-mode': 'monitored'}
316 input_dict_3 = {'trib-port-number': 1}
318 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
320 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
322 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
323 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
324 'parent-odu-allocation'], **input_dict_3
326 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
327 'parent-odu-allocation'])
330 'org-openroadm-otn-odu-interfaces:odu'][
331 'parent-odu-allocation']['trib-slots'])
333 def test_15_check_ODU2E_connection(self):
334 response = test_utils.check_netconf_node_request(
336 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
337 self.assertEqual(response.status_code, requests.codes.ok)
338 res = response.json()
341 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
342 'direction': 'bidirectional'
345 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
346 res['odu-connection'][0])
347 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
348 res['odu-connection'][0]['destination'])
349 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
350 res['odu-connection'][0]['source'])
352 def test_16_otn_service_path_delete_10GE(self):
353 url = "{}/operations/transportpce-device-renderer:otn-service-path"
354 data = {"renderer:input": {
355 "service-name": "service1",
356 "operation": "delete",
357 "service-rate": "10G",
358 "service-type": "Ethernet",
359 "ethernet-encoding": "eth encode",
361 "trib-port-number": "1",
363 {"node-id": "SPDR-SA1",
364 "client-tp": "XPDR1-CLIENT1",
365 "network-tp": "XPDR1-NETWORK1"}]}}
366 response = test_utils.post_request(url, data)
368 self.assertEqual(response.status_code, requests.codes.ok)
369 res = response.json()
370 self.assertIn('Request processed', res["output"]["result"])
371 self.assertTrue(res["output"]["success"])
373 def test_17_check_no_ODU2E_connection(self):
374 response = test_utils.check_netconf_node_request(
376 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
377 self.assertEqual(response.status_code, requests.codes.not_found)
379 def test_18_check_no_interface_ODU2E_NETWORK(self):
380 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
381 self.assertEqual(response.status_code, requests.codes.not_found)
383 def test_19_check_no_interface_ODU2E_CLIENT(self):
384 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
385 self.assertEqual(response.status_code, requests.codes.not_found)
387 def test_20_check_no_interface_10GE_CLIENT(self):
388 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
389 self.assertEqual(response.status_code, requests.codes.not_found)
391 def test_21_otn_service_path_delete_ODU4(self):
392 url = "{}/operations/transportpce-device-renderer:otn-service-path"
393 data = {"renderer:input": {
394 "service-name": "service_ODU4",
395 "operation": "delete",
396 "service-rate": "100G",
397 "service-type": "ODU",
399 {"node-id": "SPDR-SA1",
400 "network-tp": "XPDR1-NETWORK1"}]}}
401 response = test_utils.post_request(url, data)
403 self.assertEqual(response.status_code, requests.codes.ok)
404 res = response.json()
405 self.assertIn('Request processed', res["output"]["result"])
406 self.assertTrue(res["output"]["success"])
408 def test_22_check_no_interface_ODU4(self):
409 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
410 self.assertEqual(response.status_code, requests.codes.not_found)
412 def test_23_service_path_delete_OCH_OTU4(self):
413 url = "{}/operations/transportpce-device-renderer:service-path"
414 data = {"renderer:input": {
415 "service-name": "service_OTU4",
417 "modulation-format": "qpsk",
418 "operation": "delete",
420 {"node-id": "SPDR-SA1",
421 "dest-tp": "XPDR1-NETWORK1"}]}}
422 response = test_utils.post_request(url, data)
424 self.assertEqual(response.status_code, requests.codes.ok)
425 res = response.json()
426 self.assertIn('Request processed', res["output"]["result"])
427 self.assertTrue(res["output"]["success"])
429 def test_24_check_no_interface_OTU4(self):
430 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
431 self.assertEqual(response.status_code, requests.codes.not_found)
433 def test_25_check_no_interface_OCH(self):
434 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
435 self.assertEqual(response.status_code, requests.codes.not_found)
437 def test_26_disconnect_SPDR_SA1(self):
438 response = test_utils.unmount_device("SPDR-SA1")
439 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
442 if __name__ == "__main__":
443 unittest.main(verbosity=2)