3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['spdra'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
37 def test_01_connect_SPDR_SA1(self):
38 response = test_utils.mount_device("SPDR-SA1", 'spdra')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 response = test_utils.get_netconf_oper_request("SPDR-SA1")
43 self.assertEqual(response.status_code, requests.codes.ok)
46 res['node'][0]['netconf-node-topology:connection-status'],
49 def test_02_get_portmapping_CLIENT4(self):
50 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
51 self.assertEqual(response.status_code, requests.codes.ok)
53 self.assertEqual('CP1-SFP4-P1', res['mapping'][0]['supporting-port'])
54 self.assertEqual('CP1-SFP4', res['mapping'][0]['supporting-circuit-pack-name'])
55 self.assertEqual('XPDR1-CLIENT4', res['mapping'][0]['logical-connection-point'])
56 self.assertEqual('bidirectional', res['mapping'][0]['port-direction'])
57 self.assertEqual('xpdr-client', res['mapping'][0]['port-qual'])
58 self.assertEqual('FqlcrxV7p3g=', res['mapping'][0]['lcp-hash-val'])
59 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res['mapping'][0]['supported-interface-capability'])
60 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res['mapping'][0]['supported-interface-capability'])
61 self.assertIn('org-openroadm-port-types:if-10GE', res['mapping'][0]['supported-interface-capability'])
63 def test_03_get_portmapping_NETWORK1(self):
64 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
65 self.assertEqual(response.status_code, requests.codes.ok)
68 {"logical-connection-point": "XPDR1-NETWORK1",
69 "supporting-port": "CP1-CFP0-P1",
70 "supported-interface-capability": [
71 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
73 "port-direction": "bidirectional",
74 "port-qual": "xpdr-network",
75 "supporting-circuit-pack-name": "CP1-CFP0",
76 "xponder-type": "mpdr",
77 'lcp-hash-val': 'Swfw02qXGyI='},
80 def test_04_service_path_create_OCH_OTU4(self):
81 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
82 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
84 self.assertEqual(response.status_code, requests.codes.ok)
86 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
87 self.assertTrue(res["output"]["success"])
89 {'node-id': 'SPDR-SA1',
90 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
91 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
93 def test_05_get_portmapping_NETWORK1(self):
94 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
95 self.assertEqual(response.status_code, requests.codes.ok)
98 {"logical-connection-point": "XPDR1-NETWORK1",
99 "supporting-port": "CP1-CFP0-P1",
100 "supported-interface-capability": [
101 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
103 "port-direction": "bidirectional",
104 "port-qual": "xpdr-network",
105 "supporting-circuit-pack-name": "CP1-CFP0",
106 "xponder-type": "mpdr",
107 "lcp-hash-val": "Swfw02qXGyI="},
110 def test_06_check_interface_och(self):
111 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
112 self.assertEqual(response.status_code, requests.codes.ok)
113 res = response.json()
115 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
116 'administrative-state': 'inService',
117 'supporting-circuit-pack-name': 'CP1-CFP0',
118 'type': 'org-openroadm-interfaces:opticalChannel',
119 'supporting-port': 'CP1-CFP0-P1'
123 self.assertDictEqual(
124 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
125 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
126 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
128 def test_07_check_interface_OTU(self):
129 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
130 self.assertEqual(response.status_code, requests.codes.ok)
131 res = response.json()
132 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
133 'administrative-state': 'inService',
134 'supporting-circuit-pack-name': 'CP1-CFP0',
135 'supporting-interface': 'XPDR1-NETWORK1-1',
136 'type': 'org-openroadm-interfaces:otnOtu',
137 'supporting-port': 'CP1-CFP0-P1'
140 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
141 'expected-sapi': 'Swfw02qXGyI=',
142 'tx-sapi': 'Swfw02qXGyI=',
143 'expected-dapi': 'Swfw02qXGyI=',
144 'rate': 'org-openroadm-otn-common-types:OTU4',
148 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
151 self.assertDictEqual(input_dict_2,
152 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
154 def test_08_otn_service_path_create_ODU4(self):
155 response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
156 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
158 self.assertEqual(response.status_code, requests.codes.ok)
159 res = response.json()
160 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
161 self.assertTrue(res["output"]["success"])
163 {'node-id': 'SPDR-SA1',
164 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
166 def test_09_get_portmapping_NETWORK1(self):
167 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
171 {"logical-connection-point": "XPDR1-NETWORK1",
172 "supporting-port": "CP1-CFP0-P1",
173 "supported-interface-capability": [
174 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
176 "port-direction": "bidirectional",
177 "port-qual": "xpdr-network",
178 "supporting-circuit-pack-name": "CP1-CFP0",
179 "xponder-type": "mpdr",
180 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
181 "lcp-hash-val": "Swfw02qXGyI="
185 def test_10_check_interface_ODU4(self):
186 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
189 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
190 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
191 'type': 'org-openroadm-interfaces:otnOdu',
192 'supporting-port': 'CP1-CFP0-P1'}
193 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
194 'rate': 'org-openroadm-otn-common-types:ODU4',
195 'expected-dapi': 'Swfw02qXGyI=',
196 'expected-sapi': 'Swfw02qXGyI=',
197 'tx-dapi': 'Swfw02qXGyI=',
198 'tx-sapi': 'Swfw02qXGyI='}
200 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
202 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
205 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
207 self.assertDictEqual(
208 {u'payload-type': u'21', u'exp-payload-type': u'21'},
209 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
211 def test_11_otn_service_path_create_10GE(self):
212 response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
213 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
214 "network-tp": "XPDR1-NETWORK1"}],
215 {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
217 self.assertEqual(response.status_code, requests.codes.ok)
218 res = response.json()
219 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
220 self.assertTrue(res["output"]["success"])
221 self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
222 self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
223 res["output"]['node-interface'][0]['connection-id'])
224 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
225 self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
226 self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
228 def test_12_check_interface_10GE_CLIENT(self):
229 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
232 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
233 'administrative-state': 'inService',
234 'supporting-circuit-pack-name': 'CP1-SFP4',
235 'type': 'org-openroadm-interfaces:ethernetCsmacd',
236 'supporting-port': 'CP1-SFP4-P1'
238 self.assertDictEqual(dict(res['interface'][0], **input_dict),
240 self.assertDictEqual(
242 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
244 def test_13_check_interface_ODU2E_CLIENT(self):
245 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
249 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
250 'administrative-state': 'inService',
251 'supporting-circuit-pack-name': 'CP1-SFP4',
252 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
253 'type': 'org-openroadm-interfaces:otnOdu',
254 'supporting-port': 'CP1-SFP4-P1'}
256 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
257 'rate': 'org-openroadm-otn-common-types:ODU2e',
258 'monitoring-mode': 'terminated'}
260 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
262 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
264 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
265 self.assertDictEqual(
266 {u'payload-type': u'03', u'exp-payload-type': u'03'},
267 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
269 def test_14_check_interface_ODU2E_NETWORK(self):
270 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
274 'supporting-circuit-pack-name': 'CP1-CFP0',
275 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
276 'type': 'org-openroadm-interfaces:otnOdu',
277 'supporting-port': 'CP1-CFP0-P1'}
279 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
280 'rate': 'org-openroadm-otn-common-types:ODU2e',
281 'monitoring-mode': 'monitored'}
283 input_dict_3 = {'trib-port-number': 1}
285 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
287 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
289 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
290 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
291 'parent-odu-allocation'], **input_dict_3
293 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
294 'parent-odu-allocation'])
297 'org-openroadm-otn-odu-interfaces:odu'][
298 'parent-odu-allocation']['trib-slots'])
300 def test_15_check_ODU2E_connection(self):
301 response = test_utils.check_netconf_node_request(
303 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
304 self.assertEqual(response.status_code, requests.codes.ok)
305 res = response.json()
308 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
309 'direction': 'bidirectional'
312 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
313 res['odu-connection'][0])
314 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
315 res['odu-connection'][0]['destination'])
316 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
317 res['odu-connection'][0]['source'])
319 def test_16_otn_service_path_delete_10GE(self):
320 response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
321 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
322 "network-tp": "XPDR1-NETWORK1"}],
323 {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
325 self.assertEqual(response.status_code, requests.codes.ok)
326 res = response.json()
327 self.assertIn('Request processed', res["output"]["result"])
328 self.assertTrue(res["output"]["success"])
330 def test_17_check_no_ODU2E_connection(self):
331 response = test_utils.check_netconf_node_request(
333 "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
334 self.assertEqual(response.status_code, requests.codes.conflict)
336 def test_18_check_no_interface_ODU2E_NETWORK(self):
337 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
338 self.assertEqual(response.status_code, requests.codes.conflict)
340 def test_19_check_no_interface_ODU2E_CLIENT(self):
341 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
342 self.assertEqual(response.status_code, requests.codes.conflict)
344 def test_20_check_no_interface_10GE_CLIENT(self):
345 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
346 self.assertEqual(response.status_code, requests.codes.conflict)
348 def test_21_otn_service_path_delete_ODU4(self):
349 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
350 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
352 self.assertEqual(response.status_code, requests.codes.ok)
353 res = response.json()
354 self.assertIn('Request processed', res["output"]["result"])
355 self.assertTrue(res["output"]["success"])
357 def test_22_check_no_interface_ODU4(self):
358 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
359 self.assertEqual(response.status_code, requests.codes.conflict)
361 def test_23_service_path_delete_OCH_OTU4(self):
362 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
363 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 self.assertIn('Request processed', res["output"]["result"])
368 self.assertTrue(res["output"]["success"])
370 def test_24_check_no_interface_OTU4(self):
371 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
372 self.assertEqual(response.status_code, requests.codes.conflict)
374 def test_25_check_no_interface_OCH(self):
375 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
376 self.assertEqual(response.status_code, requests.codes.conflict)
378 def test_26_disconnect_SPDR_SA1(self):
379 response = test_utils.unmount_device("SPDR-SA1")
380 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
383 if __name__ == "__main__":
384 unittest.main(verbosity=2)