3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['spdra'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
37 def test_01_connect_SPDR_SA1(self):
38 response = test_utils.mount_device("SPDR-SA1", 'spdra')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 response = test_utils.get_netconf_oper_request("SPDR-SA1")
43 self.assertEqual(response.status_code, requests.codes.ok)
46 res['node'][0]['netconf-node-topology:connection-status'],
49 def test_02_get_portmapping_CLIENT1(self):
50 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT1")
51 self.assertEqual(response.status_code, requests.codes.ok)
54 {'supported-interface-capability': [
55 'org-openroadm-port-types:if-10GE-ODU2e',
56 'org-openroadm-port-types:if-10GE-ODU2',
57 'org-openroadm-port-types:if-10GE'],
58 'supporting-port': 'CP1-SFP4-P1',
59 'supporting-circuit-pack-name': 'CP1-SFP4',
60 'logical-connection-point': 'XPDR1-CLIENT1',
61 'port-direction': 'bidirectional',
62 'port-qual': 'xpdr-client',
63 'lcp-hash-val': 'FqlcrxV7p30='},
66 def test_03_get_portmapping_NETWORK1(self):
67 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
68 self.assertEqual(response.status_code, requests.codes.ok)
71 {"logical-connection-point": "XPDR1-NETWORK1",
72 "supporting-port": "CP1-CFP0-P1",
73 "supported-interface-capability": [
74 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
76 "port-direction": "bidirectional",
77 "port-qual": "xpdr-network",
78 "supporting-circuit-pack-name": "CP1-CFP0",
79 "xponder-type": "mpdr",
80 'lcp-hash-val': 'Swfw02qXGyI='},
83 def test_04_service_path_create_OCH_OTU4(self):
84 response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
85 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
87 self.assertEqual(response.status_code, requests.codes.ok)
89 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
90 self.assertTrue(res["output"]["success"])
92 {'node-id': 'SPDR-SA1',
93 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
94 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
96 def test_05_get_portmapping_NETWORK1(self):
97 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
98 self.assertEqual(response.status_code, requests.codes.ok)
101 {"logical-connection-point": "XPDR1-NETWORK1",
102 "supporting-port": "CP1-CFP0-P1",
103 "supported-interface-capability": [
104 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
106 "port-direction": "bidirectional",
107 "port-qual": "xpdr-network",
108 "supporting-circuit-pack-name": "CP1-CFP0",
109 "xponder-type": "mpdr",
110 "lcp-hash-val": "Swfw02qXGyI="},
113 def test_06_check_interface_och(self):
114 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
115 self.assertEqual(response.status_code, requests.codes.ok)
116 res = response.json()
118 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
119 'administrative-state': 'inService',
120 'supporting-circuit-pack-name': 'CP1-CFP0',
121 'type': 'org-openroadm-interfaces:opticalChannel',
122 'supporting-port': 'CP1-CFP0-P1'
126 self.assertDictEqual(
127 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
128 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
129 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
131 def test_07_check_interface_OTU(self):
132 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
133 self.assertEqual(response.status_code, requests.codes.ok)
134 res = response.json()
135 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
136 'administrative-state': 'inService',
137 'supporting-circuit-pack-name': 'CP1-CFP0',
138 'supporting-interface': 'XPDR1-NETWORK1-1',
139 'type': 'org-openroadm-interfaces:otnOtu',
140 'supporting-port': 'CP1-CFP0-P1'
143 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
144 'expected-sapi': 'Swfw02qXGyI=',
145 'tx-sapi': 'Swfw02qXGyI=',
146 'expected-dapi': 'Swfw02qXGyI=',
147 'rate': 'org-openroadm-otn-common-types:OTU4',
151 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
154 self.assertDictEqual(input_dict_2,
155 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
157 def test_08_otn_service_path_create_ODU4(self):
158 response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
159 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
164 self.assertTrue(res["output"]["success"])
166 {'node-id': 'SPDR-SA1',
167 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
169 def test_09_get_portmapping_NETWORK1(self):
170 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
174 {"logical-connection-point": "XPDR1-NETWORK1",
175 "supporting-port": "CP1-CFP0-P1",
176 "supported-interface-capability": [
177 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
179 "port-direction": "bidirectional",
180 "port-qual": "xpdr-network",
181 "supporting-circuit-pack-name": "CP1-CFP0",
182 "xponder-type": "mpdr",
183 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
184 "lcp-hash-val": "Swfw02qXGyI="
188 def test_10_check_interface_ODU4(self):
189 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
190 self.assertEqual(response.status_code, requests.codes.ok)
191 res = response.json()
192 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
193 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
194 'type': 'org-openroadm-interfaces:otnOdu',
195 'supporting-port': 'CP1-CFP0-P1'}
196 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
197 'rate': 'org-openroadm-otn-common-types:ODU4',
198 'expected-dapi': 'Swfw02qXGyI=',
199 'expected-sapi': 'Swfw02qXGyI=',
200 'tx-dapi': 'Swfw02qXGyI=',
201 'tx-sapi': 'Swfw02qXGyI='}
203 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
205 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
208 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
210 self.assertDictEqual(
211 {u'payload-type': u'21', u'exp-payload-type': u'21'},
212 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
214 def test_11_otn_service_path_create_10GE(self):
215 response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
216 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
217 "network-tp": "XPDR1-NETWORK1"}],
218 {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
222 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
223 self.assertTrue(res["output"]["success"])
225 {'node-id': 'SPDR-SA1',
226 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
227 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
228 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
230 def test_12_check_interface_10GE_CLIENT(self):
231 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
234 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
235 'administrative-state': 'inService',
236 'supporting-circuit-pack-name': 'CP1-SFP4',
237 'type': 'org-openroadm-interfaces:ethernetCsmacd',
238 'supporting-port': 'CP1-SFP4-P1'
240 self.assertDictEqual(dict(res['interface'][0], **input_dict),
242 self.assertDictEqual(
244 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
246 def test_13_check_interface_ODU2E_CLIENT(self):
247 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
251 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
252 'administrative-state': 'inService',
253 'supporting-circuit-pack-name': 'CP1-SFP4',
254 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
255 'type': 'org-openroadm-interfaces:otnOdu',
256 'supporting-port': 'CP1-SFP4-P1'}
258 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
259 'rate': 'org-openroadm-otn-common-types:ODU2e',
260 'monitoring-mode': 'terminated'}
262 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
264 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
266 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
267 self.assertDictEqual(
268 {u'payload-type': u'03', u'exp-payload-type': u'03'},
269 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
271 def test_14_check_interface_ODU2E_NETWORK(self):
272 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
273 self.assertEqual(response.status_code, requests.codes.ok)
274 res = response.json()
275 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
276 'supporting-circuit-pack-name': 'CP1-CFP0',
277 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
278 'type': 'org-openroadm-interfaces:otnOdu',
279 'supporting-port': 'CP1-CFP0-P1'}
281 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
282 'rate': 'org-openroadm-otn-common-types:ODU2e',
283 'monitoring-mode': 'monitored'}
285 input_dict_3 = {'trib-port-number': 1}
287 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
289 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
291 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
292 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
293 'parent-odu-allocation'], **input_dict_3
295 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
296 'parent-odu-allocation'])
299 'org-openroadm-otn-odu-interfaces:odu'][
300 'parent-odu-allocation']['trib-slots'])
302 def test_15_check_ODU2E_connection(self):
303 response = test_utils.check_netconf_node_request(
305 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
306 self.assertEqual(response.status_code, requests.codes.ok)
307 res = response.json()
310 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
311 'direction': 'bidirectional'
314 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
315 res['odu-connection'][0])
316 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
317 res['odu-connection'][0]['destination'])
318 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
319 res['odu-connection'][0]['source'])
321 def test_16_otn_service_path_delete_10GE(self):
322 response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
323 [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
324 "network-tp": "XPDR1-NETWORK1"}],
325 {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
327 self.assertEqual(response.status_code, requests.codes.ok)
328 res = response.json()
329 self.assertIn('Request processed', res["output"]["result"])
330 self.assertTrue(res["output"]["success"])
332 def test_17_check_no_ODU2E_connection(self):
333 response = test_utils.check_netconf_node_request(
335 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
336 self.assertEqual(response.status_code, requests.codes.not_found)
338 def test_18_check_no_interface_ODU2E_NETWORK(self):
339 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
340 self.assertEqual(response.status_code, requests.codes.not_found)
342 def test_19_check_no_interface_ODU2E_CLIENT(self):
343 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
344 self.assertEqual(response.status_code, requests.codes.not_found)
346 def test_20_check_no_interface_10GE_CLIENT(self):
347 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
348 self.assertEqual(response.status_code, requests.codes.not_found)
350 def test_21_otn_service_path_delete_ODU4(self):
351 response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
352 [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertIn('Request processed', res["output"]["result"])
357 self.assertTrue(res["output"]["success"])
359 def test_22_check_no_interface_ODU4(self):
360 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
361 self.assertEqual(response.status_code, requests.codes.not_found)
363 def test_23_service_path_delete_OCH_OTU4(self):
364 response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
365 [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
367 self.assertEqual(response.status_code, requests.codes.ok)
368 res = response.json()
369 self.assertIn('Request processed', res["output"]["result"])
370 self.assertTrue(res["output"]["success"])
372 def test_24_check_no_interface_OTU4(self):
373 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
374 self.assertEqual(response.status_code, requests.codes.not_found)
376 def test_25_check_no_interface_OCH(self):
377 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
378 self.assertEqual(response.status_code, requests.codes.not_found)
380 def test_26_disconnect_SPDR_SA1(self):
381 response = test_utils.unmount_device("SPDR-SA1")
382 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
385 if __name__ == "__main__":
386 unittest.main(verbosity=2)