3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportPCERendererTesting(unittest.TestCase):
27 honeynode_process1 = None
28 honeynode_process2 = None
30 restconf_baseurl = "http://localhost:8181/restconf"
32 #START_IGNORE_XTESTING
36 print ("starting honeynode1...")
37 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
40 print ("starting honeynode2...")
41 cls.honeynode_process2 = test_utils.start_roadma_honeynode()
44 print ("starting opendaylight...")
45 cls.odl_process = test_utils.start_tpce()
47 print ("opendaylight started")
50 def tearDownClass(cls):
51 for child in psutil.Process(cls.odl_process.pid).children():
52 child.send_signal(signal.SIGINT)
54 cls.odl_process.send_signal(signal.SIGINT)
55 cls.odl_process.wait()
56 for child in psutil.Process(cls.honeynode_process1.pid).children():
57 child.send_signal(signal.SIGINT)
59 cls.honeynode_process1.send_signal(signal.SIGINT)
60 cls.honeynode_process1.wait()
61 for child in psutil.Process(cls.honeynode_process2.pid).children():
62 child.send_signal(signal.SIGINT)
64 cls.honeynode_process2.send_signal(signal.SIGINT)
65 cls.honeynode_process2.wait()
68 print ("execution of {}".format(self.id().split(".")[-1]))
73 def test_01_rdm_device_connected(self):
74 url = ("{}/config/network-topology:"
75 "network-topology/topology/topology-netconf/node/ROADMA"
76 .format(self.restconf_baseurl))
79 "netconf-node-topology:username": "admin",
80 "netconf-node-topology:password": "admin",
81 "netconf-node-topology:host": "127.0.0.1",
82 "netconf-node-topology:port": "17831",
83 "netconf-node-topology:tcp-only": "false",
84 "netconf-node-topology:pass-through": {}}]}
85 headers = {'content-type': 'application/json'}
86 response = requests.request(
87 "PUT", url, data=json.dumps(data), headers=headers,
88 auth=('admin', 'admin'))
89 self.assertEqual(response.status_code, requests.codes.created)
92 def test_02_xpdr_device_connected(self):
93 url = ("{}/config/network-topology:"
94 "network-topology/topology/topology-netconf/node/XPDRA"
95 .format(self.restconf_baseurl))
98 "netconf-node-topology:username": "admin",
99 "netconf-node-topology:password": "admin",
100 "netconf-node-topology:host": "127.0.0.1",
101 "netconf-node-topology:port": "17830",
102 "netconf-node-topology:tcp-only": "false",
103 "netconf-node-topology:pass-through": {}}]}
104 headers = {'content-type': 'application/json'}
105 response = requests.request(
106 "PUT", url, data=json.dumps(data), headers=headers,
107 auth=('admin', 'admin'))
108 self.assertEqual(response.status_code, requests.codes.created)
111 def test_03_rdm_portmapping(self):
112 url = ("{}/config/transportpce-portmapping:network/"
114 .format(self.restconf_baseurl))
115 headers = {'content-type': 'application/json'}
116 response = requests.request(
117 "GET", url, headers=headers, auth=('admin', 'admin'))
118 self.assertEqual(response.status_code, requests.codes.ok)
119 res = response.json()
121 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
122 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
123 res['nodes'][0]['mapping'])
125 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
126 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
127 res['nodes'][0]['mapping'])
129 def test_04_xpdr_portmapping(self):
130 url = ("{}/config/transportpce-portmapping:network/"
132 .format(self.restconf_baseurl))
133 headers = {'content-type': 'application/json'}
134 response = requests.request(
135 "GET", url, headers=headers, auth=('admin', 'admin'))
136 self.assertEqual(response.status_code, requests.codes.ok)
137 res = response.json()
139 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
140 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
141 'associated-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network'},
142 res['nodes'][0]['mapping'])
144 {'supporting-port': 'C1',
145 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
146 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
147 'associated-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client'},
148 res['nodes'][0]['mapping'])
150 def test_05_service_path_create(self):
151 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
152 data = {"renderer:input": {
153 "renderer:service-name": "service_test",
154 "renderer:wave-number": "7",
155 "renderer:modulation-format": "qpsk",
156 "renderer:operation": "create",
158 {"renderer:node-id": "ROADMA",
159 "renderer:src-tp": "SRG1-PP7-TXRX",
160 "renderer:dest-tp": "DEG1-TTP-TXRX"},
161 {"renderer:node-id": "XPDRA",
162 "renderer:src-tp": "XPDR1-CLIENT1",
163 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
164 headers = {'content-type': 'application/json'}
165 response = requests.request(
166 "POST", url, data=json.dumps(data),
167 headers=headers, auth=('admin', 'admin'))
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertIn('Roadm-connection successfully created for nodes: ROADMA', res["output"]["result"])
172 def test_06_service_path_create_rdm_check(self):
173 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
174 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
175 "interface/DEG1-TTP-TXRX-7"
176 .format(self.restconf_baseurl))
177 headers = {'content-type': 'application/json'}
178 response = requests.request(
179 "GET", url, headers=headers, auth=('admin', 'admin'))
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertDictContainsSubset({'name': 'DEG1-TTP-TXRX-7', 'administrative-state': 'inService',
183 'supporting-circuit-pack-name': '2/0',
184 'type': 'org-openroadm-interfaces:opticalChannel',
185 'supporting-port': 'L1'}, res['interface'][0])
186 self.assertDictEqual(
187 {'wavelength-number': 7},
188 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
190 def test_07_service_path_create_rdm_check(self):
191 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
192 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
193 "interface/SRG1-PP7-TXRX-7"
194 .format(self.restconf_baseurl))
195 headers = {'content-type': 'application/json'}
196 response = requests.request(
197 "GET", url, headers=headers, auth=('admin', 'admin'))
198 self.assertEqual(response.status_code, requests.codes.ok)
199 res = response.json()
200 self.assertDictContainsSubset(
201 {'name': 'SRG1-PP7-TXRX-7', 'administrative-state': 'inService',
202 'supporting-circuit-pack-name': '4/0',
203 'type': 'org-openroadm-interfaces:opticalChannel',
204 'supporting-port': 'C7'},
206 self.assertDictEqual(
207 {'wavelength-number': 7},
208 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
210 def test_08_service_path_create_rdm_check(self):
211 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
212 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
213 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
214 .format(self.restconf_baseurl))
215 headers = {'content-type': 'application/json'}
216 response = requests.request(
217 "GET", url, headers=headers, auth=('admin', 'admin'))
218 self.assertEqual(response.status_code, requests.codes.ok)
219 res = response.json()
220 self.assertDictContainsSubset(
221 {'connection-number': 'SRG1-PP7-TXRX-DEG1-TTP-TXRX-7',
222 'wavelength-number': 7,
223 'opticalControlMode': 'off'},
224 res['roadm-connections'][0])
225 self.assertDictEqual(
226 {'src-if': 'SRG1-PP7-TXRX-7'},
227 res['roadm-connections'][0]['source'])
228 self.assertDictEqual(
229 {'dst-if': 'DEG1-TTP-TXRX-7'},
230 res['roadm-connections'][0]['destination'])
232 def test_09_service_path_create_xpdr_check(self):
233 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
234 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
235 "interface/XPDR1-NETWORK1-7"
236 .format(self.restconf_baseurl))
237 headers = {'content-type': 'application/json'}
238 response = requests.request(
239 "GET", url, headers=headers, auth=('admin', 'admin'))
240 self.assertEqual(response.status_code, requests.codes.ok)
241 res = response.json()
242 self.assertDictContainsSubset(
243 {'name': 'XPDR1-NETWORK1-7', 'administrative-state': 'inService',
244 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
245 'type': 'org-openroadm-interfaces:opticalChannel',
246 'supporting-port': '1'},
248 self.assertDictEqual(
249 {u'rate': u'org-openroadm-optical-channel-interfaces:R100G',
250 u'transmit-power':-5,
251 u'wavelength-number': 7,
252 u'modulation-format': u'dp-qpsk'},
253 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
255 def test_10_service_path_create_xpdr_check(self):
256 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
257 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
258 "interface/XPDR1-NETWORK1-OTU"
259 .format(self.restconf_baseurl))
260 headers = {'content-type': 'application/json'}
261 response = requests.request(
262 "GET", url, headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 self.assertDictContainsSubset(
266 {'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
267 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
268 'type': 'org-openroadm-interfaces:otnOtu',
269 'supporting-port': '1',
270 'supporting-interface': 'XPDR1-NETWORK1-7'},
272 self.assertDictEqual(
273 {u'rate': u'org-openroadm-otn-otu-interfaces:OTU4',
275 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
277 def test_11_service_path_create_xpdr_check(self):
278 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
279 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
280 "interface/XPDR1-NETWORK1-ODU"
281 .format(self.restconf_baseurl))
282 headers = {'content-type': 'application/json'}
283 response = requests.request(
284 "GET", url, headers=headers, auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 self.assertDictContainsSubset(
288 {'name': 'XPDR1-NETWORK1-ODU', 'administrative-state': 'inService',
289 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
290 'type': 'org-openroadm-interfaces:otnOdu',
291 'supporting-port': '1',
292 'supporting-interface': 'XPDR1-NETWORK1-OTU'},
294 self.assertDictContainsSubset(
295 {'rate': 'org-openroadm-otn-odu-interfaces:ODU4',
296 u'monitoring-mode': u'terminated'},
297 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
298 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
299 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
301 def test_12_service_path_create_xpdr_check(self):
302 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
303 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
304 "interface/XPDR1-CLIENT1-ETHERNET"
305 .format(self.restconf_baseurl))
306 headers = {'content-type': 'application/json'}
307 response = requests.request(
308 "GET", url, headers=headers, auth=('admin', 'admin'))
309 self.assertEqual(response.status_code, requests.codes.ok)
310 res = response.json()
311 self.assertDictContainsSubset(
312 {'name': 'XPDR1-CLIENT1-ETHERNET', 'administrative-state': 'inService',
313 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
314 'type': 'org-openroadm-interfaces:ethernetCsmacd',
315 'supporting-port': 'C1'},
317 self.assertDictEqual(
320 'auto-negotiation': 'enabled',
323 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
325 def test_13_service_path_create_xpdr_check(self):
326 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
327 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
328 "circuit-packs/1%2F0%2F1-PLUG-NET"
329 .format(self.restconf_baseurl))
330 headers = {'content-type': 'application/json'}
331 response = requests.request(
332 "GET", url, headers=headers, auth=('admin', 'admin'))
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
337 def test_14_service_path_delete(self):
338 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
339 data = {"renderer:input": {
340 "renderer:service-name": "service_test",
341 "renderer:wave-number": "7",
342 "renderer:operation": "delete",
344 {"renderer:node-id": "ROADMA",
345 "renderer:src-tp": "SRG1-PP7-TXRX",
346 "renderer:dest-tp": "DEG1-TTP-TXRX"},
347 {"renderer:node-id": "XPDRA",
348 "renderer:src-tp": "XPDR1-CLIENT1",
349 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
350 headers = {'content-type': 'application/json'}
351 response = requests.request(
352 "POST", url, data=json.dumps(data),
353 headers=headers, auth=('admin', 'admin'))
354 self.assertEqual(response.status_code, requests.codes.ok)
355 self.assertEqual(response.json(), {
356 'output': {'result': 'Request processed', 'success': True}})
358 def test_15_service_path_delete_rdm_check(self):
359 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
360 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
361 "interface/DEG1-TTP-TXRX-7"
362 .format(self.restconf_baseurl))
363 headers = {'content-type': 'application/json'}
364 response = requests.request(
365 "GET", url, headers=headers, auth=('admin', 'admin'))
366 self.assertEqual(response.status_code, requests.codes.not_found)
367 res = response.json()
369 {"error-type":"application", "error-tag":"data-missing",
370 "error-message":"Request could not be completed because the relevant data model content does not exist"},
371 res['errors']['error'])
373 def test_16_service_path_delete_rdm_check(self):
374 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
375 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
376 "interface/SRG1-PP7-TXRX-7"
377 .format(self.restconf_baseurl))
378 headers = {'content-type': 'application/json'}
379 response = requests.request(
380 "GET", url, headers=headers, auth=('admin', 'admin'))
381 self.assertEqual(response.status_code, requests.codes.not_found)
382 res = response.json()
384 {"error-type":"application", "error-tag":"data-missing", "error-message":"Request could not be completed because the relevant data model content does not exist"},
385 res['errors']['error'])
387 def test_17_service_path_delete_rdm_check(self):
388 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
389 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
390 "roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
391 .format(self.restconf_baseurl))
392 headers = {'content-type': 'application/json'}
393 response = requests.request(
394 "GET", url, headers=headers, auth=('admin', 'admin'))
395 self.assertEqual(response.status_code, requests.codes.not_found)
396 res = response.json()
398 {"error-type":"application", "error-tag":"data-missing", "error-message":"Request could not be completed because the relevant data model content does not exist"},
399 res['errors']['error'])
401 def test_18_service_path_delete_xpdr_check(self):
402 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
403 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
404 "interface/XPDR1-NETWORK1-7"
405 .format(self.restconf_baseurl))
406 headers = {'content-type': 'application/json'}
407 response = requests.request(
408 "GET", url, headers=headers, auth=('admin', 'admin'))
409 self.assertEqual(response.status_code, requests.codes.not_found)
410 res = response.json()
412 {"error-type":"application", "error-tag":"data-missing", "error-message":"Request could not be completed because the relevant data model content does not exist"},
413 res['errors']['error'])
415 def test_19_service_path_delete_xpdr_check(self):
416 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
417 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
418 "interface/XPDR1-NETWORK1-OTU"
419 .format(self.restconf_baseurl))
420 headers = {'content-type': 'application/json'}
421 response = requests.request(
422 "GET", url, headers=headers, auth=('admin', 'admin'))
423 self.assertEqual(response.status_code, requests.codes.not_found)
424 res = response.json()
426 {"error-type":"application", "error-tag":"data-missing", "error-message":"Request could not be completed because the relevant data model content does not exist"},
427 res['errors']['error'])
429 def test_20_service_path_delete_xpdr_check(self):
430 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
431 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
432 "interface/XPDR1-NETWORK1-ODU"
433 .format(self.restconf_baseurl))
434 headers = {'content-type': 'application/json'}
435 response = requests.request(
436 "GET", url, headers=headers, auth=('admin', 'admin'))
437 self.assertEqual(response.status_code, requests.codes.not_found)
438 res = response.json()
440 {"error-type":"application", "error-tag":"data-missing", "error-message":"Request could not be completed because the relevant data model content does not exist"},
441 res['errors']['error'])
443 def test_21_service_path_delete_xpdr_check(self):
444 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
445 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
446 "interface/XPDR1-CLIENT1-ETHERNET"
447 .format(self.restconf_baseurl))
448 headers = {'content-type': 'application/json'}
449 response = requests.request(
450 "GET", url, headers=headers, auth=('admin', 'admin'))
451 self.assertEqual(response.status_code, requests.codes.not_found)
452 res = response.json()
454 {"error-type":"application", "error-tag":"data-missing", "error-message":"Request could not be completed because the relevant data model content does not exist"},
455 res['errors']['error'])
457 def test_22_service_path_delete_xpdr_check(self):
458 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
459 "node/XPDRA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
460 "circuit-packs/1%2F0%2F1-PLUG-NET"
461 .format(self.restconf_baseurl))
462 headers = {'content-type': 'application/json'}
463 response = requests.request(
464 "GET", url, headers=headers, auth=('admin', 'admin'))
465 self.assertEqual(response.status_code, requests.codes.ok)
466 res = response.json()
467 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
469 def test_23_rdm_device_disconnected(self):
470 url = ("{}/config/network-topology:"
471 "network-topology/topology/topology-netconf/node/ROADMA"
472 .format(self.restconf_baseurl))
473 headers = {'content-type': 'application/json'}
474 response = requests.request(
475 "DELETE", url, headers=headers,
476 auth=('admin', 'admin'))
477 self.assertEqual(response.status_code, requests.codes.ok)
480 def test_24_xpdr_device_disconnected(self):
481 url = ("{}/config/network-topology:"
482 "network-topology/topology/topology-netconf/node/XPDRA"
483 .format(self.restconf_baseurl))
484 headers = {'content-type': 'application/json'}
485 response = requests.request(
486 "DELETE", url, headers=headers,
487 auth=('admin', 'admin'))
488 self.assertEqual(response.status_code, requests.codes.ok)
492 if __name__ == "__main__":
493 unittest.main(verbosity=2, failfast=True)