3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
27 honeynode_process1 = None
29 restconf_baseurl = "http://localhost:8181/restconf"
33 print ("starting honeynode1...")
34 cls.honeynode_process1 = test_utils.start_spdra_honeynode()
37 print ("starting opendaylight...")
38 cls.odl_process = test_utils.start_tpce()
40 print ("opendaylight started")
43 def tearDownClass(cls):
44 for child in psutil.Process(cls.odl_process.pid).children():
45 child.send_signal(signal.SIGINT)
47 cls.odl_process.send_signal(signal.SIGINT)
48 cls.odl_process.wait()
49 for child in psutil.Process(cls.honeynode_process1.pid).children():
50 child.send_signal(signal.SIGINT)
52 cls.honeynode_process1.send_signal(signal.SIGINT)
53 cls.honeynode_process1.wait()
58 def test_01_connect_SPDR_SA1(self):
59 url = ("{}/config/network-topology:"
60 "network-topology/topology/topology-netconf/node/SPDR-SA1"
61 .format(self.restconf_baseurl))
63 "node-id": "SPDR-SA1",
64 "netconf-node-topology:username": "admin",
65 "netconf-node-topology:password": "admin",
66 "netconf-node-topology:host": "127.0.0.1",
67 "netconf-node-topology:port": "17845",
68 "netconf-node-topology:tcp-only": "false",
69 "netconf-node-topology:pass-through": {}}]}
70 headers = {'content-type': 'application/json'}
71 response = requests.request(
72 "PUT", url, data=json.dumps(data), headers=headers,
73 auth=('admin', 'admin'))
74 self.assertEqual(response.status_code, requests.codes.created)
76 url = ("{}/operational/network-topology:"
77 "network-topology/topology/topology-netconf/node/SPDR-SA1"
78 .format(self.restconf_baseurl))
79 response = requests.request(
80 "GET", url, headers=headers, auth=('admin', 'admin'))
81 self.assertEqual(response.status_code, requests.codes.ok)
84 res['node'][0]['netconf-node-topology:connection-status'],
87 def test_02_get_portmapping_CLIENT1(self):
88 url = ("{}/config/transportpce-portmapping:network/"
89 "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
90 .format(self.restconf_baseurl))
91 headers = {'content-type': 'application/json'}
92 response = requests.request(
93 "GET", url, headers=headers, auth=('admin', 'admin'))
94 self.assertEqual(response.status_code, requests.codes.ok)
97 {'supported-interface-capability': [
98 'org-openroadm-port-types:if-10GE-ODU2e',
99 'org-openroadm-port-types:if-10GE-ODU2',
100 'org-openroadm-port-types:if-10GE'],
101 'supporting-port': 'CP1-SFP4-P1',
102 'supporting-circuit-pack-name': 'CP1-SFP4',
103 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
104 'port-qual': 'xpdr-client'},
107 def test_03_get_portmapping_NETWORK1(self):
108 url = ("{}/config/transportpce-portmapping:network/"
109 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
110 .format(self.restconf_baseurl))
111 headers = {'content-type': 'application/json'}
112 response = requests.request(
113 "GET", url, headers=headers, auth=('admin', 'admin'))
114 self.assertEqual(response.status_code, requests.codes.ok)
115 res = response.json()
117 {"logical-connection-point": "XPDR1-NETWORK1",
118 "supporting-port": "CP1-CFP0-P1",
119 "supported-interface-capability": [
120 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
122 "port-direction": "bidirectional",
123 "port-qual": "xpdr-network",
124 "supporting-circuit-pack-name": "CP1-CFP0",
125 "xponder-type": "mpdr"},
128 def test_04_service_path_create_ODU4(self):
129 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
130 data = {"renderer:input": {
131 "service-name": "service_ODU4",
133 "modulation-format": "qpsk",
134 "operation": "create",
136 {"node-id": "SPDR-SA1",
137 "dest-tp": "XPDR1-NETWORK1"}]}}
138 headers = {'content-type': 'application/json'}
139 response = requests.request(
140 "POST", url, data=json.dumps(data),
141 headers=headers, auth=('admin', 'admin'))
143 self.assertEqual(response.status_code, requests.codes.ok)
144 res = response.json()
145 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
146 self.assertTrue(res["output"]["success"])
148 {'node-id': 'SPDR-SA1',
149 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
150 'odu-interface-id': ['XPDR1-NETWORK1-ODU4'],
151 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
153 def test_05_get_portmapping_NETWORK1(self):
154 url = ("{}/config/transportpce-portmapping:network/"
155 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
156 .format(self.restconf_baseurl))
157 headers = {'content-type': 'application/json'}
158 response = requests.request(
159 "GET", url, headers=headers, auth=('admin', 'admin'))
160 self.assertEqual(response.status_code, requests.codes.ok)
161 res = response.json()
163 {"logical-connection-point": "XPDR1-NETWORK1",
164 "supporting-port": "CP1-CFP0-P1",
165 "supported-interface-capability": [
166 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
168 "port-direction": "bidirectional",
169 "port-qual": "xpdr-network",
170 "supporting-circuit-pack-name": "CP1-CFP0",
171 "xponder-type": "mpdr",
172 "supporting-odu4": "XPDR1-NETWORK1-ODU4"},
175 def test_06_check_interface_och(self):
176 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
177 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
178 "interface/XPDR1-NETWORK1-1"
179 .format(self.restconf_baseurl))
180 headers = {'content-type': 'application/json'}
181 response = requests.request(
182 "GET", url, headers=headers, auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
186 'supporting-circuit-pack-name': 'CP1-CFP0',
187 'type': 'org-openroadm-interfaces:opticalChannel',
188 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
189 self.assertDictEqual(
190 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
191 u'transmit-power': -5},
192 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
194 def test_07_check_interface_OTU(self):
195 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
196 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
197 "interface/XPDR1-NETWORK1-OTU"
198 .format(self.restconf_baseurl))
199 headers = {'content-type': 'application/json'}
200 response = requests.request(
201 "GET", url, headers=headers, auth=('admin', 'admin'))
202 self.assertEqual(response.status_code, requests.codes.ok)
203 res = response.json()
204 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
205 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
206 'type': 'org-openroadm-interfaces:otnOtu',
207 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
208 self.assertDictEqual(
209 {u'rate': u'org-openroadm-otn-common-types:OTU4',
211 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
213 def test_08_check_interface_ODU4(self):
214 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
215 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
216 "interface/XPDR1-NETWORK1-ODU4"
217 .format(self.restconf_baseurl))
218 headers = {'content-type': 'application/json'}
219 response = requests.request(
220 "GET", url, headers=headers, auth=('admin', 'admin'))
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
224 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
225 'type': 'org-openroadm-interfaces:otnOdu',
226 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
227 self.assertDictContainsSubset(
228 {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
229 'rate': 'org-openroadm-otn-common-types:ODU4'},
230 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
231 self.assertDictEqual(
232 {u'payload-type': u'21', u'exp-payload-type': u'21'},
233 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
235 def test_09_otn_service_path_create_10GE(self):
236 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
237 data = {"renderer:input": {
238 "service-name": "service1",
239 "operation": "create",
240 "service-rate": "10G",
241 "service-type": "Ethernet",
242 "ethernet-encoding": "eth encode",
244 "trib-port-number": "1",
245 "opucn-trib-slots": ["1"],
247 {"node-id": "SPDR-SA1",
248 "client-tp": "XPDR1-CLIENT1",
249 "network-tp": "XPDR1-NETWORK1"}]}}
250 headers = {'content-type': 'application/json'}
251 response = requests.request(
252 "POST", url, data=json.dumps(data),
253 headers=headers, auth=('admin', 'admin'))
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 self.assertIn('Otn Service path was set up successfully for node :service1-SPDR-SA1', res["output"]["result"])
257 self.assertTrue(res["output"]["success"])
259 def test_10_check_interface_10GE_CLIENT(self):
260 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
261 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
262 "interface/XPDR1-CLIENT1-ETHERNET10G"
263 .format(self.restconf_baseurl))
264 headers = {'content-type': 'application/json'}
265 response = requests.request(
266 "GET", url, headers=headers, auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.ok)
268 res = response.json()
269 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
270 'supporting-circuit-pack-name': 'CP1-SFP4',
271 'type': 'org-openroadm-interfaces:ethernetCsmacd',
272 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
273 self.assertDictEqual(
275 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
277 def test_11_check_interface_ODU2E_CLIENT(self):
278 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
279 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
280 "interface/XPDR1-CLIENT1-ODU2e-service1"
281 .format(self.restconf_baseurl))
282 headers = {'content-type': 'application/json'}
283 response = requests.request(
284 "GET", url, headers=headers, auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
288 'supporting-circuit-pack-name': 'CP1-SFP4',
289 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
290 'type': 'org-openroadm-interfaces:otnOdu',
291 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
292 self.assertDictContainsSubset({
293 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
294 'rate': 'org-openroadm-otn-common-types:ODU2e',
295 'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
296 self.assertDictEqual(
297 {u'payload-type': u'03', u'exp-payload-type': u'03'},
298 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
300 def test_12_check_interface_ODU2E_NETWORK(self):
301 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
302 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
303 "interface/XPDR1-NETWORK1-ODU2e-service1"
304 .format(self.restconf_baseurl))
305 headers = {'content-type': 'application/json'}
306 response = requests.request(
307 "GET", url, headers=headers, auth=('admin', 'admin'))
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
311 'supporting-circuit-pack-name': 'CP1-CFP0',
312 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
313 'type': 'org-openroadm-interfaces:otnOdu',
314 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
315 self.assertDictContainsSubset({
316 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
317 'rate': 'org-openroadm-otn-common-types:ODU2e',
318 'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
319 self.assertDictContainsSubset(
320 {'trib-port-number': 1},
321 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
323 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']['trib-slots'])
325 def test_13_check_ODU2E_connection(self):
326 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
327 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
328 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
329 .format(self.restconf_baseurl))
330 headers = {'content-type': 'application/json'}
331 response = requests.request(
332 "GET", url, headers=headers, auth=('admin', 'admin'))
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertDictContainsSubset({
336 'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
337 'direction': 'bidirectional'},
338 res['odu-connection'][0])
339 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
340 res['odu-connection'][0]['destination'])
341 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
342 res['odu-connection'][0]['source'])
344 def test_14_otn_service_path_delete_10GE(self):
345 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
346 data = {"renderer:input": {
347 "service-name": "service1",
348 "operation": "delete",
349 "service-rate": "10G",
350 "service-type": "Ethernet",
351 "ethernet-encoding": "eth encode",
353 "trib-port-number": "1",
354 "opucn-trib-slots": ["1"],
356 {"node-id": "SPDR-SA1",
357 "client-tp": "XPDR1-CLIENT1",
358 "network-tp": "XPDR1-NETWORK1"}]}}
359 headers = {'content-type': 'application/json'}
360 response = requests.request(
361 "POST", url, data=json.dumps(data),
362 headers=headers, auth=('admin', 'admin'))
363 self.assertEqual(response.status_code, requests.codes.ok)
364 res = response.json()
365 self.assertIn('Request processed', res["output"]["result"])
366 self.assertTrue(res["output"]["success"])
368 def test_15_check_no_ODU2E_connection(self):
369 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
370 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
371 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
372 .format(self.restconf_baseurl))
373 headers = {'content-type': 'application/json'}
374 response = requests.request(
375 "GET", url, headers=headers, auth=('admin', 'admin'))
376 self.assertEqual(response.status_code, requests.codes.not_found)
378 def test_16_check_no_interface_ODU2E_NETWORK(self):
379 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
380 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
381 "interface/XPDR1-NETWORK1-ODU2e-service1"
382 .format(self.restconf_baseurl))
383 headers = {'content-type': 'application/json'}
384 response = requests.request(
385 "GET", url, headers=headers, auth=('admin', 'admin'))
386 self.assertEqual(response.status_code, requests.codes.not_found)
388 def test_17_check_no_interface_ODU2E_CLIENT(self):
389 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
390 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
391 "interface/XPDR1-CLIENT1-ODU2e-service1"
392 .format(self.restconf_baseurl))
393 headers = {'content-type': 'application/json'}
394 response = requests.request(
395 "GET", url, headers=headers, auth=('admin', 'admin'))
396 self.assertEqual(response.status_code, requests.codes.not_found)
398 def test_18_check_no_interface_10GE_CLIENT(self):
399 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
400 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
401 "interface/XPDR1-CLIENT1-ETHERNET10G"
402 .format(self.restconf_baseurl))
403 headers = {'content-type': 'application/json'}
404 response = requests.request(
405 "GET", url, headers=headers, auth=('admin', 'admin'))
406 self.assertEqual(response.status_code, requests.codes.not_found)
408 def test_19_service_path_delete_ODU4(self):
409 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
410 data = {"renderer:input": {
411 "service-name": "service_ODU4",
413 "modulation-format": "qpsk",
414 "operation": "delete",
416 {"node-id": "SPDR-SA1",
417 "dest-tp": "XPDR1-NETWORK1"}]}}
418 headers = {'content-type': 'application/json'}
419 response = requests.request(
420 "POST", url, data=json.dumps(data),
421 headers=headers, auth=('admin', 'admin'))
423 self.assertEqual(response.status_code, requests.codes.ok)
424 res = response.json()
425 self.assertIn('Request processed', res["output"]["result"])
426 self.assertTrue(res["output"]["success"])
428 def test_20_check_no_interface_ODU4(self):
429 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
430 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
431 "interface/XPDR1-NETWORK1-ODU4"
432 .format(self.restconf_baseurl))
433 headers = {'content-type': 'application/json'}
434 response = requests.request(
435 "GET", url, headers=headers, auth=('admin', 'admin'))
436 self.assertEqual(response.status_code, requests.codes.not_found)
438 def test_21_check_no_interface_OTU(self):
439 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
440 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
441 "interface/XPDR1-NETWORK1-OTU"
442 .format(self.restconf_baseurl))
443 headers = {'content-type': 'application/json'}
444 response = requests.request(
445 "GET", url, headers=headers, auth=('admin', 'admin'))
446 self.assertEqual(response.status_code, requests.codes.not_found)
448 def test_22_check_no_interface_och(self):
449 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
450 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
451 "interface/XPDR1-NETWORK1-1"
452 .format(self.restconf_baseurl))
453 headers = {'content-type': 'application/json'}
454 response = requests.request(
455 "GET", url, headers=headers, auth=('admin', 'admin'))
456 self.assertEqual(response.status_code, requests.codes.not_found)
458 def test_23_disconnect_SPDR_SA1(self):
459 url = ("{}/config/network-topology:"
460 "network-topology/topology/topology-netconf/node/SPDR-SA1"
461 .format(self.restconf_baseurl))
463 headers = {'content-type': 'application/json'}
464 response = requests.request(
465 "DELETE", url, data=json.dumps(data), headers=headers,
466 auth=('admin', 'admin'))
467 self.assertEqual(response.status_code, requests.codes.ok)
470 if __name__ == "__main__":
471 unittest.main(verbosity=2)