3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 def extract_a_from_b(a, b):
26 return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
29 class TransportPCEtesting(unittest.TestCase):
31 honeynode_process1 = None
33 restconf_baseurl = "http://localhost:8181/restconf"
37 print("starting honeynode1...")
38 cls.honeynode_process1 = test_utils.start_spdra_honeynode()
41 print("starting opendaylight...")
42 cls.odl_process = test_utils.start_tpce()
44 print("opendaylight started")
47 def tearDownClass(cls):
48 for child in psutil.Process(cls.odl_process.pid).children():
49 child.send_signal(signal.SIGINT)
51 cls.odl_process.send_signal(signal.SIGINT)
52 cls.odl_process.wait()
53 for child in psutil.Process(cls.honeynode_process1.pid).children():
54 child.send_signal(signal.SIGINT)
56 cls.honeynode_process1.send_signal(signal.SIGINT)
57 cls.honeynode_process1.wait()
62 def test_01_connect_SPDR_SA1(self):
63 url = ("{}/config/network-topology:"
64 "network-topology/topology/topology-netconf/node/SPDR-SA1"
65 .format(self.restconf_baseurl))
67 "node-id": "SPDR-SA1",
68 "netconf-node-topology:username": "admin",
69 "netconf-node-topology:password": "admin",
70 "netconf-node-topology:host": "127.0.0.1",
71 "netconf-node-topology:port": "17845",
72 "netconf-node-topology:tcp-only": "false",
73 "netconf-node-topology:pass-through": {}}]}
74 headers = {'content-type': 'application/json'}
75 response = requests.request(
76 "PUT", url, data=json.dumps(data), headers=headers,
77 auth=('admin', 'admin'))
78 self.assertEqual(response.status_code, requests.codes.created)
80 url = ("{}/operational/network-topology:"
81 "network-topology/topology/topology-netconf/node/SPDR-SA1"
82 .format(self.restconf_baseurl))
83 response = requests.request(
84 "GET", url, headers=headers, auth=('admin', 'admin'))
85 self.assertEqual(response.status_code, requests.codes.ok)
88 res['node'][0]['netconf-node-topology:connection-status'],
91 def test_02_get_portmapping_CLIENT1(self):
92 url = ("{}/config/transportpce-portmapping:network/"
93 "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
94 .format(self.restconf_baseurl))
95 headers = {'content-type': 'application/json'}
96 response = requests.request(
97 "GET", url, headers=headers, auth=('admin', 'admin'))
98 self.assertEqual(response.status_code, requests.codes.ok)
101 {'supported-interface-capability': [
102 'org-openroadm-port-types:if-10GE-ODU2e',
103 'org-openroadm-port-types:if-10GE-ODU2',
104 'org-openroadm-port-types:if-10GE'],
105 'supporting-port': 'CP1-SFP4-P1',
106 'supporting-circuit-pack-name': 'CP1-SFP4',
107 'logical-connection-point': 'XPDR1-CLIENT1',
108 'port-direction': 'bidirectional',
109 'port-qual': 'xpdr-client',
110 'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
113 def test_03_get_portmapping_NETWORK1(self):
114 url = ("{}/config/transportpce-portmapping:network/"
115 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
116 .format(self.restconf_baseurl))
117 headers = {'content-type': 'application/json'}
118 response = requests.request(
119 "GET", url, headers=headers, auth=('admin', 'admin'))
120 self.assertEqual(response.status_code, requests.codes.ok)
121 res = response.json()
123 {"logical-connection-point": "XPDR1-NETWORK1",
124 "supporting-port": "CP1-CFP0-P1",
125 "supported-interface-capability": [
126 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
128 "port-direction": "bidirectional",
129 "port-qual": "xpdr-network",
130 "supporting-circuit-pack-name": "CP1-CFP0",
131 "xponder-type": "mpdr",
132 'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
135 def test_04_service_path_create_OCH_OTU4(self):
136 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
137 data = {"renderer:input": {
138 "service-name": "service_ODU4",
140 "modulation-format": "qpsk",
141 "operation": "create",
143 {"node-id": "SPDR-SA1",
144 "dest-tp": "XPDR1-NETWORK1"}]}}
145 headers = {'content-type': 'application/json'}
146 response = requests.request(
147 "POST", url, data=json.dumps(data),
148 headers=headers, auth=('admin', 'admin'))
150 self.assertEqual(response.status_code, requests.codes.ok)
151 res = response.json()
152 self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
153 self.assertTrue(res["output"]["success"])
155 {'node-id': 'SPDR-SA1',
156 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
157 'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
159 def test_05_get_portmapping_NETWORK1(self):
160 url = ("{}/config/transportpce-portmapping:network/"
161 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
162 .format(self.restconf_baseurl))
163 headers = {'content-type': 'application/json'}
164 response = requests.request(
165 "GET", url, headers=headers, auth=('admin', 'admin'))
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
169 {"logical-connection-point": "XPDR1-NETWORK1",
170 "supporting-port": "CP1-CFP0-P1",
171 "supported-interface-capability": [
172 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
174 "port-direction": "bidirectional",
175 "port-qual": "xpdr-network",
176 "supporting-circuit-pack-name": "CP1-CFP0",
177 "xponder-type": "mpdr",
178 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
181 def test_06_check_interface_och(self):
182 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
183 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
184 "interface/XPDR1-NETWORK1-1"
185 .format(self.restconf_baseurl))
186 headers = {'content-type': 'application/json'}
187 response = requests.request(
188 "GET", url, headers=headers, auth=('admin', 'admin'))
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
192 input_dict = {'name': 'XPDR1-NETWORK1-1',
193 'administrative-state': 'inService',
194 'supporting-circuit-pack-name': 'CP1-CFP0',
195 'type': 'org-openroadm-interfaces:opticalChannel',
196 'supporting-port': 'CP1-CFP0-P1'
198 # assertDictContainsSubset is deprecated
200 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
201 'supporting-circuit-pack-name': 'CP1-CFP0',
202 'type': 'org-openroadm-interfaces:opticalChannel',
203 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
205 self.assertDictEqual(input_dict,
206 extract_a_from_b(input_dict,
209 self.assertDictEqual(
210 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
211 u'transmit-power': -5},
212 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
214 def test_07_check_interface_OTU(self):
215 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
216 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
217 "interface/XPDR1-NETWORK1-OTU"
218 .format(self.restconf_baseurl))
219 headers = {'content-type': 'application/json'}
220 response = requests.request(
221 "GET", url, headers=headers, auth=('admin', 'admin'))
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 input_dict = {'name': 'XPDR1-NETWORK1-OTU',
225 'administrative-state': 'inService',
226 'supporting-circuit-pack-name': 'CP1-CFP0',
227 'supporting-interface': 'XPDR1-NETWORK1-1',
228 'type': 'org-openroadm-interfaces:otnOtu',
229 'supporting-port': 'CP1-CFP0-P1'}
231 # assertDictContainsSubset is deprecated
233 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
234 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
235 'type': 'org-openroadm-interfaces:otnOtu',
236 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
238 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
242 self.assertDictEqual(
243 {u'rate': u'org-openroadm-otn-common-types:OTU4',
245 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
247 def test_08_otn_service_path_create_ODU4(self):
248 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
249 data = {"renderer:input": {
250 "service-name": "service_ODU4",
251 "operation": "create",
252 "service-rate": "100G",
253 "service-type": "ODU",
255 {"node-id": "SPDR-SA1",
256 "network-tp": "XPDR1-NETWORK1"}]}}
257 headers = {'content-type': 'application/json'}
258 response = requests.request(
259 "POST", url, data=json.dumps(data),
260 headers=headers, auth=('admin', 'admin'))
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
264 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
265 self.assertTrue(res["output"]["success"])
267 {'node-id': 'SPDR-SA1',
268 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
270 def test_09_get_portmapping_NETWORK1(self):
271 url = ("{}/config/transportpce-portmapping:network/"
272 "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
273 .format(self.restconf_baseurl))
274 headers = {'content-type': 'application/json'}
275 response = requests.request(
276 "GET", url, headers=headers, auth=('admin', 'admin'))
277 self.assertEqual(response.status_code, requests.codes.ok)
278 res = response.json()
280 {"logical-connection-point": "XPDR1-NETWORK1",
281 "supporting-port": "CP1-CFP0-P1",
282 "supported-interface-capability": [
283 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
285 "port-direction": "bidirectional",
286 "port-qual": "xpdr-network",
287 "supporting-circuit-pack-name": "CP1-CFP0",
288 "xponder-type": "mpdr",
289 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
290 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
294 def test_10_check_interface_ODU4(self):
295 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
296 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
297 "interface/XPDR1-NETWORK1-ODU4"
298 .format(self.restconf_baseurl))
299 headers = {'content-type': 'application/json'}
300 response = requests.request(
301 "GET", url, headers=headers, auth=('admin', 'admin'))
302 self.assertEqual(response.status_code, requests.codes.ok)
303 res = response.json()
304 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
305 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
306 'type': 'org-openroadm-interfaces:otnOdu',
307 'supporting-port': 'CP1-CFP0-P1'}
308 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
309 'rate': 'org-openroadm-otn-common-types:ODU4'}
311 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
314 self.assertDictEqual(input_dict_2,
315 extract_a_from_b(input_dict_2,
317 'org-openroadm-otn-odu-interfaces:odu'])
321 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
322 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
323 'type': 'org-openroadm-interfaces:otnOdu',
324 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
325 self.assertDictContainsSubset(
326 {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
327 'rate': 'org-openroadm-otn-common-types:ODU4'},
328 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
330 self.assertDictEqual(
331 {u'payload-type': u'21', u'exp-payload-type': u'21'},
332 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
334 def test_11_otn_service_path_create_10GE(self):
335 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
336 data = {"renderer:input": {
337 "service-name": "service1",
338 "operation": "create",
339 "service-rate": "10G",
340 "service-type": "Ethernet",
341 "ethernet-encoding": "eth encode",
343 "trib-port-number": "1",
345 {"node-id": "SPDR-SA1",
346 "client-tp": "XPDR1-CLIENT1",
347 "network-tp": "XPDR1-NETWORK1"}]}}
348 headers = {'content-type': 'application/json'}
349 response = requests.request(
350 "POST", url, data=json.dumps(data),
351 headers=headers, auth=('admin', 'admin'))
353 self.assertEqual(response.status_code, requests.codes.ok)
354 res = response.json()
355 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
356 self.assertTrue(res["output"]["success"])
358 {'node-id': 'SPDR-SA1',
359 'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
360 'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
361 'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
363 def test_12_check_interface_10GE_CLIENT(self):
364 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
365 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
366 "interface/XPDR1-CLIENT1-ETHERNET10G"
367 .format(self.restconf_baseurl))
368 headers = {'content-type': 'application/json'}
369 response = requests.request(
370 "GET", url, headers=headers, auth=('admin', 'admin'))
371 self.assertEqual(response.status_code, requests.codes.ok)
372 res = response.json()
373 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
374 'administrative-state': 'inService',
375 'supporting-circuit-pack-name': 'CP1-SFP4',
376 'type': 'org-openroadm-interfaces:ethernetCsmacd',
377 'supporting-port': 'CP1-SFP4-P1'
381 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
382 'supporting-circuit-pack-name': 'CP1-SFP4',
383 'type': 'org-openroadm-interfaces:ethernetCsmacd',
384 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
386 self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
389 self.assertDictEqual(
391 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
393 def test_13_check_interface_ODU2E_CLIENT(self):
394 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
395 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
396 "interface/XPDR1-CLIENT1-ODU2e-service1"
397 .format(self.restconf_baseurl))
398 headers = {'content-type': 'application/json'}
399 response = requests.request(
400 "GET", url, headers=headers, auth=('admin', 'admin'))
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
404 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
405 'administrative-state': 'inService',
406 'supporting-circuit-pack-name': 'CP1-SFP4',
407 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
408 'type': 'org-openroadm-interfaces:otnOdu',
409 'supporting-port': 'CP1-SFP4-P1'}
411 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
412 'rate': 'org-openroadm-otn-common-types:ODU2e',
413 'monitoring-mode': 'terminated'}
415 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
418 self.assertDictEqual(input_dict_2,
419 extract_a_from_b(input_dict_2, res['interface'][0][
420 'org-openroadm-otn-odu-interfaces:odu'])
424 self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
425 'supporting-circuit-pack-name': 'CP1-SFP4',
426 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
427 'type': 'org-openroadm-interfaces:otnOdu',
428 'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
429 self.assertDictContainsSubset({
430 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
431 'rate': 'org-openroadm-otn-common-types:ODU2e',
432 'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
434 self.assertDictEqual(
435 {u'payload-type': u'03', u'exp-payload-type': u'03'},
436 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
438 def test_14_check_interface_ODU2E_NETWORK(self):
439 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
440 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
441 "interface/XPDR1-NETWORK1-ODU2e-service1"
442 .format(self.restconf_baseurl))
443 headers = {'content-type': 'application/json'}
444 response = requests.request(
445 "GET", url, headers=headers, auth=('admin', 'admin'))
446 self.assertEqual(response.status_code, requests.codes.ok)
447 res = response.json()
448 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
449 'supporting-circuit-pack-name': 'CP1-CFP0',
450 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
451 'type': 'org-openroadm-interfaces:otnOdu',
452 'supporting-port': 'CP1-CFP0-P1'}
454 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
455 'rate': 'org-openroadm-otn-common-types:ODU2e',
456 'monitoring-mode': 'monitored'}
458 input_dict_3 = {'trib-port-number': 1}
460 self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
464 self.assertDictEqual(input_dict_2,
465 extract_a_from_b(input_dict_2,
467 'org-openroadm-otn-odu-interfaces:odu']
470 self.assertDictEqual(input_dict_3,
471 extract_a_from_b(input_dict_3,
473 'org-openroadm-otn-odu-interfaces:odu'][
474 'parent-odu-allocation']))
477 self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
478 'supporting-circuit-pack-name': 'CP1-CFP0',
479 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
480 'type': 'org-openroadm-interfaces:otnOdu',
481 'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
482 self.assertDictContainsSubset({
483 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
484 'rate': 'org-openroadm-otn-common-types:ODU2e',
485 'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
486 self.assertDictContainsSubset(
487 {'trib-port-number': 1},
488 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
492 'org-openroadm-otn-odu-interfaces:odu'][
493 'parent-odu-allocation']['trib-slots'])
495 def test_15_check_ODU2E_connection(self):
496 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
497 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
498 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
499 .format(self.restconf_baseurl))
500 headers = {'content-type': 'application/json'}
501 response = requests.request(
502 "GET", url, headers=headers, auth=('admin', 'admin'))
503 self.assertEqual(response.status_code, requests.codes.ok)
504 res = response.json()
507 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
508 'direction': 'bidirectional'
511 self.assertDictEqual(input_dict_1,
512 extract_a_from_b(input_dict_1,
513 res['odu-connection'][0]))
515 self.assertDictContainsSubset({
516 'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
517 'direction': 'bidirectional'},
518 res['odu-connection'][0])
520 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
521 res['odu-connection'][0]['destination'])
522 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
523 res['odu-connection'][0]['source'])
525 def test_16_otn_service_path_delete_10GE(self):
526 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
527 data = {"renderer:input": {
528 "service-name": "service1",
529 "operation": "delete",
530 "service-rate": "10G",
531 "service-type": "Ethernet",
532 "ethernet-encoding": "eth encode",
534 "trib-port-number": "1",
536 {"node-id": "SPDR-SA1",
537 "client-tp": "XPDR1-CLIENT1",
538 "network-tp": "XPDR1-NETWORK1"}]}}
539 headers = {'content-type': 'application/json'}
540 response = requests.request(
541 "POST", url, data=json.dumps(data),
542 headers=headers, auth=('admin', 'admin'))
544 self.assertEqual(response.status_code, requests.codes.ok)
545 res = response.json()
546 self.assertIn('Request processed', res["output"]["result"])
547 self.assertTrue(res["output"]["success"])
549 def test_17_check_no_ODU2E_connection(self):
550 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
551 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
552 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
553 .format(self.restconf_baseurl))
554 headers = {'content-type': 'application/json'}
555 response = requests.request(
556 "GET", url, headers=headers, auth=('admin', 'admin'))
557 self.assertEqual(response.status_code, requests.codes.not_found)
559 def test_18_check_no_interface_ODU2E_NETWORK(self):
560 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
561 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
562 "interface/XPDR1-NETWORK1-ODU2e-service1"
563 .format(self.restconf_baseurl))
564 headers = {'content-type': 'application/json'}
565 response = requests.request(
566 "GET", url, headers=headers, auth=('admin', 'admin'))
567 self.assertEqual(response.status_code, requests.codes.not_found)
569 def test_19_check_no_interface_ODU2E_CLIENT(self):
570 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
571 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
572 "interface/XPDR1-CLIENT1-ODU2e-service1"
573 .format(self.restconf_baseurl))
574 headers = {'content-type': 'application/json'}
575 response = requests.request(
576 "GET", url, headers=headers, auth=('admin', 'admin'))
577 self.assertEqual(response.status_code, requests.codes.not_found)
579 def test_20_check_no_interface_10GE_CLIENT(self):
580 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
581 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
582 "interface/XPDR1-CLIENT1-ETHERNET10G"
583 .format(self.restconf_baseurl))
584 headers = {'content-type': 'application/json'}
585 response = requests.request(
586 "GET", url, headers=headers, auth=('admin', 'admin'))
587 self.assertEqual(response.status_code, requests.codes.not_found)
589 def test_21_otn_service_path_delete_ODU4(self):
590 url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
591 data = {"renderer:input": {
592 "service-name": "service_ODU4",
593 "operation": "delete",
594 "service-rate": "100G",
595 "service-type": "ODU",
597 {"node-id": "SPDR-SA1",
598 "network-tp": "XPDR1-NETWORK1"}]}}
599 headers = {'content-type': 'application/json'}
600 response = requests.request(
601 "POST", url, data=json.dumps(data),
602 headers=headers, auth=('admin', 'admin'))
604 self.assertEqual(response.status_code, requests.codes.ok)
605 res = response.json()
606 self.assertIn('Request processed', res["output"]["result"])
607 self.assertTrue(res["output"]["success"])
609 def test_22_check_no_interface_ODU4(self):
610 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
611 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
612 "interface/XPDR1-NETWORK1-ODU4"
613 .format(self.restconf_baseurl))
614 headers = {'content-type': 'application/json'}
615 response = requests.request(
616 "GET", url, headers=headers, auth=('admin', 'admin'))
617 self.assertEqual(response.status_code, requests.codes.not_found)
619 def test_23_service_path_delete_OCH_OTU4(self):
620 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
621 data = {"renderer:input": {
622 "service-name": "service_OTU4",
624 "modulation-format": "qpsk",
625 "operation": "delete",
627 {"node-id": "SPDR-SA1",
628 "dest-tp": "XPDR1-NETWORK1"}]}}
629 headers = {'content-type': 'application/json'}
630 response = requests.request(
631 "POST", url, data=json.dumps(data),
632 headers=headers, auth=('admin', 'admin'))
634 self.assertEqual(response.status_code, requests.codes.ok)
635 res = response.json()
636 self.assertIn('Request processed', res["output"]["result"])
637 self.assertTrue(res["output"]["success"])
639 def test_24_check_no_interface_OTU4(self):
640 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
641 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
642 "interface/XPDR1-NETWORK1-OTU"
643 .format(self.restconf_baseurl))
644 headers = {'content-type': 'application/json'}
645 response = requests.request(
646 "GET", url, headers=headers, auth=('admin', 'admin'))
647 self.assertEqual(response.status_code, requests.codes.not_found)
649 def test_25_check_no_interface_OCH(self):
650 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
651 "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
652 "interface/XPDR1-NETWORK1-1"
653 .format(self.restconf_baseurl))
654 headers = {'content-type': 'application/json'}
655 response = requests.request(
656 "GET", url, headers=headers, auth=('admin', 'admin'))
657 self.assertEqual(response.status_code, requests.codes.not_found)
659 def test_26_disconnect_SPDR_SA1(self):
660 url = ("{}/config/network-topology:"
661 "network-topology/topology/topology-netconf/node/SPDR-SA1"
662 .format(self.restconf_baseurl))
664 headers = {'content-type': 'application/json'}
665 response = requests.request(
666 "DELETE", url, data=json.dumps(data), headers=headers,
667 auth=('admin', 'admin'))
668 self.assertEqual(response.status_code, requests.codes.ok)
671 if __name__ == "__main__":
672 unittest.main(verbosity=2)