3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportPCERendererTesting(unittest.TestCase):
30 restconf_baseurl = "http://localhost:8181/restconf"
32 # START_IGNORE_XTESTING
36 cls.sim_process1 = test_utils.start_sim('xpdra')
39 cls.sim_process2 = test_utils.start_sim('roadma')
42 cls.odl_process = test_utils.start_tpce()
44 print("opendaylight started")
47 def tearDownClass(cls):
48 for child in psutil.Process(cls.odl_process.pid).children():
49 child.send_signal(signal.SIGINT)
51 cls.odl_process.send_signal(signal.SIGINT)
52 cls.odl_process.wait()
53 for child in psutil.Process(cls.sim_process1.pid).children():
54 child.send_signal(signal.SIGINT)
56 cls.sim_process1.send_signal(signal.SIGINT)
57 cls.sim_process1.wait()
58 for child in psutil.Process(cls.sim_process2.pid).children():
59 child.send_signal(signal.SIGINT)
61 cls.sim_process2.send_signal(signal.SIGINT)
62 cls.sim_process2.wait()
65 print("execution of {}".format(self.id().split(".")[-1]))
70 def test_01_rdm_device_connected(self):
71 url = ("{}/config/network-topology:"
72 "network-topology/topology/topology-netconf/node/ROADMA01"
73 .format(self.restconf_baseurl))
75 "node-id": "ROADMA01",
76 "netconf-node-topology:username": "admin",
77 "netconf-node-topology:password": "admin",
78 "netconf-node-topology:host": "127.0.0.1",
79 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
80 "netconf-node-topology:tcp-only": "false",
81 "netconf-node-topology:pass-through": {}}]}
82 headers = {'content-type': 'application/json'}
83 response = requests.request(
84 "PUT", url, data=json.dumps(data), headers=headers,
85 auth=('admin', 'admin'))
86 self.assertEqual(response.status_code, requests.codes.created)
89 def test_02_xpdr_device_connected(self):
90 url = ("{}/config/network-topology:"
91 "network-topology/topology/topology-netconf/node/XPDRA01"
92 .format(self.restconf_baseurl))
95 "netconf-node-topology:username": "admin",
96 "netconf-node-topology:password": "admin",
97 "netconf-node-topology:host": "127.0.0.1",
98 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
99 "netconf-node-topology:tcp-only": "false",
100 "netconf-node-topology:pass-through": {}}]}
101 headers = {'content-type': 'application/json'}
102 response = requests.request(
103 "PUT", url, data=json.dumps(data), headers=headers,
104 auth=('admin', 'admin'))
105 self.assertEqual(response.status_code, requests.codes.created)
108 def test_03_rdm_portmapping(self):
109 url = ("{}/config/transportpce-portmapping:network/"
111 .format(self.restconf_baseurl))
112 headers = {'content-type': 'application/json'}
113 response = requests.request(
114 "GET", url, headers=headers, auth=('admin', 'admin'))
115 self.assertEqual(response.status_code, requests.codes.ok)
116 res = response.json()
118 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
119 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
120 res['nodes'][0]['mapping'])
122 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
123 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
124 res['nodes'][0]['mapping'])
126 def test_04_xpdr_portmapping(self):
127 url = ("{}/config/transportpce-portmapping:network/"
129 .format(self.restconf_baseurl))
130 headers = {'content-type': 'application/json'}
131 response = requests.request(
132 "GET", url, headers=headers, auth=('admin', 'admin'))
133 self.assertEqual(response.status_code, requests.codes.ok)
134 res = response.json()
136 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
137 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
138 'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
139 'lcp-hash-val': '3b3ab304d2a6eb3c3623e52746dbb7aa'},
140 res['nodes'][0]['mapping'])
142 {'supporting-port': 'C1',
143 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
144 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
145 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
146 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae5'},
147 res['nodes'][0]['mapping'])
149 def test_05_service_path_create(self):
150 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
151 data = {"renderer:input": {
152 "renderer:service-name": "service_test",
153 "renderer:wave-number": "7",
154 "renderer:modulation-format": "qpsk",
155 "renderer:operation": "create",
157 {"renderer:node-id": "ROADMA01",
158 "renderer:src-tp": "SRG1-PP7-TXRX",
159 "renderer:dest-tp": "DEG1-TTP-TXRX"},
160 {"renderer:node-id": "XPDRA01",
161 "renderer:src-tp": "XPDR1-CLIENT1",
162 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
163 headers = {'content-type': 'application/json'}
164 response = requests.request(
165 "POST", url, data=json.dumps(data),
166 headers=headers, auth=('admin', 'admin'))
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Roadm-connection successfully created for nodes: ROADMA01', res["output"]["result"])
171 def test_06_service_path_create_rdm_check(self):
172 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
173 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
174 "interface/DEG1-TTP-TXRX-7"
175 .format(self.restconf_baseurl))
176 headers = {'content-type': 'application/json'}
177 response = requests.request(
178 "GET", url, headers=headers, auth=('admin', 'admin'))
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
182 self.assertDictEqual(
184 'name': 'DEG1-TTP-TXRX-7',
185 'administrative-state': 'inService',
186 'supporting-circuit-pack-name': '2/0',
187 'type': 'org-openroadm-interfaces:opticalChannel',
188 'supporting-port': 'L1'
189 }, **res['interface'][0]),
192 self.assertDictEqual(
193 {'wavelength-number': 7},
194 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
196 def test_07_service_path_create_rdm_check(self):
197 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
198 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
199 "interface/SRG1-PP7-TXRX-7"
200 .format(self.restconf_baseurl))
201 headers = {'content-type': 'application/json'}
202 response = requests.request(
203 "GET", url, headers=headers, auth=('admin', 'admin'))
204 self.assertEqual(response.status_code, requests.codes.ok)
205 res = response.json()
206 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
207 self.assertDictEqual(
209 'name': 'SRG1-PP7-TXRX-7',
210 'administrative-state': 'inService',
211 'supporting-circuit-pack-name': '4/0',
212 'type': 'org-openroadm-interfaces:opticalChannel',
213 'supporting-port': 'C7'
214 }, **res['interface'][0]),
217 self.assertDictEqual(
218 {'wavelength-number': 7},
219 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
221 def test_08_service_path_create_rdm_check(self):
222 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
223 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
224 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
225 .format(self.restconf_baseurl))
226 headers = {'content-type': 'application/json'}
227 response = requests.request(
228 "GET", url, headers=headers, auth=('admin', 'admin'))
229 self.assertEqual(response.status_code, requests.codes.ok)
230 res = response.json()
231 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
232 self.assertDictEqual(
234 'connection-number': 'SRG1-PP7-TXRX-DEG1-TTP-TXRX-7',
235 'wavelength-number': 7,
236 'opticalControlMode': 'off'
237 }, **res['roadm-connections'][0]),
238 res['roadm-connections'][0]
240 self.assertDictEqual(
241 {'src-if': 'SRG1-PP7-TXRX-7'},
242 res['roadm-connections'][0]['source'])
243 self.assertDictEqual(
244 {'dst-if': 'DEG1-TTP-TXRX-7'},
245 res['roadm-connections'][0]['destination'])
247 def test_09_service_path_create_xpdr_check(self):
248 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
249 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
250 "interface/XPDR1-NETWORK1-7"
251 .format(self.restconf_baseurl))
252 headers = {'content-type': 'application/json'}
253 response = requests.request(
254 "GET", url, headers=headers, auth=('admin', 'admin'))
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
258 self.assertDictEqual(
260 'name': 'XPDR1-NETWORK1-7',
261 'administrative-state': 'inService',
262 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
263 'type': 'org-openroadm-interfaces:opticalChannel',
264 'supporting-port': '1'
265 }, **res['interface'][0]),
268 self.assertDictEqual(
269 {u'rate': u'org-openroadm-optical-channel-interfaces:R100G',
270 u'transmit-power': -5,
271 u'wavelength-number': 7,
272 u'modulation-format': u'dp-qpsk'},
273 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
275 def test_10_service_path_create_xpdr_check(self):
276 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
277 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
278 "interface/XPDR1-NETWORK1-OTU"
279 .format(self.restconf_baseurl))
280 headers = {'content-type': 'application/json'}
281 response = requests.request(
282 "GET", url, headers=headers, auth=('admin', 'admin'))
283 self.assertEqual(response.status_code, requests.codes.ok)
284 res = response.json()
285 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
286 self.assertDictEqual(
288 'name': 'XPDR1-NETWORK1-OTU',
289 'administrative-state': 'inService',
290 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
291 'type': 'org-openroadm-interfaces:otnOtu',
292 'supporting-port': '1',
293 'supporting-interface': 'XPDR1-NETWORK1-7'
294 }, **res['interface'][0]),
297 self.assertDictEqual(
298 {u'rate': u'org-openroadm-otn-otu-interfaces:OTU4',
300 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
302 def test_11_service_path_create_xpdr_check(self):
303 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
304 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
305 "interface/XPDR1-NETWORK1-ODU"
306 .format(self.restconf_baseurl))
307 headers = {'content-type': 'application/json'}
308 response = requests.request(
309 "GET", url, headers=headers, auth=('admin', 'admin'))
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
313 self.assertDictEqual(
315 'name': 'XPDR1-NETWORK1-ODU',
316 'administrative-state': 'inService',
317 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
318 'type': 'org-openroadm-interfaces:otnOdu',
319 'supporting-port': '1',
320 'supporting-interface': 'XPDR1-NETWORK1-OTU'
321 }, **res['interface'][0]),
324 self.assertDictEqual(
326 'rate': 'org-openroadm-otn-odu-interfaces:ODU4',
327 u'monitoring-mode': u'terminated'
328 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
329 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
331 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
332 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
334 def test_12_service_path_create_xpdr_check(self):
335 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
336 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
337 "interface/XPDR1-CLIENT1-ETHERNET"
338 .format(self.restconf_baseurl))
339 headers = {'content-type': 'application/json'}
340 response = requests.request(
341 "GET", url, headers=headers, auth=('admin', 'admin'))
342 self.assertEqual(response.status_code, requests.codes.ok)
343 res = response.json()
344 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
345 self.assertDictEqual(
347 'name': 'XPDR1-CLIENT1-ETHERNET',
348 'administrative-state': 'inService',
349 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
350 'type': 'org-openroadm-interfaces:ethernetCsmacd',
351 'supporting-port': 'C1'
352 }, **res['interface'][0]),
355 self.assertDictEqual(
358 'auto-negotiation': 'enabled',
361 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
363 def test_13_service_path_create_xpdr_check(self):
364 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
365 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
366 "circuit-packs/1%2F0%2F1-PLUG-NET"
367 .format(self.restconf_baseurl))
368 headers = {'content-type': 'application/json'}
369 response = requests.request(
370 "GET", url, headers=headers, auth=('admin', 'admin'))
371 self.assertEqual(response.status_code, requests.codes.ok)
372 res = response.json()
373 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
375 def test_14_service_path_delete(self):
376 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
377 data = {"renderer:input": {
378 "renderer:service-name": "service_test",
379 "renderer:wave-number": "7",
380 "renderer:operation": "delete",
382 {"renderer:node-id": "ROADMA01",
383 "renderer:src-tp": "SRG1-PP7-TXRX",
384 "renderer:dest-tp": "DEG1-TTP-TXRX"},
385 {"renderer:node-id": "XPDRA01",
386 "renderer:src-tp": "XPDR1-CLIENT1",
387 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
388 headers = {'content-type': 'application/json'}
389 response = requests.request(
390 "POST", url, data=json.dumps(data),
391 headers=headers, auth=('admin', 'admin'))
392 self.assertEqual(response.status_code, requests.codes.ok)
393 self.assertEqual(response.json(), {
394 'output': {'result': 'Request processed', 'success': True}})
396 def test_15_service_path_delete_rdm_check(self):
397 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
398 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
399 "interface/DEG1-TTP-TXRX-7"
400 .format(self.restconf_baseurl))
401 headers = {'content-type': 'application/json'}
402 response = requests.request(
403 "GET", url, headers=headers, auth=('admin', 'admin'))
404 self.assertEqual(response.status_code, requests.codes.not_found)
405 res = response.json()
407 {"error-type": "application", "error-tag": "data-missing",
408 "error-message": "Request could not be completed because the relevant data model content does not exist"},
409 res['errors']['error'])
411 def test_16_service_path_delete_rdm_check(self):
412 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
413 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
414 "interface/SRG1-PP7-TXRX-7"
415 .format(self.restconf_baseurl))
416 headers = {'content-type': 'application/json'}
417 response = requests.request(
418 "GET", url, headers=headers, auth=('admin', 'admin'))
419 self.assertEqual(response.status_code, requests.codes.not_found)
420 res = response.json()
422 {"error-type": "application", "error-tag": "data-missing",
423 "error-message": "Request could not be completed because the relevant data model content does not exist"},
424 res['errors']['error'])
426 def test_17_service_path_delete_rdm_check(self):
427 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
428 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
429 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
430 .format(self.restconf_baseurl))
431 headers = {'content-type': 'application/json'}
432 response = requests.request(
433 "GET", url, headers=headers, auth=('admin', 'admin'))
434 self.assertEqual(response.status_code, requests.codes.not_found)
435 res = response.json()
437 {"error-type": "application", "error-tag": "data-missing",
438 "error-message": "Request could not be completed because the relevant data model content does not exist"},
439 res['errors']['error'])
441 def test_18_service_path_delete_xpdr_check(self):
442 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
443 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
444 "interface/XPDR1-NETWORK1-7"
445 .format(self.restconf_baseurl))
446 headers = {'content-type': 'application/json'}
447 response = requests.request(
448 "GET", url, headers=headers, auth=('admin', 'admin'))
449 self.assertEqual(response.status_code, requests.codes.not_found)
450 res = response.json()
452 {"error-type": "application", "error-tag": "data-missing",
453 "error-message": "Request could not be completed because the relevant data model content does not exist"},
454 res['errors']['error'])
456 def test_19_service_path_delete_xpdr_check(self):
457 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
458 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
459 "interface/XPDR1-NETWORK1-OTU"
460 .format(self.restconf_baseurl))
461 headers = {'content-type': 'application/json'}
462 response = requests.request(
463 "GET", url, headers=headers, auth=('admin', 'admin'))
464 self.assertEqual(response.status_code, requests.codes.not_found)
465 res = response.json()
467 {"error-type": "application", "error-tag": "data-missing",
468 "error-message": "Request could not be completed because the relevant data model content does not exist"},
469 res['errors']['error'])
471 def test_20_service_path_delete_xpdr_check(self):
472 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
473 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
474 "interface/XPDR1-NETWORK1-ODU"
475 .format(self.restconf_baseurl))
476 headers = {'content-type': 'application/json'}
477 response = requests.request(
478 "GET", url, headers=headers, auth=('admin', 'admin'))
479 self.assertEqual(response.status_code, requests.codes.not_found)
480 res = response.json()
482 {"error-type": "application", "error-tag": "data-missing",
483 "error-message": "Request could not be completed because the relevant data model content does not exist"},
484 res['errors']['error'])
486 def test_21_service_path_delete_xpdr_check(self):
487 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
488 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
489 "interface/XPDR1-CLIENT1-ETHERNET"
490 .format(self.restconf_baseurl))
491 headers = {'content-type': 'application/json'}
492 response = requests.request(
493 "GET", url, headers=headers, auth=('admin', 'admin'))
494 self.assertEqual(response.status_code, requests.codes.not_found)
495 res = response.json()
497 {"error-type": "application", "error-tag": "data-missing",
498 "error-message": "Request could not be completed because the relevant data model content does not exist"},
499 res['errors']['error'])
501 def test_22_service_path_delete_xpdr_check(self):
502 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
503 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
504 "circuit-packs/1%2F0%2F1-PLUG-NET"
505 .format(self.restconf_baseurl))
506 headers = {'content-type': 'application/json'}
507 response = requests.request(
508 "GET", url, headers=headers, auth=('admin', 'admin'))
509 self.assertEqual(response.status_code, requests.codes.ok)
510 res = response.json()
511 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
513 def test_23_rdm_device_disconnected(self):
514 url = ("{}/config/network-topology:"
515 "network-topology/topology/topology-netconf/node/ROADMA01"
516 .format(self.restconf_baseurl))
517 headers = {'content-type': 'application/json'}
518 response = requests.request(
519 "DELETE", url, headers=headers,
520 auth=('admin', 'admin'))
521 self.assertEqual(response.status_code, requests.codes.ok)
524 def test_24_xpdr_device_disconnected(self):
525 url = ("{}/config/network-topology:"
526 "network-topology/topology/topology-netconf/node/XPDRA01"
527 .format(self.restconf_baseurl))
528 headers = {'content-type': 'application/json'}
529 response = requests.request(
530 "DELETE", url, headers=headers,
531 auth=('admin', 'admin'))
532 self.assertEqual(response.status_code, requests.codes.ok)
536 if __name__ == "__main__":
537 unittest.main(verbosity=2, failfast=True)