3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
22 from common import test_utils
25 class TransportPCERendererTesting(unittest.TestCase):
31 cls.processes = test_utils.start_tpce()
32 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
35 def tearDownClass(cls):
36 for process in cls.processes:
37 test_utils.shutdown_process(process)
38 print("all processes killed")
41 print("execution of {}".format(self.id().split(".")[-1]))
44 def test_01_rdm_device_connected(self):
45 response = test_utils.mount_device("ROADMA01", 'roadma')
46 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
48 def test_02_xpdr_device_connected(self):
49 response = test_utils.mount_device("XPDRA01", 'xpdra')
50 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
52 def test_03_rdm_portmapping(self):
53 url = ("{}/config/transportpce-portmapping:network/"
55 .format(test_utils.RESTCONF_BASE_URL))
56 response = requests.request(
57 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
58 self.assertEqual(response.status_code, requests.codes.ok)
61 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
62 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
63 res['nodes'][0]['mapping'])
65 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
66 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
67 res['nodes'][0]['mapping'])
69 def test_04_xpdr_portmapping(self):
70 url = ("{}/config/transportpce-portmapping:network/"
72 .format(test_utils.RESTCONF_BASE_URL))
73 response = requests.request(
74 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
75 self.assertEqual(response.status_code, requests.codes.ok)
78 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
79 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
80 'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
81 'lcp-hash-val': '3b3ab304d2a6eb3c3623e52746dbb7aa'},
82 res['nodes'][0]['mapping'])
84 {'supporting-port': 'C1',
85 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
86 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
87 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
88 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae5'},
89 res['nodes'][0]['mapping'])
91 def test_05_service_path_create(self):
92 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
93 data = {"renderer:input": {
94 "renderer:service-name": "service_test",
95 "renderer:wave-number": "7",
96 "renderer:modulation-format": "qpsk",
97 "renderer:operation": "create",
99 {"renderer:node-id": "ROADMA01",
100 "renderer:src-tp": "SRG1-PP7-TXRX",
101 "renderer:dest-tp": "DEG1-TTP-TXRX"},
102 {"renderer:node-id": "XPDRA01",
103 "renderer:src-tp": "XPDR1-CLIENT1",
104 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
105 response = requests.request(
106 "POST", url, data=json.dumps(data),
107 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
110 self.assertIn('Roadm-connection successfully created for nodes: ROADMA01', res["output"]["result"])
112 def test_06_service_path_create_rdm_check(self):
113 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
114 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
115 "interface/DEG1-TTP-TXRX-7"
116 .format(test_utils.RESTCONF_BASE_URL))
117 response = requests.request(
118 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
119 self.assertEqual(response.status_code, requests.codes.ok)
120 res = response.json()
121 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
122 self.assertDictEqual(
124 'name': 'DEG1-TTP-TXRX-7',
125 'administrative-state': 'inService',
126 'supporting-circuit-pack-name': '2/0',
127 'type': 'org-openroadm-interfaces:opticalChannel',
128 'supporting-port': 'L1'
129 }, **res['interface'][0]),
132 self.assertDictEqual(
133 {'wavelength-number': 7},
134 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
136 def test_07_service_path_create_rdm_check(self):
137 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
138 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
139 "interface/SRG1-PP7-TXRX-7"
140 .format(test_utils.RESTCONF_BASE_URL))
141 response = requests.request(
142 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
143 self.assertEqual(response.status_code, requests.codes.ok)
144 res = response.json()
145 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
146 self.assertDictEqual(
148 'name': 'SRG1-PP7-TXRX-7',
149 'administrative-state': 'inService',
150 'supporting-circuit-pack-name': '4/0',
151 'type': 'org-openroadm-interfaces:opticalChannel',
152 'supporting-port': 'C7'
153 }, **res['interface'][0]),
156 self.assertDictEqual(
157 {'wavelength-number': 7},
158 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
160 def test_08_service_path_create_rdm_check(self):
161 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
162 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
163 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
164 .format(test_utils.RESTCONF_BASE_URL))
165 response = requests.request(
166 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
170 self.assertDictEqual(
172 'connection-number': 'SRG1-PP7-TXRX-DEG1-TTP-TXRX-7',
173 'wavelength-number': 7,
174 'opticalControlMode': 'off'
175 }, **res['roadm-connections'][0]),
176 res['roadm-connections'][0]
178 self.assertDictEqual(
179 {'src-if': 'SRG1-PP7-TXRX-7'},
180 res['roadm-connections'][0]['source'])
181 self.assertDictEqual(
182 {'dst-if': 'DEG1-TTP-TXRX-7'},
183 res['roadm-connections'][0]['destination'])
185 def test_09_service_path_create_xpdr_check(self):
186 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
187 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
188 "interface/XPDR1-NETWORK1-7"
189 .format(test_utils.RESTCONF_BASE_URL))
190 response = requests.request(
191 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
192 self.assertEqual(response.status_code, requests.codes.ok)
193 res = response.json()
194 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
195 self.assertDictEqual(
197 'name': 'XPDR1-NETWORK1-7',
198 'administrative-state': 'inService',
199 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
200 'type': 'org-openroadm-interfaces:opticalChannel',
201 'supporting-port': '1'
202 }, **res['interface'][0]),
205 self.assertDictEqual(
206 {u'rate': u'org-openroadm-optical-channel-interfaces:R100G',
207 u'transmit-power': -5,
208 u'wavelength-number': 7,
209 u'modulation-format': u'dp-qpsk'},
210 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
212 def test_10_service_path_create_xpdr_check(self):
213 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
214 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
215 "interface/XPDR1-NETWORK1-OTU"
216 .format(test_utils.RESTCONF_BASE_URL))
217 response = requests.request(
218 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
222 self.assertDictEqual(
224 'name': 'XPDR1-NETWORK1-OTU',
225 'administrative-state': 'inService',
226 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
227 'type': 'org-openroadm-interfaces:otnOtu',
228 'supporting-port': '1',
229 'supporting-interface': 'XPDR1-NETWORK1-7'
230 }, **res['interface'][0]),
233 self.assertDictEqual(
234 {u'rate': u'org-openroadm-otn-otu-interfaces:OTU4',
236 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
238 def test_11_service_path_create_xpdr_check(self):
239 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
240 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
241 "interface/XPDR1-NETWORK1-ODU"
242 .format(test_utils.RESTCONF_BASE_URL))
243 response = requests.request(
244 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
248 self.assertDictEqual(
250 'name': 'XPDR1-NETWORK1-ODU',
251 'administrative-state': 'inService',
252 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
253 'type': 'org-openroadm-interfaces:otnOdu',
254 'supporting-port': '1',
255 'supporting-interface': 'XPDR1-NETWORK1-OTU'
256 }, **res['interface'][0]),
259 self.assertDictEqual(
261 'rate': 'org-openroadm-otn-odu-interfaces:ODU4',
262 u'monitoring-mode': u'terminated'
263 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
264 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
266 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
267 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
269 def test_12_service_path_create_xpdr_check(self):
270 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
271 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
272 "interface/XPDR1-CLIENT1-ETHERNET"
273 .format(test_utils.RESTCONF_BASE_URL))
274 response = requests.request(
275 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
278 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
279 self.assertDictEqual(
281 'name': 'XPDR1-CLIENT1-ETHERNET',
282 'administrative-state': 'inService',
283 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
284 'type': 'org-openroadm-interfaces:ethernetCsmacd',
285 'supporting-port': 'C1'
286 }, **res['interface'][0]),
289 self.assertDictEqual(
292 'auto-negotiation': 'enabled',
295 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
297 def test_13_service_path_create_xpdr_check(self):
298 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
299 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
300 "circuit-packs/1%2F0%2F1-PLUG-NET"
301 .format(test_utils.RESTCONF_BASE_URL))
302 response = requests.request(
303 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
304 self.assertEqual(response.status_code, requests.codes.ok)
305 res = response.json()
306 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
308 def test_14_service_path_delete(self):
309 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
310 data = {"renderer:input": {
311 "renderer:service-name": "service_test",
312 "renderer:wave-number": "7",
313 "renderer:operation": "delete",
315 {"renderer:node-id": "ROADMA01",
316 "renderer:src-tp": "SRG1-PP7-TXRX",
317 "renderer:dest-tp": "DEG1-TTP-TXRX"},
318 {"renderer:node-id": "XPDRA01",
319 "renderer:src-tp": "XPDR1-CLIENT1",
320 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
321 response = requests.request(
322 "POST", url, data=json.dumps(data),
323 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
324 self.assertEqual(response.status_code, requests.codes.ok)
325 self.assertEqual(response.json(), {
326 'output': {'result': 'Request processed', 'success': True}})
328 def test_15_service_path_delete_rdm_check(self):
329 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
330 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
331 "interface/DEG1-TTP-TXRX-7"
332 .format(test_utils.RESTCONF_BASE_URL))
333 response = requests.request(
334 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
335 self.assertEqual(response.status_code, requests.codes.not_found)
336 res = response.json()
338 {"error-type": "application", "error-tag": "data-missing",
339 "error-message": "Request could not be completed because the relevant data model content does not exist"},
340 res['errors']['error'])
342 def test_16_service_path_delete_rdm_check(self):
343 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
344 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
345 "interface/SRG1-PP7-TXRX-7"
346 .format(test_utils.RESTCONF_BASE_URL))
347 response = requests.request(
348 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
349 self.assertEqual(response.status_code, requests.codes.not_found)
350 res = response.json()
352 {"error-type": "application", "error-tag": "data-missing",
353 "error-message": "Request could not be completed because the relevant data model content does not exist"},
354 res['errors']['error'])
356 def test_17_service_path_delete_rdm_check(self):
357 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
358 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
359 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
360 .format(test_utils.RESTCONF_BASE_URL))
361 response = requests.request(
362 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
363 self.assertEqual(response.status_code, requests.codes.not_found)
364 res = response.json()
366 {"error-type": "application", "error-tag": "data-missing",
367 "error-message": "Request could not be completed because the relevant data model content does not exist"},
368 res['errors']['error'])
370 def test_18_service_path_delete_xpdr_check(self):
371 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
372 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
373 "interface/XPDR1-NETWORK1-7"
374 .format(test_utils.RESTCONF_BASE_URL))
375 response = requests.request(
376 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
377 self.assertEqual(response.status_code, requests.codes.not_found)
378 res = response.json()
380 {"error-type": "application", "error-tag": "data-missing",
381 "error-message": "Request could not be completed because the relevant data model content does not exist"},
382 res['errors']['error'])
384 def test_19_service_path_delete_xpdr_check(self):
385 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
386 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
387 "interface/XPDR1-NETWORK1-OTU"
388 .format(test_utils.RESTCONF_BASE_URL))
389 response = requests.request(
390 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
391 self.assertEqual(response.status_code, requests.codes.not_found)
392 res = response.json()
394 {"error-type": "application", "error-tag": "data-missing",
395 "error-message": "Request could not be completed because the relevant data model content does not exist"},
396 res['errors']['error'])
398 def test_20_service_path_delete_xpdr_check(self):
399 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
400 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
401 "interface/XPDR1-NETWORK1-ODU"
402 .format(test_utils.RESTCONF_BASE_URL))
403 response = requests.request(
404 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
405 self.assertEqual(response.status_code, requests.codes.not_found)
406 res = response.json()
408 {"error-type": "application", "error-tag": "data-missing",
409 "error-message": "Request could not be completed because the relevant data model content does not exist"},
410 res['errors']['error'])
412 def test_21_service_path_delete_xpdr_check(self):
413 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
414 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
415 "interface/XPDR1-CLIENT1-ETHERNET"
416 .format(test_utils.RESTCONF_BASE_URL))
417 response = requests.request(
418 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
419 self.assertEqual(response.status_code, requests.codes.not_found)
420 res = response.json()
422 {"error-type": "application", "error-tag": "data-missing",
423 "error-message": "Request could not be completed because the relevant data model content does not exist"},
424 res['errors']['error'])
426 def test_22_service_path_delete_xpdr_check(self):
427 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
428 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
429 "circuit-packs/1%2F0%2F1-PLUG-NET"
430 .format(test_utils.RESTCONF_BASE_URL))
431 response = requests.request(
432 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
433 self.assertEqual(response.status_code, requests.codes.ok)
434 res = response.json()
435 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
437 def test_23_rdm_device_disconnected(self):
438 response = test_utils.unmount_device("ROADMA01")
439 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
441 def test_24_xpdr_device_disconnected(self):
442 response = test_utils.unmount_device("XPDRA01")
443 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
446 if __name__ == "__main__":
447 unittest.main(verbosity=2, failfast=True)