3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
22 from common import test_utils
25 class TransportPCERendererTesting(unittest.TestCase):
28 restconf_baseurl = "http://localhost:8181/restconf"
32 cls.processes = test_utils.start_tpce()
33 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
36 def tearDownClass(cls):
37 for process in cls.processes:
38 test_utils.shutdown_process(process)
39 print("all processes killed")
42 print("execution of {}".format(self.id().split(".")[-1]))
45 def test_01_rdm_device_connected(self):
46 url = ("{}/config/network-topology:"
47 "network-topology/topology/topology-netconf/node/ROADMA01"
48 .format(self.restconf_baseurl))
50 "node-id": "ROADMA01",
51 "netconf-node-topology:username": "admin",
52 "netconf-node-topology:password": "admin",
53 "netconf-node-topology:host": "127.0.0.1",
54 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
55 "netconf-node-topology:tcp-only": "false",
56 "netconf-node-topology:pass-through": {}}]}
57 headers = {'content-type': 'application/json'}
58 response = requests.request(
59 "PUT", url, data=json.dumps(data), headers=headers,
60 auth=('admin', 'admin'))
61 self.assertEqual(response.status_code, requests.codes.created)
64 def test_02_xpdr_device_connected(self):
65 url = ("{}/config/network-topology:"
66 "network-topology/topology/topology-netconf/node/XPDRA01"
67 .format(self.restconf_baseurl))
70 "netconf-node-topology:username": "admin",
71 "netconf-node-topology:password": "admin",
72 "netconf-node-topology:host": "127.0.0.1",
73 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
74 "netconf-node-topology:tcp-only": "false",
75 "netconf-node-topology:pass-through": {}}]}
76 headers = {'content-type': 'application/json'}
77 response = requests.request(
78 "PUT", url, data=json.dumps(data), headers=headers,
79 auth=('admin', 'admin'))
80 self.assertEqual(response.status_code, requests.codes.created)
83 def test_03_rdm_portmapping(self):
84 url = ("{}/config/transportpce-portmapping:network/"
86 .format(self.restconf_baseurl))
87 headers = {'content-type': 'application/json'}
88 response = requests.request(
89 "GET", url, headers=headers, auth=('admin', 'admin'))
90 self.assertEqual(response.status_code, requests.codes.ok)
93 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
94 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
95 res['nodes'][0]['mapping'])
97 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
98 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
99 res['nodes'][0]['mapping'])
101 def test_04_xpdr_portmapping(self):
102 url = ("{}/config/transportpce-portmapping:network/"
104 .format(self.restconf_baseurl))
105 headers = {'content-type': 'application/json'}
106 response = requests.request(
107 "GET", url, headers=headers, auth=('admin', 'admin'))
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
111 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
112 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
113 'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
114 'lcp-hash-val': '3b3ab304d2a6eb3c3623e52746dbb7aa'},
115 res['nodes'][0]['mapping'])
117 {'supporting-port': 'C1',
118 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
119 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
120 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
121 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae5'},
122 res['nodes'][0]['mapping'])
124 def test_05_service_path_create(self):
125 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
126 data = {"renderer:input": {
127 "renderer:service-name": "service_test",
128 "renderer:wave-number": "7",
129 "renderer:modulation-format": "qpsk",
130 "renderer:operation": "create",
132 {"renderer:node-id": "ROADMA01",
133 "renderer:src-tp": "SRG1-PP7-TXRX",
134 "renderer:dest-tp": "DEG1-TTP-TXRX"},
135 {"renderer:node-id": "XPDRA01",
136 "renderer:src-tp": "XPDR1-CLIENT1",
137 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
138 headers = {'content-type': 'application/json'}
139 response = requests.request(
140 "POST", url, data=json.dumps(data),
141 headers=headers, auth=('admin', 'admin'))
142 self.assertEqual(response.status_code, requests.codes.ok)
143 res = response.json()
144 self.assertIn('Roadm-connection successfully created for nodes: ROADMA01', res["output"]["result"])
146 def test_06_service_path_create_rdm_check(self):
147 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
148 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
149 "interface/DEG1-TTP-TXRX-7"
150 .format(self.restconf_baseurl))
151 headers = {'content-type': 'application/json'}
152 response = requests.request(
153 "GET", url, headers=headers, auth=('admin', 'admin'))
154 self.assertEqual(response.status_code, requests.codes.ok)
155 res = response.json()
156 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
157 self.assertDictEqual(
159 'name': 'DEG1-TTP-TXRX-7',
160 'administrative-state': 'inService',
161 'supporting-circuit-pack-name': '2/0',
162 'type': 'org-openroadm-interfaces:opticalChannel',
163 'supporting-port': 'L1'
164 }, **res['interface'][0]),
167 self.assertDictEqual(
168 {'wavelength-number': 7},
169 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
171 def test_07_service_path_create_rdm_check(self):
172 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
173 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
174 "interface/SRG1-PP7-TXRX-7"
175 .format(self.restconf_baseurl))
176 headers = {'content-type': 'application/json'}
177 response = requests.request(
178 "GET", url, headers=headers, auth=('admin', 'admin'))
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
182 self.assertDictEqual(
184 'name': 'SRG1-PP7-TXRX-7',
185 'administrative-state': 'inService',
186 'supporting-circuit-pack-name': '4/0',
187 'type': 'org-openroadm-interfaces:opticalChannel',
188 'supporting-port': 'C7'
189 }, **res['interface'][0]),
192 self.assertDictEqual(
193 {'wavelength-number': 7},
194 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
196 def test_08_service_path_create_rdm_check(self):
197 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
198 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
199 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
200 .format(self.restconf_baseurl))
201 headers = {'content-type': 'application/json'}
202 response = requests.request(
203 "GET", url, headers=headers, auth=('admin', 'admin'))
204 self.assertEqual(response.status_code, requests.codes.ok)
205 res = response.json()
206 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
207 self.assertDictEqual(
209 'connection-number': 'SRG1-PP7-TXRX-DEG1-TTP-TXRX-7',
210 'wavelength-number': 7,
211 'opticalControlMode': 'off'
212 }, **res['roadm-connections'][0]),
213 res['roadm-connections'][0]
215 self.assertDictEqual(
216 {'src-if': 'SRG1-PP7-TXRX-7'},
217 res['roadm-connections'][0]['source'])
218 self.assertDictEqual(
219 {'dst-if': 'DEG1-TTP-TXRX-7'},
220 res['roadm-connections'][0]['destination'])
222 def test_09_service_path_create_xpdr_check(self):
223 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
224 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
225 "interface/XPDR1-NETWORK1-7"
226 .format(self.restconf_baseurl))
227 headers = {'content-type': 'application/json'}
228 response = requests.request(
229 "GET", url, headers=headers, auth=('admin', 'admin'))
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
232 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
233 self.assertDictEqual(
235 'name': 'XPDR1-NETWORK1-7',
236 'administrative-state': 'inService',
237 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
238 'type': 'org-openroadm-interfaces:opticalChannel',
239 'supporting-port': '1'
240 }, **res['interface'][0]),
243 self.assertDictEqual(
244 {u'rate': u'org-openroadm-optical-channel-interfaces:R100G',
245 u'transmit-power': -5,
246 u'wavelength-number': 7,
247 u'modulation-format': u'dp-qpsk'},
248 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
250 def test_10_service_path_create_xpdr_check(self):
251 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
252 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
253 "interface/XPDR1-NETWORK1-OTU"
254 .format(self.restconf_baseurl))
255 headers = {'content-type': 'application/json'}
256 response = requests.request(
257 "GET", url, headers=headers, auth=('admin', 'admin'))
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
261 self.assertDictEqual(
263 'name': 'XPDR1-NETWORK1-OTU',
264 'administrative-state': 'inService',
265 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
266 'type': 'org-openroadm-interfaces:otnOtu',
267 'supporting-port': '1',
268 'supporting-interface': 'XPDR1-NETWORK1-7'
269 }, **res['interface'][0]),
272 self.assertDictEqual(
273 {u'rate': u'org-openroadm-otn-otu-interfaces:OTU4',
275 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
277 def test_11_service_path_create_xpdr_check(self):
278 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
279 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
280 "interface/XPDR1-NETWORK1-ODU"
281 .format(self.restconf_baseurl))
282 headers = {'content-type': 'application/json'}
283 response = requests.request(
284 "GET", url, headers=headers, auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
288 self.assertDictEqual(
290 'name': 'XPDR1-NETWORK1-ODU',
291 'administrative-state': 'inService',
292 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
293 'type': 'org-openroadm-interfaces:otnOdu',
294 'supporting-port': '1',
295 'supporting-interface': 'XPDR1-NETWORK1-OTU'
296 }, **res['interface'][0]),
299 self.assertDictEqual(
301 'rate': 'org-openroadm-otn-odu-interfaces:ODU4',
302 u'monitoring-mode': u'terminated'
303 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
304 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
306 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
307 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
309 def test_12_service_path_create_xpdr_check(self):
310 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
311 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
312 "interface/XPDR1-CLIENT1-ETHERNET"
313 .format(self.restconf_baseurl))
314 headers = {'content-type': 'application/json'}
315 response = requests.request(
316 "GET", url, headers=headers, auth=('admin', 'admin'))
317 self.assertEqual(response.status_code, requests.codes.ok)
318 res = response.json()
319 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
320 self.assertDictEqual(
322 'name': 'XPDR1-CLIENT1-ETHERNET',
323 'administrative-state': 'inService',
324 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
325 'type': 'org-openroadm-interfaces:ethernetCsmacd',
326 'supporting-port': 'C1'
327 }, **res['interface'][0]),
330 self.assertDictEqual(
333 'auto-negotiation': 'enabled',
336 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
338 def test_13_service_path_create_xpdr_check(self):
339 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
340 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
341 "circuit-packs/1%2F0%2F1-PLUG-NET"
342 .format(self.restconf_baseurl))
343 headers = {'content-type': 'application/json'}
344 response = requests.request(
345 "GET", url, headers=headers, auth=('admin', 'admin'))
346 self.assertEqual(response.status_code, requests.codes.ok)
347 res = response.json()
348 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
350 def test_14_service_path_delete(self):
351 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
352 data = {"renderer:input": {
353 "renderer:service-name": "service_test",
354 "renderer:wave-number": "7",
355 "renderer:operation": "delete",
357 {"renderer:node-id": "ROADMA01",
358 "renderer:src-tp": "SRG1-PP7-TXRX",
359 "renderer:dest-tp": "DEG1-TTP-TXRX"},
360 {"renderer:node-id": "XPDRA01",
361 "renderer:src-tp": "XPDR1-CLIENT1",
362 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
363 headers = {'content-type': 'application/json'}
364 response = requests.request(
365 "POST", url, data=json.dumps(data),
366 headers=headers, auth=('admin', 'admin'))
367 self.assertEqual(response.status_code, requests.codes.ok)
368 self.assertEqual(response.json(), {
369 'output': {'result': 'Request processed', 'success': True}})
371 def test_15_service_path_delete_rdm_check(self):
372 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
373 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
374 "interface/DEG1-TTP-TXRX-7"
375 .format(self.restconf_baseurl))
376 headers = {'content-type': 'application/json'}
377 response = requests.request(
378 "GET", url, headers=headers, auth=('admin', 'admin'))
379 self.assertEqual(response.status_code, requests.codes.not_found)
380 res = response.json()
382 {"error-type": "application", "error-tag": "data-missing",
383 "error-message": "Request could not be completed because the relevant data model content does not exist"},
384 res['errors']['error'])
386 def test_16_service_path_delete_rdm_check(self):
387 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
388 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
389 "interface/SRG1-PP7-TXRX-7"
390 .format(self.restconf_baseurl))
391 headers = {'content-type': 'application/json'}
392 response = requests.request(
393 "GET", url, headers=headers, auth=('admin', 'admin'))
394 self.assertEqual(response.status_code, requests.codes.not_found)
395 res = response.json()
397 {"error-type": "application", "error-tag": "data-missing",
398 "error-message": "Request could not be completed because the relevant data model content does not exist"},
399 res['errors']['error'])
401 def test_17_service_path_delete_rdm_check(self):
402 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
403 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
404 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
405 .format(self.restconf_baseurl))
406 headers = {'content-type': 'application/json'}
407 response = requests.request(
408 "GET", url, headers=headers, auth=('admin', 'admin'))
409 self.assertEqual(response.status_code, requests.codes.not_found)
410 res = response.json()
412 {"error-type": "application", "error-tag": "data-missing",
413 "error-message": "Request could not be completed because the relevant data model content does not exist"},
414 res['errors']['error'])
416 def test_18_service_path_delete_xpdr_check(self):
417 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
418 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
419 "interface/XPDR1-NETWORK1-7"
420 .format(self.restconf_baseurl))
421 headers = {'content-type': 'application/json'}
422 response = requests.request(
423 "GET", url, headers=headers, auth=('admin', 'admin'))
424 self.assertEqual(response.status_code, requests.codes.not_found)
425 res = response.json()
427 {"error-type": "application", "error-tag": "data-missing",
428 "error-message": "Request could not be completed because the relevant data model content does not exist"},
429 res['errors']['error'])
431 def test_19_service_path_delete_xpdr_check(self):
432 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
433 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
434 "interface/XPDR1-NETWORK1-OTU"
435 .format(self.restconf_baseurl))
436 headers = {'content-type': 'application/json'}
437 response = requests.request(
438 "GET", url, headers=headers, auth=('admin', 'admin'))
439 self.assertEqual(response.status_code, requests.codes.not_found)
440 res = response.json()
442 {"error-type": "application", "error-tag": "data-missing",
443 "error-message": "Request could not be completed because the relevant data model content does not exist"},
444 res['errors']['error'])
446 def test_20_service_path_delete_xpdr_check(self):
447 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
448 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
449 "interface/XPDR1-NETWORK1-ODU"
450 .format(self.restconf_baseurl))
451 headers = {'content-type': 'application/json'}
452 response = requests.request(
453 "GET", url, headers=headers, auth=('admin', 'admin'))
454 self.assertEqual(response.status_code, requests.codes.not_found)
455 res = response.json()
457 {"error-type": "application", "error-tag": "data-missing",
458 "error-message": "Request could not be completed because the relevant data model content does not exist"},
459 res['errors']['error'])
461 def test_21_service_path_delete_xpdr_check(self):
462 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
463 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
464 "interface/XPDR1-CLIENT1-ETHERNET"
465 .format(self.restconf_baseurl))
466 headers = {'content-type': 'application/json'}
467 response = requests.request(
468 "GET", url, headers=headers, auth=('admin', 'admin'))
469 self.assertEqual(response.status_code, requests.codes.not_found)
470 res = response.json()
472 {"error-type": "application", "error-tag": "data-missing",
473 "error-message": "Request could not be completed because the relevant data model content does not exist"},
474 res['errors']['error'])
476 def test_22_service_path_delete_xpdr_check(self):
477 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
478 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
479 "circuit-packs/1%2F0%2F1-PLUG-NET"
480 .format(self.restconf_baseurl))
481 headers = {'content-type': 'application/json'}
482 response = requests.request(
483 "GET", url, headers=headers, auth=('admin', 'admin'))
484 self.assertEqual(response.status_code, requests.codes.ok)
485 res = response.json()
486 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
488 def test_23_rdm_device_disconnected(self):
489 url = ("{}/config/network-topology:"
490 "network-topology/topology/topology-netconf/node/ROADMA01"
491 .format(self.restconf_baseurl))
492 headers = {'content-type': 'application/json'}
493 response = requests.request(
494 "DELETE", url, headers=headers,
495 auth=('admin', 'admin'))
496 self.assertEqual(response.status_code, requests.codes.ok)
499 def test_24_xpdr_device_disconnected(self):
500 url = ("{}/config/network-topology:"
501 "network-topology/topology/topology-netconf/node/XPDRA01"
502 .format(self.restconf_baseurl))
503 headers = {'content-type': 'application/json'}
504 response = requests.request(
505 "DELETE", url, headers=headers,
506 auth=('admin', 'admin'))
507 self.assertEqual(response.status_code, requests.codes.ok)
511 if __name__ == "__main__":
512 unittest.main(verbosity=2, failfast=True)