3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
29 NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
30 "supporting-port": "CP1-CFP0-P1",
31 "supported-interface-capability": [
32 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
34 "port-direction": "bidirectional",
35 "port-qual": "xpdr-network",
36 "supporting-circuit-pack-name": "CP1-CFP0",
37 "xponder-type": "mpdr",
38 'lcp-hash-val': 'Swfw02qXGyI=',
39 'port-admin-state': 'InService',
40 'port-oper-state': 'InService'}
41 NODE_VERSION = '2.2.1'
45 cls.processes = test_utils.start_tpce()
46 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
49 def tearDownClass(cls):
50 # pylint: disable=not-an-iterable
51 for process in cls.processes:
52 test_utils.shutdown_process(process)
53 print("all processes killed")
58 def test_01_connect_SPDR_SA1(self):
59 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
60 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
63 response = test_utils.get_netconf_oper_request("SPDR-SA1")
64 self.assertEqual(response.status_code, requests.codes.ok)
67 res['node'][0]['netconf-node-topology:connection-status'],
70 def test_02_get_portmapping_CLIENT4(self):
71 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
72 self.assertEqual(response.status_code, requests.codes.ok)
73 res_mapping = (response.json())['mapping'][0]
74 self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
75 self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
76 self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
77 self.assertEqual('bidirectional', res_mapping['port-direction'])
78 self.assertEqual('xpdr-client', res_mapping['port-qual'])
79 self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
80 self.assertEqual('InService', res_mapping['port-admin-state'])
81 self.assertEqual('InService', res_mapping['port-oper-state'])
82 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
83 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
84 self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
86 def test_03_get_portmapping_NETWORK1(self):
87 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
88 self.assertEqual(response.status_code, requests.codes.ok)
91 self.NETWORK1_CHECK_DICT,
94 def test_04_service_path_create_OCH_OTU4(self):
95 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
96 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
97 196.1, 40, 196.075, 196.125, 761,
100 self.assertEqual(response.status_code, requests.codes.ok)
101 res = response.json()
102 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
103 self.assertTrue(res["output"]["success"])
105 {'node-id': 'SPDR-SA1',
106 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
107 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
109 def test_05_get_portmapping_NETWORK1(self):
110 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
111 self.assertEqual(response.status_code, requests.codes.ok)
112 res = response.json()
114 self.NETWORK1_CHECK_DICT,
117 def test_06_check_interface_och(self):
118 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
119 self.assertEqual(response.status_code, requests.codes.ok)
120 res = response.json()
122 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
123 'administrative-state': 'inService',
124 'supporting-circuit-pack-name': 'CP1-CFP0',
125 'type': 'org-openroadm-interfaces:opticalChannel',
126 'supporting-port': 'CP1-CFP0-P1'
130 self.assertDictEqual(
131 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
132 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
133 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
135 def test_07_check_interface_OTU(self):
136 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
137 self.assertEqual(response.status_code, requests.codes.ok)
138 res = response.json()
139 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
140 'administrative-state': 'inService',
141 'supporting-circuit-pack-name': 'CP1-CFP0',
142 'supporting-interface': 'XPDR1-NETWORK1-761:768',
143 'type': 'org-openroadm-interfaces:otnOtu',
144 'supporting-port': 'CP1-CFP0-P1'
147 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
148 'expected-sapi': 'Swfw02qXGyI=',
149 'tx-sapi': 'Swfw02qXGyI=',
150 'expected-dapi': 'Swfw02qXGyI=',
151 'rate': 'org-openroadm-otn-common-types:OTU4',
155 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
158 self.assertDictEqual(input_dict_2,
159 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
161 def test_08_otn_service_path_create_ODU4(self):
162 response = test_utils.otn_service_path_request("create", "service_ODU4", "100", "ODU",
163 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
168 self.assertTrue(res["output"]["success"])
170 {'node-id': 'SPDR-SA1',
171 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
173 def test_09_get_portmapping_NETWORK1(self):
174 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
179 self.NETWORK1_CHECK_DICT,
182 def test_10_check_interface_ODU4(self):
183 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
187 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
188 'type': 'org-openroadm-interfaces:otnOdu',
189 'supporting-port': 'CP1-CFP0-P1'}
190 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
191 'rate': 'org-openroadm-otn-common-types:ODU4',
192 'expected-dapi': 'Swfw02qXGyI=',
193 'expected-sapi': 'Swfw02qXGyI=',
194 'tx-dapi': 'Swfw02qXGyI=',
195 'tx-sapi': 'Swfw02qXGyI='}
197 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
199 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
202 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
204 self.assertDictEqual(
205 {'payload-type': '21', 'exp-payload-type': '21'},
206 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
208 def test_11_otn_service_path_create_10GE(self):
209 response = test_utils.otn_service_path_request("create", "service1", "10", "Ethernet",
210 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
211 "network-tp": "XPDR1-NETWORK1"}],
212 {"ethernet-encoding": "eth encode",
213 "trib-slot": ["1"], "trib-port-number": "1"})
215 self.assertEqual(response.status_code, requests.codes.ok)
216 res = response.json()
217 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
218 self.assertTrue(res["output"]["success"])
219 self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
220 self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
221 res["output"]['node-interface'][0]['connection-id'])
222 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
223 self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
224 self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
226 def test_12_check_interface_10GE_CLIENT(self):
227 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
228 self.assertEqual(response.status_code, requests.codes.ok)
229 res = response.json()
230 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
231 'administrative-state': 'inService',
232 'supporting-circuit-pack-name': 'CP1-SFP4',
233 'type': 'org-openroadm-interfaces:ethernetCsmacd',
234 'supporting-port': 'CP1-SFP4-P1'
236 self.assertDictEqual(dict(res['interface'][0], **input_dict),
238 self.assertDictEqual(
240 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
242 def test_13_check_interface_ODU2E_CLIENT(self):
243 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
244 self.assertEqual(response.status_code, requests.codes.ok)
245 res = response.json()
247 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
248 'administrative-state': 'inService',
249 'supporting-circuit-pack-name': 'CP1-SFP4',
250 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
251 'type': 'org-openroadm-interfaces:otnOdu',
252 'supporting-port': 'CP1-SFP4-P1'}
254 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
255 'rate': 'org-openroadm-otn-common-types:ODU2e',
256 'monitoring-mode': 'terminated'}
258 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
260 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
262 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
263 self.assertDictEqual(
264 {'payload-type': '03', 'exp-payload-type': '03'},
265 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
267 def test_14_check_interface_ODU2E_NETWORK(self):
268 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
269 self.assertEqual(response.status_code, requests.codes.ok)
270 res = response.json()
271 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
272 'supporting-circuit-pack-name': 'CP1-CFP0',
273 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
274 'type': 'org-openroadm-interfaces:otnOdu',
275 'supporting-port': 'CP1-CFP0-P1'}
277 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
278 'rate': 'org-openroadm-otn-common-types:ODU2e',
279 'monitoring-mode': 'monitored'}
281 input_dict_3 = {'trib-port-number': 1}
283 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
285 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
287 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
288 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
289 'parent-odu-allocation'], **input_dict_3
291 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
292 'parent-odu-allocation'])
295 'org-openroadm-otn-odu-interfaces:odu'][
296 'parent-odu-allocation']['trib-slots'])
298 def test_15_check_ODU2E_connection(self):
299 response = test_utils.check_netconf_node_request(
301 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
302 self.assertEqual(response.status_code, requests.codes.ok)
303 res = response.json()
306 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
307 'direction': 'bidirectional'
310 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
311 res['odu-connection'][0])
312 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'},
313 res['odu-connection'][0]['destination'])
314 self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'},
315 res['odu-connection'][0]['source'])
317 def test_16_otn_service_path_delete_10GE(self):
318 response = test_utils.otn_service_path_request("delete", "service1", "10", "Ethernet",
319 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
320 "network-tp": "XPDR1-NETWORK1"}],
321 {"ethernet-encoding": "eth encode",
322 "trib-slot": ["1"], "trib-port-number": "1"})
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 self.assertIn('Request processed', res["output"]["result"])
327 self.assertTrue(res["output"]["success"])
329 def test_17_check_no_ODU2E_connection(self):
330 response = test_utils.check_netconf_node_request(
332 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
333 self.assertEqual(response.status_code, requests.codes.conflict)
335 def test_18_check_no_interface_ODU2E_NETWORK(self):
336 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
337 self.assertEqual(response.status_code, requests.codes.conflict)
339 def test_19_check_no_interface_ODU2E_CLIENT(self):
340 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
341 self.assertEqual(response.status_code, requests.codes.conflict)
343 def test_20_check_no_interface_10GE_CLIENT(self):
344 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
345 self.assertEqual(response.status_code, requests.codes.conflict)
347 def test_21_otn_service_path_delete_ODU4(self):
348 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100", "ODU",
349 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
351 self.assertEqual(response.status_code, requests.codes.ok)
352 res = response.json()
353 self.assertIn('Request processed', res["output"]["result"])
354 self.assertTrue(res["output"]["success"])
356 def test_22_check_no_interface_ODU4(self):
357 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
358 self.assertEqual(response.status_code, requests.codes.conflict)
360 def test_23_service_path_delete_OCH_OTU4(self):
361 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
362 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
363 196.1, 40, 196.075, 196.125, 761,
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 self.assertIn('Request processed', res["output"]["result"])
369 self.assertTrue(res["output"]["success"])
371 def test_24_check_no_interface_OTU4(self):
372 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
373 self.assertEqual(response.status_code, requests.codes.conflict)
375 def test_25_check_no_interface_OCH(self):
376 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
377 self.assertEqual(response.status_code, requests.codes.conflict)
379 def test_26_disconnect_SPDR_SA1(self):
380 response = test_utils.unmount_device("SPDR-SA1")
381 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
384 if __name__ == "__main__":
385 unittest.main(verbosity=2)