3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 from common import test_utils
21 class TransportPCEtesting(unittest.TestCase):
24 NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
25 "supporting-port": "CP1-CFP0-P1",
26 "supported-interface-capability": [
27 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
29 "port-direction": "bidirectional",
30 "port-qual": "xpdr-network",
31 "supporting-circuit-pack-name": "CP1-CFP0",
32 "xponder-type": "mpdr",
33 'lcp-hash-val': 'Swfw02qXGyI=',
34 'port-admin-state': 'InService',
35 'port-oper-state': 'InService'}
39 cls.processes = test_utils.start_tpce()
40 cls.processes = test_utils.start_sims(['spdra'])
43 def tearDownClass(cls):
44 # pylint: disable=not-an-iterable
45 for process in cls.processes:
46 test_utils.shutdown_process(process)
47 print("all processes killed")
52 def test_01_connect_SPDR_SA1(self):
53 response = test_utils.mount_device("SPDR-SA1", 'spdra')
54 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
57 response = test_utils.get_netconf_oper_request("SPDR-SA1")
58 self.assertEqual(response.status_code, requests.codes.ok)
61 res['node'][0]['netconf-node-topology:connection-status'],
64 def test_02_get_portmapping_CLIENT4(self):
65 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
66 self.assertEqual(response.status_code, requests.codes.ok)
67 res_mapping = (response.json())['mapping'][0]
68 self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
69 self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
70 self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
71 self.assertEqual('bidirectional', res_mapping['port-direction'])
72 self.assertEqual('xpdr-client', res_mapping['port-qual'])
73 self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
74 self.assertEqual('InService', res_mapping['port-admin-state'])
75 self.assertEqual('InService', res_mapping['port-oper-state'])
76 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
77 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
78 self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
80 def test_03_get_portmapping_NETWORK1(self):
81 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
82 self.assertEqual(response.status_code, requests.codes.ok)
85 self.NETWORK1_CHECK_DICT,
88 def test_04_service_path_create_OCH_OTU4(self):
89 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
90 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
91 196.1, 40, 196.075, 196.125, 761,
94 self.assertEqual(response.status_code, requests.codes.ok)
96 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
97 self.assertTrue(res["output"]["success"])
99 {'node-id': 'SPDR-SA1',
100 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
101 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
103 def test_05_get_portmapping_NETWORK1(self):
104 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
105 self.assertEqual(response.status_code, requests.codes.ok)
106 res = response.json()
108 self.NETWORK1_CHECK_DICT,
111 def test_06_check_interface_och(self):
112 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
113 self.assertEqual(response.status_code, requests.codes.ok)
114 res = response.json()
116 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
117 'administrative-state': 'inService',
118 'supporting-circuit-pack-name': 'CP1-CFP0',
119 'type': 'org-openroadm-interfaces:opticalChannel',
120 'supporting-port': 'CP1-CFP0-P1'
124 self.assertDictEqual(
125 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
126 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
127 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
129 def test_07_check_interface_OTU(self):
130 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
131 self.assertEqual(response.status_code, requests.codes.ok)
132 res = response.json()
133 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
134 'administrative-state': 'inService',
135 'supporting-circuit-pack-name': 'CP1-CFP0',
136 'supporting-interface': 'XPDR1-NETWORK1-761:768',
137 'type': 'org-openroadm-interfaces:otnOtu',
138 'supporting-port': 'CP1-CFP0-P1'
141 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
142 'expected-sapi': 'Swfw02qXGyI=',
143 'tx-sapi': 'Swfw02qXGyI=',
144 'expected-dapi': 'Swfw02qXGyI=',
145 'rate': 'org-openroadm-otn-common-types:OTU4',
149 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
152 self.assertDictEqual(input_dict_2,
153 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
155 def test_08_otn_service_path_create_ODU4(self):
156 response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
157 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
162 self.assertTrue(res["output"]["success"])
164 {'node-id': 'SPDR-SA1',
165 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
167 def test_09_get_portmapping_NETWORK1(self):
168 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
169 self.assertEqual(response.status_code, requests.codes.ok)
170 res = response.json()
171 self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
173 self.NETWORK1_CHECK_DICT,
176 def test_10_check_interface_ODU4(self):
177 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
181 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
182 'type': 'org-openroadm-interfaces:otnOdu',
183 'supporting-port': 'CP1-CFP0-P1'}
184 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
185 'rate': 'org-openroadm-otn-common-types:ODU4',
186 'expected-dapi': 'Swfw02qXGyI=',
187 'expected-sapi': 'Swfw02qXGyI=',
188 'tx-dapi': 'Swfw02qXGyI=',
189 'tx-sapi': 'Swfw02qXGyI='}
191 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
193 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
196 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
198 self.assertDictEqual(
199 {u'payload-type': u'21', u'exp-payload-type': u'21'},
200 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
202 def test_11_otn_service_path_create_10GE(self):
203 response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
204 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
205 "network-tp": "XPDR1-NETWORK1"}],
206 {"ethernet-encoding": "eth encode",
207 "trib-slot": ["1"], "trib-port-number": "1"})
209 self.assertEqual(response.status_code, requests.codes.ok)
210 res = response.json()
211 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
212 self.assertTrue(res["output"]["success"])
213 self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
214 self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
215 res["output"]['node-interface'][0]['connection-id'])
216 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
217 self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
218 self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
220 def test_12_check_interface_10GE_CLIENT(self):
221 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
225 'administrative-state': 'inService',
226 'supporting-circuit-pack-name': 'CP1-SFP4',
227 'type': 'org-openroadm-interfaces:ethernetCsmacd',
228 'supporting-port': 'CP1-SFP4-P1'
230 self.assertDictEqual(dict(res['interface'][0], **input_dict),
232 self.assertDictEqual(
234 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
236 def test_13_check_interface_ODU2E_CLIENT(self):
237 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
241 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
242 'administrative-state': 'inService',
243 'supporting-circuit-pack-name': 'CP1-SFP4',
244 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
245 'type': 'org-openroadm-interfaces:otnOdu',
246 'supporting-port': 'CP1-SFP4-P1'}
248 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
249 'rate': 'org-openroadm-otn-common-types:ODU2e',
250 'monitoring-mode': 'terminated'}
252 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
254 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
256 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
257 self.assertDictEqual(
258 {u'payload-type': u'03', u'exp-payload-type': u'03'},
259 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
261 def test_14_check_interface_ODU2E_NETWORK(self):
262 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
266 'supporting-circuit-pack-name': 'CP1-CFP0',
267 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
268 'type': 'org-openroadm-interfaces:otnOdu',
269 'supporting-port': 'CP1-CFP0-P1'}
271 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
272 'rate': 'org-openroadm-otn-common-types:ODU2e',
273 'monitoring-mode': 'monitored'}
275 input_dict_3 = {'trib-port-number': 1}
277 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
279 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
281 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
282 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
283 'parent-odu-allocation'], **input_dict_3
285 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
286 'parent-odu-allocation'])
289 'org-openroadm-otn-odu-interfaces:odu'][
290 'parent-odu-allocation']['trib-slots'])
292 def test_15_check_ODU2E_connection(self):
293 response = test_utils.check_netconf_node_request(
295 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
296 self.assertEqual(response.status_code, requests.codes.ok)
297 res = response.json()
300 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
301 'direction': 'bidirectional'
304 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
305 res['odu-connection'][0])
306 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
307 res['odu-connection'][0]['destination'])
308 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
309 res['odu-connection'][0]['source'])
311 def test_16_otn_service_path_delete_10GE(self):
312 response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
313 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
314 "network-tp": "XPDR1-NETWORK1"}],
315 {"ethernet-encoding": "eth encode",
316 "trib-slot": ["1"], "trib-port-number": "1"})
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
320 self.assertIn('Request processed', res["output"]["result"])
321 self.assertTrue(res["output"]["success"])
323 def test_17_check_no_ODU2E_connection(self):
324 response = test_utils.check_netconf_node_request(
326 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
327 self.assertEqual(response.status_code, requests.codes.conflict)
329 def test_18_check_no_interface_ODU2E_NETWORK(self):
330 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
331 self.assertEqual(response.status_code, requests.codes.conflict)
333 def test_19_check_no_interface_ODU2E_CLIENT(self):
334 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
335 self.assertEqual(response.status_code, requests.codes.conflict)
337 def test_20_check_no_interface_10GE_CLIENT(self):
338 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
339 self.assertEqual(response.status_code, requests.codes.conflict)
341 def test_21_otn_service_path_delete_ODU4(self):
342 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
343 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
345 self.assertEqual(response.status_code, requests.codes.ok)
346 res = response.json()
347 self.assertIn('Request processed', res["output"]["result"])
348 self.assertTrue(res["output"]["success"])
350 def test_22_check_no_interface_ODU4(self):
351 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
352 self.assertEqual(response.status_code, requests.codes.conflict)
354 def test_23_service_path_delete_OCH_OTU4(self):
355 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
356 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
357 196.1, 40, 196.075, 196.125, 761,
360 self.assertEqual(response.status_code, requests.codes.ok)
361 res = response.json()
362 self.assertIn('Request processed', res["output"]["result"])
363 self.assertTrue(res["output"]["success"])
365 def test_24_check_no_interface_OTU4(self):
366 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
367 self.assertEqual(response.status_code, requests.codes.conflict)
369 def test_25_check_no_interface_OCH(self):
370 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
371 self.assertEqual(response.status_code, requests.codes.conflict)
373 def test_26_disconnect_SPDR_SA1(self):
374 response = test_utils.unmount_device("SPDR-SA1")
375 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
378 if __name__ == "__main__":
379 unittest.main(verbosity=2)