3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 from common import test_utils
21 class TransportPCEtesting(unittest.TestCase):
27 cls.processes = test_utils.start_tpce()
28 cls.processes = test_utils.start_sims(['spdra'])
31 def tearDownClass(cls):
32 # pylint: disable=not-an-iterable
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
40 def test_01_connect_SPDR_SA1(self):
41 response = test_utils.mount_device("SPDR-SA1", 'spdra')
42 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
45 response = test_utils.get_netconf_oper_request("SPDR-SA1")
46 self.assertEqual(response.status_code, requests.codes.ok)
49 res['node'][0]['netconf-node-topology:connection-status'],
52 def test_02_get_portmapping_CLIENT4(self):
53 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
54 self.assertEqual(response.status_code, requests.codes.ok)
55 res_mapping = (response.json())['mapping'][0]
56 self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
57 self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
58 self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
59 self.assertEqual('bidirectional', res_mapping['port-direction'])
60 self.assertEqual('xpdr-client', res_mapping['port-qual'])
61 self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
62 self.assertEqual('InService', res_mapping['port-admin-state'])
63 self.assertEqual('InService', res_mapping['port-oper-state'])
64 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
65 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
66 self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
68 def test_03_get_portmapping_NETWORK1(self):
69 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
70 self.assertEqual(response.status_code, requests.codes.ok)
73 {"logical-connection-point": "XPDR1-NETWORK1",
74 "supporting-port": "CP1-CFP0-P1",
75 "supported-interface-capability": [
76 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
78 "port-direction": "bidirectional",
79 "port-qual": "xpdr-network",
80 "supporting-circuit-pack-name": "CP1-CFP0",
81 "xponder-type": "mpdr",
82 'lcp-hash-val': 'Swfw02qXGyI=',
83 'port-admin-state': 'InService',
84 'port-oper-state': 'InService'},
87 def test_04_service_path_create_OCH_OTU4(self):
88 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
89 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
90 196.1, 40, 196.075, 196.125, 761,
93 self.assertEqual(response.status_code, requests.codes.ok)
95 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
96 self.assertTrue(res["output"]["success"])
98 {'node-id': 'SPDR-SA1',
99 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
100 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
102 def test_05_get_portmapping_NETWORK1(self):
103 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
104 self.assertEqual(response.status_code, requests.codes.ok)
105 res = response.json()
107 {"logical-connection-point": "XPDR1-NETWORK1",
108 "supporting-port": "CP1-CFP0-P1",
109 "supported-interface-capability": [
110 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
112 "port-direction": "bidirectional",
113 "port-qual": "xpdr-network",
114 "supporting-circuit-pack-name": "CP1-CFP0",
115 "xponder-type": "mpdr",
116 "lcp-hash-val": "Swfw02qXGyI=",
117 "port-admin-state": "InService",
118 "port-oper-state": "InService"},
121 def test_06_check_interface_och(self):
122 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
126 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
127 'administrative-state': 'inService',
128 'supporting-circuit-pack-name': 'CP1-CFP0',
129 'type': 'org-openroadm-interfaces:opticalChannel',
130 'supporting-port': 'CP1-CFP0-P1'
134 self.assertDictEqual(
135 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
136 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
137 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
139 def test_07_check_interface_OTU(self):
140 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
141 self.assertEqual(response.status_code, requests.codes.ok)
142 res = response.json()
143 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
144 'administrative-state': 'inService',
145 'supporting-circuit-pack-name': 'CP1-CFP0',
146 'supporting-interface': 'XPDR1-NETWORK1-761:768',
147 'type': 'org-openroadm-interfaces:otnOtu',
148 'supporting-port': 'CP1-CFP0-P1'
151 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
152 'expected-sapi': 'Swfw02qXGyI=',
153 'tx-sapi': 'Swfw02qXGyI=',
154 'expected-dapi': 'Swfw02qXGyI=',
155 'rate': 'org-openroadm-otn-common-types:OTU4',
159 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
162 self.assertDictEqual(input_dict_2,
163 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
165 def test_08_otn_service_path_create_ODU4(self):
166 response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
167 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
169 self.assertEqual(response.status_code, requests.codes.ok)
170 res = response.json()
171 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
172 self.assertTrue(res["output"]["success"])
174 {'node-id': 'SPDR-SA1',
175 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
177 def test_09_get_portmapping_NETWORK1(self):
178 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
182 {"logical-connection-point": "XPDR1-NETWORK1",
183 "supporting-port": "CP1-CFP0-P1",
184 "supported-interface-capability": [
185 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
187 "port-direction": "bidirectional",
188 "port-qual": "xpdr-network",
189 "supporting-circuit-pack-name": "CP1-CFP0",
190 "xponder-type": "mpdr",
191 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
192 "lcp-hash-val": "Swfw02qXGyI=",
193 "port-admin-state": "InService",
194 "port-oper-state": "InService"
198 def test_10_check_interface_ODU4(self):
199 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
200 self.assertEqual(response.status_code, requests.codes.ok)
201 res = response.json()
202 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
203 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
204 'type': 'org-openroadm-interfaces:otnOdu',
205 'supporting-port': 'CP1-CFP0-P1'}
206 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
207 'rate': 'org-openroadm-otn-common-types:ODU4',
208 'expected-dapi': 'Swfw02qXGyI=',
209 'expected-sapi': 'Swfw02qXGyI=',
210 'tx-dapi': 'Swfw02qXGyI=',
211 'tx-sapi': 'Swfw02qXGyI='}
213 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
215 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
218 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
220 self.assertDictEqual(
221 {u'payload-type': u'21', u'exp-payload-type': u'21'},
222 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
224 def test_11_otn_service_path_create_10GE(self):
225 response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
226 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
227 "network-tp": "XPDR1-NETWORK1"}],
228 {"ethernet-encoding": "eth encode",
229 "trib-slot": ["1"], "trib-port-number": "1"})
231 self.assertEqual(response.status_code, requests.codes.ok)
232 res = response.json()
233 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
234 self.assertTrue(res["output"]["success"])
235 self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
236 self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
237 res["output"]['node-interface'][0]['connection-id'])
238 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
239 self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
240 self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
242 def test_12_check_interface_10GE_CLIENT(self):
243 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
244 self.assertEqual(response.status_code, requests.codes.ok)
245 res = response.json()
246 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
247 'administrative-state': 'inService',
248 'supporting-circuit-pack-name': 'CP1-SFP4',
249 'type': 'org-openroadm-interfaces:ethernetCsmacd',
250 'supporting-port': 'CP1-SFP4-P1'
252 self.assertDictEqual(dict(res['interface'][0], **input_dict),
254 self.assertDictEqual(
256 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
258 def test_13_check_interface_ODU2E_CLIENT(self):
259 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
260 self.assertEqual(response.status_code, requests.codes.ok)
261 res = response.json()
263 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
264 'administrative-state': 'inService',
265 'supporting-circuit-pack-name': 'CP1-SFP4',
266 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
267 'type': 'org-openroadm-interfaces:otnOdu',
268 'supporting-port': 'CP1-SFP4-P1'}
270 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
271 'rate': 'org-openroadm-otn-common-types:ODU2e',
272 'monitoring-mode': 'terminated'}
274 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
276 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
278 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
279 self.assertDictEqual(
280 {u'payload-type': u'03', u'exp-payload-type': u'03'},
281 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
283 def test_14_check_interface_ODU2E_NETWORK(self):
284 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
288 'supporting-circuit-pack-name': 'CP1-CFP0',
289 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
290 'type': 'org-openroadm-interfaces:otnOdu',
291 'supporting-port': 'CP1-CFP0-P1'}
293 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
294 'rate': 'org-openroadm-otn-common-types:ODU2e',
295 'monitoring-mode': 'monitored'}
297 input_dict_3 = {'trib-port-number': 1}
299 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
301 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
303 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
304 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
305 'parent-odu-allocation'], **input_dict_3
307 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
308 'parent-odu-allocation'])
311 'org-openroadm-otn-odu-interfaces:odu'][
312 'parent-odu-allocation']['trib-slots'])
314 def test_15_check_ODU2E_connection(self):
315 response = test_utils.check_netconf_node_request(
317 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
322 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
323 'direction': 'bidirectional'
326 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
327 res['odu-connection'][0])
328 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
329 res['odu-connection'][0]['destination'])
330 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
331 res['odu-connection'][0]['source'])
333 def test_16_otn_service_path_delete_10GE(self):
334 response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
335 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
336 "network-tp": "XPDR1-NETWORK1"}],
337 {"ethernet-encoding": "eth encode",
338 "trib-slot": ["1"], "trib-port-number": "1"})
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 self.assertIn('Request processed', res["output"]["result"])
343 self.assertTrue(res["output"]["success"])
345 def test_17_check_no_ODU2E_connection(self):
346 response = test_utils.check_netconf_node_request(
348 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
349 self.assertEqual(response.status_code, requests.codes.conflict)
351 def test_18_check_no_interface_ODU2E_NETWORK(self):
352 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
353 self.assertEqual(response.status_code, requests.codes.conflict)
355 def test_19_check_no_interface_ODU2E_CLIENT(self):
356 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
357 self.assertEqual(response.status_code, requests.codes.conflict)
359 def test_20_check_no_interface_10GE_CLIENT(self):
360 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
361 self.assertEqual(response.status_code, requests.codes.conflict)
363 def test_21_otn_service_path_delete_ODU4(self):
364 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
365 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
367 self.assertEqual(response.status_code, requests.codes.ok)
368 res = response.json()
369 self.assertIn('Request processed', res["output"]["result"])
370 self.assertTrue(res["output"]["success"])
372 def test_22_check_no_interface_ODU4(self):
373 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
374 self.assertEqual(response.status_code, requests.codes.conflict)
376 def test_23_service_path_delete_OCH_OTU4(self):
377 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
378 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
379 196.1, 40, 196.075, 196.125, 761,
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
384 self.assertIn('Request processed', res["output"]["result"])
385 self.assertTrue(res["output"]["success"])
387 def test_24_check_no_interface_OTU4(self):
388 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
389 self.assertEqual(response.status_code, requests.codes.conflict)
391 def test_25_check_no_interface_OCH(self):
392 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
393 self.assertEqual(response.status_code, requests.codes.conflict)
395 def test_26_disconnect_SPDR_SA1(self):
396 response = test_utils.unmount_device("SPDR-SA1")
397 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
400 if __name__ == "__main__":
401 unittest.main(verbosity=2)