3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
15 #from unittest.result import failfast
17 from common import test_utils
20 class TransportPCERendererTesting(unittest.TestCase):
26 cls.processes = test_utils.start_tpce()
27 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
30 def tearDownClass(cls):
31 for process in cls.processes:
32 test_utils.shutdown_process(process)
33 print("all processes killed")
36 print("execution of {}".format(self.id().split(".")[-1]))
39 def test_01_rdm_device_connected(self):
40 response = test_utils.mount_device("ROADMA01", 'roadma')
41 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
43 def test_02_xpdr_device_connected(self):
44 response = test_utils.mount_device("XPDRA01", 'xpdra')
45 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
47 def test_03_rdm_portmapping(self):
48 url = ("{}/config/transportpce-portmapping:network/"
50 .format(test_utils.RESTCONF_BASE_URL))
51 response = requests.request(
52 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
53 self.assertEqual(response.status_code, requests.codes.ok)
56 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
57 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
58 res['nodes'][0]['mapping'])
60 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
61 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
62 res['nodes'][0]['mapping'])
64 def test_04_xpdr_portmapping(self):
65 url = ("{}/config/transportpce-portmapping:network/"
67 .format(test_utils.RESTCONF_BASE_URL))
68 response = requests.request(
69 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
70 self.assertEqual(response.status_code, requests.codes.ok)
73 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
74 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
75 'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
76 'lcp-hash-val': '3b3ab304d2a6eb3c3623e52746dbb7aa'},
77 res['nodes'][0]['mapping'])
79 {'supporting-port': 'C1',
80 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
81 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
82 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
83 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae5'},
84 res['nodes'][0]['mapping'])
86 def test_05_service_path_create(self):
87 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
88 data = {"renderer:input": {
89 "renderer:service-name": "service_test",
90 "renderer:wave-number": "7",
91 "renderer:modulation-format": "qpsk",
92 "renderer:operation": "create",
94 {"renderer:node-id": "ROADMA01",
95 "renderer:src-tp": "SRG1-PP7-TXRX",
96 "renderer:dest-tp": "DEG1-TTP-TXRX"},
97 {"renderer:node-id": "XPDRA01",
98 "renderer:src-tp": "XPDR1-CLIENT1",
99 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
100 response = requests.request(
101 "POST", url, data=json.dumps(data),
102 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
103 self.assertEqual(response.status_code, requests.codes.ok)
104 res = response.json()
105 self.assertIn('Roadm-connection successfully created for nodes: ROADMA01', res["output"]["result"])
107 def test_06_service_path_create_rdm_check(self):
108 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
109 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
110 "interface/DEG1-TTP-TXRX-7"
111 .format(test_utils.RESTCONF_BASE_URL))
112 response = requests.request(
113 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
114 self.assertEqual(response.status_code, requests.codes.ok)
115 res = response.json()
116 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
117 self.assertDictEqual(
119 'name': 'DEG1-TTP-TXRX-7',
120 'administrative-state': 'inService',
121 'supporting-circuit-pack-name': '2/0',
122 'type': 'org-openroadm-interfaces:opticalChannel',
123 'supporting-port': 'L1'
124 }, **res['interface'][0]),
127 self.assertDictEqual(
128 {'wavelength-number': 7},
129 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
131 def test_07_service_path_create_rdm_check(self):
132 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
133 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
134 "interface/SRG1-PP7-TXRX-7"
135 .format(test_utils.RESTCONF_BASE_URL))
136 response = requests.request(
137 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
138 self.assertEqual(response.status_code, requests.codes.ok)
139 res = response.json()
140 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
141 self.assertDictEqual(
143 'name': 'SRG1-PP7-TXRX-7',
144 'administrative-state': 'inService',
145 'supporting-circuit-pack-name': '4/0',
146 'type': 'org-openroadm-interfaces:opticalChannel',
147 'supporting-port': 'C7'
148 }, **res['interface'][0]),
151 self.assertDictEqual(
152 {'wavelength-number': 7},
153 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
155 def test_08_service_path_create_rdm_check(self):
156 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
157 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
158 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
159 .format(test_utils.RESTCONF_BASE_URL))
160 response = requests.request(
161 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
165 self.assertDictEqual(
167 'connection-number': 'SRG1-PP7-TXRX-DEG1-TTP-TXRX-7',
168 'wavelength-number': 7,
169 'opticalControlMode': 'off'
170 }, **res['roadm-connections'][0]),
171 res['roadm-connections'][0]
173 self.assertDictEqual(
174 {'src-if': 'SRG1-PP7-TXRX-7'},
175 res['roadm-connections'][0]['source'])
176 self.assertDictEqual(
177 {'dst-if': 'DEG1-TTP-TXRX-7'},
178 res['roadm-connections'][0]['destination'])
180 def test_09_service_path_create_xpdr_check(self):
181 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
182 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
183 "interface/XPDR1-NETWORK1-7"
184 .format(test_utils.RESTCONF_BASE_URL))
185 response = requests.request(
186 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
189 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
190 self.assertDictEqual(
192 'name': 'XPDR1-NETWORK1-7',
193 'administrative-state': 'inService',
194 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
195 'type': 'org-openroadm-interfaces:opticalChannel',
196 'supporting-port': '1'
197 }, **res['interface'][0]),
200 self.assertDictEqual(
201 {u'rate': u'org-openroadm-optical-channel-interfaces:R100G',
202 u'transmit-power': -5,
203 u'wavelength-number': 7,
204 u'modulation-format': u'dp-qpsk'},
205 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
207 def test_10_service_path_create_xpdr_check(self):
208 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
209 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
210 "interface/XPDR1-NETWORK1-OTU"
211 .format(test_utils.RESTCONF_BASE_URL))
212 response = requests.request(
213 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
214 self.assertEqual(response.status_code, requests.codes.ok)
215 res = response.json()
216 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
217 self.assertDictEqual(
219 'name': 'XPDR1-NETWORK1-OTU',
220 'administrative-state': 'inService',
221 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
222 'type': 'org-openroadm-interfaces:otnOtu',
223 'supporting-port': '1',
224 'supporting-interface': 'XPDR1-NETWORK1-7'
225 }, **res['interface'][0]),
228 self.assertDictEqual(
229 {u'rate': u'org-openroadm-otn-otu-interfaces:OTU4',
231 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
233 def test_11_service_path_create_xpdr_check(self):
234 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
235 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
236 "interface/XPDR1-NETWORK1-ODU"
237 .format(test_utils.RESTCONF_BASE_URL))
238 response = requests.request(
239 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
240 self.assertEqual(response.status_code, requests.codes.ok)
241 res = response.json()
242 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
243 self.assertDictEqual(
245 'name': 'XPDR1-NETWORK1-ODU',
246 'administrative-state': 'inService',
247 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
248 'type': 'org-openroadm-interfaces:otnOdu',
249 'supporting-port': '1',
250 'supporting-interface': 'XPDR1-NETWORK1-OTU'
251 }, **res['interface'][0]),
254 self.assertDictEqual(
256 'rate': 'org-openroadm-otn-odu-interfaces:ODU4',
257 u'monitoring-mode': u'terminated'
258 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
259 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
261 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
262 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
264 def test_12_service_path_create_xpdr_check(self):
265 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
266 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
267 "interface/XPDR1-CLIENT1-ETHERNET"
268 .format(test_utils.RESTCONF_BASE_URL))
269 response = requests.request(
270 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
274 self.assertDictEqual(
276 'name': 'XPDR1-CLIENT1-ETHERNET',
277 'administrative-state': 'inService',
278 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
279 'type': 'org-openroadm-interfaces:ethernetCsmacd',
280 'supporting-port': 'C1'
281 }, **res['interface'][0]),
284 self.assertDictEqual(
287 'auto-negotiation': 'enabled',
290 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
292 def test_13_service_path_create_xpdr_check(self):
293 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
294 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
295 "circuit-packs/1%2F0%2F1-PLUG-NET"
296 .format(test_utils.RESTCONF_BASE_URL))
297 response = requests.request(
298 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
303 def test_14_service_path_delete(self):
304 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
305 data = {"renderer:input": {
306 "renderer:service-name": "service_test",
307 "renderer:wave-number": "7",
308 "renderer:operation": "delete",
310 {"renderer:node-id": "ROADMA01",
311 "renderer:src-tp": "SRG1-PP7-TXRX",
312 "renderer:dest-tp": "DEG1-TTP-TXRX"},
313 {"renderer:node-id": "XPDRA01",
314 "renderer:src-tp": "XPDR1-CLIENT1",
315 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
316 response = requests.request(
317 "POST", url, data=json.dumps(data),
318 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
319 self.assertEqual(response.status_code, requests.codes.ok)
320 self.assertEqual(response.json(), {
321 'output': {'result': 'Request processed', 'success': True}})
323 def test_15_service_path_delete_rdm_check(self):
324 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
325 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
326 "interface/DEG1-TTP-TXRX-7"
327 .format(test_utils.RESTCONF_BASE_URL))
328 response = requests.request(
329 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
330 self.assertEqual(response.status_code, requests.codes.not_found)
331 res = response.json()
333 {"error-type": "application", "error-tag": "data-missing",
334 "error-message": "Request could not be completed because the relevant data model content does not exist"},
335 res['errors']['error'])
337 def test_16_service_path_delete_rdm_check(self):
338 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
339 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
340 "interface/SRG1-PP7-TXRX-7"
341 .format(test_utils.RESTCONF_BASE_URL))
342 response = requests.request(
343 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
344 self.assertEqual(response.status_code, requests.codes.not_found)
345 res = response.json()
347 {"error-type": "application", "error-tag": "data-missing",
348 "error-message": "Request could not be completed because the relevant data model content does not exist"},
349 res['errors']['error'])
351 def test_17_service_path_delete_rdm_check(self):
352 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
353 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
354 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
355 .format(test_utils.RESTCONF_BASE_URL))
356 response = requests.request(
357 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
358 self.assertEqual(response.status_code, requests.codes.not_found)
359 res = response.json()
361 {"error-type": "application", "error-tag": "data-missing",
362 "error-message": "Request could not be completed because the relevant data model content does not exist"},
363 res['errors']['error'])
365 def test_18_service_path_delete_xpdr_check(self):
366 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
367 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
368 "interface/XPDR1-NETWORK1-7"
369 .format(test_utils.RESTCONF_BASE_URL))
370 response = requests.request(
371 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
372 self.assertEqual(response.status_code, requests.codes.not_found)
373 res = response.json()
375 {"error-type": "application", "error-tag": "data-missing",
376 "error-message": "Request could not be completed because the relevant data model content does not exist"},
377 res['errors']['error'])
379 def test_19_service_path_delete_xpdr_check(self):
380 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
381 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
382 "interface/XPDR1-NETWORK1-OTU"
383 .format(test_utils.RESTCONF_BASE_URL))
384 response = requests.request(
385 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
386 self.assertEqual(response.status_code, requests.codes.not_found)
387 res = response.json()
389 {"error-type": "application", "error-tag": "data-missing",
390 "error-message": "Request could not be completed because the relevant data model content does not exist"},
391 res['errors']['error'])
393 def test_20_service_path_delete_xpdr_check(self):
394 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
395 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
396 "interface/XPDR1-NETWORK1-ODU"
397 .format(test_utils.RESTCONF_BASE_URL))
398 response = requests.request(
399 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
400 self.assertEqual(response.status_code, requests.codes.not_found)
401 res = response.json()
403 {"error-type": "application", "error-tag": "data-missing",
404 "error-message": "Request could not be completed because the relevant data model content does not exist"},
405 res['errors']['error'])
407 def test_21_service_path_delete_xpdr_check(self):
408 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
409 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
410 "interface/XPDR1-CLIENT1-ETHERNET"
411 .format(test_utils.RESTCONF_BASE_URL))
412 response = requests.request(
413 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
414 self.assertEqual(response.status_code, requests.codes.not_found)
415 res = response.json()
417 {"error-type": "application", "error-tag": "data-missing",
418 "error-message": "Request could not be completed because the relevant data model content does not exist"},
419 res['errors']['error'])
421 def test_22_service_path_delete_xpdr_check(self):
422 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
423 "node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
424 "circuit-packs/1%2F0%2F1-PLUG-NET"
425 .format(test_utils.RESTCONF_BASE_URL))
426 response = requests.request(
427 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
428 self.assertEqual(response.status_code, requests.codes.ok)
429 res = response.json()
430 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
432 def test_23_rdm_device_disconnected(self):
433 response = test_utils.unmount_device("ROADMA01")
434 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
436 def test_24_xpdr_device_disconnected(self):
437 response = test_utils.unmount_device("XPDRA01")
438 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
441 if __name__ == "__main__":
442 unittest.main(verbosity=2, failfast=True)