3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
20 import test_utils # nopep8
23 class TransportPCEtesting(unittest.TestCase):
26 NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
27 "supporting-port": "CP1-CFP0-P1",
28 "supported-interface-capability": [
29 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
31 "port-direction": "bidirectional",
32 "port-qual": "xpdr-network",
33 "supporting-circuit-pack-name": "CP1-CFP0",
34 "xponder-type": "mpdr",
35 'lcp-hash-val': 'Swfw02qXGyI=',
36 'port-admin-state': 'InService',
37 'port-oper-state': 'InService'}
38 NODE_VERSION = '2.2.1'
42 cls.processes = test_utils.start_tpce()
43 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
46 def tearDownClass(cls):
47 # pylint: disable=not-an-iterable
48 for process in cls.processes:
49 test_utils.shutdown_process(process)
50 print("all processes killed")
55 def test_01_connect_SPDR_SA1(self):
56 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
57 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
60 response = test_utils.get_netconf_oper_request("SPDR-SA1")
61 self.assertEqual(response.status_code, requests.codes.ok)
64 res['node'][0]['netconf-node-topology:connection-status'],
67 def test_02_get_portmapping_CLIENT4(self):
68 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
69 self.assertEqual(response.status_code, requests.codes.ok)
70 res_mapping = (response.json())['mapping'][0]
71 self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
72 self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
73 self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
74 self.assertEqual('bidirectional', res_mapping['port-direction'])
75 self.assertEqual('xpdr-client', res_mapping['port-qual'])
76 self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
77 self.assertEqual('InService', res_mapping['port-admin-state'])
78 self.assertEqual('InService', res_mapping['port-oper-state'])
79 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
80 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
81 self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
83 def test_03_get_portmapping_NETWORK1(self):
84 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
85 self.assertEqual(response.status_code, requests.codes.ok)
88 self.NETWORK1_CHECK_DICT,
91 def test_04_service_path_create_OCH_OTU4(self):
92 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
93 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
94 196.1, 40, 196.075, 196.125, 761,
97 self.assertEqual(response.status_code, requests.codes.ok)
99 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
100 self.assertTrue(res["output"]["success"])
102 {'node-id': 'SPDR-SA1',
103 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
104 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
106 def test_05_get_portmapping_NETWORK1(self):
107 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
111 self.NETWORK1_CHECK_DICT,
114 def test_06_check_interface_och(self):
115 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
116 self.assertEqual(response.status_code, requests.codes.ok)
117 res = response.json()
119 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
120 'administrative-state': 'inService',
121 'supporting-circuit-pack-name': 'CP1-CFP0',
122 'type': 'org-openroadm-interfaces:opticalChannel',
123 'supporting-port': 'CP1-CFP0-P1'
127 self.assertDictEqual(
128 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
129 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
130 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
132 def test_07_check_interface_OTU(self):
133 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
134 self.assertEqual(response.status_code, requests.codes.ok)
135 res = response.json()
136 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
137 'administrative-state': 'inService',
138 'supporting-circuit-pack-name': 'CP1-CFP0',
139 'supporting-interface': 'XPDR1-NETWORK1-761:768',
140 'type': 'org-openroadm-interfaces:otnOtu',
141 'supporting-port': 'CP1-CFP0-P1'
144 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
145 'expected-sapi': 'Swfw02qXGyI=',
146 'tx-sapi': 'Swfw02qXGyI=',
147 'expected-dapi': 'Swfw02qXGyI=',
148 'rate': 'org-openroadm-otn-common-types:OTU4',
152 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
155 self.assertDictEqual(input_dict_2,
156 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
158 def test_08_otn_service_path_create_ODU4(self):
159 response = test_utils.otn_service_path_request("create", "service_ODU4", "100", "ODU",
160 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
165 self.assertTrue(res["output"]["success"])
167 {'node-id': 'SPDR-SA1',
168 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
170 def test_09_get_portmapping_NETWORK1(self):
171 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
176 self.NETWORK1_CHECK_DICT,
179 def test_10_check_interface_ODU4(self):
180 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
183 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
184 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
185 'type': 'org-openroadm-interfaces:otnOdu',
186 'supporting-port': 'CP1-CFP0-P1'}
187 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
188 'rate': 'org-openroadm-otn-common-types:ODU4',
189 'expected-dapi': 'Swfw02qXGyI=',
190 'expected-sapi': 'Swfw02qXGyI=',
191 'tx-dapi': 'Swfw02qXGyI=',
192 'tx-sapi': 'Swfw02qXGyI='}
194 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
196 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
199 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
201 self.assertDictEqual(
202 {'payload-type': '21', 'exp-payload-type': '21'},
203 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
205 def test_11_otn_service_path_create_10GE(self):
206 response = test_utils.otn_service_path_request("create", "service1", "10", "Ethernet",
207 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
208 "network-tp": "XPDR1-NETWORK1"}],
209 {"ethernet-encoding": "eth encode",
210 "trib-slot": ["1"], "trib-port-number": "1"})
212 self.assertEqual(response.status_code, requests.codes.ok)
213 res = response.json()
214 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
215 self.assertTrue(res["output"]["success"])
216 self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
217 self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
218 res["output"]['node-interface'][0]['connection-id'])
219 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
220 self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
221 self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
223 def test_12_check_interface_10GE_CLIENT(self):
224 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
228 'administrative-state': 'inService',
229 'supporting-circuit-pack-name': 'CP1-SFP4',
230 'type': 'org-openroadm-interfaces:ethernetCsmacd',
231 'supporting-port': 'CP1-SFP4-P1'
233 self.assertDictEqual(dict(res['interface'][0], **input_dict),
235 self.assertDictEqual(
237 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
239 def test_13_check_interface_ODU2E_CLIENT(self):
240 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
241 self.assertEqual(response.status_code, requests.codes.ok)
242 res = response.json()
244 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
245 'administrative-state': 'inService',
246 'supporting-circuit-pack-name': 'CP1-SFP4',
247 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
248 'type': 'org-openroadm-interfaces:otnOdu',
249 'supporting-port': 'CP1-SFP4-P1'}
251 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
252 'rate': 'org-openroadm-otn-common-types:ODU2e',
253 'monitoring-mode': 'terminated'}
255 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
257 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
259 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
260 self.assertDictEqual(
261 {'payload-type': '03', 'exp-payload-type': '03'},
262 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
264 def test_14_check_interface_ODU2E_NETWORK(self):
265 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
266 self.assertEqual(response.status_code, requests.codes.ok)
267 res = response.json()
268 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
269 'supporting-circuit-pack-name': 'CP1-CFP0',
270 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
271 'type': 'org-openroadm-interfaces:otnOdu',
272 'supporting-port': 'CP1-CFP0-P1'}
274 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
275 'rate': 'org-openroadm-otn-common-types:ODU2e',
276 'monitoring-mode': 'monitored'}
278 input_dict_3 = {'trib-port-number': 1}
280 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
282 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
284 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
285 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
286 'parent-odu-allocation'], **input_dict_3
288 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
289 'parent-odu-allocation'])
292 'org-openroadm-otn-odu-interfaces:odu'][
293 'parent-odu-allocation']['trib-slots'])
295 def test_15_check_ODU2E_connection(self):
296 response = test_utils.check_netconf_node_request(
298 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
303 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
304 'direction': 'bidirectional'
307 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
308 res['odu-connection'][0])
309 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'},
310 res['odu-connection'][0]['destination'])
311 self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'},
312 res['odu-connection'][0]['source'])
314 def test_16_otn_service_path_delete_10GE(self):
315 response = test_utils.otn_service_path_request("delete", "service1", "10", "Ethernet",
316 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
317 "network-tp": "XPDR1-NETWORK1"}],
318 {"ethernet-encoding": "eth encode",
319 "trib-slot": ["1"], "trib-port-number": "1"})
321 self.assertEqual(response.status_code, requests.codes.ok)
322 res = response.json()
323 self.assertIn('Request processed', res["output"]["result"])
324 self.assertTrue(res["output"]["success"])
326 def test_17_check_no_ODU2E_connection(self):
327 response = test_utils.check_netconf_node_request(
329 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
330 self.assertEqual(response.status_code, requests.codes.conflict)
332 def test_18_check_no_interface_ODU2E_NETWORK(self):
333 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
334 self.assertEqual(response.status_code, requests.codes.conflict)
336 def test_19_check_no_interface_ODU2E_CLIENT(self):
337 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
338 self.assertEqual(response.status_code, requests.codes.conflict)
340 def test_20_check_no_interface_10GE_CLIENT(self):
341 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
342 self.assertEqual(response.status_code, requests.codes.conflict)
344 def test_21_otn_service_path_delete_ODU4(self):
345 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100", "ODU",
346 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
348 self.assertEqual(response.status_code, requests.codes.ok)
349 res = response.json()
350 self.assertIn('Request processed', res["output"]["result"])
351 self.assertTrue(res["output"]["success"])
353 def test_22_check_no_interface_ODU4(self):
354 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
355 self.assertEqual(response.status_code, requests.codes.conflict)
357 def test_23_service_path_delete_OCH_OTU4(self):
358 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
359 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
360 196.1, 40, 196.075, 196.125, 761,
363 self.assertEqual(response.status_code, requests.codes.ok)
364 res = response.json()
365 self.assertIn('Request processed', res["output"]["result"])
366 self.assertTrue(res["output"]["success"])
368 def test_24_check_no_interface_OTU4(self):
369 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
370 self.assertEqual(response.status_code, requests.codes.conflict)
372 def test_25_check_no_interface_OCH(self):
373 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
374 self.assertEqual(response.status_code, requests.codes.conflict)
376 def test_26_disconnect_SPDR_SA1(self):
377 response = test_utils.unmount_device("SPDR-SA1")
378 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
381 if __name__ == "__main__":
382 unittest.main(verbosity=2)