3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 from common import test_utils
21 class TransportPCEtesting(unittest.TestCase):
27 cls.processes = test_utils.start_tpce()
28 cls.processes = test_utils.start_sims(['spdra'])
31 def tearDownClass(cls):
32 for process in cls.processes:
33 test_utils.shutdown_process(process)
34 print("all processes killed")
39 def test_01_connect_SPDR_SA1(self):
40 response = test_utils.mount_device("SPDR-SA1", 'spdra')
41 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
44 response = test_utils.get_netconf_oper_request("SPDR-SA1")
45 self.assertEqual(response.status_code, requests.codes.ok)
48 res['node'][0]['netconf-node-topology:connection-status'],
51 def test_02_get_portmapping_CLIENT4(self):
52 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
53 self.assertEqual(response.status_code, requests.codes.ok)
54 res_mapping = (response.json())['mapping'][0]
55 self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
56 self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
57 self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
58 self.assertEqual('bidirectional', res_mapping['port-direction'])
59 self.assertEqual('xpdr-client', res_mapping['port-qual'])
60 self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
61 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
62 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
63 self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
65 def test_03_get_portmapping_NETWORK1(self):
66 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
67 self.assertEqual(response.status_code, requests.codes.ok)
70 {"logical-connection-point": "XPDR1-NETWORK1",
71 "supporting-port": "CP1-CFP0-P1",
72 "supported-interface-capability": [
73 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
75 "port-direction": "bidirectional",
76 "port-qual": "xpdr-network",
77 "supporting-circuit-pack-name": "CP1-CFP0",
78 "xponder-type": "mpdr",
79 'lcp-hash-val': 'Swfw02qXGyI='},
82 def test_04_service_path_create_OCH_OTU4(self):
83 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
84 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
86 self.assertEqual(response.status_code, requests.codes.ok)
88 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
89 self.assertTrue(res["output"]["success"])
91 {'node-id': 'SPDR-SA1',
92 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
93 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
95 def test_05_get_portmapping_NETWORK1(self):
96 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
97 self.assertEqual(response.status_code, requests.codes.ok)
100 {"logical-connection-point": "XPDR1-NETWORK1",
101 "supporting-port": "CP1-CFP0-P1",
102 "supported-interface-capability": [
103 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
105 "port-direction": "bidirectional",
106 "port-qual": "xpdr-network",
107 "supporting-circuit-pack-name": "CP1-CFP0",
108 "xponder-type": "mpdr",
109 "lcp-hash-val": "Swfw02qXGyI="},
112 def test_06_check_interface_och(self):
113 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
114 self.assertEqual(response.status_code, requests.codes.ok)
115 res = response.json()
117 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
118 'administrative-state': 'inService',
119 'supporting-circuit-pack-name': 'CP1-CFP0',
120 'type': 'org-openroadm-interfaces:opticalChannel',
121 'supporting-port': 'CP1-CFP0-P1'
125 self.assertDictEqual(
126 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
127 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
128 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
130 def test_07_check_interface_OTU(self):
131 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
132 self.assertEqual(response.status_code, requests.codes.ok)
133 res = response.json()
134 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
135 'administrative-state': 'inService',
136 'supporting-circuit-pack-name': 'CP1-CFP0',
137 'supporting-interface': 'XPDR1-NETWORK1-1',
138 'type': 'org-openroadm-interfaces:otnOtu',
139 'supporting-port': 'CP1-CFP0-P1'
142 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
143 'expected-sapi': 'Swfw02qXGyI=',
144 'tx-sapi': 'Swfw02qXGyI=',
145 'expected-dapi': 'Swfw02qXGyI=',
146 'rate': 'org-openroadm-otn-common-types:OTU4',
150 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
153 self.assertDictEqual(input_dict_2,
154 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
156 def test_08_otn_service_path_create_ODU4(self):
157 response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
158 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
160 self.assertEqual(response.status_code, requests.codes.ok)
161 res = response.json()
162 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
163 self.assertTrue(res["output"]["success"])
165 {'node-id': 'SPDR-SA1',
166 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
168 def test_09_get_portmapping_NETWORK1(self):
169 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
173 {"logical-connection-point": "XPDR1-NETWORK1",
174 "supporting-port": "CP1-CFP0-P1",
175 "supported-interface-capability": [
176 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
178 "port-direction": "bidirectional",
179 "port-qual": "xpdr-network",
180 "supporting-circuit-pack-name": "CP1-CFP0",
181 "xponder-type": "mpdr",
182 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
183 "lcp-hash-val": "Swfw02qXGyI="
187 def test_10_check_interface_ODU4(self):
188 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
192 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
193 'type': 'org-openroadm-interfaces:otnOdu',
194 'supporting-port': 'CP1-CFP0-P1'}
195 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
196 'rate': 'org-openroadm-otn-common-types:ODU4',
197 'expected-dapi': 'Swfw02qXGyI=',
198 'expected-sapi': 'Swfw02qXGyI=',
199 'tx-dapi': 'Swfw02qXGyI=',
200 'tx-sapi': 'Swfw02qXGyI='}
202 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
204 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
207 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
209 self.assertDictEqual(
210 {u'payload-type': u'21', u'exp-payload-type': u'21'},
211 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
213 def test_11_otn_service_path_create_10GE(self):
214 response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
215 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
216 "network-tp": "XPDR1-NETWORK1"}],
217 {"ethernet-encoding": "eth encode",
218 "trib-slot": ["1"], "trib-port-number": "1"})
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
222 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
223 self.assertTrue(res["output"]["success"])
224 self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
225 self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
226 res["output"]['node-interface'][0]['connection-id'])
227 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
228 self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
229 self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
231 def test_12_check_interface_10GE_CLIENT(self):
232 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
236 'administrative-state': 'inService',
237 'supporting-circuit-pack-name': 'CP1-SFP4',
238 'type': 'org-openroadm-interfaces:ethernetCsmacd',
239 'supporting-port': 'CP1-SFP4-P1'
241 self.assertDictEqual(dict(res['interface'][0], **input_dict),
243 self.assertDictEqual(
245 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
247 def test_13_check_interface_ODU2E_CLIENT(self):
248 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
252 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
253 'administrative-state': 'inService',
254 'supporting-circuit-pack-name': 'CP1-SFP4',
255 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
256 'type': 'org-openroadm-interfaces:otnOdu',
257 'supporting-port': 'CP1-SFP4-P1'}
259 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
260 'rate': 'org-openroadm-otn-common-types:ODU2e',
261 'monitoring-mode': 'terminated'}
263 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
265 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
267 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
268 self.assertDictEqual(
269 {u'payload-type': u'03', u'exp-payload-type': u'03'},
270 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
272 def test_14_check_interface_ODU2E_NETWORK(self):
273 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
276 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
277 'supporting-circuit-pack-name': 'CP1-CFP0',
278 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
279 'type': 'org-openroadm-interfaces:otnOdu',
280 'supporting-port': 'CP1-CFP0-P1'}
282 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
283 'rate': 'org-openroadm-otn-common-types:ODU2e',
284 'monitoring-mode': 'monitored'}
286 input_dict_3 = {'trib-port-number': 1}
288 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
290 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
292 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
293 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
294 'parent-odu-allocation'], **input_dict_3
296 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
297 'parent-odu-allocation'])
300 'org-openroadm-otn-odu-interfaces:odu'][
301 'parent-odu-allocation']['trib-slots'])
303 def test_15_check_ODU2E_connection(self):
304 response = test_utils.check_netconf_node_request(
306 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
307 self.assertEqual(response.status_code, requests.codes.ok)
308 res = response.json()
311 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
312 'direction': 'bidirectional'
315 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
316 res['odu-connection'][0])
317 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
318 res['odu-connection'][0]['destination'])
319 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
320 res['odu-connection'][0]['source'])
322 def test_16_otn_service_path_delete_10GE(self):
323 response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
324 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
325 "network-tp": "XPDR1-NETWORK1"}],
326 {"ethernet-encoding": "eth encode",
327 "trib-slot": ["1"], "trib-port-number": "1"})
329 self.assertEqual(response.status_code, requests.codes.ok)
330 res = response.json()
331 self.assertIn('Request processed', res["output"]["result"])
332 self.assertTrue(res["output"]["success"])
334 def test_17_check_no_ODU2E_connection(self):
335 response = test_utils.check_netconf_node_request(
337 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
338 self.assertEqual(response.status_code, requests.codes.conflict)
340 def test_18_check_no_interface_ODU2E_NETWORK(self):
341 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
342 self.assertEqual(response.status_code, requests.codes.conflict)
344 def test_19_check_no_interface_ODU2E_CLIENT(self):
345 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
346 self.assertEqual(response.status_code, requests.codes.conflict)
348 def test_20_check_no_interface_10GE_CLIENT(self):
349 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
350 self.assertEqual(response.status_code, requests.codes.conflict)
352 def test_21_otn_service_path_delete_ODU4(self):
353 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
354 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertIn('Request processed', res["output"]["result"])
359 self.assertTrue(res["output"]["success"])
361 def test_22_check_no_interface_ODU4(self):
362 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
363 self.assertEqual(response.status_code, requests.codes.conflict)
365 def test_23_service_path_delete_OCH_OTU4(self):
366 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
367 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
369 self.assertEqual(response.status_code, requests.codes.ok)
370 res = response.json()
371 self.assertIn('Request processed', res["output"]["result"])
372 self.assertTrue(res["output"]["success"])
374 def test_24_check_no_interface_OTU4(self):
375 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
376 self.assertEqual(response.status_code, requests.codes.conflict)
378 def test_25_check_no_interface_OCH(self):
379 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
380 self.assertEqual(response.status_code, requests.codes.conflict)
382 def test_26_disconnect_SPDR_SA1(self):
383 response = test_utils.unmount_device("SPDR-SA1")
384 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
387 if __name__ == "__main__":
388 unittest.main(verbosity=2)