3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['spdra'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
37 def test_01_connect_SPDR_SA1(self):
38 response = test_utils.mount_device("SPDR-SA1", 'spdra')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 response = test_utils.get_netconf_oper_request("SPDR-SA1")
43 self.assertEqual(response.status_code, requests.codes.ok)
46 res['node'][0]['netconf-node-topology:connection-status'],
49 def test_02_get_portmapping_CLIENT1(self):
50 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
51 response = test_utils.get_request(url)
52 self.assertEqual(response.status_code, requests.codes.ok)
55 {'supported-interface-capability': [
56 'org-openroadm-port-types:if-10GE-ODU2e',
57 'org-openroadm-port-types:if-10GE-ODU2',
58 'org-openroadm-port-types:if-10GE'],
59 'supporting-port': 'CP1-SFP4-P1',
60 'supporting-circuit-pack-name': 'CP1-SFP4',
61 'logical-connection-point': 'XPDR1-CLIENT1',
62 'port-direction': 'bidirectional',
63 'port-qual': 'xpdr-client',
64 'lcp-hash-val': 'FqlcrxV7p30='},
67 def test_03_get_portmapping_NETWORK1(self):
68 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
69 response = test_utils.get_request(url)
70 self.assertEqual(response.status_code, requests.codes.ok)
73 {"logical-connection-point": "XPDR1-NETWORK1",
74 "supporting-port": "CP1-CFP0-P1",
75 "supported-interface-capability": [
76 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
78 "port-direction": "bidirectional",
79 "port-qual": "xpdr-network",
80 "supporting-circuit-pack-name": "CP1-CFP0",
81 "xponder-type": "mpdr",
82 'lcp-hash-val': 'Swfw02qXGyI='},
85 def test_04_service_path_create_OCH_OTU4(self):
86 url = "{}/operations/transportpce-device-renderer:service-path"
87 data = {"renderer:input": {
88 "service-name": "service_ODU4",
90 "modulation-format": "qpsk",
91 "operation": "create",
93 {"node-id": "SPDR-SA1",
94 "dest-tp": "XPDR1-NETWORK1"}]}}
95 response = test_utils.post_request(url, data)
97 self.assertEqual(response.status_code, requests.codes.ok)
99 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
100 self.assertTrue(res["output"]["success"])
102 {'node-id': 'SPDR-SA1',
103 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
104 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
106 def test_05_get_portmapping_NETWORK1(self):
107 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
108 response = test_utils.get_request(url)
109 self.assertEqual(response.status_code, requests.codes.ok)
110 res = response.json()
112 {"logical-connection-point": "XPDR1-NETWORK1",
113 "supporting-port": "CP1-CFP0-P1",
114 "supported-interface-capability": [
115 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
117 "port-direction": "bidirectional",
118 "port-qual": "xpdr-network",
119 "supporting-circuit-pack-name": "CP1-CFP0",
120 "xponder-type": "mpdr",
121 "lcp-hash-val": "Swfw02qXGyI="},
124 def test_06_check_interface_och(self):
125 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
126 self.assertEqual(response.status_code, requests.codes.ok)
127 res = response.json()
129 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
130 'administrative-state': 'inService',
131 'supporting-circuit-pack-name': 'CP1-CFP0',
132 'type': 'org-openroadm-interfaces:opticalChannel',
133 'supporting-port': 'CP1-CFP0-P1'
137 self.assertDictEqual(
138 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
139 u'transmit-power': -5},
140 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
142 def test_07_check_interface_OTU(self):
143 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
144 self.assertEqual(response.status_code, requests.codes.ok)
145 res = response.json()
146 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
147 'administrative-state': 'inService',
148 'supporting-circuit-pack-name': 'CP1-CFP0',
149 'supporting-interface': 'XPDR1-NETWORK1-1',
150 'type': 'org-openroadm-interfaces:otnOtu',
151 'supporting-port': 'CP1-CFP0-P1'
154 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
155 'expected-sapi': 'Swfw02qXGyI=',
156 'tx-sapi': 'Swfw02qXGyI=',
157 'expected-dapi': 'Swfw02qXGyI=',
158 'rate': 'org-openroadm-otn-common-types:OTU4',
162 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
165 self.assertDictEqual(input_dict_2,
166 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
168 def test_08_otn_service_path_create_ODU4(self):
169 url = "{}/operations/transportpce-device-renderer:otn-service-path"
170 data = {"renderer:input": {
171 "service-name": "service_ODU4",
172 "operation": "create",
173 "service-rate": "100G",
174 "service-type": "ODU",
176 {"node-id": "SPDR-SA1",
177 "network-tp": "XPDR1-NETWORK1"}]}}
178 response = test_utils.post_request(url, data)
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
183 self.assertTrue(res["output"]["success"])
185 {'node-id': 'SPDR-SA1',
186 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
188 def test_09_get_portmapping_NETWORK1(self):
189 url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
190 response = test_utils.get_request(url)
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
194 {"logical-connection-point": "XPDR1-NETWORK1",
195 "supporting-port": "CP1-CFP0-P1",
196 "supported-interface-capability": [
197 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
199 "port-direction": "bidirectional",
200 "port-qual": "xpdr-network",
201 "supporting-circuit-pack-name": "CP1-CFP0",
202 "xponder-type": "mpdr",
203 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
204 "lcp-hash-val": "Swfw02qXGyI="
208 def test_10_check_interface_ODU4(self):
209 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
210 self.assertEqual(response.status_code, requests.codes.ok)
211 res = response.json()
212 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
213 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
214 'type': 'org-openroadm-interfaces:otnOdu',
215 'supporting-port': 'CP1-CFP0-P1'}
216 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
217 'rate': 'org-openroadm-otn-common-types:ODU4',
218 'expected-dapi': 'Swfw02qXGyI=',
219 'expected-sapi': 'Swfw02qXGyI=',
220 'tx-dapi': 'Swfw02qXGyI=',
221 'tx-sapi': 'Swfw02qXGyI='}
223 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
225 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
228 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
230 self.assertDictEqual(
231 {u'payload-type': u'21', u'exp-payload-type': u'21'},
232 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
234 def test_11_otn_service_path_create_10GE(self):
235 url = "{}/operations/transportpce-device-renderer:otn-service-path"
236 data = {"renderer:input": {
237 "service-name": "service1",
238 "operation": "create",
239 "service-rate": "10G",
240 "service-type": "Ethernet",
241 "ethernet-encoding": "eth encode",
243 "trib-port-number": "1",
245 {"node-id": "SPDR-SA1",
246 "client-tp": "XPDR1-CLIENT1",
247 "network-tp": "XPDR1-NETWORK1"}]}}
248 response = test_utils.post_request(url, data)
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
252 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
253 self.assertTrue(res["output"]["success"])
255 {'node-id': 'SPDR-SA1',
256 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
257 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
258 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
260 def test_12_check_interface_10GE_CLIENT(self):
261 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
264 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
265 'administrative-state': 'inService',
266 'supporting-circuit-pack-name': 'CP1-SFP4',
267 'type': 'org-openroadm-interfaces:ethernetCsmacd',
268 'supporting-port': 'CP1-SFP4-P1'
270 self.assertDictEqual(dict(res['interface'][0], **input_dict),
272 self.assertDictEqual(
274 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
276 def test_13_check_interface_ODU2E_CLIENT(self):
277 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
281 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
282 'administrative-state': 'inService',
283 'supporting-circuit-pack-name': 'CP1-SFP4',
284 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
285 'type': 'org-openroadm-interfaces:otnOdu',
286 'supporting-port': 'CP1-SFP4-P1'}
288 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
289 'rate': 'org-openroadm-otn-common-types:ODU2e',
290 'monitoring-mode': 'terminated'}
292 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
294 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
296 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
297 self.assertDictEqual(
298 {u'payload-type': u'03', u'exp-payload-type': u'03'},
299 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
301 def test_14_check_interface_ODU2E_NETWORK(self):
302 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
305 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
306 'supporting-circuit-pack-name': 'CP1-CFP0',
307 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
308 'type': 'org-openroadm-interfaces:otnOdu',
309 'supporting-port': 'CP1-CFP0-P1'}
311 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
312 'rate': 'org-openroadm-otn-common-types:ODU2e',
313 'monitoring-mode': 'monitored'}
315 input_dict_3 = {'trib-port-number': 1}
317 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
319 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
321 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
322 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
323 'parent-odu-allocation'], **input_dict_3
325 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
326 'parent-odu-allocation'])
329 'org-openroadm-otn-odu-interfaces:odu'][
330 'parent-odu-allocation']['trib-slots'])
332 def test_15_check_ODU2E_connection(self):
333 response = test_utils.check_netconf_node_request(
335 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
336 self.assertEqual(response.status_code, requests.codes.ok)
337 res = response.json()
340 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
341 'direction': 'bidirectional'
344 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
345 res['odu-connection'][0])
346 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
347 res['odu-connection'][0]['destination'])
348 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
349 res['odu-connection'][0]['source'])
351 def test_16_otn_service_path_delete_10GE(self):
352 url = "{}/operations/transportpce-device-renderer:otn-service-path"
353 data = {"renderer:input": {
354 "service-name": "service1",
355 "operation": "delete",
356 "service-rate": "10G",
357 "service-type": "Ethernet",
358 "ethernet-encoding": "eth encode",
360 "trib-port-number": "1",
362 {"node-id": "SPDR-SA1",
363 "client-tp": "XPDR1-CLIENT1",
364 "network-tp": "XPDR1-NETWORK1"}]}}
365 response = test_utils.post_request(url, data)
367 self.assertEqual(response.status_code, requests.codes.ok)
368 res = response.json()
369 self.assertIn('Request processed', res["output"]["result"])
370 self.assertTrue(res["output"]["success"])
372 def test_17_check_no_ODU2E_connection(self):
373 response = test_utils.check_netconf_node_request(
375 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
376 self.assertEqual(response.status_code, requests.codes.not_found)
378 def test_18_check_no_interface_ODU2E_NETWORK(self):
379 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
380 self.assertEqual(response.status_code, requests.codes.not_found)
382 def test_19_check_no_interface_ODU2E_CLIENT(self):
383 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
384 self.assertEqual(response.status_code, requests.codes.not_found)
386 def test_20_check_no_interface_10GE_CLIENT(self):
387 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
388 self.assertEqual(response.status_code, requests.codes.not_found)
390 def test_21_otn_service_path_delete_ODU4(self):
391 url = "{}/operations/transportpce-device-renderer:otn-service-path"
392 data = {"renderer:input": {
393 "service-name": "service_ODU4",
394 "operation": "delete",
395 "service-rate": "100G",
396 "service-type": "ODU",
398 {"node-id": "SPDR-SA1",
399 "network-tp": "XPDR1-NETWORK1"}]}}
400 response = test_utils.post_request(url, data)
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 self.assertIn('Request processed', res["output"]["result"])
405 self.assertTrue(res["output"]["success"])
407 def test_22_check_no_interface_ODU4(self):
408 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
409 self.assertEqual(response.status_code, requests.codes.not_found)
411 def test_23_service_path_delete_OCH_OTU4(self):
412 url = "{}/operations/transportpce-device-renderer:service-path"
413 data = {"renderer:input": {
414 "service-name": "service_OTU4",
416 "modulation-format": "qpsk",
417 "operation": "delete",
419 {"node-id": "SPDR-SA1",
420 "dest-tp": "XPDR1-NETWORK1"}]}}
421 response = test_utils.post_request(url, data)
423 self.assertEqual(response.status_code, requests.codes.ok)
424 res = response.json()
425 self.assertIn('Request processed', res["output"]["result"])
426 self.assertTrue(res["output"]["success"])
428 def test_24_check_no_interface_OTU4(self):
429 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
430 self.assertEqual(response.status_code, requests.codes.not_found)
432 def test_25_check_no_interface_OCH(self):
433 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
434 self.assertEqual(response.status_code, requests.codes.not_found)
436 def test_26_disconnect_SPDR_SA1(self):
437 response = test_utils.unmount_device("SPDR-SA1")
438 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
441 if __name__ == "__main__":
442 unittest.main(verbosity=2)