3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['spdra'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
37 def test_01_connect_SPDR_SA1(self):
38 response = test_utils.mount_device("SPDR-SA1", 'spdra')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 response = test_utils.get_netconf_oper_request("SPDR-SA1")
43 self.assertEqual(response.status_code, requests.codes.ok)
46 res['node'][0]['netconf-node-topology:connection-status'],
49 def test_02_get_portmapping_CLIENT1(self):
50 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT1")
51 self.assertEqual(response.status_code, requests.codes.ok)
52 res_mapping = (response.json())['mapping'][0]
53 self.assertSetEqual({'org-openroadm-port-types:if-10GE-ODU2e',
54 'org-openroadm-port-types:if-10GE-ODU2',
55 'org-openroadm-port-types:if-10GE'},
56 set(res_mapping['supported-interface-capability']))
59 {'supporting-port': 'CP1-SFP4-P1',
60 'supporting-circuit-pack-name': 'CP1-SFP4',
61 'logical-connection-point': 'XPDR1-CLIENT1',
62 'port-direction': 'bidirectional',
63 'port-qual': 'xpdr-client',
64 'lcp-hash-val': 'FqlcrxV7p30='},
68 def test_03_get_portmapping_NETWORK1(self):
69 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
70 self.assertEqual(response.status_code, requests.codes.ok)
73 {"logical-connection-point": "XPDR1-NETWORK1",
74 "supporting-port": "CP1-CFP0-P1",
75 "supported-interface-capability": [
76 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
78 "port-direction": "bidirectional",
79 "port-qual": "xpdr-network",
80 "supporting-circuit-pack-name": "CP1-CFP0",
81 "xponder-type": "mpdr",
82 'lcp-hash-val': 'Swfw02qXGyI='},
85 def test_04_service_path_create_OCH_OTU4(self):
86 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
87 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
89 self.assertEqual(response.status_code, requests.codes.ok)
91 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
92 self.assertTrue(res["output"]["success"])
94 {'node-id': 'SPDR-SA1',
95 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
96 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
98 def test_05_get_portmapping_NETWORK1(self):
99 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
100 self.assertEqual(response.status_code, requests.codes.ok)
101 res = response.json()
103 {"logical-connection-point": "XPDR1-NETWORK1",
104 "supporting-port": "CP1-CFP0-P1",
105 "supported-interface-capability": [
106 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
108 "port-direction": "bidirectional",
109 "port-qual": "xpdr-network",
110 "supporting-circuit-pack-name": "CP1-CFP0",
111 "xponder-type": "mpdr",
112 "lcp-hash-val": "Swfw02qXGyI="},
115 def test_06_check_interface_och(self):
116 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
117 self.assertEqual(response.status_code, requests.codes.ok)
118 res = response.json()
120 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
121 'administrative-state': 'inService',
122 'supporting-circuit-pack-name': 'CP1-CFP0',
123 'type': 'org-openroadm-interfaces:opticalChannel',
124 'supporting-port': 'CP1-CFP0-P1'
128 self.assertDictEqual(
129 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
130 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
131 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
133 def test_07_check_interface_OTU(self):
134 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
137 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
138 'administrative-state': 'inService',
139 'supporting-circuit-pack-name': 'CP1-CFP0',
140 'supporting-interface': 'XPDR1-NETWORK1-1',
141 'type': 'org-openroadm-interfaces:otnOtu',
142 'supporting-port': 'CP1-CFP0-P1'
145 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
146 'expected-sapi': 'Swfw02qXGyI=',
147 'tx-sapi': 'Swfw02qXGyI=',
148 'expected-dapi': 'Swfw02qXGyI=',
149 'rate': 'org-openroadm-otn-common-types:OTU4',
153 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
156 self.assertDictEqual(input_dict_2,
157 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
159 def test_08_otn_service_path_create_ODU4(self):
160 response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
161 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
165 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
166 self.assertTrue(res["output"]["success"])
168 {'node-id': 'SPDR-SA1',
169 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
171 def test_09_get_portmapping_NETWORK1(self):
172 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
173 self.assertEqual(response.status_code, requests.codes.ok)
174 res = response.json()
176 {"logical-connection-point": "XPDR1-NETWORK1",
177 "supporting-port": "CP1-CFP0-P1",
178 "supported-interface-capability": [
179 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
181 "port-direction": "bidirectional",
182 "port-qual": "xpdr-network",
183 "supporting-circuit-pack-name": "CP1-CFP0",
184 "xponder-type": "mpdr",
185 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
186 "lcp-hash-val": "Swfw02qXGyI="
190 def test_10_check_interface_ODU4(self):
191 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
192 self.assertEqual(response.status_code, requests.codes.ok)
193 res = response.json()
194 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
195 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
196 'type': 'org-openroadm-interfaces:otnOdu',
197 'supporting-port': 'CP1-CFP0-P1'}
198 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
199 'rate': 'org-openroadm-otn-common-types:ODU4',
200 'expected-dapi': 'Swfw02qXGyI=',
201 'expected-sapi': 'Swfw02qXGyI=',
202 'tx-dapi': 'Swfw02qXGyI=',
203 'tx-sapi': 'Swfw02qXGyI='}
205 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
207 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
210 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
212 self.assertDictEqual(
213 {u'payload-type': u'21', u'exp-payload-type': u'21'},
214 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
216 def test_11_otn_service_path_create_10GE(self):
217 response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
218 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
219 "network-tp": "XPDR1-NETWORK1"}],
220 {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
225 self.assertTrue(res["output"]["success"])
227 {'node-id': 'SPDR-SA1',
228 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
229 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
230 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
232 def test_12_check_interface_10GE_CLIENT(self):
233 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
236 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
237 'administrative-state': 'inService',
238 'supporting-circuit-pack-name': 'CP1-SFP4',
239 'type': 'org-openroadm-interfaces:ethernetCsmacd',
240 'supporting-port': 'CP1-SFP4-P1'
242 self.assertDictEqual(dict(res['interface'][0], **input_dict),
244 self.assertDictEqual(
246 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
248 def test_13_check_interface_ODU2E_CLIENT(self):
249 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
253 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
254 'administrative-state': 'inService',
255 'supporting-circuit-pack-name': 'CP1-SFP4',
256 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
257 'type': 'org-openroadm-interfaces:otnOdu',
258 'supporting-port': 'CP1-SFP4-P1'}
260 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
261 'rate': 'org-openroadm-otn-common-types:ODU2e',
262 'monitoring-mode': 'terminated'}
264 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
266 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
268 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
269 self.assertDictEqual(
270 {u'payload-type': u'03', u'exp-payload-type': u'03'},
271 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
273 def test_14_check_interface_ODU2E_NETWORK(self):
274 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
277 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
278 'supporting-circuit-pack-name': 'CP1-CFP0',
279 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
280 'type': 'org-openroadm-interfaces:otnOdu',
281 'supporting-port': 'CP1-CFP0-P1'}
283 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
284 'rate': 'org-openroadm-otn-common-types:ODU2e',
285 'monitoring-mode': 'monitored'}
287 input_dict_3 = {'trib-port-number': 1}
289 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
291 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
293 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
294 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
295 'parent-odu-allocation'], **input_dict_3
297 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
298 'parent-odu-allocation'])
301 'org-openroadm-otn-odu-interfaces:odu'][
302 'parent-odu-allocation']['trib-slots'])
304 def test_15_check_ODU2E_connection(self):
305 response = test_utils.check_netconf_node_request(
307 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
312 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
313 'direction': 'bidirectional'
316 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
317 res['odu-connection'][0])
318 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
319 res['odu-connection'][0]['destination'])
320 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
321 res['odu-connection'][0]['source'])
323 def test_16_otn_service_path_delete_10GE(self):
324 response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
325 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
326 "network-tp": "XPDR1-NETWORK1"}],
327 {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
329 self.assertEqual(response.status_code, requests.codes.ok)
330 res = response.json()
331 self.assertIn('Request processed', res["output"]["result"])
332 self.assertTrue(res["output"]["success"])
334 def test_17_check_no_ODU2E_connection(self):
335 response = test_utils.check_netconf_node_request(
337 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
338 self.assertEqual(response.status_code, requests.codes.not_found)
340 def test_18_check_no_interface_ODU2E_NETWORK(self):
341 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
342 self.assertEqual(response.status_code, requests.codes.not_found)
344 def test_19_check_no_interface_ODU2E_CLIENT(self):
345 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
346 self.assertEqual(response.status_code, requests.codes.not_found)
348 def test_20_check_no_interface_10GE_CLIENT(self):
349 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
350 self.assertEqual(response.status_code, requests.codes.not_found)
352 def test_21_otn_service_path_delete_ODU4(self):
353 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
354 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertIn('Request processed', res["output"]["result"])
359 self.assertTrue(res["output"]["success"])
361 def test_22_check_no_interface_ODU4(self):
362 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
363 self.assertEqual(response.status_code, requests.codes.not_found)
365 def test_23_service_path_delete_OCH_OTU4(self):
366 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
367 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
369 self.assertEqual(response.status_code, requests.codes.ok)
370 res = response.json()
371 self.assertIn('Request processed', res["output"]["result"])
372 self.assertTrue(res["output"]["success"])
374 def test_24_check_no_interface_OTU4(self):
375 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
376 self.assertEqual(response.status_code, requests.codes.not_found)
378 def test_25_check_no_interface_OCH(self):
379 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
380 self.assertEqual(response.status_code, requests.codes.not_found)
382 def test_26_disconnect_SPDR_SA1(self):
383 response = test_utils.unmount_device("SPDR-SA1")
384 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
387 if __name__ == "__main__":
388 unittest.main(verbosity=2)