3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
21 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION_221 = '2.2.1'
34 NODE_VERSION_71 = '7.1'
36 cr_serv_sample_data = {"input": {
37 "sdnc-request-header": {
38 "request-id": "request-1",
39 "rpc-action": "service-create",
40 "request-system-id": "appname"
42 "service-name": "service1-OTUC4",
43 "common-id": "commonId",
44 "connection-type": "infrastructure",
46 "service-rate": "400",
48 "service-format": "OTU",
49 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
53 "committed-info-rate": "100000",
54 "committed-burst-size": "64"
59 "port-device-name": "XPDR-A2-XPDR2",
61 "port-name": "XPDR2-NETWORK1",
62 "port-rack": "000000.00",
63 "port-shelf": "Chassis#1"
66 "lgx-device-name": "Some lgx-device-name",
67 "lgx-port-name": "Some lgx-port-name",
68 "lgx-port-rack": "000000.00",
69 "lgx-port-shelf": "00"
74 "port-device-name": "XPDR-A2-XPDR2",
76 "port-name": "XPDR2-NETWORK1",
77 "port-rack": "000000.00",
78 "port-shelf": "Chassis#1"
81 "lgx-device-name": "Some lgx-device-name",
82 "lgx-port-name": "Some lgx-port-name",
83 "lgx-port-rack": "000000.00",
84 "lgx-port-shelf": "00"
90 "service-rate": "400",
92 "service-format": "OTU",
93 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
97 "committed-info-rate": "100000",
98 "committed-burst-size": "64"
103 "port-device-name": "XPDR-C2-XPDR2",
104 "port-type": "fixed",
105 "port-name": "XPDR2-NETWORK1",
106 "port-rack": "000000.00",
107 "port-shelf": "Chassis#1"
110 "lgx-device-name": "Some lgx-device-name",
111 "lgx-port-name": "Some lgx-port-name",
112 "lgx-port-rack": "000000.00",
113 "lgx-port-shelf": "00"
118 "port-device-name": "XPDR-C2-XPDR2",
119 "port-type": "fixed",
120 "port-name": "XPDR2-NETWORK1",
121 "port-rack": "000000.00",
122 "port-shelf": "Chassis#1"
125 "lgx-device-name": "Some lgx-device-name",
126 "lgx-port-name": "Some lgx-port-name",
127 "lgx-port-rack": "000000.00",
128 "lgx-port-shelf": "00"
133 "due-date": "2018-06-15T00:00:01Z",
134 "operator-contact": "pw1234"
140 cls.processes = test_utils.start_tpce()
141 cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
142 ('roadma', cls.NODE_VERSION_221),
143 ('roadmc', cls.NODE_VERSION_221),
144 ('xpdrc2', cls.NODE_VERSION_71)])
147 def tearDownClass(cls):
148 # pylint: disable=not-an-iterable
149 for process in cls.processes:
150 test_utils.shutdown_process(process)
151 print("all processes killed")
156 def test_01_connect_xpdra2(self):
157 response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_02_connect_xpdrc2(self):
162 response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_03_connect_rdma(self):
167 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
168 self.assertEqual(response.status_code,
169 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
171 def test_04_connect_rdmc(self):
172 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
173 self.assertEqual(response.status_code,
174 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
176 def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
177 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
178 "ROADM-A1", "1", "SRG1-PP2-TXRX")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 self.assertIn('Xponder Roadm Link created successfully',
182 res["output"]["result"])
185 def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
186 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
187 "ROADM-A1", "1", "SRG1-PP2-TXRX")
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Roadm Xponder links created successfully',
191 res["output"]["result"])
194 def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
196 "ROADM-C1", "1", "SRG1-PP2-TXRX")
197 self.assertEqual(response.status_code, requests.codes.ok)
198 res = response.json()
199 self.assertIn('Xponder Roadm Link created successfully',
200 res["output"]["result"])
203 def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
204 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
205 "ROADM-C1", "1", "SRG1-PP2-TXRX")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
208 self.assertIn('Roadm Xponder links created successfully',
209 res["output"]["result"])
212 def test_09_add_omsAttributes_roadma_roadmc(self):
213 # Config ROADMA-ROADMC oms-attributes
215 "auto-spanloss": "true",
216 "spanloss-base": 11.4,
217 "spanloss-current": 12,
218 "engineered-spanloss": 12.2,
219 "link-concatenation": [{
222 "SRLG-length": 100000,
224 response = test_utils.add_oms_attr_request(
225 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
226 self.assertEqual(response.status_code, requests.codes.created)
228 def test_10_add_omsAttributes_roadmc_roadma(self):
229 # Config ROADMC-ROADMA oms-attributes
231 "auto-spanloss": "true",
232 "spanloss-base": 11.4,
233 "spanloss-current": 12,
234 "engineered-spanloss": 12.2,
235 "link-concatenation": [{
238 "SRLG-length": 100000,
240 response = test_utils.add_oms_attr_request(
241 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
242 self.assertEqual(response.status_code, requests.codes.created)
244 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
245 def test_11_check_otn_topology(self):
246 response = test_utils.get_otn_topo_request()
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
249 nbNode = len(res['network'][0]['node'])
250 self.assertEqual(nbNode, 4, 'There should be 4 nodes')
251 self.assertNotIn('ietf-network-topology:link', res['network'][0])
253 def test_12_create_OTUC4_service(self):
254 response = test_utils.service_create_request(self.cr_serv_sample_data)
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 self.assertIn('PCE calculation in progress',
258 res['output']['configuration-response-common']['response-message'])
259 time.sleep(self.WAITING)
261 def test_13_get_OTUC4_service1(self):
262 response = test_utils.get_service_list_request(
263 "services/service1-OTUC4")
264 self.assertEqual(response.status_code, requests.codes.ok)
265 res = response.json()
267 res['services'][0]['administrative-state'], 'inService')
269 res['services'][0]['service-name'], 'service1-OTUC4')
271 res['services'][0]['connection-type'], 'infrastructure')
273 res['services'][0]['lifecycle-state'], 'planned')
275 res['services'][0]['service-layer'], 'wdm')
277 res['services'][0]['service-a-end']['service-rate'], 400)
279 res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
282 res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
285 # Check correct configuration of devices
286 def test_14_check_interface_och_xpdra2(self):
287 response = test_utils.check_netconf_node_request(
288 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
292 'administrative-state': 'inService',
293 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294 'type': 'org-openroadm-interfaces:otsi',
295 'supporting-port': 'L1'
296 }, **res['interface'][0]),
299 self.assertDictEqual(
300 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
301 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
302 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
303 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
305 def test_15_check_interface_OTSI_GROUP_xpdra2(self):
306 response = test_utils.check_netconf_node_request(
307 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
311 'administrative-state': 'inService',
312 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
313 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
314 'type': 'org-openroadm-interfaces:otsi-group',
315 'supporting-port': 'L1'
317 input_dict_2 = {"group-id": 1,
318 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
320 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
322 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
323 ['org-openroadm-otsi-group-interfaces:otsi-group']),
325 ['org-openroadm-otsi-group-interfaces:otsi-group'])
327 def test_16_check_interface_OTUC4_xpdra2(self):
328 response = test_utils.check_netconf_node_request(
329 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
330 self.assertEqual(response.status_code, requests.codes.ok)
331 res = response.json()
332 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
333 'administrative-state': 'inService',
334 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
335 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
336 'type': 'org-openroadm-interfaces:otnOtu',
337 'supporting-port': 'L1'
339 input_dict_2 = {'tx-sapi': 'G54UFNImtOE=',
340 'expected-dapi': 'G54UFNImtOE=',
341 'tx-dapi': 'J/FIUzQc+4M=',
342 'expected-sapi': 'J/FIUzQc+4M=',
343 'rate': 'org-openroadm-otn-common-types:OTUCn',
344 'degthr-percentage': 100,
348 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
350 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
352 ['org-openroadm-otn-otu-interfaces:otu'])
354 def test_17_check_interface_och_xpdrc2(self):
355 response = test_utils.check_netconf_node_request(
356 "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
360 'administrative-state': 'inService',
361 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
362 'type': 'org-openroadm-interfaces:otsi',
363 'supporting-port': 'L1'
364 }, **res['interface'][0]),
367 self.assertDictEqual(
368 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
369 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
370 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
371 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
373 def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
374 response = test_utils.check_netconf_node_request(
375 "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
376 self.assertEqual(response.status_code, requests.codes.ok)
377 res = response.json()
378 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
379 'administrative-state': 'inService',
380 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
381 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
382 'type': 'org-openroadm-interfaces:otsi-group',
383 'supporting-port': 'L1'
385 input_dict_2 = {"group-id": 1,
386 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
388 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
390 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
391 ['org-openroadm-otsi-group-interfaces:otsi-group']),
393 ['org-openroadm-otsi-group-interfaces:otsi-group'])
395 def test_19_check_interface_OTUC4_xpdrc2(self):
396 response = test_utils.check_netconf_node_request(
397 "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
401 'administrative-state': 'inService',
402 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
403 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
404 'type': 'org-openroadm-interfaces:otnOtu',
405 'supporting-port': 'L1'
407 input_dict_2 = {'tx-dapi': 'G54UFNImtOE=',
408 'expected-sapi': 'G54UFNImtOE=',
409 'tx-sapi': 'J/FIUzQc+4M=',
410 'expected-dapi': 'J/FIUzQc+4M=',
411 'rate': 'org-openroadm-otn-common-types:OTUCn',
412 'degthr-percentage': 100,
417 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
420 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
422 ['org-openroadm-otn-otu-interfaces:otu'])
424 def test_20_check_no_interface_ODUC4_xpdra2(self):
425 response = test_utils.check_netconf_node_request(
426 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
427 self.assertEqual(response.status_code, requests.codes.conflict)
428 res = response.json()
430 {"error-type": "application", "error-tag": "data-missing",
431 "error-message": "Request could not be completed because the relevant data model content does not exist"},
432 res['errors']['error'])
434 def test_21_check_openroadm_topo_xpdra2(self):
435 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
436 self.assertEqual(response.status_code, requests.codes.ok)
437 res = response.json()
438 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
439 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
440 self.assertEqual({'frequency': 196.08125,
442 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
445 def test_22_check_otn_topo_OTUC4_links(self):
446 response = test_utils.get_otn_topo_request()
447 self.assertEqual(response.status_code, requests.codes.ok)
448 res = response.json()
449 nb_links = len(res['network'][0]['ietf-network-topology:link'])
450 self.assertEqual(nb_links, 2)
451 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
452 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
453 for link in res['network'][0]['ietf-network-topology:link']:
454 self.assertIn(link['link-id'], listLinkId)
456 link['transportpce-topology:otn-link-type'], 'OTUC4')
458 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
460 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
462 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
464 link['org-openroadm-common-network:opposite-link'], listLinkId)
466 # test service-create for ODU4 service from xpdra2 to xpdrc2
467 def test_23_create_ODUC4_service(self):
468 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
469 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
470 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
471 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
472 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
473 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
474 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
476 response = test_utils.service_create_request(self.cr_serv_sample_data)
477 self.assertEqual(response.status_code, requests.codes.ok)
478 res = response.json()
479 self.assertIn('PCE calculation in progress',
480 res['output']['configuration-response-common']['response-message'])
481 time.sleep(self.WAITING)
483 def test_24_get_ODUC4_service1(self):
484 response = test_utils.get_service_list_request(
485 "services/service1-ODUC4")
486 self.assertEqual(response.status_code, requests.codes.ok)
487 res = response.json()
489 res['services'][0]['administrative-state'], 'inService')
491 res['services'][0]['service-name'], 'service1-ODUC4')
493 res['services'][0]['connection-type'], 'infrastructure')
495 res['services'][0]['lifecycle-state'], 'planned')
497 res['services'][0]['service-layer'], 'wdm')
499 res['services'][0]['service-a-end']['service-rate'], 400)
501 res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
503 res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
506 def test_25_check_interface_ODUC4_xpdra2(self):
507 response = test_utils.check_netconf_node_request(
508 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
509 self.assertEqual(response.status_code, requests.codes.ok)
510 res = response.json()
511 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
512 'administrative-state': 'inService',
513 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
514 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
515 'type': 'org-openroadm-interfaces:otnOdu',
516 'supporting-port': 'L1'}
517 # SAPI/DAPI are added in the Otu4 renderer
518 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
519 'rate': 'org-openroadm-otn-common-types:ODUCn',
520 'expected-dapi': 'Nmbu2MNHvc4=',
521 'expected-sapi': 'Nmbu2MNHvc4=',
522 'tx-dapi': 'Nmbu2MNHvc4=',
523 'tx-sapi': 'LY9PxYJqUbw='}
525 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
527 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
528 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
529 self.assertDictEqual(
530 {'payload-type': '22', 'exp-payload-type': '22'},
531 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
533 def test_26_check_interface_ODUC4_xpdrc2(self):
534 response = test_utils.check_netconf_node_request(
535 "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
539 'administrative-state': 'inService',
540 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
541 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
542 'type': 'org-openroadm-interfaces:otnOdu',
543 'supporting-port': 'L1'}
544 # SAPI/DAPI are added in the Otu4 renderer
545 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
546 'rate': 'org-openroadm-otn-common-types:ODUCn',
547 'tx-sapi': 'Nmbu2MNHvc4=',
548 'tx-dapi': 'LY9PxYJqUbw=',
549 'expected-sapi': 'LY9PxYJqUbw=',
550 'expected-dapi': 'LY9PxYJqUbw='
552 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
554 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
555 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
556 self.assertDictEqual(
557 {'payload-type': '22', 'exp-payload-type': '22'},
558 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
560 def test_27_check_otn_topo_links(self):
561 response = test_utils.get_otn_topo_request()
562 self.assertEqual(response.status_code, requests.codes.ok)
563 res = response.json()
564 nb_links = len(res['network'][0]['ietf-network-topology:link'])
565 self.assertEqual(nb_links, 4)
566 for link in res['network'][0]['ietf-network-topology:link']:
567 linkId = link['link-id']
568 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
569 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
571 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
573 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
574 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
575 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
577 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
579 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
581 link['transportpce-topology:otn-link-type'], 'ODUC4')
583 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
584 self.assertIn(link['org-openroadm-common-network:opposite-link'],
585 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
586 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
588 self.fail("this link should not exist")
590 def test_28_check_otn_topo_tp(self):
591 response = test_utils.get_otn_topo_request()
592 res = response.json()
593 for node in res['network'][0]['node']:
594 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
595 tpList = node['ietf-network-topology:termination-point']
597 if tp['tp-id'] == 'XPDR2-NETWORK1':
598 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
599 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
601 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
602 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
603 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
605 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
606 def test_29_create_100GE_service_1(self):
607 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
608 self.cr_serv_sample_data["input"]["connection-type"] = "service"
609 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
610 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
611 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
612 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
613 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
614 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
615 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
616 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
617 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
618 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
619 response = test_utils.service_create_request(self.cr_serv_sample_data)
620 self.assertEqual(response.status_code, requests.codes.ok)
621 res = response.json()
622 self.assertIn('PCE calculation in progress',
623 res['output']['configuration-response-common']['response-message'])
624 time.sleep(self.WAITING)
626 def test_30_get_100GE_service_1(self):
627 response = test_utils.get_service_list_request(
628 "services/service-100GE")
629 self.assertEqual(response.status_code, requests.codes.ok)
630 res = response.json()
632 res['services'][0]['administrative-state'], 'inService')
634 res['services'][0]['service-name'], 'service-100GE')
636 res['services'][0]['connection-type'], 'service')
638 res['services'][0]['lifecycle-state'], 'planned')
641 def test_31_check_interface_100GE_CLIENT_xpdra2(self):
642 response = test_utils.check_netconf_node_request(
643 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
644 self.assertEqual(response.status_code, requests.codes.ok)
645 res = response.json()
646 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
647 'administrative-state': 'inService',
648 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
649 'type': 'org-openroadm-interfaces:ethernetCsmacd',
650 'supporting-port': 'C1'
652 input_dict_2 = {'speed': 100000}
653 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
655 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
656 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
658 def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
659 response = test_utils.check_netconf_node_request(
660 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
664 'administrative-state': 'inService',
665 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
666 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
667 'type': 'org-openroadm-interfaces:otnOdu',
668 'supporting-port': 'C1'}
670 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
671 'rate': 'org-openroadm-otn-common-types:ODU4',
672 'monitoring-mode': 'terminated'}
674 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
676 self.assertDictEqual(dict(input_dict_2,
677 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
678 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
679 self.assertDictEqual(
680 {'payload-type': '07', 'exp-payload-type': '07'},
681 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
683 def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
684 response = test_utils.check_netconf_node_request(
685 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
686 self.assertEqual(response.status_code, requests.codes.ok)
687 res = response.json()
688 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
689 'administrative-state': 'inService',
690 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
691 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
692 'type': 'org-openroadm-interfaces:otnOdu',
693 'supporting-port': 'L1'}
695 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
696 'rate': 'org-openroadm-otn-common-types:ODU4',
697 'monitoring-mode': 'not-terminated'}
698 input_dict_3 = {'trib-port-number': 1}
700 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
702 self.assertDictEqual(dict(input_dict_2,
703 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
704 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
705 self.assertDictEqual(dict(input_dict_3,
706 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
707 'parent-odu-allocation']),
708 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
709 self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
710 ['opucn-trib-slots'])
711 self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
712 ['opucn-trib-slots'])
714 def test_34_check_ODU4_connection_xpdra2(self):
715 response = test_utils.check_netconf_node_request(
717 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
718 self.assertEqual(response.status_code, requests.codes.ok)
719 res = response.json()
722 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
723 'direction': 'bidirectional'
726 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
727 res['odu-connection'][0])
728 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
729 res['odu-connection'][0]['destination'])
730 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
731 res['odu-connection'][0]['source'])
733 def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
734 response = test_utils.check_netconf_node_request(
735 "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
736 self.assertEqual(response.status_code, requests.codes.ok)
737 res = response.json()
738 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
739 'administrative-state': 'inService',
740 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
741 'type': 'org-openroadm-interfaces:ethernetCsmacd',
742 'supporting-port': 'C1'
744 input_dict_2 = {'speed': 100000}
745 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
747 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
748 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
750 def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
751 response = test_utils.check_netconf_node_request(
752 "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
753 self.assertEqual(response.status_code, requests.codes.ok)
754 res = response.json()
755 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
756 'administrative-state': 'inService',
757 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
758 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
759 'type': 'org-openroadm-interfaces:otnOdu',
760 'supporting-port': 'C1'}
762 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
763 'rate': 'org-openroadm-otn-common-types:ODU4',
764 'monitoring-mode': 'terminated'}
766 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
768 self.assertDictEqual(dict(input_dict_2,
769 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
770 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
771 self.assertDictEqual(
772 {'payload-type': '07', 'exp-payload-type': '07'},
773 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
775 def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
776 response = test_utils.check_netconf_node_request(
777 "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
778 self.assertEqual(response.status_code, requests.codes.ok)
779 res = response.json()
780 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
781 'administrative-state': 'inService',
782 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
783 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
784 'type': 'org-openroadm-interfaces:otnOdu',
785 'supporting-port': 'C1'}
787 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
788 'rate': 'org-openroadm-otn-common-types:ODU4',
789 'monitoring-mode': 'not-terminated'}
791 input_dict_3 = {'trib-port-number': 1}
793 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
795 self.assertDictEqual(dict(input_dict_2,
796 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
797 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
798 self.assertDictEqual(dict(input_dict_3,
799 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
800 'parent-odu-allocation']),
801 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
802 'parent-odu-allocation'])
805 'org-openroadm-otn-odu-interfaces:odu'][
806 'parent-odu-allocation']['opucn-trib-slots'])
807 self.assertIn('1.20',
809 'org-openroadm-otn-odu-interfaces:odu'][
810 'parent-odu-allocation']['opucn-trib-slots'])
812 def test_38_check_ODU4_connection_xpdrc2(self):
813 response = test_utils.check_netconf_node_request(
815 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
816 self.assertEqual(response.status_code, requests.codes.ok)
817 res = response.json()
820 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
821 'direction': 'bidirectional'
824 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
825 res['odu-connection'][0])
826 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
827 res['odu-connection'][0]['destination'])
828 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
829 res['odu-connection'][0]['source'])
831 def test_39_check_otn_topo_links(self):
832 response = test_utils.get_otn_topo_request()
833 self.assertEqual(response.status_code, requests.codes.ok)
834 res = response.json()
835 nb_links = len(res['network'][0]['ietf-network-topology:link'])
836 self.assertEqual(nb_links, 4)
837 for link in res['network'][0]['ietf-network-topology:link']:
838 linkId = link['link-id']
839 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
840 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
842 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
844 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
846 def test_40_check_otn_topo_tp(self):
847 response = test_utils.get_otn_topo_request()
848 res = response.json()
849 for node in res['network'][0]['node']:
850 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
851 tpList = node['ietf-network-topology:termination-point']
853 if tp['tp-id'] == 'XPDR2-NETWORK1':
854 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
855 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
856 tsPoolList = list(range(1, 20))
858 tsPoolList, xpdrTpPortConAt['ts-pool'])
860 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
862 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
864 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
865 def test_41_create_100GE_service_2(self):
866 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
867 self.cr_serv_sample_data["input"]["connection-type"] = "service"
868 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
869 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
870 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
871 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
872 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
873 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
874 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
875 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
876 response = test_utils.service_create_request(self.cr_serv_sample_data)
877 self.assertEqual(response.status_code, requests.codes.ok)
878 res = response.json()
879 self.assertIn('PCE calculation in progress',
880 res['output']['configuration-response-common']['response-message'])
881 time.sleep(self.WAITING)
883 def test_42_get_100GE_service_2(self):
884 response = test_utils.get_service_list_request(
885 "services/service-100GE2")
886 self.assertEqual(response.status_code, requests.codes.ok)
887 res = response.json()
889 res['services'][0]['administrative-state'], 'inService')
891 res['services'][0]['service-name'], 'service-100GE2')
893 res['services'][0]['connection-type'], 'service')
895 res['services'][0]['lifecycle-state'], 'planned')
898 def test_43_check_service_list(self):
899 response = test_utils.get_service_list_request("")
900 self.assertEqual(response.status_code, requests.codes.ok)
901 res = response.json()
902 self.assertEqual(len(res['service-list']['services']), 4)
905 def test_44_check_otn_topo_links(self):
906 response = test_utils.get_otn_topo_request()
907 self.assertEqual(response.status_code, requests.codes.ok)
908 res = response.json()
909 nb_links = len(res['network'][0]['ietf-network-topology:link'])
910 self.assertEqual(nb_links, 4)
911 for link in res['network'][0]['ietf-network-topology:link']:
912 linkId = link['link-id']
913 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
914 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
916 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
918 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
920 def test_45_check_otn_topo_tp(self):
921 response = test_utils.get_otn_topo_request()
922 res = response.json()
923 for node in res['network'][0]['node']:
924 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
925 tpList = node['ietf-network-topology:termination-point']
927 if tp['tp-id'] == 'XPDR2-NETWORK1':
928 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
929 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
930 tsPoolList = list(range(1, 40))
932 tsPoolList, xpdrTpPortConAt['ts-pool'])
934 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
936 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
938 def test_46_delete_100GE_service_2(self):
939 response = test_utils.service_delete_request("service-100GE2")
940 self.assertEqual(response.status_code, requests.codes.ok)
941 res = response.json()
942 self.assertIn('Renderer service delete in progress',
943 res['output']['configuration-response-common']['response-message'])
944 time.sleep(self.WAITING)
946 def test_47_delete_100GE_service_1(self):
947 response = test_utils.service_delete_request("service-100GE")
948 self.assertEqual(response.status_code, requests.codes.ok)
949 res = response.json()
950 self.assertIn('Renderer service delete in progress',
951 res['output']['configuration-response-common']['response-message'])
952 time.sleep(self.WAITING)
954 def test_48_check_service_list(self):
955 response = test_utils.get_service_list_request("")
956 self.assertEqual(response.status_code, requests.codes.ok)
957 res = response.json()
958 self.assertEqual(len(res['service-list']['services']), 2)
961 def test_49_check_no_ODU4_connection_xpdra2(self):
962 response = test_utils.check_netconf_node_request("XPDR-A2", "")
963 self.assertEqual(response.status_code, requests.codes.ok)
964 res = response.json()
965 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
968 def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
969 response = test_utils.check_netconf_node_request(
970 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
971 self.assertEqual(response.status_code, requests.codes.conflict)
973 def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
974 response = test_utils.check_netconf_node_request(
975 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
976 self.assertEqual(response.status_code, requests.codes.conflict)
978 def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
979 response = test_utils.check_netconf_node_request(
980 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
981 self.assertEqual(response.status_code, requests.codes.conflict)
983 def test_53_check_otn_topo_links(self):
984 response = test_utils.get_otn_topo_request()
985 self.assertEqual(response.status_code, requests.codes.ok)
986 res = response.json()
987 nb_links = len(res['network'][0]['ietf-network-topology:link'])
988 self.assertEqual(nb_links, 4)
989 for link in res['network'][0]['ietf-network-topology:link']:
990 linkId = link['link-id']
991 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
992 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
994 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
996 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
998 def test_54_check_otn_topo_tp(self):
999 response = test_utils.get_otn_topo_request()
1000 res = response.json()
1001 for node in res['network'][0]['node']:
1002 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1003 tpList = node['ietf-network-topology:termination-point']
1005 if tp['tp-id'] == 'XPDR2-NETWORK1':
1006 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1007 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1009 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1011 def test_55_delete_ODUC4_service(self):
1012 response = test_utils.service_delete_request("service1-ODUC4")
1013 self.assertEqual(response.status_code, requests.codes.ok)
1014 res = response.json()
1015 self.assertIn('Renderer service delete in progress',
1016 res['output']['configuration-response-common']['response-message'])
1017 time.sleep(self.WAITING)
1019 def test_56_check_service_list(self):
1020 response = test_utils.get_service_list_request("")
1021 self.assertEqual(response.status_code, requests.codes.ok)
1022 res = response.json()
1023 self.assertEqual(len(res['service-list']['services']), 1)
1026 def test_57_check_no_interface_ODU4_xpdra2(self):
1027 response = test_utils.check_netconf_node_request(
1028 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1029 self.assertEqual(response.status_code, requests.codes.conflict)
1031 def test_58_check_otn_topo_links(self):
1032 self.test_22_check_otn_topo_OTUC4_links()
1034 def test_59_check_otn_topo_tp(self):
1035 response = test_utils.get_otn_topo_request()
1036 res = response.json()
1037 for node in res['network'][0]['node']:
1038 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1039 tpList = node['ietf-network-topology:termination-point']
1041 if tp['tp-id'] == 'XPDR2-NETWORK1':
1042 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1045 def test_60_delete_OTUC4_service(self):
1046 response = test_utils.service_delete_request("service1-OTUC4")
1047 self.assertEqual(response.status_code, requests.codes.ok)
1048 res = response.json()
1049 self.assertIn('Renderer service delete in progress',
1050 res['output']['configuration-response-common']['response-message'])
1051 time.sleep(self.WAITING)
1053 def test_61_get_no_service(self):
1054 response = test_utils.get_service_list_request("")
1055 self.assertEqual(response.status_code, requests.codes.conflict)
1056 res = response.json()
1058 {"error-type": "application", "error-tag": "data-missing",
1059 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1060 res['errors']['error'])
1063 def test_62_check_no_interface_OTUC4_xpdra2(self):
1064 response = test_utils.check_netconf_node_request(
1065 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1066 self.assertEqual(response.status_code, requests.codes.conflict)
1068 def test_63_check_no_interface_OCH_xpdra2(self):
1069 response = test_utils.check_netconf_node_request(
1070 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1071 self.assertEqual(response.status_code, requests.codes.conflict)
1073 def test_64_check_no_interface_OTSI_xpdra2(self):
1074 response = test_utils.check_netconf_node_request(
1075 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1076 self.assertEqual(response.status_code, requests.codes.conflict)
1078 def test_65_getLinks_OtnTopology(self):
1079 response = test_utils.get_otn_topo_request()
1080 self.assertEqual(response.status_code, requests.codes.ok)
1081 res = response.json()
1082 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1084 def test_66_check_openroadm_topo_xpdra2(self):
1085 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1086 self.assertEqual(response.status_code, requests.codes.ok)
1087 res = response.json()
1088 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1089 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1090 self.assertNotIn('wavelength', dict.keys(
1091 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1094 def test_67_check_openroadm_topology(self):
1095 response = test_utils.get_ordm_topo_request("")
1096 self.assertEqual(response.status_code, requests.codes.ok)
1097 res = response.json()
1098 links = res['network'][0]['ietf-network-topology:link']
1099 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1101 def test_68_connect_xprda2_1_N1_to_roadma_PP2(self):
1102 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1103 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1104 self.assertEqual(response.status_code, requests.codes.ok)
1105 res = response.json()
1106 self.assertIn('Xponder Roadm Link created successfully',
1107 res["output"]["result"])
1110 def test_69_connect_roadma_PP2_to_xpdra2_1_N1(self):
1111 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1112 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1113 self.assertEqual(response.status_code, requests.codes.ok)
1114 res = response.json()
1115 self.assertIn('Roadm Xponder links created successfully',
1116 res["output"]["result"])
1119 def test_70_connect_xprdc2_1_N1_to_roadmc_PP2(self):
1120 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1121 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1122 self.assertEqual(response.status_code, requests.codes.ok)
1123 res = response.json()
1124 self.assertIn('Xponder Roadm Link created successfully',
1125 res["output"]["result"])
1128 def test_71_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
1129 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1130 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1131 self.assertEqual(response.status_code, requests.codes.ok)
1132 res = response.json()
1133 self.assertIn('Roadm Xponder links created successfully',
1134 res["output"]["result"])
1138 # test service-create for 400GE service from xpdra2 to xpdrc2
1140 def test_72_create_400GE_service(self):
1141 self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1142 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1143 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1144 response = test_utils.service_create_request(self.cr_serv_sample_data)
1145 self.assertEqual(response.status_code, requests.codes.ok)
1146 res = response.json()
1147 self.assertIn('PCE calculation in progress',
1148 res['output']['configuration-response-common']['response-message'])
1149 time.sleep(self.WAITING)
1151 def test_73_get_400GE_service(self):
1152 response = test_utils.get_service_list_request(
1153 "services/service-400GE")
1154 self.assertEqual(response.status_code, requests.codes.ok)
1155 res = response.json()
1157 res['services'][0]['administrative-state'], 'inService')
1159 res['services'][0]['service-name'], 'service-400GE')
1161 res['services'][0]['connection-type'], 'service')
1163 res['services'][0]['lifecycle-state'], 'planned')
1166 def test_74_check_xc1_roadma(self):
1167 response = test_utils.check_netconf_node_request(
1168 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1169 self.assertEqual(response.status_code, requests.codes.ok)
1170 res = response.json()
1171 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1172 self.assertDictEqual(
1174 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1175 'opticalControlMode': 'gainLoss',
1176 'target-output-power': -3.0
1177 }, **res['roadm-connections'][0]),
1178 res['roadm-connections'][0]
1180 self.assertDictEqual(
1181 {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1182 res['roadm-connections'][0]['source'])
1183 self.assertDictEqual(
1184 {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1185 res['roadm-connections'][0]['destination'])
1188 def test_75_check_topo_xpdra2(self):
1189 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1190 self.assertEqual(response.status_code, requests.codes.ok)
1191 res = response.json()
1192 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1193 for ele in liste_tp:
1194 if ele['tp-id'] == 'XPDR1-NETWORK1':
1195 self.assertEqual({'frequency': 196.08125,
1197 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1198 if ele['tp-id'] == 'XPDR1-CLIENT1':
1199 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1202 def test_76_check_topo_roadma_SRG1(self):
1203 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1204 self.assertEqual(response.status_code, requests.codes.ok)
1205 res = response.json()
1206 freq_map = base64.b64decode(
1207 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1208 freq_map_array = [int(x) for x in freq_map]
1209 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1210 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1211 for ele in liste_tp:
1212 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1213 freq_map = base64.b64decode(
1214 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1215 freq_map_array = [int(x) for x in freq_map]
1216 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1217 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1218 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1221 def test_77_check_topo_roadma_DEG1(self):
1222 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1223 self.assertEqual(response.status_code, requests.codes.ok)
1224 res = response.json()
1225 freq_map = base64.b64decode(
1226 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1227 freq_map_array = [int(x) for x in freq_map]
1228 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1229 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1230 for ele in liste_tp:
1231 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1232 freq_map = base64.b64decode(
1233 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1234 freq_map_array = [int(x) for x in freq_map]
1235 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1236 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1237 freq_map = base64.b64decode(
1238 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1239 freq_map_array = [int(x) for x in freq_map]
1240 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1243 def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1244 response = test_utils.check_netconf_node_request(
1245 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1246 self.assertEqual(response.status_code, requests.codes.ok)
1247 res = response.json()
1248 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1249 'administrative-state': 'inService',
1250 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1251 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1252 'supporting-port': 'C1'
1254 input_dict_2 = {'speed': 400000}
1255 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1256 res['interface'][0])
1257 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1258 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1260 def test_79_check_interface_OTSI_xpdra2(self):
1261 response = test_utils.check_netconf_node_request(
1262 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1263 self.assertEqual(response.status_code, requests.codes.ok)
1264 res = response.json()
1265 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1266 'administrative-state': 'inService',
1267 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1268 'type': 'org-openroadm-interfaces:otsi',
1269 'supporting-port': 'L1'}
1271 "frequency": 196.0812,
1272 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1273 "fec": "org-openroadm-common-types:ofec",
1274 "transmit-power": -5,
1275 "provision-mode": "explicit",
1276 "modulation-format": "dp-qam16"}
1278 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1279 res['interface'][0])
1280 self.assertDictEqual(dict(input_dict_2,
1281 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1282 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1283 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1284 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1286 def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1287 response = test_utils.check_netconf_node_request(
1288 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1289 self.assertEqual(response.status_code, requests.codes.ok)
1290 res = response.json()
1291 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1292 'administrative-state': 'inService',
1293 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1294 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1295 'type': 'org-openroadm-interfaces:otsi-group',
1296 'supporting-port': 'L1'}
1297 input_dict_2 = {"group-id": 1,
1298 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1300 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1301 res['interface'][0])
1302 self.assertDictEqual(dict(input_dict_2,
1303 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1304 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1306 def test_81_check_interface_OTUC4_xpdra2(self):
1307 response = test_utils.check_netconf_node_request(
1308 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1309 self.assertEqual(response.status_code, requests.codes.ok)
1310 res = response.json()
1311 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1312 'administrative-state': 'inService',
1313 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1314 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1315 'type': 'org-openroadm-interfaces:otnOtu',
1316 'supporting-port': 'L1'}
1317 input_dict_2 = {"tx-sapi": "ANeUjNzWtDLV",
1318 "expected-dapi": "ANeUjNzWtDLV",
1319 'tx-dapi': 'AKsqPmWceByv',
1320 'expected-sapi': 'AKsqPmWceByv',
1321 "rate": "org-openroadm-otn-common-types:OTUCn",
1322 "degthr-percentage": 100,
1323 "tim-detect-mode": "Disabled",
1325 "degm-intervals": 2}
1327 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1328 res['interface'][0])
1329 self.assertDictEqual(dict(input_dict_2,
1330 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1331 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1333 def test_82_check_interface_ODUC4_xpdra2(self):
1334 response = test_utils.check_netconf_node_request(
1335 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1336 self.assertEqual(response.status_code, requests.codes.ok)
1337 res = response.json()
1338 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1339 'administrative-state': 'inService',
1340 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1341 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTUC4',
1342 'type': 'org-openroadm-interfaces:otnOdu',
1343 'supporting-port': 'L1',
1344 'circuit-id': 'TBD',
1345 'description': 'TBD'}
1346 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1347 "tim-detect-mode": "Disabled",
1348 "degm-intervals": 2,
1349 "degthr-percentage": 100,
1350 "monitoring-mode": "terminated",
1351 "rate": "org-openroadm-otn-common-types:ODUCn",
1353 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1355 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1356 res['interface'][0])
1357 self.assertDictEqual(dict(input_dict_2,
1358 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1359 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1360 self.assertDictEqual(dict(input_dict_3,
1361 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1362 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1363 self.assertEqual('XPDR1-NETWORK1-OTUC4', res['interface'][0]['supporting-interface-list'][0])
1365 def test_83_delete_400GE_service(self):
1366 response = test_utils.service_delete_request("service-400GE")
1367 self.assertEqual(response.status_code, requests.codes.ok)
1368 res = response.json()
1369 self.assertIn('Renderer service delete in progress',
1370 res['output']['configuration-response-common']['response-message'])
1371 time.sleep(self.WAITING)
1373 def test_84_get_no_service(self):
1374 response = test_utils.get_service_list_request("")
1375 self.assertEqual(response.status_code, requests.codes.conflict)
1376 res = response.json()
1378 {"error-type": "application", "error-tag": "data-missing",
1379 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1380 res['errors']['error'])
1383 def test_85_check_no_interface_ODUC4_xpdra2(self):
1384 response = test_utils.check_netconf_node_request(
1385 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1386 self.assertEqual(response.status_code, requests.codes.conflict)
1388 def test_86_check_no_interface_OTUC4_xpdra2(self):
1389 response = test_utils.check_netconf_node_request(
1390 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1391 self.assertEqual(response.status_code, requests.codes.conflict)
1393 def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1394 response = test_utils.check_netconf_node_request(
1395 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1396 self.assertEqual(response.status_code, requests.codes.conflict)
1398 def test_88_check_no_interface_OTSI_xpdra2(self):
1399 response = test_utils.check_netconf_node_request(
1400 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1401 self.assertEqual(response.status_code, requests.codes.conflict)
1403 def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1404 response = test_utils.check_netconf_node_request(
1405 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1406 self.assertEqual(response.status_code, requests.codes.conflict)
1408 def test_90_disconnect_xponders_from_roadm(self):
1409 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1410 response = test_utils.get_ordm_topo_request("")
1411 self.assertEqual(response.status_code, requests.codes.ok)
1412 res = response.json()
1413 links = res['network'][0]['ietf-network-topology:link']
1415 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1416 link_name = link["link-id"]
1417 response = test_utils.delete_request(url+link_name)
1418 self.assertEqual(response.status_code, requests.codes.ok)
1420 def test_91_disconnect_xpdra2(self):
1421 response = test_utils.unmount_device("XPDR-A2")
1422 self.assertEqual(response.status_code, requests.codes.ok,
1423 test_utils.CODE_SHOULD_BE_200)
1425 def test_92_disconnect_xpdrc2(self):
1426 response = test_utils.unmount_device("XPDR-C2")
1427 self.assertEqual(response.status_code, requests.codes.ok,
1428 test_utils.CODE_SHOULD_BE_200)
1430 def test_93_disconnect_roadmA(self):
1431 response = test_utils.unmount_device("ROADM-A1")
1432 self.assertEqual(response.status_code, requests.codes.ok,
1433 test_utils.CODE_SHOULD_BE_200)
1435 def test_94_disconnect_roadmC(self):
1436 response = test_utils.unmount_device("ROADM-C1")
1437 self.assertEqual(response.status_code, requests.codes.ok,
1438 test_utils.CODE_SHOULD_BE_200)
1441 if __name__ == "__main__":
1442 unittest.main(verbosity=2)