3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 def extract_a_from_b(a, b):
26 return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
29 class TransportPCEtesting(unittest.TestCase):
32 restconf_baseurl = "http://localhost:8181/restconf"
36 cls.processes = test_utils.start_tpce()
37 cls.processes = test_utils.start_sims(['spdrav2'])
40 def tearDownClass(cls):
41 for process in cls.processes:
42 test_utils.shutdown_process(process)
43 print("all processes killed")
48 def test_01_connect_SPDR_SA1(self):
49 url = ("{}/config/network-topology:"
50 "network-topology/topology/topology-netconf/node/SPDR-SA1"
51 .format(self.restconf_baseurl))
53 "node-id": "SPDR-SA1",
54 "netconf-node-topology:username": "admin",
55 "netconf-node-topology:password": "admin",
56 "netconf-node-topology:host": "127.0.0.1",
57 "netconf-node-topology:port": test_utils.sims['spdrav2']['port'],
58 "netconf-node-topology:tcp-only": "false",
59 "netconf-node-topology:pass-through": {}}]}
60 headers = {'content-type': 'application/json'}
61 response = requests.request(
62 "PUT", url, data=json.dumps(data), headers=headers,
63 auth=('admin', 'admin'))
64 self.assertEqual(response.status_code, requests.codes.created)
66 url = ("{}/operational/network-topology:"
67 "network-topology/topology/topology-netconf/node/SPDR-SA1"
68 .format(self.restconf_baseurl))
69 response = requests.request(
70 "GET", url, headers=headers, auth=('admin', 'admin'))
71 self.assertEqual(response.status_code, requests.codes.ok)
74 res['node'][0]['netconf-node-topology:connection-status'],
77 def test_02_get_portmapping_CLIENT1(self):
78 url = ("{}/config/transportpce-portmapping:network/"
79 "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
80 .format(self.restconf_baseurl))
81 headers = {'content-type': 'application/json'}
82 response = requests.request(
83 "GET", url, headers=headers, auth=('admin', 'admin'))
84 self.assertEqual(response.status_code, requests.codes.ok)
87 {'supported-interface-capability': [
88 'org-openroadm-port-types:if-10GE-ODU2e',
89 'org-openroadm-port-types:if-10GE-ODU2',
90 'org-openroadm-port-types:if-10GE'],
91 'supporting-port': 'CP1-SFP4-P1',
92 'supporting-circuit-pack-name': 'CP1-SFP4',
93 'logical-connection-point': 'XPDR1-CLIENT1',
94 'port-direction': 'bidirectional',
95 'port-qual': 'xpdr-client',
96 'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
99 def test_03_get_portmapping_NETWORK1(self):
100 url = ("{}/config/transportpce-portmapping:network/"
101 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
102 .format(self.restconf_baseurl))
103 headers = {'content-type': 'application/json'}
104 response = requests.request(
105 "GET", url, headers=headers, auth=('admin', 'admin'))
106 self.assertEqual(response.status_code, requests.codes.ok)
107 res = response.json()
109 {"logical-connection-point": "XPDR1-NETWORK1",
110 "supporting-port": "CP1-CFP0-P1",
111 "supported-interface-capability": [
112 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
114 "port-direction": "bidirectional",
115 "port-qual": "xpdr-network",
116 "supporting-circuit-pack-name": "CP1-CFP0",
117 "xponder-type": "mpdr",
118 'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
121 def test_04_service_path_create_OCH_OTU4(self):
122 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
123 data = {"renderer:input": {
124 "service-name": "service_ODU4",
126 "modulation-format": "qpsk",
127 "operation": "create",
129 {"node-id": "SPDR-SA1",
130 "dest-tp": "XPDR1-NETWORK1"}]}}
131 headers = {'content-type': 'application/json'}
132 response = requests.request(
133 "POST", url, data=json.dumps(data),
134 headers=headers, auth=('admin', 'admin'))
136 self.assertEqual(response.status_code, requests.codes.ok)
137 res = response.json()
138 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
139 self.assertTrue(res["output"]["success"])
141 {'node-id': 'SPDR-SA1',
142 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
143 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
145 def test_05_get_portmapping_NETWORK1(self):
146 url = ("{}/config/transportpce-portmapping:network/"
147 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
148 .format(self.restconf_baseurl))
149 headers = {'content-type': 'application/json'}
150 response = requests.request(
151 "GET", url, headers=headers, auth=('admin', 'admin'))
152 self.assertEqual(response.status_code, requests.codes.ok)
153 res = response.json()
155 {"logical-connection-point": "XPDR1-NETWORK1",
156 "supporting-port": "CP1-CFP0-P1",
157 "supported-interface-capability": [
158 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
160 "port-direction": "bidirectional",
161 "port-qual": "xpdr-network",
162 "supporting-circuit-pack-name": "CP1-CFP0",
163 "xponder-type": "mpdr",
164 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
167 def test_06_check_interface_och(self):
168 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
169 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
170 "interface/XPDR1-NETWORK1-1"
171 .format(self.restconf_baseurl))
172 headers = {'content-type': 'application/json'}
173 response = requests.request(
174 "GET", url, headers=headers, auth=('admin', 'admin'))
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
178 input_dict = {'name': 'XPDR1-NETWORK1-1',
179 'administrative-state': 'inService',
180 'supporting-circuit-pack-name': 'CP1-CFP0',
181 'type': 'org-openroadm-interfaces:opticalChannel',
182 'supporting-port': 'CP1-CFP0-P1'
184 # assertDictContainsSubset is deprecated
186 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
187 'supporting-circuit-pack-name': 'CP1-CFP0',
188 'type': 'org-openroadm-interfaces:opticalChannel',
189 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
191 self.assertDictEqual(input_dict,
192 extract_a_from_b(input_dict,
195 self.assertDictEqual(
196 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
197 u'transmit-power': -5},
198 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
200 def test_07_check_interface_OTU(self):
201 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
202 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
203 "interface/XPDR1-NETWORK1-OTU"
204 .format(self.restconf_baseurl))
205 headers = {'content-type': 'application/json'}
206 response = requests.request(
207 "GET", url, headers=headers, auth=('admin', 'admin'))
208 self.assertEqual(response.status_code, requests.codes.ok)
209 res = response.json()
210 input_dict = {'name': 'XPDR1-NETWORK1-OTU',
211 'administrative-state': 'inService',
212 'supporting-circuit-pack-name': 'CP1-CFP0',
213 'supporting-interface': 'XPDR1-NETWORK1-1',
214 'type': 'org-openroadm-interfaces:otnOtu',
215 'supporting-port': 'CP1-CFP0-P1'}
217 # assertDictContainsSubset is deprecated
219 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
220 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
221 'type': 'org-openroadm-interfaces:otnOtu',
222 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
224 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
228 self.assertDictEqual(
229 {u'rate': u'org-openroadm-otn-common-types:OTU4',
231 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
233 def test_08_otn_service_path_create_ODU4(self):
234 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
235 data = {"renderer:input": {
236 "service-name": "service_ODU4",
237 "operation": "create",
238 "service-rate": "100G",
239 "service-type": "ODU",
241 {"node-id": "SPDR-SA1",
242 "network-tp": "XPDR1-NETWORK1"}]}}
243 headers = {'content-type': 'application/json'}
244 response = requests.request(
245 "POST", url, data=json.dumps(data),
246 headers=headers, auth=('admin', 'admin'))
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
250 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
251 self.assertTrue(res["output"]["success"])
253 {'node-id': 'SPDR-SA1',
254 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
256 def test_09_get_portmapping_NETWORK1(self):
257 url = ("{}/config/transportpce-portmapping:network/"
258 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
259 .format(self.restconf_baseurl))
260 headers = {'content-type': 'application/json'}
261 response = requests.request(
262 "GET", url, headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
266 {"logical-connection-point": "XPDR1-NETWORK1",
267 "supporting-port": "CP1-CFP0-P1",
268 "supported-interface-capability": [
269 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
271 "port-direction": "bidirectional",
272 "port-qual": "xpdr-network",
273 "supporting-circuit-pack-name": "CP1-CFP0",
274 "xponder-type": "mpdr",
275 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
276 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
280 def test_10_check_interface_ODU4(self):
281 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
282 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
283 "interface/XPDR1-NETWORK1-ODU4"
284 .format(self.restconf_baseurl))
285 headers = {'content-type': 'application/json'}
286 response = requests.request(
287 "GET", url, headers=headers, auth=('admin', 'admin'))
288 self.assertEqual(response.status_code, requests.codes.ok)
289 res = response.json()
290 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
291 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
292 'type': 'org-openroadm-interfaces:otnOdu',
293 'supporting-port': 'CP1-CFP0-P1'}
294 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
295 'rate': 'org-openroadm-otn-common-types:ODU4'}
297 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
300 self.assertDictEqual(input_dict_2,
301 extract_a_from_b(input_dict_2,
303 'org-openroadm-otn-odu-interfaces:odu'])
307 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
308 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
309 'type': 'org-openroadm-interfaces:otnOdu',
310 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
311 self.assertDictContainsSubset(
312 {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
313 'rate': 'org-openroadm-otn-common-types:ODU4'},
314 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
316 self.assertDictEqual(
317 {u'payload-type': u'21', u'exp-payload-type': u'21'},
318 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
320 def test_11_otn_service_path_create_10GE(self):
321 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
322 data = {"renderer:input": {
323 "service-name": "service1",
324 "operation": "create",
325 "service-rate": "10G",
326 "service-type": "Ethernet",
327 "ethernet-encoding": "eth encode",
329 "trib-port-number": "1",
331 {"node-id": "SPDR-SA1",
332 "client-tp": "XPDR1-CLIENT1",
333 "network-tp": "XPDR1-NETWORK1"}]}}
334 headers = {'content-type': 'application/json'}
335 response = requests.request(
336 "POST", url, data=json.dumps(data),
337 headers=headers, auth=('admin', 'admin'))
339 self.assertEqual(response.status_code, requests.codes.ok)
340 res = response.json()
341 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
342 self.assertTrue(res["output"]["success"])
344 {'node-id': 'SPDR-SA1',
345 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
346 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
347 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
349 def test_12_check_interface_10GE_CLIENT(self):
350 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
351 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
352 "interface/XPDR1-CLIENT1-ETHERNET10G"
353 .format(self.restconf_baseurl))
354 headers = {'content-type': 'application/json'}
355 response = requests.request(
356 "GET", url, headers=headers, auth=('admin', 'admin'))
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
360 'administrative-state': 'inService',
361 'supporting-circuit-pack-name': 'CP1-SFP4',
362 'type': 'org-openroadm-interfaces:ethernetCsmacd',
363 'supporting-port': 'CP1-SFP4-P1'
367 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
368 'supporting-circuit-pack-name': 'CP1-SFP4',
369 'type': 'org-openroadm-interfaces:ethernetCsmacd',
370 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
372 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
375 self.assertDictEqual(
377 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
379 def test_13_check_interface_ODU2E_CLIENT(self):
380 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
381 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
382 "interface/XPDR1-CLIENT1-ODU2e-service1"
383 .format(self.restconf_baseurl))
384 headers = {'content-type': 'application/json'}
385 response = requests.request(
386 "GET", url, headers=headers, auth=('admin', 'admin'))
387 self.assertEqual(response.status_code, requests.codes.ok)
388 res = response.json()
390 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
391 'administrative-state': 'inService',
392 'supporting-circuit-pack-name': 'CP1-SFP4',
393 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
394 'type': 'org-openroadm-interfaces:otnOdu',
395 'supporting-port': 'CP1-SFP4-P1'}
397 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
398 'rate': 'org-openroadm-otn-common-types:ODU2e',
399 'monitoring-mode': 'terminated'}
401 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
404 self.assertDictEqual(input_dict_2,
405 extract_a_from_b(input_dict_2, res['interface'][0][
406 'org-openroadm-otn-odu-interfaces:odu'])
410 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
411 'supporting-circuit-pack-name': 'CP1-SFP4',
412 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
413 'type': 'org-openroadm-interfaces:otnOdu',
414 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
415 self.assertDictContainsSubset({
416 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
417 'rate': 'org-openroadm-otn-common-types:ODU2e',
418 'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
420 self.assertDictEqual(
421 {u'payload-type': u'03', u'exp-payload-type': u'03'},
422 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
424 def test_14_check_interface_ODU2E_NETWORK(self):
425 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
426 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
427 "interface/XPDR1-NETWORK1-ODU2e-service1"
428 .format(self.restconf_baseurl))
429 headers = {'content-type': 'application/json'}
430 response = requests.request(
431 "GET", url, headers=headers, auth=('admin', 'admin'))
432 self.assertEqual(response.status_code, requests.codes.ok)
433 res = response.json()
434 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
435 'supporting-circuit-pack-name': 'CP1-CFP0',
436 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
437 'type': 'org-openroadm-interfaces:otnOdu',
438 'supporting-port': 'CP1-CFP0-P1'}
440 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
441 'rate': 'org-openroadm-otn-common-types:ODU2e',
442 'monitoring-mode': 'monitored'}
444 input_dict_3 = {'trib-port-number': 1}
446 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
450 self.assertDictEqual(input_dict_2,
451 extract_a_from_b(input_dict_2,
453 'org-openroadm-otn-odu-interfaces:odu']
456 self.assertDictEqual(input_dict_3,
457 extract_a_from_b(input_dict_3,
459 'org-openroadm-otn-odu-interfaces:odu'][
460 'parent-odu-allocation']))
463 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
464 'supporting-circuit-pack-name': 'CP1-CFP0',
465 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
466 'type': 'org-openroadm-interfaces:otnOdu',
467 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
468 self.assertDictContainsSubset({
469 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
470 'rate': 'org-openroadm-otn-common-types:ODU2e',
471 'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
472 self.assertDictContainsSubset(
473 {'trib-port-number': 1},
474 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
478 'org-openroadm-otn-odu-interfaces:odu'][
479 'parent-odu-allocation']['trib-slots'])
481 def test_15_check_ODU2E_connection(self):
482 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
483 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
484 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
485 .format(self.restconf_baseurl))
486 headers = {'content-type': 'application/json'}
487 response = requests.request(
488 "GET", url, headers=headers, auth=('admin', 'admin'))
489 self.assertEqual(response.status_code, requests.codes.ok)
490 res = response.json()
493 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
494 'direction': 'bidirectional'
497 self.assertDictEqual(input_dict_1,
498 extract_a_from_b(input_dict_1,
499 res['odu-connection'][0]))
501 self.assertDictContainsSubset({
502 'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
503 'direction': 'bidirectional'},
504 res['odu-connection'][0])
506 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
507 res['odu-connection'][0]['destination'])
508 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
509 res['odu-connection'][0]['source'])
511 def test_16_otn_service_path_delete_10GE(self):
512 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
513 data = {"renderer:input": {
514 "service-name": "service1",
515 "operation": "delete",
516 "service-rate": "10G",
517 "service-type": "Ethernet",
518 "ethernet-encoding": "eth encode",
520 "trib-port-number": "1",
522 {"node-id": "SPDR-SA1",
523 "client-tp": "XPDR1-CLIENT1",
524 "network-tp": "XPDR1-NETWORK1"}]}}
525 headers = {'content-type': 'application/json'}
526 response = requests.request(
527 "POST", url, data=json.dumps(data),
528 headers=headers, auth=('admin', 'admin'))
530 self.assertEqual(response.status_code, requests.codes.ok)
531 res = response.json()
532 self.assertIn('Request processed', res["output"]["result"])
533 self.assertTrue(res["output"]["success"])
535 def test_17_check_no_ODU2E_connection(self):
536 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
537 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
538 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
539 .format(self.restconf_baseurl))
540 headers = {'content-type': 'application/json'}
541 response = requests.request(
542 "GET", url, headers=headers, auth=('admin', 'admin'))
543 self.assertEqual(response.status_code, requests.codes.not_found)
545 def test_18_check_no_interface_ODU2E_NETWORK(self):
546 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
547 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
548 "interface/XPDR1-NETWORK1-ODU2e-service1"
549 .format(self.restconf_baseurl))
550 headers = {'content-type': 'application/json'}
551 response = requests.request(
552 "GET", url, headers=headers, auth=('admin', 'admin'))
553 self.assertEqual(response.status_code, requests.codes.not_found)
555 def test_19_check_no_interface_ODU2E_CLIENT(self):
556 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
557 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
558 "interface/XPDR1-CLIENT1-ODU2e-service1"
559 .format(self.restconf_baseurl))
560 headers = {'content-type': 'application/json'}
561 response = requests.request(
562 "GET", url, headers=headers, auth=('admin', 'admin'))
563 self.assertEqual(response.status_code, requests.codes.not_found)
565 def test_20_check_no_interface_10GE_CLIENT(self):
566 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
567 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
568 "interface/XPDR1-CLIENT1-ETHERNET10G"
569 .format(self.restconf_baseurl))
570 headers = {'content-type': 'application/json'}
571 response = requests.request(
572 "GET", url, headers=headers, auth=('admin', 'admin'))
573 self.assertEqual(response.status_code, requests.codes.not_found)
575 def test_21_otn_service_path_delete_ODU4(self):
576 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
577 data = {"renderer:input": {
578 "service-name": "service_ODU4",
579 "operation": "delete",
580 "service-rate": "100G",
581 "service-type": "ODU",
583 {"node-id": "SPDR-SA1",
584 "network-tp": "XPDR1-NETWORK1"}]}}
585 headers = {'content-type': 'application/json'}
586 response = requests.request(
587 "POST", url, data=json.dumps(data),
588 headers=headers, auth=('admin', 'admin'))
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 self.assertIn('Request processed', res["output"]["result"])
593 self.assertTrue(res["output"]["success"])
595 def test_22_check_no_interface_ODU4(self):
596 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
597 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
598 "interface/XPDR1-NETWORK1-ODU4"
599 .format(self.restconf_baseurl))
600 headers = {'content-type': 'application/json'}
601 response = requests.request(
602 "GET", url, headers=headers, auth=('admin', 'admin'))
603 self.assertEqual(response.status_code, requests.codes.not_found)
605 def test_23_service_path_delete_OCH_OTU4(self):
606 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
607 data = {"renderer:input": {
608 "service-name": "service_OTU4",
610 "modulation-format": "qpsk",
611 "operation": "delete",
613 {"node-id": "SPDR-SA1",
614 "dest-tp": "XPDR1-NETWORK1"}]}}
615 headers = {'content-type': 'application/json'}
616 response = requests.request(
617 "POST", url, data=json.dumps(data),
618 headers=headers, auth=('admin', 'admin'))
620 self.assertEqual(response.status_code, requests.codes.ok)
621 res = response.json()
622 self.assertIn('Request processed', res["output"]["result"])
623 self.assertTrue(res["output"]["success"])
625 def test_24_check_no_interface_OTU4(self):
626 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
627 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
628 "interface/XPDR1-NETWORK1-OTU"
629 .format(self.restconf_baseurl))
630 headers = {'content-type': 'application/json'}
631 response = requests.request(
632 "GET", url, headers=headers, auth=('admin', 'admin'))
633 self.assertEqual(response.status_code, requests.codes.not_found)
635 def test_25_check_no_interface_OCH(self):
636 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
637 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
638 "interface/XPDR1-NETWORK1-1"
639 .format(self.restconf_baseurl))
640 headers = {'content-type': 'application/json'}
641 response = requests.request(
642 "GET", url, headers=headers, auth=('admin', 'admin'))
643 self.assertEqual(response.status_code, requests.codes.not_found)
645 def test_26_disconnect_SPDR_SA1(self):
646 url = ("{}/config/network-topology:"
647 "network-topology/topology/topology-netconf/node/SPDR-SA1"
648 .format(self.restconf_baseurl))
650 headers = {'content-type': 'application/json'}
651 response = requests.request(
652 "DELETE", url, data=json.dumps(data), headers=headers,
653 auth=('admin', 'admin'))
654 self.assertEqual(response.status_code, requests.codes.ok)
657 if __name__ == "__main__":
658 unittest.main(verbosity=2)