3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 def extract_a_from_b(a, b):
26 return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
29 class TransportPCEtesting(unittest.TestCase):
33 restconf_baseurl = "http://localhost:8181/restconf"
37 cls.sim_process1 = test_utils.start_sim('spdrav2')
39 cls.odl_process = test_utils.start_tpce()
41 print("opendaylight started")
44 def tearDownClass(cls):
45 for child in psutil.Process(cls.odl_process.pid).children():
46 child.send_signal(signal.SIGINT)
48 cls.odl_process.send_signal(signal.SIGINT)
49 cls.odl_process.wait()
50 for child in psutil.Process(cls.sim_process1.pid).children():
51 child.send_signal(signal.SIGINT)
53 cls.sim_process1.send_signal(signal.SIGINT)
54 cls.sim_process1.wait()
59 def test_01_connect_SPDR_SA1(self):
60 url = ("{}/config/network-topology:"
61 "network-topology/topology/topology-netconf/node/SPDR-SA1"
62 .format(self.restconf_baseurl))
64 "node-id": "SPDR-SA1",
65 "netconf-node-topology:username": "admin",
66 "netconf-node-topology:password": "admin",
67 "netconf-node-topology:host": "127.0.0.1",
68 "netconf-node-topology:port": test_utils.sims['spdrav2']['port'],
69 "netconf-node-topology:tcp-only": "false",
70 "netconf-node-topology:pass-through": {}}]}
71 headers = {'content-type': 'application/json'}
72 response = requests.request(
73 "PUT", url, data=json.dumps(data), headers=headers,
74 auth=('admin', 'admin'))
75 self.assertEqual(response.status_code, requests.codes.created)
77 url = ("{}/operational/network-topology:"
78 "network-topology/topology/topology-netconf/node/SPDR-SA1"
79 .format(self.restconf_baseurl))
80 response = requests.request(
81 "GET", url, headers=headers, auth=('admin', 'admin'))
82 self.assertEqual(response.status_code, requests.codes.ok)
85 res['node'][0]['netconf-node-topology:connection-status'],
88 def test_02_get_portmapping_CLIENT1(self):
89 url = ("{}/config/transportpce-portmapping:network/"
90 "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
91 .format(self.restconf_baseurl))
92 headers = {'content-type': 'application/json'}
93 response = requests.request(
94 "GET", url, headers=headers, auth=('admin', 'admin'))
95 self.assertEqual(response.status_code, requests.codes.ok)
98 {'supported-interface-capability': [
99 'org-openroadm-port-types:if-10GE-ODU2e',
100 'org-openroadm-port-types:if-10GE-ODU2',
101 'org-openroadm-port-types:if-10GE'],
102 'supporting-port': 'CP1-SFP4-P1',
103 'supporting-circuit-pack-name': 'CP1-SFP4',
104 'logical-connection-point': 'XPDR1-CLIENT1',
105 'port-direction': 'bidirectional',
106 'port-qual': 'xpdr-client',
107 'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
110 def test_03_get_portmapping_NETWORK1(self):
111 url = ("{}/config/transportpce-portmapping:network/"
112 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
113 .format(self.restconf_baseurl))
114 headers = {'content-type': 'application/json'}
115 response = requests.request(
116 "GET", url, headers=headers, auth=('admin', 'admin'))
117 self.assertEqual(response.status_code, requests.codes.ok)
118 res = response.json()
120 {"logical-connection-point": "XPDR1-NETWORK1",
121 "supporting-port": "CP1-CFP0-P1",
122 "supported-interface-capability": [
123 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
125 "port-direction": "bidirectional",
126 "port-qual": "xpdr-network",
127 "supporting-circuit-pack-name": "CP1-CFP0",
128 "xponder-type": "mpdr",
129 'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
132 def test_04_service_path_create_OCH_OTU4(self):
133 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
134 data = {"renderer:input": {
135 "service-name": "service_ODU4",
137 "modulation-format": "qpsk",
138 "operation": "create",
140 {"node-id": "SPDR-SA1",
141 "dest-tp": "XPDR1-NETWORK1"}]}}
142 headers = {'content-type': 'application/json'}
143 response = requests.request(
144 "POST", url, data=json.dumps(data),
145 headers=headers, auth=('admin', 'admin'))
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
149 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
150 self.assertTrue(res["output"]["success"])
152 {'node-id': 'SPDR-SA1',
153 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
154 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
156 def test_05_get_portmapping_NETWORK1(self):
157 url = ("{}/config/transportpce-portmapping:network/"
158 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
159 .format(self.restconf_baseurl))
160 headers = {'content-type': 'application/json'}
161 response = requests.request(
162 "GET", url, headers=headers, auth=('admin', 'admin'))
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
166 {"logical-connection-point": "XPDR1-NETWORK1",
167 "supporting-port": "CP1-CFP0-P1",
168 "supported-interface-capability": [
169 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
171 "port-direction": "bidirectional",
172 "port-qual": "xpdr-network",
173 "supporting-circuit-pack-name": "CP1-CFP0",
174 "xponder-type": "mpdr",
175 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
178 def test_06_check_interface_och(self):
179 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
180 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
181 "interface/XPDR1-NETWORK1-1"
182 .format(self.restconf_baseurl))
183 headers = {'content-type': 'application/json'}
184 response = requests.request(
185 "GET", url, headers=headers, auth=('admin', 'admin'))
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
189 input_dict = {'name': 'XPDR1-NETWORK1-1',
190 'administrative-state': 'inService',
191 'supporting-circuit-pack-name': 'CP1-CFP0',
192 'type': 'org-openroadm-interfaces:opticalChannel',
193 'supporting-port': 'CP1-CFP0-P1'
195 # assertDictContainsSubset is deprecated
197 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
198 'supporting-circuit-pack-name': 'CP1-CFP0',
199 'type': 'org-openroadm-interfaces:opticalChannel',
200 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
202 self.assertDictEqual(input_dict,
203 extract_a_from_b(input_dict,
206 self.assertDictEqual(
207 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
208 u'transmit-power': -5},
209 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
211 def test_07_check_interface_OTU(self):
212 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
213 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
214 "interface/XPDR1-NETWORK1-OTU"
215 .format(self.restconf_baseurl))
216 headers = {'content-type': 'application/json'}
217 response = requests.request(
218 "GET", url, headers=headers, auth=('admin', 'admin'))
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 input_dict = {'name': 'XPDR1-NETWORK1-OTU',
222 'administrative-state': 'inService',
223 'supporting-circuit-pack-name': 'CP1-CFP0',
224 'supporting-interface': 'XPDR1-NETWORK1-1',
225 'type': 'org-openroadm-interfaces:otnOtu',
226 'supporting-port': 'CP1-CFP0-P1'}
228 # assertDictContainsSubset is deprecated
230 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
231 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
232 'type': 'org-openroadm-interfaces:otnOtu',
233 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
235 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
239 self.assertDictEqual(
240 {u'rate': u'org-openroadm-otn-common-types:OTU4',
242 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
244 def test_08_otn_service_path_create_ODU4(self):
245 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
246 data = {"renderer:input": {
247 "service-name": "service_ODU4",
248 "operation": "create",
249 "service-rate": "100G",
250 "service-type": "ODU",
252 {"node-id": "SPDR-SA1",
253 "network-tp": "XPDR1-NETWORK1"}]}}
254 headers = {'content-type': 'application/json'}
255 response = requests.request(
256 "POST", url, data=json.dumps(data),
257 headers=headers, auth=('admin', 'admin'))
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
261 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
262 self.assertTrue(res["output"]["success"])
264 {'node-id': 'SPDR-SA1',
265 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
267 def test_09_get_portmapping_NETWORK1(self):
268 url = ("{}/config/transportpce-portmapping:network/"
269 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
270 .format(self.restconf_baseurl))
271 headers = {'content-type': 'application/json'}
272 response = requests.request(
273 "GET", url, headers=headers, auth=('admin', 'admin'))
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
277 {"logical-connection-point": "XPDR1-NETWORK1",
278 "supporting-port": "CP1-CFP0-P1",
279 "supported-interface-capability": [
280 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
282 "port-direction": "bidirectional",
283 "port-qual": "xpdr-network",
284 "supporting-circuit-pack-name": "CP1-CFP0",
285 "xponder-type": "mpdr",
286 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
287 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
291 def test_10_check_interface_ODU4(self):
292 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
293 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
294 "interface/XPDR1-NETWORK1-ODU4"
295 .format(self.restconf_baseurl))
296 headers = {'content-type': 'application/json'}
297 response = requests.request(
298 "GET", url, headers=headers, auth=('admin', 'admin'))
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
302 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
303 'type': 'org-openroadm-interfaces:otnOdu',
304 'supporting-port': 'CP1-CFP0-P1'}
305 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
306 'rate': 'org-openroadm-otn-common-types:ODU4'}
308 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
311 self.assertDictEqual(input_dict_2,
312 extract_a_from_b(input_dict_2,
314 'org-openroadm-otn-odu-interfaces:odu'])
318 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
319 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
320 'type': 'org-openroadm-interfaces:otnOdu',
321 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
322 self.assertDictContainsSubset(
323 {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
324 'rate': 'org-openroadm-otn-common-types:ODU4'},
325 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
327 self.assertDictEqual(
328 {u'payload-type': u'21', u'exp-payload-type': u'21'},
329 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
331 def test_11_otn_service_path_create_10GE(self):
332 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
333 data = {"renderer:input": {
334 "service-name": "service1",
335 "operation": "create",
336 "service-rate": "10G",
337 "service-type": "Ethernet",
338 "ethernet-encoding": "eth encode",
340 "trib-port-number": "1",
342 {"node-id": "SPDR-SA1",
343 "client-tp": "XPDR1-CLIENT1",
344 "network-tp": "XPDR1-NETWORK1"}]}}
345 headers = {'content-type': 'application/json'}
346 response = requests.request(
347 "POST", url, data=json.dumps(data),
348 headers=headers, auth=('admin', 'admin'))
350 self.assertEqual(response.status_code, requests.codes.ok)
351 res = response.json()
352 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
353 self.assertTrue(res["output"]["success"])
355 {'node-id': 'SPDR-SA1',
356 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
357 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
358 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
360 def test_12_check_interface_10GE_CLIENT(self):
361 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
362 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
363 "interface/XPDR1-CLIENT1-ETHERNET10G"
364 .format(self.restconf_baseurl))
365 headers = {'content-type': 'application/json'}
366 response = requests.request(
367 "GET", url, headers=headers, auth=('admin', 'admin'))
368 self.assertEqual(response.status_code, requests.codes.ok)
369 res = response.json()
370 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
371 'administrative-state': 'inService',
372 'supporting-circuit-pack-name': 'CP1-SFP4',
373 'type': 'org-openroadm-interfaces:ethernetCsmacd',
374 'supporting-port': 'CP1-SFP4-P1'
378 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
379 'supporting-circuit-pack-name': 'CP1-SFP4',
380 'type': 'org-openroadm-interfaces:ethernetCsmacd',
381 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
383 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
386 self.assertDictEqual(
388 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
390 def test_13_check_interface_ODU2E_CLIENT(self):
391 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
392 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
393 "interface/XPDR1-CLIENT1-ODU2e-service1"
394 .format(self.restconf_baseurl))
395 headers = {'content-type': 'application/json'}
396 response = requests.request(
397 "GET", url, headers=headers, auth=('admin', 'admin'))
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
401 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
402 'administrative-state': 'inService',
403 'supporting-circuit-pack-name': 'CP1-SFP4',
404 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
405 'type': 'org-openroadm-interfaces:otnOdu',
406 'supporting-port': 'CP1-SFP4-P1'}
408 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
409 'rate': 'org-openroadm-otn-common-types:ODU2e',
410 'monitoring-mode': 'terminated'}
412 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
415 self.assertDictEqual(input_dict_2,
416 extract_a_from_b(input_dict_2, res['interface'][0][
417 'org-openroadm-otn-odu-interfaces:odu'])
421 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
422 'supporting-circuit-pack-name': 'CP1-SFP4',
423 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
424 'type': 'org-openroadm-interfaces:otnOdu',
425 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
426 self.assertDictContainsSubset({
427 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
428 'rate': 'org-openroadm-otn-common-types:ODU2e',
429 'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
431 self.assertDictEqual(
432 {u'payload-type': u'03', u'exp-payload-type': u'03'},
433 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
435 def test_14_check_interface_ODU2E_NETWORK(self):
436 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
437 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
438 "interface/XPDR1-NETWORK1-ODU2e-service1"
439 .format(self.restconf_baseurl))
440 headers = {'content-type': 'application/json'}
441 response = requests.request(
442 "GET", url, headers=headers, auth=('admin', 'admin'))
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
446 'supporting-circuit-pack-name': 'CP1-CFP0',
447 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
448 'type': 'org-openroadm-interfaces:otnOdu',
449 'supporting-port': 'CP1-CFP0-P1'}
451 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
452 'rate': 'org-openroadm-otn-common-types:ODU2e',
453 'monitoring-mode': 'monitored'}
455 input_dict_3 = {'trib-port-number': 1}
457 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
461 self.assertDictEqual(input_dict_2,
462 extract_a_from_b(input_dict_2,
464 'org-openroadm-otn-odu-interfaces:odu']
467 self.assertDictEqual(input_dict_3,
468 extract_a_from_b(input_dict_3,
470 'org-openroadm-otn-odu-interfaces:odu'][
471 'parent-odu-allocation']))
474 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
475 'supporting-circuit-pack-name': 'CP1-CFP0',
476 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
477 'type': 'org-openroadm-interfaces:otnOdu',
478 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
479 self.assertDictContainsSubset({
480 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
481 'rate': 'org-openroadm-otn-common-types:ODU2e',
482 'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
483 self.assertDictContainsSubset(
484 {'trib-port-number': 1},
485 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
489 'org-openroadm-otn-odu-interfaces:odu'][
490 'parent-odu-allocation']['trib-slots'])
492 def test_15_check_ODU2E_connection(self):
493 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
494 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
495 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
496 .format(self.restconf_baseurl))
497 headers = {'content-type': 'application/json'}
498 response = requests.request(
499 "GET", url, headers=headers, auth=('admin', 'admin'))
500 self.assertEqual(response.status_code, requests.codes.ok)
501 res = response.json()
504 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
505 'direction': 'bidirectional'
508 self.assertDictEqual(input_dict_1,
509 extract_a_from_b(input_dict_1,
510 res['odu-connection'][0]))
512 self.assertDictContainsSubset({
513 'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
514 'direction': 'bidirectional'},
515 res['odu-connection'][0])
517 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
518 res['odu-connection'][0]['destination'])
519 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
520 res['odu-connection'][0]['source'])
522 def test_16_otn_service_path_delete_10GE(self):
523 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
524 data = {"renderer:input": {
525 "service-name": "service1",
526 "operation": "delete",
527 "service-rate": "10G",
528 "service-type": "Ethernet",
529 "ethernet-encoding": "eth encode",
531 "trib-port-number": "1",
533 {"node-id": "SPDR-SA1",
534 "client-tp": "XPDR1-CLIENT1",
535 "network-tp": "XPDR1-NETWORK1"}]}}
536 headers = {'content-type': 'application/json'}
537 response = requests.request(
538 "POST", url, data=json.dumps(data),
539 headers=headers, auth=('admin', 'admin'))
541 self.assertEqual(response.status_code, requests.codes.ok)
542 res = response.json()
543 self.assertIn('Request processed', res["output"]["result"])
544 self.assertTrue(res["output"]["success"])
546 def test_17_check_no_ODU2E_connection(self):
547 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
548 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
549 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
550 .format(self.restconf_baseurl))
551 headers = {'content-type': 'application/json'}
552 response = requests.request(
553 "GET", url, headers=headers, auth=('admin', 'admin'))
554 self.assertEqual(response.status_code, requests.codes.not_found)
556 def test_18_check_no_interface_ODU2E_NETWORK(self):
557 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
558 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
559 "interface/XPDR1-NETWORK1-ODU2e-service1"
560 .format(self.restconf_baseurl))
561 headers = {'content-type': 'application/json'}
562 response = requests.request(
563 "GET", url, headers=headers, auth=('admin', 'admin'))
564 self.assertEqual(response.status_code, requests.codes.not_found)
566 def test_19_check_no_interface_ODU2E_CLIENT(self):
567 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
568 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
569 "interface/XPDR1-CLIENT1-ODU2e-service1"
570 .format(self.restconf_baseurl))
571 headers = {'content-type': 'application/json'}
572 response = requests.request(
573 "GET", url, headers=headers, auth=('admin', 'admin'))
574 self.assertEqual(response.status_code, requests.codes.not_found)
576 def test_20_check_no_interface_10GE_CLIENT(self):
577 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
578 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
579 "interface/XPDR1-CLIENT1-ETHERNET10G"
580 .format(self.restconf_baseurl))
581 headers = {'content-type': 'application/json'}
582 response = requests.request(
583 "GET", url, headers=headers, auth=('admin', 'admin'))
584 self.assertEqual(response.status_code, requests.codes.not_found)
586 def test_21_otn_service_path_delete_ODU4(self):
587 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
588 data = {"renderer:input": {
589 "service-name": "service_ODU4",
590 "operation": "delete",
591 "service-rate": "100G",
592 "service-type": "ODU",
594 {"node-id": "SPDR-SA1",
595 "network-tp": "XPDR1-NETWORK1"}]}}
596 headers = {'content-type': 'application/json'}
597 response = requests.request(
598 "POST", url, data=json.dumps(data),
599 headers=headers, auth=('admin', 'admin'))
601 self.assertEqual(response.status_code, requests.codes.ok)
602 res = response.json()
603 self.assertIn('Request processed', res["output"]["result"])
604 self.assertTrue(res["output"]["success"])
606 def test_22_check_no_interface_ODU4(self):
607 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
608 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
609 "interface/XPDR1-NETWORK1-ODU4"
610 .format(self.restconf_baseurl))
611 headers = {'content-type': 'application/json'}
612 response = requests.request(
613 "GET", url, headers=headers, auth=('admin', 'admin'))
614 self.assertEqual(response.status_code, requests.codes.not_found)
616 def test_23_service_path_delete_OCH_OTU4(self):
617 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
618 data = {"renderer:input": {
619 "service-name": "service_OTU4",
621 "modulation-format": "qpsk",
622 "operation": "delete",
624 {"node-id": "SPDR-SA1",
625 "dest-tp": "XPDR1-NETWORK1"}]}}
626 headers = {'content-type': 'application/json'}
627 response = requests.request(
628 "POST", url, data=json.dumps(data),
629 headers=headers, auth=('admin', 'admin'))
631 self.assertEqual(response.status_code, requests.codes.ok)
632 res = response.json()
633 self.assertIn('Request processed', res["output"]["result"])
634 self.assertTrue(res["output"]["success"])
636 def test_24_check_no_interface_OTU4(self):
637 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
638 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
639 "interface/XPDR1-NETWORK1-OTU"
640 .format(self.restconf_baseurl))
641 headers = {'content-type': 'application/json'}
642 response = requests.request(
643 "GET", url, headers=headers, auth=('admin', 'admin'))
644 self.assertEqual(response.status_code, requests.codes.not_found)
646 def test_25_check_no_interface_OCH(self):
647 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
648 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
649 "interface/XPDR1-NETWORK1-1"
650 .format(self.restconf_baseurl))
651 headers = {'content-type': 'application/json'}
652 response = requests.request(
653 "GET", url, headers=headers, auth=('admin', 'admin'))
654 self.assertEqual(response.status_code, requests.codes.not_found)
656 def test_26_disconnect_SPDR_SA1(self):
657 url = ("{}/config/network-topology:"
658 "network-topology/topology/topology-netconf/node/SPDR-SA1"
659 .format(self.restconf_baseurl))
661 headers = {'content-type': 'application/json'}
662 response = requests.request(
663 "DELETE", url, data=json.dumps(data), headers=headers,
664 auth=('admin', 'admin'))
665 self.assertEqual(response.status_code, requests.codes.ok)
668 if __name__ == "__main__":
669 unittest.main(verbosity=2)