3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 def extract_a_from_b(a, b):
26 return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
29 class TransportPCEtesting(unittest.TestCase):
33 restconf_baseurl = "http://localhost:8181/restconf"
37 cls.odl_process = test_utils.start_tpce()
38 cls.sim_process1 = test_utils.start_sim('spdrav2')
41 def tearDownClass(cls):
42 for child in psutil.Process(cls.odl_process.pid).children():
43 child.send_signal(signal.SIGINT)
45 cls.odl_process.send_signal(signal.SIGINT)
46 cls.odl_process.wait()
47 for child in psutil.Process(cls.sim_process1.pid).children():
48 child.send_signal(signal.SIGINT)
50 cls.sim_process1.send_signal(signal.SIGINT)
51 cls.sim_process1.wait()
56 def test_01_connect_SPDR_SA1(self):
57 url = ("{}/config/network-topology:"
58 "network-topology/topology/topology-netconf/node/SPDR-SA1"
59 .format(self.restconf_baseurl))
61 "node-id": "SPDR-SA1",
62 "netconf-node-topology:username": "admin",
63 "netconf-node-topology:password": "admin",
64 "netconf-node-topology:host": "127.0.0.1",
65 "netconf-node-topology:port": test_utils.sims['spdrav2']['port'],
66 "netconf-node-topology:tcp-only": "false",
67 "netconf-node-topology:pass-through": {}}]}
68 headers = {'content-type': 'application/json'}
69 response = requests.request(
70 "PUT", url, data=json.dumps(data), headers=headers,
71 auth=('admin', 'admin'))
72 self.assertEqual(response.status_code, requests.codes.created)
74 url = ("{}/operational/network-topology:"
75 "network-topology/topology/topology-netconf/node/SPDR-SA1"
76 .format(self.restconf_baseurl))
77 response = requests.request(
78 "GET", url, headers=headers, auth=('admin', 'admin'))
79 self.assertEqual(response.status_code, requests.codes.ok)
82 res['node'][0]['netconf-node-topology:connection-status'],
85 def test_02_get_portmapping_CLIENT1(self):
86 url = ("{}/config/transportpce-portmapping:network/"
87 "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
88 .format(self.restconf_baseurl))
89 headers = {'content-type': 'application/json'}
90 response = requests.request(
91 "GET", url, headers=headers, auth=('admin', 'admin'))
92 self.assertEqual(response.status_code, requests.codes.ok)
95 {'supported-interface-capability': [
96 'org-openroadm-port-types:if-10GE-ODU2e',
97 'org-openroadm-port-types:if-10GE-ODU2',
98 'org-openroadm-port-types:if-10GE'],
99 'supporting-port': 'CP1-SFP4-P1',
100 'supporting-circuit-pack-name': 'CP1-SFP4',
101 'logical-connection-point': 'XPDR1-CLIENT1',
102 'port-direction': 'bidirectional',
103 'port-qual': 'xpdr-client',
104 'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
107 def test_03_get_portmapping_NETWORK1(self):
108 url = ("{}/config/transportpce-portmapping:network/"
109 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
110 .format(self.restconf_baseurl))
111 headers = {'content-type': 'application/json'}
112 response = requests.request(
113 "GET", url, headers=headers, auth=('admin', 'admin'))
114 self.assertEqual(response.status_code, requests.codes.ok)
115 res = response.json()
117 {"logical-connection-point": "XPDR1-NETWORK1",
118 "supporting-port": "CP1-CFP0-P1",
119 "supported-interface-capability": [
120 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
122 "port-direction": "bidirectional",
123 "port-qual": "xpdr-network",
124 "supporting-circuit-pack-name": "CP1-CFP0",
125 "xponder-type": "mpdr",
126 'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
129 def test_04_service_path_create_OCH_OTU4(self):
130 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
131 data = {"renderer:input": {
132 "service-name": "service_ODU4",
134 "modulation-format": "qpsk",
135 "operation": "create",
137 {"node-id": "SPDR-SA1",
138 "dest-tp": "XPDR1-NETWORK1"}]}}
139 headers = {'content-type': 'application/json'}
140 response = requests.request(
141 "POST", url, data=json.dumps(data),
142 headers=headers, auth=('admin', 'admin'))
144 self.assertEqual(response.status_code, requests.codes.ok)
145 res = response.json()
146 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
147 self.assertTrue(res["output"]["success"])
149 {'node-id': 'SPDR-SA1',
150 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
151 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
153 def test_05_get_portmapping_NETWORK1(self):
154 url = ("{}/config/transportpce-portmapping:network/"
155 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
156 .format(self.restconf_baseurl))
157 headers = {'content-type': 'application/json'}
158 response = requests.request(
159 "GET", url, headers=headers, auth=('admin', 'admin'))
160 self.assertEqual(response.status_code, requests.codes.ok)
161 res = response.json()
163 {"logical-connection-point": "XPDR1-NETWORK1",
164 "supporting-port": "CP1-CFP0-P1",
165 "supported-interface-capability": [
166 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
168 "port-direction": "bidirectional",
169 "port-qual": "xpdr-network",
170 "supporting-circuit-pack-name": "CP1-CFP0",
171 "xponder-type": "mpdr",
172 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
175 def test_06_check_interface_och(self):
176 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
177 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
178 "interface/XPDR1-NETWORK1-1"
179 .format(self.restconf_baseurl))
180 headers = {'content-type': 'application/json'}
181 response = requests.request(
182 "GET", url, headers=headers, auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
186 input_dict = {'name': 'XPDR1-NETWORK1-1',
187 'administrative-state': 'inService',
188 'supporting-circuit-pack-name': 'CP1-CFP0',
189 'type': 'org-openroadm-interfaces:opticalChannel',
190 'supporting-port': 'CP1-CFP0-P1'
192 # assertDictContainsSubset is deprecated
194 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
195 'supporting-circuit-pack-name': 'CP1-CFP0',
196 'type': 'org-openroadm-interfaces:opticalChannel',
197 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
199 self.assertDictEqual(input_dict,
200 extract_a_from_b(input_dict,
203 self.assertDictEqual(
204 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
205 u'transmit-power': -5},
206 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
208 def test_07_check_interface_OTU(self):
209 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
210 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
211 "interface/XPDR1-NETWORK1-OTU"
212 .format(self.restconf_baseurl))
213 headers = {'content-type': 'application/json'}
214 response = requests.request(
215 "GET", url, headers=headers, auth=('admin', 'admin'))
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 input_dict = {'name': 'XPDR1-NETWORK1-OTU',
219 'administrative-state': 'inService',
220 'supporting-circuit-pack-name': 'CP1-CFP0',
221 'supporting-interface': 'XPDR1-NETWORK1-1',
222 'type': 'org-openroadm-interfaces:otnOtu',
223 'supporting-port': 'CP1-CFP0-P1'}
225 # assertDictContainsSubset is deprecated
227 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
228 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
229 'type': 'org-openroadm-interfaces:otnOtu',
230 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
232 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
236 self.assertDictEqual(
237 {u'rate': u'org-openroadm-otn-common-types:OTU4',
239 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
241 def test_08_otn_service_path_create_ODU4(self):
242 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
243 data = {"renderer:input": {
244 "service-name": "service_ODU4",
245 "operation": "create",
246 "service-rate": "100G",
247 "service-type": "ODU",
249 {"node-id": "SPDR-SA1",
250 "network-tp": "XPDR1-NETWORK1"}]}}
251 headers = {'content-type': 'application/json'}
252 response = requests.request(
253 "POST", url, data=json.dumps(data),
254 headers=headers, auth=('admin', 'admin'))
256 self.assertEqual(response.status_code, requests.codes.ok)
257 res = response.json()
258 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
259 self.assertTrue(res["output"]["success"])
261 {'node-id': 'SPDR-SA1',
262 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
264 def test_09_get_portmapping_NETWORK1(self):
265 url = ("{}/config/transportpce-portmapping:network/"
266 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
267 .format(self.restconf_baseurl))
268 headers = {'content-type': 'application/json'}
269 response = requests.request(
270 "GET", url, headers=headers, auth=('admin', 'admin'))
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
274 {"logical-connection-point": "XPDR1-NETWORK1",
275 "supporting-port": "CP1-CFP0-P1",
276 "supported-interface-capability": [
277 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
279 "port-direction": "bidirectional",
280 "port-qual": "xpdr-network",
281 "supporting-circuit-pack-name": "CP1-CFP0",
282 "xponder-type": "mpdr",
283 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
284 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
288 def test_10_check_interface_ODU4(self):
289 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
290 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
291 "interface/XPDR1-NETWORK1-ODU4"
292 .format(self.restconf_baseurl))
293 headers = {'content-type': 'application/json'}
294 response = requests.request(
295 "GET", url, headers=headers, auth=('admin', 'admin'))
296 self.assertEqual(response.status_code, requests.codes.ok)
297 res = response.json()
298 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
299 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
300 'type': 'org-openroadm-interfaces:otnOdu',
301 'supporting-port': 'CP1-CFP0-P1'}
302 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
303 'rate': 'org-openroadm-otn-common-types:ODU4'}
305 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
308 self.assertDictEqual(input_dict_2,
309 extract_a_from_b(input_dict_2,
311 'org-openroadm-otn-odu-interfaces:odu'])
315 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
316 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
317 'type': 'org-openroadm-interfaces:otnOdu',
318 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
319 self.assertDictContainsSubset(
320 {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
321 'rate': 'org-openroadm-otn-common-types:ODU4'},
322 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
324 self.assertDictEqual(
325 {u'payload-type': u'21', u'exp-payload-type': u'21'},
326 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
328 def test_11_otn_service_path_create_10GE(self):
329 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
330 data = {"renderer:input": {
331 "service-name": "service1",
332 "operation": "create",
333 "service-rate": "10G",
334 "service-type": "Ethernet",
335 "ethernet-encoding": "eth encode",
337 "trib-port-number": "1",
339 {"node-id": "SPDR-SA1",
340 "client-tp": "XPDR1-CLIENT1",
341 "network-tp": "XPDR1-NETWORK1"}]}}
342 headers = {'content-type': 'application/json'}
343 response = requests.request(
344 "POST", url, data=json.dumps(data),
345 headers=headers, auth=('admin', 'admin'))
347 self.assertEqual(response.status_code, requests.codes.ok)
348 res = response.json()
349 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
350 self.assertTrue(res["output"]["success"])
352 {'node-id': 'SPDR-SA1',
353 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
354 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
355 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
357 def test_12_check_interface_10GE_CLIENT(self):
358 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
359 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
360 "interface/XPDR1-CLIENT1-ETHERNET10G"
361 .format(self.restconf_baseurl))
362 headers = {'content-type': 'application/json'}
363 response = requests.request(
364 "GET", url, headers=headers, auth=('admin', 'admin'))
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
368 'administrative-state': 'inService',
369 'supporting-circuit-pack-name': 'CP1-SFP4',
370 'type': 'org-openroadm-interfaces:ethernetCsmacd',
371 'supporting-port': 'CP1-SFP4-P1'
375 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
376 'supporting-circuit-pack-name': 'CP1-SFP4',
377 'type': 'org-openroadm-interfaces:ethernetCsmacd',
378 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
380 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
383 self.assertDictEqual(
385 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
387 def test_13_check_interface_ODU2E_CLIENT(self):
388 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
389 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
390 "interface/XPDR1-CLIENT1-ODU2e-service1"
391 .format(self.restconf_baseurl))
392 headers = {'content-type': 'application/json'}
393 response = requests.request(
394 "GET", url, headers=headers, auth=('admin', 'admin'))
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
398 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
399 'administrative-state': 'inService',
400 'supporting-circuit-pack-name': 'CP1-SFP4',
401 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
402 'type': 'org-openroadm-interfaces:otnOdu',
403 'supporting-port': 'CP1-SFP4-P1'}
405 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
406 'rate': 'org-openroadm-otn-common-types:ODU2e',
407 'monitoring-mode': 'terminated'}
409 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
412 self.assertDictEqual(input_dict_2,
413 extract_a_from_b(input_dict_2, res['interface'][0][
414 'org-openroadm-otn-odu-interfaces:odu'])
418 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
419 'supporting-circuit-pack-name': 'CP1-SFP4',
420 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
421 'type': 'org-openroadm-interfaces:otnOdu',
422 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
423 self.assertDictContainsSubset({
424 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
425 'rate': 'org-openroadm-otn-common-types:ODU2e',
426 'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
428 self.assertDictEqual(
429 {u'payload-type': u'03', u'exp-payload-type': u'03'},
430 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
432 def test_14_check_interface_ODU2E_NETWORK(self):
433 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
434 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
435 "interface/XPDR1-NETWORK1-ODU2e-service1"
436 .format(self.restconf_baseurl))
437 headers = {'content-type': 'application/json'}
438 response = requests.request(
439 "GET", url, headers=headers, auth=('admin', 'admin'))
440 self.assertEqual(response.status_code, requests.codes.ok)
441 res = response.json()
442 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
443 'supporting-circuit-pack-name': 'CP1-CFP0',
444 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
445 'type': 'org-openroadm-interfaces:otnOdu',
446 'supporting-port': 'CP1-CFP0-P1'}
448 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
449 'rate': 'org-openroadm-otn-common-types:ODU2e',
450 'monitoring-mode': 'monitored'}
452 input_dict_3 = {'trib-port-number': 1}
454 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
458 self.assertDictEqual(input_dict_2,
459 extract_a_from_b(input_dict_2,
461 'org-openroadm-otn-odu-interfaces:odu']
464 self.assertDictEqual(input_dict_3,
465 extract_a_from_b(input_dict_3,
467 'org-openroadm-otn-odu-interfaces:odu'][
468 'parent-odu-allocation']))
471 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
472 'supporting-circuit-pack-name': 'CP1-CFP0',
473 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
474 'type': 'org-openroadm-interfaces:otnOdu',
475 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
476 self.assertDictContainsSubset({
477 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
478 'rate': 'org-openroadm-otn-common-types:ODU2e',
479 'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
480 self.assertDictContainsSubset(
481 {'trib-port-number': 1},
482 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
486 'org-openroadm-otn-odu-interfaces:odu'][
487 'parent-odu-allocation']['trib-slots'])
489 def test_15_check_ODU2E_connection(self):
490 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
491 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
492 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
493 .format(self.restconf_baseurl))
494 headers = {'content-type': 'application/json'}
495 response = requests.request(
496 "GET", url, headers=headers, auth=('admin', 'admin'))
497 self.assertEqual(response.status_code, requests.codes.ok)
498 res = response.json()
501 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
502 'direction': 'bidirectional'
505 self.assertDictEqual(input_dict_1,
506 extract_a_from_b(input_dict_1,
507 res['odu-connection'][0]))
509 self.assertDictContainsSubset({
510 'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
511 'direction': 'bidirectional'},
512 res['odu-connection'][0])
514 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
515 res['odu-connection'][0]['destination'])
516 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
517 res['odu-connection'][0]['source'])
519 def test_16_otn_service_path_delete_10GE(self):
520 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
521 data = {"renderer:input": {
522 "service-name": "service1",
523 "operation": "delete",
524 "service-rate": "10G",
525 "service-type": "Ethernet",
526 "ethernet-encoding": "eth encode",
528 "trib-port-number": "1",
530 {"node-id": "SPDR-SA1",
531 "client-tp": "XPDR1-CLIENT1",
532 "network-tp": "XPDR1-NETWORK1"}]}}
533 headers = {'content-type': 'application/json'}
534 response = requests.request(
535 "POST", url, data=json.dumps(data),
536 headers=headers, auth=('admin', 'admin'))
538 self.assertEqual(response.status_code, requests.codes.ok)
539 res = response.json()
540 self.assertIn('Request processed', res["output"]["result"])
541 self.assertTrue(res["output"]["success"])
543 def test_17_check_no_ODU2E_connection(self):
544 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
545 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
546 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
547 .format(self.restconf_baseurl))
548 headers = {'content-type': 'application/json'}
549 response = requests.request(
550 "GET", url, headers=headers, auth=('admin', 'admin'))
551 self.assertEqual(response.status_code, requests.codes.not_found)
553 def test_18_check_no_interface_ODU2E_NETWORK(self):
554 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
555 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
556 "interface/XPDR1-NETWORK1-ODU2e-service1"
557 .format(self.restconf_baseurl))
558 headers = {'content-type': 'application/json'}
559 response = requests.request(
560 "GET", url, headers=headers, auth=('admin', 'admin'))
561 self.assertEqual(response.status_code, requests.codes.not_found)
563 def test_19_check_no_interface_ODU2E_CLIENT(self):
564 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
565 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
566 "interface/XPDR1-CLIENT1-ODU2e-service1"
567 .format(self.restconf_baseurl))
568 headers = {'content-type': 'application/json'}
569 response = requests.request(
570 "GET", url, headers=headers, auth=('admin', 'admin'))
571 self.assertEqual(response.status_code, requests.codes.not_found)
573 def test_20_check_no_interface_10GE_CLIENT(self):
574 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
575 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
576 "interface/XPDR1-CLIENT1-ETHERNET10G"
577 .format(self.restconf_baseurl))
578 headers = {'content-type': 'application/json'}
579 response = requests.request(
580 "GET", url, headers=headers, auth=('admin', 'admin'))
581 self.assertEqual(response.status_code, requests.codes.not_found)
583 def test_21_otn_service_path_delete_ODU4(self):
584 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
585 data = {"renderer:input": {
586 "service-name": "service_ODU4",
587 "operation": "delete",
588 "service-rate": "100G",
589 "service-type": "ODU",
591 {"node-id": "SPDR-SA1",
592 "network-tp": "XPDR1-NETWORK1"}]}}
593 headers = {'content-type': 'application/json'}
594 response = requests.request(
595 "POST", url, data=json.dumps(data),
596 headers=headers, auth=('admin', 'admin'))
598 self.assertEqual(response.status_code, requests.codes.ok)
599 res = response.json()
600 self.assertIn('Request processed', res["output"]["result"])
601 self.assertTrue(res["output"]["success"])
603 def test_22_check_no_interface_ODU4(self):
604 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
605 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
606 "interface/XPDR1-NETWORK1-ODU4"
607 .format(self.restconf_baseurl))
608 headers = {'content-type': 'application/json'}
609 response = requests.request(
610 "GET", url, headers=headers, auth=('admin', 'admin'))
611 self.assertEqual(response.status_code, requests.codes.not_found)
613 def test_23_service_path_delete_OCH_OTU4(self):
614 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
615 data = {"renderer:input": {
616 "service-name": "service_OTU4",
618 "modulation-format": "qpsk",
619 "operation": "delete",
621 {"node-id": "SPDR-SA1",
622 "dest-tp": "XPDR1-NETWORK1"}]}}
623 headers = {'content-type': 'application/json'}
624 response = requests.request(
625 "POST", url, data=json.dumps(data),
626 headers=headers, auth=('admin', 'admin'))
628 self.assertEqual(response.status_code, requests.codes.ok)
629 res = response.json()
630 self.assertIn('Request processed', res["output"]["result"])
631 self.assertTrue(res["output"]["success"])
633 def test_24_check_no_interface_OTU4(self):
634 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
635 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
636 "interface/XPDR1-NETWORK1-OTU"
637 .format(self.restconf_baseurl))
638 headers = {'content-type': 'application/json'}
639 response = requests.request(
640 "GET", url, headers=headers, auth=('admin', 'admin'))
641 self.assertEqual(response.status_code, requests.codes.not_found)
643 def test_25_check_no_interface_OCH(self):
644 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
645 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
646 "interface/XPDR1-NETWORK1-1"
647 .format(self.restconf_baseurl))
648 headers = {'content-type': 'application/json'}
649 response = requests.request(
650 "GET", url, headers=headers, auth=('admin', 'admin'))
651 self.assertEqual(response.status_code, requests.codes.not_found)
653 def test_26_disconnect_SPDR_SA1(self):
654 url = ("{}/config/network-topology:"
655 "network-topology/topology/topology-netconf/node/SPDR-SA1"
656 .format(self.restconf_baseurl))
658 headers = {'content-type': 'application/json'}
659 response = requests.request(
660 "DELETE", url, data=json.dumps(data), headers=headers,
661 auth=('admin', 'admin'))
662 self.assertEqual(response.status_code, requests.codes.ok)
665 if __name__ == "__main__":
666 unittest.main(verbosity=2)