3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportPCERendererTesting(unittest.TestCase):
30 restconf_baseurl = "http://localhost:8181/restconf"
32 # START_IGNORE_XTESTING
36 cls.odl_process = test_utils.start_tpce()
38 cls.sim_process1 = test_utils.start_sim('xpdra')
39 cls.sim_process2 = test_utils.start_sim('roadma')
42 def tearDownClass(cls):
43 for child in psutil.Process(cls.odl_process.pid).children():
44 child.send_signal(signal.SIGINT)
46 cls.odl_process.send_signal(signal.SIGINT)
47 cls.odl_process.wait()
48 for child in psutil.Process(cls.sim_process1.pid).children():
49 child.send_signal(signal.SIGINT)
51 cls.sim_process1.send_signal(signal.SIGINT)
52 cls.sim_process1.wait()
53 for child in psutil.Process(cls.sim_process2.pid).children():
54 child.send_signal(signal.SIGINT)
56 cls.sim_process2.send_signal(signal.SIGINT)
57 cls.sim_process2.wait()
60 print("execution of {}".format(self.id().split(".")[-1]))
65 def test_01_rdm_device_connected(self):
66 url = ("{}/config/network-topology:"
67 "network-topology/topology/topology-netconf/node/ROADM-A1"
68 .format(self.restconf_baseurl))
70 "node-id": "ROADM-A1",
71 "netconf-node-topology:username": "admin",
72 "netconf-node-topology:password": "admin",
73 "netconf-node-topology:host": "127.0.0.1",
74 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
75 "netconf-node-topology:tcp-only": "false",
76 "netconf-node-topology:pass-through": {}}]}
77 headers = {'content-type': 'application/json'}
78 response = requests.request(
79 "PUT", url, data=json.dumps(data), headers=headers,
80 auth=('admin', 'admin'))
81 self.assertIn(response.status_code, [requests.codes.created,
83 # self.assertEqual(response.status_code, requests.codes.created)
86 def test_02_xpdr_device_connected(self):
87 url = ("{}/config/network-topology:"
88 "network-topology/topology/topology-netconf/node/XPDR-A1"
89 .format(self.restconf_baseurl))
92 "netconf-node-topology:username": "admin",
93 "netconf-node-topology:password": "admin",
94 "netconf-node-topology:host": "127.0.0.1",
95 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
96 "netconf-node-topology:tcp-only": "false",
97 "netconf-node-topology:pass-through": {}}]}
98 headers = {'content-type': 'application/json'}
99 response = requests.request(
100 "PUT", url, data=json.dumps(data), headers=headers,
101 auth=('admin', 'admin'))
102 # self.assertEqual(response.status_code, requests.codes.created)
103 self.assertIn(response.status_code, [requests.codes.created,
107 def test_03_rdm_portmapping(self):
108 url = ("{}/config/transportpce-portmapping:network/"
110 .format(self.restconf_baseurl))
111 headers = {'content-type': 'application/json'}
112 response = requests.request(
113 "GET", url, headers=headers, auth=('admin', 'admin'))
114 self.assertEqual(response.status_code, requests.codes.ok)
115 res = response.json()
117 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
118 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
119 res['nodes'][0]['mapping'])
121 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
122 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
123 res['nodes'][0]['mapping'])
125 def test_04_xpdr_portmapping(self):
126 url = ("{}/config/transportpce-portmapping:network/"
128 .format(self.restconf_baseurl))
129 headers = {'content-type': 'application/json'}
130 response = requests.request(
131 "GET", url, headers=headers, auth=('admin', 'admin'))
132 self.assertEqual(response.status_code, requests.codes.ok)
133 res = response.json()
135 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
136 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
137 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
138 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
139 'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd941'},
140 res['nodes'][0]['mapping'])
142 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
143 'supporting-port': 'C1',
144 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
145 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
146 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
147 'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03d8'},
148 res['nodes'][0]['mapping'])
150 def test_05_service_path_create(self):
151 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
152 data = {"renderer:input": {
153 "renderer:service-name": "service_test",
154 "renderer:wave-number": "7",
155 "renderer:modulation-format": "qpsk",
156 "renderer:operation": "create",
158 {"renderer:node-id": "ROADM-A1",
159 "renderer:src-tp": "SRG1-PP3-TXRX",
160 "renderer:dest-tp": "DEG1-TTP-TXRX"},
161 {"renderer:node-id": "XPDR-A1",
162 "renderer:src-tp": "XPDR1-CLIENT1",
163 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
164 headers = {'content-type': 'application/json'}
165 response = requests.request(
166 "POST", url, data=json.dumps(data),
167 headers=headers, auth=('admin', 'admin'))
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
172 def test_06_service_path_create_rdm_check(self):
173 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
174 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
175 "interface/DEG1-TTP-TXRX-nmc-7"
176 .format(self.restconf_baseurl))
177 headers = {'content-type': 'application/json'}
178 response = requests.request(
179 "GET", url, headers=headers, auth=('admin', 'admin'))
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
183 self.assertDictEqual(
185 'name': 'DEG1-TTP-TXRX-nmc-7',
186 'administrative-state': 'inService',
187 'supporting-circuit-pack-name': '1/0',
188 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
189 'supporting-port': 'L1'
190 }, **res['interface'][0]),
193 self.assertDictEqual(
194 {u'frequency': 195.8, u'width': 40},
195 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
197 def test_07_service_path_create_rdm_check(self):
198 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
199 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
200 "interface/DEG1-TTP-TXRX-mc-7"
201 .format(self.restconf_baseurl))
202 headers = {'content-type': 'application/json'}
203 response = requests.request(
204 "GET", url, headers=headers, auth=('admin', 'admin'))
205 self.assertEqual(response.status_code, requests.codes.ok)
206 res = response.json()
207 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
208 self.assertDictEqual(
210 'name': 'DEG1-TTP-TXRX-mc-7',
211 'administrative-state': 'inService',
212 'supporting-circuit-pack-name': '1/0',
213 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
214 'supporting-port': 'L1'
215 }, **res['interface'][0]),
218 self.assertDictEqual(
219 {u'min-freq': 195.775, u'max-freq': 195.825},
220 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
222 def test_08_service_path_create_rdm_check(self):
223 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
224 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
225 "interface/SRG1-PP3-TXRX-nmc-7"
226 .format(self.restconf_baseurl))
227 headers = {'content-type': 'application/json'}
228 response = requests.request(
229 "GET", url, headers=headers, auth=('admin', 'admin'))
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
232 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
233 self.assertDictEqual(
235 'name': 'SRG1-PP3-TXRX-nmc-7',
236 'administrative-state': 'inService',
237 'supporting-circuit-pack-name': '3/0',
238 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
239 'supporting-port': 'C3'
240 }, **res['interface'][0]),
243 self.assertDictEqual(
244 {u'frequency': 195.8, u'width': 40},
245 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
247 # -mc supporting interfaces must not be created for SRG, only degrees
248 def test_09_service_path_create_rdm_check(self):
249 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
250 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
251 "interface/SRG1-PP3-TXRX-mc-7"
252 .format(self.restconf_baseurl))
253 headers = {'content-type': 'application/json'}
254 response = requests.request(
255 "GET", url, headers=headers, auth=('admin', 'admin'))
256 self.assertEqual(response.status_code, requests.codes.not_found)
257 res = response.json()
259 {"error-type": "application", "error-tag": "data-missing",
260 "error-message": "Request could not be completed because the relevant data model content does not exist"},
261 res['errors']['error'])
263 def test_10_service_path_create_rdm_check(self):
264 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
265 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
266 "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7"
267 .format(self.restconf_baseurl))
268 headers = {'content-type': 'application/json'}
269 response = requests.request(
270 "GET", url, headers=headers, auth=('admin', 'admin'))
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
274 self.assertDictEqual(
276 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-7',
277 'opticalControlMode': 'off'
278 }, **res['roadm-connections'][0]),
279 res['roadm-connections'][0]
281 self.assertDictEqual(
282 {'src-if': 'SRG1-PP3-TXRX-nmc-7'},
283 res['roadm-connections'][0]['source'])
284 self.assertDictEqual(
285 {'dst-if': 'DEG1-TTP-TXRX-nmc-7'},
286 res['roadm-connections'][0]['destination'])
288 def test_11_service_path_create_xpdr_check(self):
289 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
290 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
291 "interface/XPDR1-NETWORK1-7"
292 .format(self.restconf_baseurl))
293 headers = {'content-type': 'application/json'}
294 response = requests.request(
295 "GET", url, headers=headers, auth=('admin', 'admin'))
296 self.assertEqual(response.status_code, requests.codes.ok)
297 res = response.json()
298 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
299 self.assertDictEqual(
301 'name': 'XPDR1-NETWORK1-7',
302 'administrative-state': 'inService',
303 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
304 'type': 'org-openroadm-interfaces:opticalChannel',
305 'supporting-port': '1'
306 }, **res['interface'][0]),
309 self.assertDictEqual(
310 {u'rate': u'org-openroadm-common-types:R100G',
311 u'transmit-power': -5,
312 u'frequency': 195.8},
313 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
315 def test_12_service_path_create_xpdr_check(self):
316 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
317 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
318 "interface/XPDR1-NETWORK1-OTU"
319 .format(self.restconf_baseurl))
320 headers = {'content-type': 'application/json'}
321 response = requests.request(
322 "GET", url, headers=headers, auth=('admin', 'admin'))
323 self.assertEqual(response.status_code, requests.codes.ok)
324 res = response.json()
325 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
326 self.assertDictEqual(
328 'name': 'XPDR1-NETWORK1-OTU',
329 'administrative-state': 'inService',
330 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
331 'type': 'org-openroadm-interfaces:otnOtu',
332 'supporting-port': '1',
333 'supporting-interface': 'XPDR1-NETWORK1-7'
334 }, **res['interface'][0]),
337 self.assertDictEqual(
338 {u'rate': u'org-openroadm-otn-common-types:OTU4',
340 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
342 def test_13_service_path_create_xpdr_check(self):
343 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
344 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
345 "interface/XPDR1-NETWORK1-ODU"
346 .format(self.restconf_baseurl))
347 headers = {'content-type': 'application/json'}
348 response = requests.request(
349 "GET", url, headers=headers, auth=('admin', 'admin'))
350 self.assertEqual(response.status_code, requests.codes.ok)
351 res = response.json()
352 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
353 self.assertDictEqual(
355 'name': 'XPDR1-NETWORK1-ODU',
356 'administrative-state': 'inService',
357 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
358 'type': 'org-openroadm-interfaces:otnOdu',
359 'supporting-port': '1',
360 'supporting-interface': 'XPDR1-NETWORK1-OTU'
361 }, **res['interface'][0]),
364 self.assertDictEqual(
366 'rate': 'org-openroadm-otn-common-types:ODU4',
367 u'monitoring-mode': u'terminated'
368 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
369 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
371 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
372 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
374 def test_14_service_path_create_xpdr_check(self):
375 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
376 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
377 "interface/XPDR1-CLIENT1-ETHERNET"
378 .format(self.restconf_baseurl))
379 headers = {'content-type': 'application/json'}
380 response = requests.request(
381 "GET", url, headers=headers, auth=('admin', 'admin'))
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
384 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
385 self.assertDictEqual(
387 'name': 'XPDR1-CLIENT1-ETHERNET',
388 'administrative-state': 'inService',
389 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
390 'type': 'org-openroadm-interfaces:ethernetCsmacd',
391 'supporting-port': 'C1'
392 }, **res['interface'][0]),
395 self.assertDictEqual(
396 {u'fec': u'off', u'speed': 100000},
397 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
399 def test_15_service_path_create_xpdr_check(self):
400 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
401 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
402 "circuit-packs/1%2F0%2F1-PLUG-NET"
403 .format(self.restconf_baseurl))
404 headers = {'content-type': 'application/json'}
405 response = requests.request(
406 "GET", url, headers=headers, auth=('admin', 'admin'))
407 self.assertEqual(response.status_code, requests.codes.ok)
408 res = response.json()
409 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
411 def test_16_service_path_create_xpdr_check(self):
412 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
413 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
414 "circuit-packs/1%2F0%2F1-PLUG-CLIENT"
415 .format(self.restconf_baseurl))
416 headers = {'content-type': 'application/json'}
417 response = requests.request(
418 "GET", url, headers=headers, auth=('admin', 'admin'))
419 self.assertEqual(response.status_code, requests.codes.ok)
420 res = response.json()
421 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
423 def test_17_service_path_delete(self):
424 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
425 data = {"renderer:input": {
426 "renderer:service-name": "service_test",
427 "renderer:wave-number": "7",
428 "renderer:operation": "delete",
430 {"renderer:node-id": "ROADM-A1",
431 "renderer:src-tp": "SRG1-PP3-TXRX",
432 "renderer:dest-tp": "DEG1-TTP-TXRX"},
433 {"renderer:node-id": "XPDR-A1",
434 "renderer:src-tp": "XPDR1-CLIENT1",
435 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
436 headers = {'content-type': 'application/json'}
437 response = requests.request(
438 "POST", url, data=json.dumps(data),
439 headers=headers, auth=('admin', 'admin'))
440 self.assertEqual(response.status_code, requests.codes.ok)
441 self.assertEqual(response.json(), {
442 'output': {'result': 'Request processed', 'success': True}})
444 def test_18_service_path_delete_rdm_check(self):
445 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
446 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
447 "interface/DEG1-TTP-TXRX-mc-7"
448 .format(self.restconf_baseurl))
449 headers = {'content-type': 'application/json'}
450 response = requests.request(
451 "GET", url, headers=headers, auth=('admin', 'admin'))
452 self.assertEqual(response.status_code, requests.codes.not_found)
453 res = response.json()
455 {"error-type": "application", "error-tag": "data-missing",
456 "error-message": "Request could not be completed because the relevant data model content does not exist"},
457 res['errors']['error'])
459 def test_19_service_path_delete_rdm_check(self):
460 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
461 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
462 "interface/DEG1-TTP-TXRX-nmc-7"
463 .format(self.restconf_baseurl))
464 headers = {'content-type': 'application/json'}
465 response = requests.request(
466 "GET", url, headers=headers, auth=('admin', 'admin'))
467 self.assertEqual(response.status_code, requests.codes.not_found)
468 res = response.json()
470 {"error-type": "application", "error-tag": "data-missing",
471 "error-message": "Request could not be completed because the relevant data model content does not exist"},
472 res['errors']['error'])
474 def test_20_service_path_delete_rdm_check(self):
475 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
476 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
477 "interface/SRG1-PP3-TXRX-mc-7"
478 .format(self.restconf_baseurl))
479 headers = {'content-type': 'application/json'}
480 response = requests.request(
481 "GET", url, headers=headers, auth=('admin', 'admin'))
482 self.assertEqual(response.status_code, requests.codes.not_found)
483 res = response.json()
485 {"error-type": "application", "error-tag": "data-missing",
486 "error-message": "Request could not be completed because the relevant data model content does not exist"},
487 res['errors']['error'])
489 def test_21_service_path_delete_rdm_check(self):
490 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
491 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
492 "interface/SRG1-PP3-TXRX-nmc-7"
493 .format(self.restconf_baseurl))
494 headers = {'content-type': 'application/json'}
495 response = requests.request(
496 "GET", url, headers=headers, auth=('admin', 'admin'))
497 self.assertEqual(response.status_code, requests.codes.not_found)
498 res = response.json()
500 {"error-type": "application", "error-tag": "data-missing",
501 "error-message": "Request could not be completed because the relevant data model content does not exist"},
502 res['errors']['error'])
504 def test_22_service_path_delete_rdm_check(self):
505 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
506 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
507 "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7"
508 .format(self.restconf_baseurl))
509 headers = {'content-type': 'application/json'}
510 response = requests.request(
511 "GET", url, headers=headers, auth=('admin', 'admin'))
512 self.assertEqual(response.status_code, requests.codes.not_found)
513 res = response.json()
515 {"error-type": "application", "error-tag": "data-missing",
516 "error-message": "Request could not be completed because the relevant data model content does not exist"},
517 res['errors']['error'])
519 def test_23_service_path_delete_xpdr_check(self):
520 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
521 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
522 "interface/XPDR1-NETWORK1-7"
523 .format(self.restconf_baseurl))
524 headers = {'content-type': 'application/json'}
525 response = requests.request(
526 "GET", url, headers=headers, auth=('admin', 'admin'))
527 self.assertEqual(response.status_code, requests.codes.not_found)
528 res = response.json()
530 {"error-type": "application", "error-tag": "data-missing",
531 "error-message": "Request could not be completed because the relevant data model content does not exist"},
532 res['errors']['error'])
534 def test_24_service_path_delete_xpdr_check(self):
535 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
536 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
537 "interface/XPDR1-NETWORK1-OTU"
538 .format(self.restconf_baseurl))
539 headers = {'content-type': 'application/json'}
540 response = requests.request(
541 "GET", url, headers=headers, auth=('admin', 'admin'))
542 self.assertEqual(response.status_code, requests.codes.not_found)
543 res = response.json()
545 {"error-type": "application", "error-tag": "data-missing",
546 "error-message": "Request could not be completed because the relevant data model content does not exist"},
547 res['errors']['error'])
549 def test_25_service_path_delete_xpdr_check(self):
550 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
551 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
552 "interface/XPDR1-NETWORK1-ODU"
553 .format(self.restconf_baseurl))
554 headers = {'content-type': 'application/json'}
555 response = requests.request(
556 "GET", url, headers=headers, auth=('admin', 'admin'))
557 self.assertEqual(response.status_code, requests.codes.not_found)
558 res = response.json()
560 {"error-type": "application", "error-tag": "data-missing",
561 "error-message": "Request could not be completed because the relevant data model content does not exist"},
562 res['errors']['error'])
564 def test_26_service_path_delete_xpdr_check(self):
565 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
566 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
567 "interface/XPDR1-CLIENT1-ETHERNET"
568 .format(self.restconf_baseurl))
569 headers = {'content-type': 'application/json'}
570 response = requests.request(
571 "GET", url, headers=headers, auth=('admin', 'admin'))
572 self.assertEqual(response.status_code, requests.codes.not_found)
573 res = response.json()
575 {"error-type": "application", "error-tag": "data-missing",
576 "error-message": "Request could not be completed because the relevant data model content does not exist"},
577 res['errors']['error'])
579 def test_27_service_path_delete_xpdr_check(self):
580 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
581 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
582 "circuit-packs/1%2F0%2F1-PLUG-NET"
583 .format(self.restconf_baseurl))
584 headers = {'content-type': 'application/json'}
585 response = requests.request(
586 "GET", url, headers=headers, auth=('admin', 'admin'))
587 self.assertEqual(response.status_code, requests.codes.ok)
588 res = response.json()
589 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
591 def test_28_service_path_delete_xpdr_check(self):
592 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
593 "node/XPDR-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
594 "circuit-packs/1%2F0%2F1-PLUG-CLIENT"
595 .format(self.restconf_baseurl))
596 headers = {'content-type': 'application/json'}
597 response = requests.request(
598 "GET", url, headers=headers, auth=('admin', 'admin'))
599 self.assertEqual(response.status_code, requests.codes.ok)
600 res = response.json()
601 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
603 def test_29_rdm_device_disconnected(self):
604 url = ("{}/config/network-topology:"
605 "network-topology/topology/topology-netconf/node/ROADM-A1"
606 .format(self.restconf_baseurl))
607 headers = {'content-type': 'application/json'}
608 response = requests.request(
609 "DELETE", url, headers=headers,
610 auth=('admin', 'admin'))
611 self.assertEqual(response.status_code, requests.codes.ok)
614 def test_30_xpdr_device_disconnected(self):
615 url = ("{}/config/network-topology:"
616 "network-topology/topology/topology-netconf/node/XPDR-A1"
617 .format(self.restconf_baseurl))
618 headers = {'content-type': 'application/json'}
619 response = requests.request(
620 "DELETE", url, headers=headers,
621 auth=('admin', 'admin'))
622 self.assertEqual(response.status_code, requests.codes.ok)
626 if __name__ == "__main__":
627 unittest.main(verbosity=2, failfast=True)