3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['spdra'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
37 def test_01_connect_SPDR_SA1(self):
38 response = test_utils.mount_device("SPDR-SA1", 'spdra')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 response = test_utils.get_netconf_oper_request("SPDR-SA1")
43 self.assertEqual(response.status_code, requests.codes.ok)
46 res['node'][0]['netconf-node-topology:connection-status'],
49 def test_02_get_portmapping_CLIENT1(self):
50 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT1")
51 self.assertEqual(response.status_code, requests.codes.ok)
54 {'supported-interface-capability': [
55 'org-openroadm-port-types:if-10GE-ODU2e',
56 'org-openroadm-port-types:if-10GE-ODU2',
57 'org-openroadm-port-types:if-10GE'],
58 'supporting-port': 'CP1-SFP4-P1',
59 'supporting-circuit-pack-name': 'CP1-SFP4',
60 'logical-connection-point': 'XPDR1-CLIENT1',
61 'port-direction': 'bidirectional',
62 'port-qual': 'xpdr-client',
63 'lcp-hash-val': 'FqlcrxV7p30='},
66 def test_03_get_portmapping_NETWORK1(self):
67 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
68 self.assertEqual(response.status_code, requests.codes.ok)
71 {"logical-connection-point": "XPDR1-NETWORK1",
72 "supporting-port": "CP1-CFP0-P1",
73 "supported-interface-capability": [
74 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
76 "port-direction": "bidirectional",
77 "port-qual": "xpdr-network",
78 "supporting-circuit-pack-name": "CP1-CFP0",
79 "xponder-type": "mpdr",
80 'lcp-hash-val': 'Swfw02qXGyI='},
83 def test_04_service_path_create_OCH_OTU4(self):
84 url = "{}/operations/transportpce-device-renderer:service-path"
85 data = {"renderer:input": {
86 "service-name": "service_OCH_OTU4",
88 "modulation-format": "qpsk",
89 "operation": "create",
91 {"node-id": "SPDR-SA1",
92 "dest-tp": "XPDR1-NETWORK1"}]}}
93 response = test_utils.post_request(url, data)
95 self.assertEqual(response.status_code, requests.codes.ok)
97 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
98 self.assertTrue(res["output"]["success"])
100 {'node-id': 'SPDR-SA1',
101 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
102 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
104 def test_05_get_portmapping_NETWORK1(self):
105 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
106 self.assertEqual(response.status_code, requests.codes.ok)
107 res = response.json()
109 {"logical-connection-point": "XPDR1-NETWORK1",
110 "supporting-port": "CP1-CFP0-P1",
111 "supported-interface-capability": [
112 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
114 "port-direction": "bidirectional",
115 "port-qual": "xpdr-network",
116 "supporting-circuit-pack-name": "CP1-CFP0",
117 "xponder-type": "mpdr",
118 "lcp-hash-val": "Swfw02qXGyI="},
121 def test_06_check_interface_och(self):
122 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
126 self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
127 'administrative-state': 'inService',
128 'supporting-circuit-pack-name': 'CP1-CFP0',
129 'type': 'org-openroadm-interfaces:opticalChannel',
130 'supporting-port': 'CP1-CFP0-P1'
134 self.assertDictEqual(
135 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
136 u'transmit-power': -5},
137 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
139 def test_07_check_interface_OTU(self):
140 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
141 self.assertEqual(response.status_code, requests.codes.ok)
142 res = response.json()
143 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
144 'administrative-state': 'inService',
145 'supporting-circuit-pack-name': 'CP1-CFP0',
146 'supporting-interface': 'XPDR1-NETWORK1-1',
147 'type': 'org-openroadm-interfaces:otnOtu',
148 'supporting-port': 'CP1-CFP0-P1'
151 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
152 'expected-sapi': 'Swfw02qXGyI=',
153 'tx-sapi': 'Swfw02qXGyI=',
154 'expected-dapi': 'Swfw02qXGyI=',
155 'rate': 'org-openroadm-otn-common-types:OTU4',
159 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
162 self.assertDictEqual(input_dict_2,
163 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
165 def test_08_otn_service_path_create_ODU4(self):
166 url = "{}/operations/transportpce-device-renderer:otn-service-path"
167 data = {"renderer:input": {
168 "service-name": "service_ODU4",
169 "operation": "create",
170 "service-rate": "100G",
171 "service-type": "ODU",
173 {"node-id": "SPDR-SA1",
174 "network-tp": "XPDR1-NETWORK1"}]}}
175 response = test_utils.post_request(url, data)
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
180 self.assertTrue(res["output"]["success"])
182 {'node-id': 'SPDR-SA1',
183 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
185 def test_09_get_portmapping_NETWORK1(self):
186 response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
190 {"logical-connection-point": "XPDR1-NETWORK1",
191 "supporting-port": "CP1-CFP0-P1",
192 "supported-interface-capability": [
193 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
195 "port-direction": "bidirectional",
196 "port-qual": "xpdr-network",
197 "supporting-circuit-pack-name": "CP1-CFP0",
198 "xponder-type": "mpdr",
199 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
200 "lcp-hash-val": "Swfw02qXGyI="
204 def test_10_check_interface_ODU4(self):
205 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
208 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
209 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
210 'type': 'org-openroadm-interfaces:otnOdu',
211 'supporting-port': 'CP1-CFP0-P1'}
212 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
213 'rate': 'org-openroadm-otn-common-types:ODU4',
214 'expected-dapi': 'Swfw02qXGyI=',
215 'expected-sapi': 'Swfw02qXGyI=',
216 'tx-dapi': 'Swfw02qXGyI=',
217 'tx-sapi': 'Swfw02qXGyI='}
219 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
221 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
224 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
226 self.assertDictEqual(
227 {u'payload-type': u'21', u'exp-payload-type': u'21'},
228 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
230 def test_11_otn_service_path_create_10GE(self):
231 url = "{}/operations/transportpce-device-renderer:otn-service-path"
232 data = {"renderer:input": {
233 "service-name": "service1",
234 "operation": "create",
235 "service-rate": "10G",
236 "service-type": "Ethernet",
237 "ethernet-encoding": "eth encode",
239 "trib-port-number": "1",
241 {"node-id": "SPDR-SA1",
242 "client-tp": "XPDR1-CLIENT1",
243 "network-tp": "XPDR1-NETWORK1"}]}}
244 response = test_utils.post_request(url, data)
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
248 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
249 self.assertTrue(res["output"]["success"])
251 {'node-id': 'SPDR-SA1',
252 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
253 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
254 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
256 def test_12_check_interface_10GE_CLIENT(self):
257 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
261 'administrative-state': 'inService',
262 'supporting-circuit-pack-name': 'CP1-SFP4',
263 'type': 'org-openroadm-interfaces:ethernetCsmacd',
264 'supporting-port': 'CP1-SFP4-P1'
266 self.assertDictEqual(dict(res['interface'][0], **input_dict),
268 self.assertDictEqual(
270 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
272 def test_13_check_interface_ODU2E_CLIENT(self):
273 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
277 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
278 'administrative-state': 'inService',
279 'supporting-circuit-pack-name': 'CP1-SFP4',
280 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
281 'type': 'org-openroadm-interfaces:otnOdu',
282 'supporting-port': 'CP1-SFP4-P1'}
284 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
285 'rate': 'org-openroadm-otn-common-types:ODU2e',
286 'monitoring-mode': 'terminated'}
288 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
290 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
292 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
293 self.assertDictEqual(
294 {u'payload-type': u'03', u'exp-payload-type': u'03'},
295 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
297 def test_14_check_interface_ODU2E_NETWORK(self):
298 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
302 'supporting-circuit-pack-name': 'CP1-CFP0',
303 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
304 'type': 'org-openroadm-interfaces:otnOdu',
305 'supporting-port': 'CP1-CFP0-P1'}
307 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
308 'rate': 'org-openroadm-otn-common-types:ODU2e',
309 'monitoring-mode': 'monitored'}
311 input_dict_3 = {'trib-port-number': 1}
313 self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
315 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
317 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
318 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
319 'parent-odu-allocation'], **input_dict_3
321 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
322 'parent-odu-allocation'])
325 'org-openroadm-otn-odu-interfaces:odu'][
326 'parent-odu-allocation']['trib-slots'])
328 def test_15_check_ODU2E_connection(self):
329 response = test_utils.check_netconf_node_request(
331 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
336 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
337 'direction': 'bidirectional'
340 self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
341 res['odu-connection'][0])
342 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
343 res['odu-connection'][0]['destination'])
344 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
345 res['odu-connection'][0]['source'])
347 def test_16_otn_service_path_delete_10GE(self):
348 url = "{}/operations/transportpce-device-renderer:otn-service-path"
349 data = {"renderer:input": {
350 "service-name": "service1",
351 "operation": "delete",
352 "service-rate": "10G",
353 "service-type": "Ethernet",
354 "ethernet-encoding": "eth encode",
356 "trib-port-number": "1",
358 {"node-id": "SPDR-SA1",
359 "client-tp": "XPDR1-CLIENT1",
360 "network-tp": "XPDR1-NETWORK1"}]}}
361 response = test_utils.post_request(url, data)
363 self.assertEqual(response.status_code, requests.codes.ok)
364 res = response.json()
365 self.assertIn('Request processed', res["output"]["result"])
366 self.assertTrue(res["output"]["success"])
368 def test_17_check_no_ODU2E_connection(self):
369 response = test_utils.check_netconf_node_request(
371 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
372 self.assertEqual(response.status_code, requests.codes.not_found)
374 def test_18_check_no_interface_ODU2E_NETWORK(self):
375 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
376 self.assertEqual(response.status_code, requests.codes.not_found)
378 def test_19_check_no_interface_ODU2E_CLIENT(self):
379 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
380 self.assertEqual(response.status_code, requests.codes.not_found)
382 def test_20_check_no_interface_10GE_CLIENT(self):
383 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
384 self.assertEqual(response.status_code, requests.codes.not_found)
386 def test_21_otn_service_path_delete_ODU4(self):
387 url = "{}/operations/transportpce-device-renderer:otn-service-path"
388 data = {"renderer:input": {
389 "service-name": "service_ODU4",
390 "operation": "delete",
391 "service-rate": "100G",
392 "service-type": "ODU",
394 {"node-id": "SPDR-SA1",
395 "network-tp": "XPDR1-NETWORK1"}]}}
396 response = test_utils.post_request(url, data)
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 self.assertIn('Request processed', res["output"]["result"])
401 self.assertTrue(res["output"]["success"])
403 def test_22_check_no_interface_ODU4(self):
404 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
405 self.assertEqual(response.status_code, requests.codes.not_found)
407 def test_23_service_path_delete_OCH_OTU4(self):
408 url = "{}/operations/transportpce-device-renderer:service-path"
409 data = {"renderer:input": {
410 "service-name": "service_OTU4",
412 "modulation-format": "qpsk",
413 "operation": "delete",
415 {"node-id": "SPDR-SA1",
416 "dest-tp": "XPDR1-NETWORK1"}]}}
417 response = test_utils.post_request(url, data)
419 self.assertEqual(response.status_code, requests.codes.ok)
420 res = response.json()
421 self.assertIn('Request processed', res["output"]["result"])
422 self.assertTrue(res["output"]["success"])
424 def test_24_check_no_interface_OTU4(self):
425 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
426 self.assertEqual(response.status_code, requests.codes.not_found)
428 def test_25_check_no_interface_OCH(self):
429 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
430 self.assertEqual(response.status_code, requests.codes.not_found)
432 def test_26_disconnect_SPDR_SA1(self):
433 response = test_utils.unmount_device("SPDR-SA1")
434 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
437 if __name__ == "__main__":
438 unittest.main(verbosity=2)