3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
21 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION_221 = '2.2.1'
34 NODE_VERSION_71 = '7.1'
36 cr_serv_sample_data = {"input": {
37 "sdnc-request-header": {
38 "request-id": "request-1",
39 "rpc-action": "service-create",
40 "request-system-id": "appname"
42 "service-name": "service1-OTUC4",
43 "common-id": "commonId",
44 "connection-type": "infrastructure",
46 "service-rate": "400",
48 "service-format": "OTU",
49 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
53 "committed-info-rate": "100000",
54 "committed-burst-size": "64"
59 "port-device-name": "XPDR-A2-XPDR2",
61 "port-name": "XPDR2-NETWORK1",
62 "port-rack": "000000.00",
63 "port-shelf": "Chassis#1"
66 "lgx-device-name": "Some lgx-device-name",
67 "lgx-port-name": "Some lgx-port-name",
68 "lgx-port-rack": "000000.00",
69 "lgx-port-shelf": "00"
74 "port-device-name": "XPDR-A2-XPDR2",
76 "port-name": "XPDR2-NETWORK1",
77 "port-rack": "000000.00",
78 "port-shelf": "Chassis#1"
81 "lgx-device-name": "Some lgx-device-name",
82 "lgx-port-name": "Some lgx-port-name",
83 "lgx-port-rack": "000000.00",
84 "lgx-port-shelf": "00"
90 "service-rate": "400",
92 "service-format": "OTU",
93 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
97 "committed-info-rate": "100000",
98 "committed-burst-size": "64"
103 "port-device-name": "XPDR-C2-XPDR2",
104 "port-type": "fixed",
105 "port-name": "XPDR2-NETWORK1",
106 "port-rack": "000000.00",
107 "port-shelf": "Chassis#1"
110 "lgx-device-name": "Some lgx-device-name",
111 "lgx-port-name": "Some lgx-port-name",
112 "lgx-port-rack": "000000.00",
113 "lgx-port-shelf": "00"
118 "port-device-name": "XPDR-C2-XPDR2",
119 "port-type": "fixed",
120 "port-name": "XPDR2-NETWORK1",
121 "port-rack": "000000.00",
122 "port-shelf": "Chassis#1"
125 "lgx-device-name": "Some lgx-device-name",
126 "lgx-port-name": "Some lgx-port-name",
127 "lgx-port-rack": "000000.00",
128 "lgx-port-shelf": "00"
133 "due-date": "2018-06-15T00:00:01Z",
134 "operator-contact": "pw1234"
140 cls.processes = test_utils.start_tpce()
141 cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
142 ('roadma', cls.NODE_VERSION_221),
143 ('roadmc', cls.NODE_VERSION_221),
144 ('xpdrc2', cls.NODE_VERSION_71)])
147 def tearDownClass(cls):
148 # pylint: disable=not-an-iterable
149 for process in cls.processes:
150 test_utils.shutdown_process(process)
151 print("all processes killed")
156 def test_01_connect_xpdra2(self):
157 response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_02_connect_xpdrc2(self):
162 response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_03_connect_rdma(self):
167 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
168 self.assertEqual(response.status_code,
169 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
171 def test_04_connect_rdmc(self):
172 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
173 self.assertEqual(response.status_code,
174 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
176 def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
177 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
178 "ROADM-A1", "1", "SRG1-PP2-TXRX")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 self.assertIn('Xponder Roadm Link created successfully',
182 res["output"]["result"])
185 def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
186 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
187 "ROADM-A1", "1", "SRG1-PP2-TXRX")
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Roadm Xponder links created successfully',
191 res["output"]["result"])
194 def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
196 "ROADM-C1", "1", "SRG1-PP2-TXRX")
197 self.assertEqual(response.status_code, requests.codes.ok)
198 res = response.json()
199 self.assertIn('Xponder Roadm Link created successfully',
200 res["output"]["result"])
203 def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
204 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
205 "ROADM-C1", "1", "SRG1-PP2-TXRX")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
208 self.assertIn('Roadm Xponder links created successfully',
209 res["output"]["result"])
212 def test_09_add_omsAttributes_roadma_roadmc(self):
213 # Config ROADMA-ROADMC oms-attributes
215 "auto-spanloss": "true",
216 "spanloss-base": 11.4,
217 "spanloss-current": 12,
218 "engineered-spanloss": 12.2,
219 "link-concatenation": [{
222 "SRLG-length": 100000,
224 response = test_utils.add_oms_attr_request(
225 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
226 self.assertEqual(response.status_code, requests.codes.created)
228 def test_10_add_omsAttributes_roadmc_roadma(self):
229 # Config ROADMC-ROADMA oms-attributes
231 "auto-spanloss": "true",
232 "spanloss-base": 11.4,
233 "spanloss-current": 12,
234 "engineered-spanloss": 12.2,
235 "link-concatenation": [{
238 "SRLG-length": 100000,
240 response = test_utils.add_oms_attr_request(
241 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
242 self.assertEqual(response.status_code, requests.codes.created)
244 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
245 def test_11_check_otn_topology(self):
246 response = test_utils.get_otn_topo_request()
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
249 nbNode = len(res['network'][0]['node'])
250 self.assertEqual(nbNode, 4, 'There should be 4 nodes')
251 self.assertNotIn('ietf-network-topology:link', res['network'][0])
253 def test_12_create_OTUC4_service(self):
254 response = test_utils.service_create_request(self.cr_serv_sample_data)
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 self.assertIn('PCE calculation in progress',
258 res['output']['configuration-response-common']['response-message'])
259 time.sleep(self.WAITING)
261 def test_13_get_OTUC4_service1(self):
262 response = test_utils.get_service_list_request(
263 "services/service1-OTUC4")
264 self.assertEqual(response.status_code, requests.codes.ok)
265 res = response.json()
267 res['services'][0]['administrative-state'], 'inService')
269 res['services'][0]['service-name'], 'service1-OTUC4')
271 res['services'][0]['connection-type'], 'infrastructure')
273 res['services'][0]['lifecycle-state'], 'planned')
275 res['services'][0]['service-layer'], 'wdm')
277 res['services'][0]['service-a-end']['service-rate'], 400)
279 res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
282 res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
285 # Check correct configuration of devices
286 def test_14_check_interface_och_xpdra2(self):
287 response = test_utils.check_netconf_node_request(
288 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
292 'administrative-state': 'inService',
293 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294 'type': 'org-openroadm-interfaces:otsi',
295 'supporting-port': 'L1'
296 }, **res['interface'][0]),
299 self.assertDictEqual(
300 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
301 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
302 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
303 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
305 def test_15_check_interface_OTSI_GROUP_xpdra2(self):
306 response = test_utils.check_netconf_node_request(
307 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
311 'administrative-state': 'inService',
312 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
313 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
314 'type': 'org-openroadm-interfaces:otsi-group',
315 'supporting-port': 'L1'
317 input_dict_2 = {"group-id": 1,
318 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
320 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
322 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
323 ['org-openroadm-otsi-group-interfaces:otsi-group']),
325 ['org-openroadm-otsi-group-interfaces:otsi-group'])
327 def test_16_check_interface_OTUC4_xpdra2(self):
328 response = test_utils.check_netconf_node_request(
329 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
330 self.assertEqual(response.status_code, requests.codes.ok)
331 res = response.json()
332 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
333 'administrative-state': 'inService',
334 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
335 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
336 'type': 'org-openroadm-interfaces:otnOtu',
337 'supporting-port': 'L1'
339 input_dict_2 = {'tx-sapi': 'LY9PxYJqUbw=',
340 'expected-dapi': 'LY9PxYJqUbw=',
341 'rate': 'org-openroadm-otn-common-types:OTUCn',
342 'degthr-percentage': 100,
346 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
348 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
350 ['org-openroadm-otn-otu-interfaces:otu'])
352 def test_17_check_interface_och_xpdrc2(self):
353 response = test_utils.check_netconf_node_request(
354 "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
358 'administrative-state': 'inService',
359 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
360 'type': 'org-openroadm-interfaces:otsi',
361 'supporting-port': 'L1'
362 }, **res['interface'][0]),
365 self.assertDictEqual(
366 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
367 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
368 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
369 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
371 def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
372 response = test_utils.check_netconf_node_request(
373 "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
376 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
377 'administrative-state': 'inService',
378 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
379 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
380 'type': 'org-openroadm-interfaces:otsi-group',
381 'supporting-port': 'L1'
383 input_dict_2 = {"group-id": 1,
384 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
386 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
388 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
389 ['org-openroadm-otsi-group-interfaces:otsi-group']),
391 ['org-openroadm-otsi-group-interfaces:otsi-group'])
393 def test_19_check_interface_OTUC4_xpdrc2(self):
394 response = test_utils.check_netconf_node_request(
395 "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
396 self.assertEqual(response.status_code, requests.codes.ok)
397 res = response.json()
398 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
399 'administrative-state': 'inService',
400 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
401 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
402 'type': 'org-openroadm-interfaces:otnOtu',
403 'supporting-port': 'L1'
405 input_dict_2 = {'tx-dapi': 'LY9PxYJqUbw=',
406 'expected-sapi': 'LY9PxYJqUbw=',
407 'tx-sapi': 'Nmbu2MNHvc4=',
408 'expected-dapi': 'Nmbu2MNHvc4=',
409 'rate': 'org-openroadm-otn-common-types:OTUCn',
410 'degthr-percentage': 100,
415 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
418 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
420 ['org-openroadm-otn-otu-interfaces:otu'])
422 def test_20_check_no_interface_ODUC4_xpdra2(self):
423 response = test_utils.check_netconf_node_request(
424 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
425 self.assertEqual(response.status_code, requests.codes.conflict)
426 res = response.json()
428 {"error-type": "application", "error-tag": "data-missing",
429 "error-message": "Request could not be completed because the relevant data model content does not exist"},
430 res['errors']['error'])
432 def test_21_check_openroadm_topo_xpdra2(self):
433 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
434 self.assertEqual(response.status_code, requests.codes.ok)
435 res = response.json()
436 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
437 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
438 self.assertEqual({'frequency': 196.08125,
440 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
443 def test_22_check_otn_topo_OTUC4_links(self):
444 response = test_utils.get_otn_topo_request()
445 self.assertEqual(response.status_code, requests.codes.ok)
446 res = response.json()
447 nb_links = len(res['network'][0]['ietf-network-topology:link'])
448 self.assertEqual(nb_links, 2)
449 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
450 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
451 for link in res['network'][0]['ietf-network-topology:link']:
452 self.assertIn(link['link-id'], listLinkId)
454 link['transportpce-topology:otn-link-type'], 'OTUC4')
456 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
458 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
460 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
462 link['org-openroadm-common-network:opposite-link'], listLinkId)
464 # test service-create for ODU4 service from xpdra2 to xpdrc2
465 def test_23_create_ODUC4_service(self):
466 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
467 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
468 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
469 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
470 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
471 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
472 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
474 response = test_utils.service_create_request(self.cr_serv_sample_data)
475 self.assertEqual(response.status_code, requests.codes.ok)
476 res = response.json()
477 self.assertIn('PCE calculation in progress',
478 res['output']['configuration-response-common']['response-message'])
479 time.sleep(self.WAITING)
481 def test_24_get_ODUC4_service1(self):
482 response = test_utils.get_service_list_request(
483 "services/service1-ODUC4")
484 self.assertEqual(response.status_code, requests.codes.ok)
485 res = response.json()
487 res['services'][0]['administrative-state'], 'inService')
489 res['services'][0]['service-name'], 'service1-ODUC4')
491 res['services'][0]['connection-type'], 'infrastructure')
493 res['services'][0]['lifecycle-state'], 'planned')
495 res['services'][0]['service-layer'], 'wdm')
497 res['services'][0]['service-a-end']['service-rate'], 400)
499 res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
501 res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
504 def test_25_check_interface_ODUC4_xpdra2(self):
505 response = test_utils.check_netconf_node_request(
506 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
507 self.assertEqual(response.status_code, requests.codes.ok)
508 res = response.json()
509 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
510 'administrative-state': 'inService',
511 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
512 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
513 'type': 'org-openroadm-interfaces:otnOdu',
514 'supporting-port': 'L1'}
515 # SAPI/DAPI are added in the Otu4 renderer
516 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
517 'rate': 'org-openroadm-otn-common-types:ODUCn',
518 'expected-dapi': 'Nmbu2MNHvc4=',
519 'expected-sapi': 'Nmbu2MNHvc4=',
520 'tx-dapi': 'Nmbu2MNHvc4=',
521 'tx-sapi': 'LY9PxYJqUbw='}
523 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
525 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
526 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
527 self.assertDictEqual(
528 {'payload-type': '22', 'exp-payload-type': '22'},
529 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
531 def test_26_check_interface_ODUC4_xpdrc2(self):
532 response = test_utils.check_netconf_node_request(
533 "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
534 self.assertEqual(response.status_code, requests.codes.ok)
535 res = response.json()
536 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
537 'administrative-state': 'inService',
538 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
539 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
540 'type': 'org-openroadm-interfaces:otnOdu',
541 'supporting-port': 'L1'}
542 # SAPI/DAPI are added in the Otu4 renderer
543 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
544 'rate': 'org-openroadm-otn-common-types:ODUCn',
545 'tx-sapi': 'Nmbu2MNHvc4=',
546 'tx-dapi': 'LY9PxYJqUbw=',
547 'expected-sapi': 'LY9PxYJqUbw=',
548 'expected-dapi': 'LY9PxYJqUbw='
550 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
552 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
553 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
554 self.assertDictEqual(
555 {'payload-type': '22', 'exp-payload-type': '22'},
556 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
558 def test_27_check_otn_topo_links(self):
559 response = test_utils.get_otn_topo_request()
560 self.assertEqual(response.status_code, requests.codes.ok)
561 res = response.json()
562 nb_links = len(res['network'][0]['ietf-network-topology:link'])
563 self.assertEqual(nb_links, 4)
564 for link in res['network'][0]['ietf-network-topology:link']:
565 linkId = link['link-id']
566 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
567 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
569 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
571 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
572 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
573 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
575 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
577 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
579 link['transportpce-topology:otn-link-type'], 'ODUC4')
581 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
582 self.assertIn(link['org-openroadm-common-network:opposite-link'],
583 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
584 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
586 self.fail("this link should not exist")
588 def test_28_check_otn_topo_tp(self):
589 response = test_utils.get_otn_topo_request()
590 res = response.json()
591 for node in res['network'][0]['node']:
592 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
593 tpList = node['ietf-network-topology:termination-point']
595 if tp['tp-id'] == 'XPDR2-NETWORK1':
596 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
597 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
599 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
600 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
601 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
603 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
604 def test_29_create_100GE_service_1(self):
605 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
606 self.cr_serv_sample_data["input"]["connection-type"] = "service"
607 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
608 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
609 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
610 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
612 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
613 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
614 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
615 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
616 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
617 response = test_utils.service_create_request(self.cr_serv_sample_data)
618 self.assertEqual(response.status_code, requests.codes.ok)
619 res = response.json()
620 self.assertIn('PCE calculation in progress',
621 res['output']['configuration-response-common']['response-message'])
622 time.sleep(self.WAITING)
624 def test_30_get_100GE_service_1(self):
625 response = test_utils.get_service_list_request(
626 "services/service-100GE")
627 self.assertEqual(response.status_code, requests.codes.ok)
628 res = response.json()
630 res['services'][0]['administrative-state'], 'inService')
632 res['services'][0]['service-name'], 'service-100GE')
634 res['services'][0]['connection-type'], 'service')
636 res['services'][0]['lifecycle-state'], 'planned')
639 def test_31_check_interface_100GE_CLIENT_xpdra2(self):
640 response = test_utils.check_netconf_node_request(
641 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
642 self.assertEqual(response.status_code, requests.codes.ok)
643 res = response.json()
644 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
645 'administrative-state': 'inService',
646 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
647 'type': 'org-openroadm-interfaces:ethernetCsmacd',
648 'supporting-port': 'C1'
650 input_dict_2 = {'speed': 100000}
651 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
653 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
654 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
656 def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
657 response = test_utils.check_netconf_node_request(
658 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
659 self.assertEqual(response.status_code, requests.codes.ok)
660 res = response.json()
661 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
662 'administrative-state': 'inService',
663 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
664 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
665 'type': 'org-openroadm-interfaces:otnOdu',
666 'supporting-port': 'C1'}
668 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
669 'rate': 'org-openroadm-otn-common-types:ODU4',
670 'monitoring-mode': 'terminated'}
672 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
674 self.assertDictEqual(dict(input_dict_2,
675 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
676 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
677 self.assertDictEqual(
678 {'payload-type': '07', 'exp-payload-type': '07'},
679 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
681 def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
682 response = test_utils.check_netconf_node_request(
683 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
684 self.assertEqual(response.status_code, requests.codes.ok)
685 res = response.json()
686 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
687 'administrative-state': 'inService',
688 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
689 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
690 'type': 'org-openroadm-interfaces:otnOdu',
691 'supporting-port': 'L1'}
693 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
694 'rate': 'org-openroadm-otn-common-types:ODU4',
695 'monitoring-mode': 'not-terminated'}
696 input_dict_3 = {'trib-port-number': 1}
698 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
700 self.assertDictEqual(dict(input_dict_2,
701 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
702 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
703 self.assertDictEqual(dict(input_dict_3,
704 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
705 'parent-odu-allocation']),
706 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
707 self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
708 ['opucn-trib-slots'])
709 self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
710 ['opucn-trib-slots'])
712 def test_34_check_ODU4_connection_xpdra2(self):
713 response = test_utils.check_netconf_node_request(
715 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
716 self.assertEqual(response.status_code, requests.codes.ok)
717 res = response.json()
720 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
721 'direction': 'bidirectional'
724 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
725 res['odu-connection'][0])
726 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
727 res['odu-connection'][0]['destination'])
728 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
729 res['odu-connection'][0]['source'])
731 def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
732 response = test_utils.check_netconf_node_request(
733 "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
734 self.assertEqual(response.status_code, requests.codes.ok)
735 res = response.json()
736 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
737 'administrative-state': 'inService',
738 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
739 'type': 'org-openroadm-interfaces:ethernetCsmacd',
740 'supporting-port': 'C1'
742 input_dict_2 = {'speed': 100000}
743 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
745 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
746 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
748 def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
749 response = test_utils.check_netconf_node_request(
750 "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
751 self.assertEqual(response.status_code, requests.codes.ok)
752 res = response.json()
753 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
754 'administrative-state': 'inService',
755 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
756 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
757 'type': 'org-openroadm-interfaces:otnOdu',
758 'supporting-port': 'C1'}
760 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
761 'rate': 'org-openroadm-otn-common-types:ODU4',
762 'monitoring-mode': 'terminated'}
764 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
766 self.assertDictEqual(dict(input_dict_2,
767 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
768 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
769 self.assertDictEqual(
770 {'payload-type': '07', 'exp-payload-type': '07'},
771 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
773 def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
774 response = test_utils.check_netconf_node_request(
775 "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
776 self.assertEqual(response.status_code, requests.codes.ok)
777 res = response.json()
778 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
779 'administrative-state': 'inService',
780 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
781 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
782 'type': 'org-openroadm-interfaces:otnOdu',
783 'supporting-port': 'C1'}
785 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
786 'rate': 'org-openroadm-otn-common-types:ODU4',
787 'monitoring-mode': 'not-terminated'}
789 input_dict_3 = {'trib-port-number': 1}
791 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
793 self.assertDictEqual(dict(input_dict_2,
794 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
795 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
796 self.assertDictEqual(dict(input_dict_3,
797 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
798 'parent-odu-allocation']),
799 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
800 'parent-odu-allocation'])
803 'org-openroadm-otn-odu-interfaces:odu'][
804 'parent-odu-allocation']['opucn-trib-slots'])
805 self.assertIn('1.20',
807 'org-openroadm-otn-odu-interfaces:odu'][
808 'parent-odu-allocation']['opucn-trib-slots'])
810 def test_38_check_ODU4_connection_xpdrc2(self):
811 response = test_utils.check_netconf_node_request(
813 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
814 self.assertEqual(response.status_code, requests.codes.ok)
815 res = response.json()
818 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
819 'direction': 'bidirectional'
822 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
823 res['odu-connection'][0])
824 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
825 res['odu-connection'][0]['destination'])
826 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
827 res['odu-connection'][0]['source'])
829 def test_39_check_otn_topo_links(self):
830 response = test_utils.get_otn_topo_request()
831 self.assertEqual(response.status_code, requests.codes.ok)
832 res = response.json()
833 nb_links = len(res['network'][0]['ietf-network-topology:link'])
834 self.assertEqual(nb_links, 4)
835 for link in res['network'][0]['ietf-network-topology:link']:
836 linkId = link['link-id']
837 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
838 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
840 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
842 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
844 def test_40_check_otn_topo_tp(self):
845 response = test_utils.get_otn_topo_request()
846 res = response.json()
847 for node in res['network'][0]['node']:
848 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
849 tpList = node['ietf-network-topology:termination-point']
851 if tp['tp-id'] == 'XPDR2-NETWORK1':
852 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
853 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
854 tsPoolList = list(range(1, 20))
856 tsPoolList, xpdrTpPortConAt['ts-pool'])
858 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
860 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
862 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
863 def test_41_create_100GE_service_2(self):
864 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
865 self.cr_serv_sample_data["input"]["connection-type"] = "service"
866 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
867 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
868 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
869 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
870 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
871 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
872 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
873 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
874 response = test_utils.service_create_request(self.cr_serv_sample_data)
875 self.assertEqual(response.status_code, requests.codes.ok)
876 res = response.json()
877 self.assertIn('PCE calculation in progress',
878 res['output']['configuration-response-common']['response-message'])
879 time.sleep(self.WAITING)
881 def test_42_get_100GE_service_2(self):
882 response = test_utils.get_service_list_request(
883 "services/service-100GE2")
884 self.assertEqual(response.status_code, requests.codes.ok)
885 res = response.json()
887 res['services'][0]['administrative-state'], 'inService')
889 res['services'][0]['service-name'], 'service-100GE2')
891 res['services'][0]['connection-type'], 'service')
893 res['services'][0]['lifecycle-state'], 'planned')
896 def test_43_check_service_list(self):
897 response = test_utils.get_service_list_request("")
898 self.assertEqual(response.status_code, requests.codes.ok)
899 res = response.json()
900 self.assertEqual(len(res['service-list']['services']), 4)
903 def test_44_check_otn_topo_links(self):
904 response = test_utils.get_otn_topo_request()
905 self.assertEqual(response.status_code, requests.codes.ok)
906 res = response.json()
907 nb_links = len(res['network'][0]['ietf-network-topology:link'])
908 self.assertEqual(nb_links, 4)
909 for link in res['network'][0]['ietf-network-topology:link']:
910 linkId = link['link-id']
911 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
912 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
914 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
916 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
918 def test_45_check_otn_topo_tp(self):
919 response = test_utils.get_otn_topo_request()
920 res = response.json()
921 for node in res['network'][0]['node']:
922 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
923 tpList = node['ietf-network-topology:termination-point']
925 if tp['tp-id'] == 'XPDR2-NETWORK1':
926 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
927 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
928 tsPoolList = list(range(1, 40))
930 tsPoolList, xpdrTpPortConAt['ts-pool'])
932 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
934 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
936 def test_46_delete_100GE_service_2(self):
937 response = test_utils.service_delete_request("service-100GE2")
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 self.assertIn('Renderer service delete in progress',
941 res['output']['configuration-response-common']['response-message'])
942 time.sleep(self.WAITING)
944 def test_47_delete_100GE_service_1(self):
945 response = test_utils.service_delete_request("service-100GE")
946 self.assertEqual(response.status_code, requests.codes.ok)
947 res = response.json()
948 self.assertIn('Renderer service delete in progress',
949 res['output']['configuration-response-common']['response-message'])
950 time.sleep(self.WAITING)
952 def test_48_check_service_list(self):
953 response = test_utils.get_service_list_request("")
954 self.assertEqual(response.status_code, requests.codes.ok)
955 res = response.json()
956 self.assertEqual(len(res['service-list']['services']), 2)
959 def test_49_check_no_ODU4_connection_xpdra2(self):
960 response = test_utils.check_netconf_node_request("XPDR-A2", "")
961 self.assertEqual(response.status_code, requests.codes.ok)
962 res = response.json()
963 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
966 def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
967 response = test_utils.check_netconf_node_request(
968 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
969 self.assertEqual(response.status_code, requests.codes.conflict)
971 def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
972 response = test_utils.check_netconf_node_request(
973 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
974 self.assertEqual(response.status_code, requests.codes.conflict)
976 def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
977 response = test_utils.check_netconf_node_request(
978 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
979 self.assertEqual(response.status_code, requests.codes.conflict)
981 def test_53_check_otn_topo_links(self):
982 response = test_utils.get_otn_topo_request()
983 self.assertEqual(response.status_code, requests.codes.ok)
984 res = response.json()
985 nb_links = len(res['network'][0]['ietf-network-topology:link'])
986 self.assertEqual(nb_links, 4)
987 for link in res['network'][0]['ietf-network-topology:link']:
988 linkId = link['link-id']
989 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
990 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
992 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
994 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
996 def test_54_check_otn_topo_tp(self):
997 response = test_utils.get_otn_topo_request()
998 res = response.json()
999 for node in res['network'][0]['node']:
1000 if (node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2'):
1001 tpList = node['ietf-network-topology:termination-point']
1003 if tp['tp-id'] == 'XPDR2-NETWORK1':
1004 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1005 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1007 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1009 def test_55_delete_ODUC4_service(self):
1010 response = test_utils.service_delete_request("service1-ODUC4")
1011 self.assertEqual(response.status_code, requests.codes.ok)
1012 res = response.json()
1013 self.assertIn('Renderer service delete in progress',
1014 res['output']['configuration-response-common']['response-message'])
1015 time.sleep(self.WAITING)
1017 def test_56_check_service_list(self):
1018 response = test_utils.get_service_list_request("")
1019 self.assertEqual(response.status_code, requests.codes.ok)
1020 res = response.json()
1021 self.assertEqual(len(res['service-list']['services']), 1)
1024 def test_57_check_no_interface_ODU4_xpdra2(self):
1025 response = test_utils.check_netconf_node_request(
1026 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1027 self.assertEqual(response.status_code, requests.codes.conflict)
1029 def test_58_check_otn_topo_links(self):
1030 self.test_22_check_otn_topo_OTUC4_links()
1032 def test_59_check_otn_topo_tp(self):
1033 response = test_utils.get_otn_topo_request()
1034 res = response.json()
1035 for node in res['network'][0]['node']:
1036 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
1037 tpList = node['ietf-network-topology:termination-point']
1039 if tp['tp-id'] == 'XPDR2-NETWORK1':
1040 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1043 def test_60_delete_OTUC4_service(self):
1044 response = test_utils.service_delete_request("service1-OTUC4")
1045 self.assertEqual(response.status_code, requests.codes.ok)
1046 res = response.json()
1047 self.assertIn('Renderer service delete in progress',
1048 res['output']['configuration-response-common']['response-message'])
1049 time.sleep(self.WAITING)
1051 def test_61_get_no_service(self):
1052 response = test_utils.get_service_list_request("")
1053 self.assertEqual(response.status_code, requests.codes.conflict)
1054 res = response.json()
1056 {"error-type": "application", "error-tag": "data-missing",
1057 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1058 res['errors']['error'])
1061 def test_62_check_no_interface_OTUC4_xpdra2(self):
1062 response = test_utils.check_netconf_node_request(
1063 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1064 self.assertEqual(response.status_code, requests.codes.conflict)
1066 def test_63_check_no_interface_OCH_xpdra2(self):
1067 response = test_utils.check_netconf_node_request(
1068 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1069 self.assertEqual(response.status_code, requests.codes.conflict)
1071 def test_64_check_no_interface_OTSI_xpdra2(self):
1072 response = test_utils.check_netconf_node_request(
1073 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1074 self.assertEqual(response.status_code, requests.codes.conflict)
1076 def test_65_getLinks_OtnTopology(self):
1077 response = test_utils.get_otn_topo_request()
1078 self.assertEqual(response.status_code, requests.codes.ok)
1079 res = response.json()
1080 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1082 def test_66_check_openroadm_topo_xpdra2(self):
1083 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1084 self.assertEqual(response.status_code, requests.codes.ok)
1085 res = response.json()
1086 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1087 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1088 self.assertNotIn('wavelength', dict.keys(
1089 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1092 def test_67_check_openroadm_topology(self):
1093 response = test_utils.get_ordm_topo_request("")
1094 self.assertEqual(response.status_code, requests.codes.ok)
1095 res = response.json()
1096 links = res['network'][0]['ietf-network-topology:link']
1097 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1099 def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1100 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1101 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1102 self.assertEqual(response.status_code, requests.codes.ok)
1103 res = response.json()
1104 self.assertIn('Xponder Roadm Link created successfully',
1105 res["output"]["result"])
1108 def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1109 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1110 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1111 self.assertEqual(response.status_code, requests.codes.ok)
1112 res = response.json()
1113 self.assertIn('Roadm Xponder links created successfully',
1114 res["output"]["result"])
1117 def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1118 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1119 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1120 self.assertEqual(response.status_code, requests.codes.ok)
1121 res = response.json()
1122 self.assertIn('Xponder Roadm Link created successfully',
1123 res["output"]["result"])
1126 def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1127 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1128 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1129 self.assertEqual(response.status_code, requests.codes.ok)
1130 res = response.json()
1131 self.assertIn('Roadm Xponder links created successfully',
1132 res["output"]["result"])
1135 # test service-create for 400GE service from xpdra2 to xpdrc2
1136 def test_72_create_400GE_service(self):
1137 self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1138 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1139 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1140 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1141 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1142 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1143 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1144 response = test_utils.service_create_request(self.cr_serv_sample_data)
1145 self.assertEqual(response.status_code, requests.codes.ok)
1146 res = response.json()
1147 self.assertIn('PCE calculation in progress',
1148 res['output']['configuration-response-common']['response-message'])
1149 time.sleep(self.WAITING)
1151 def test_73_get_400GE_service(self):
1152 response = test_utils.get_service_list_request(
1153 "services/service-400GE")
1154 self.assertEqual(response.status_code, requests.codes.ok)
1155 res = response.json()
1157 res['services'][0]['administrative-state'], 'inService')
1159 res['services'][0]['service-name'], 'service-400GE')
1161 res['services'][0]['connection-type'], 'service')
1163 res['services'][0]['lifecycle-state'], 'planned')
1166 def test_74_check_xc1_roadma(self):
1167 response = test_utils.check_netconf_node_request(
1168 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1169 self.assertEqual(response.status_code, requests.codes.ok)
1170 res = response.json()
1171 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1172 self.assertDictEqual(
1174 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1175 'opticalControlMode': 'gainLoss',
1176 'target-output-power': -3.0
1177 }, **res['roadm-connections'][0]),
1178 res['roadm-connections'][0]
1180 self.assertDictEqual(
1181 {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1182 res['roadm-connections'][0]['source'])
1183 self.assertDictEqual(
1184 {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1185 res['roadm-connections'][0]['destination'])
1188 def test_75_check_topo_xpdra2(self):
1189 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1190 self.assertEqual(response.status_code, requests.codes.ok)
1191 res = response.json()
1192 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1193 for ele in liste_tp:
1194 if ele['tp-id'] == 'XPDR1-NETWORK1':
1195 self.assertEqual({'frequency': 196.08125,
1197 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1198 if ele['tp-id'] == 'XPDR1-CLIENT1':
1199 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1202 def test_76_check_topo_roadma_SRG1(self):
1203 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1204 self.assertEqual(response.status_code, requests.codes.ok)
1205 res = response.json()
1206 freq_map = base64.b64decode(
1207 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1208 freq_map_array = [int(x) for x in freq_map]
1209 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1210 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1211 for ele in liste_tp:
1212 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1213 freq_map = base64.b64decode(
1214 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1215 freq_map_array = [int(x) for x in freq_map]
1216 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1217 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1218 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1221 def test_77_check_topo_roadma_DEG1(self):
1222 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1223 self.assertEqual(response.status_code, requests.codes.ok)
1224 res = response.json()
1225 freq_map = base64.b64decode(
1226 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1227 freq_map_array = [int(x) for x in freq_map]
1228 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1229 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1230 for ele in liste_tp:
1231 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1232 freq_map = base64.b64decode(
1233 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1234 freq_map_array = [int(x) for x in freq_map]
1235 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1236 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1237 freq_map = base64.b64decode(
1238 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1239 freq_map_array = [int(x) for x in freq_map]
1240 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1243 def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1244 response = test_utils.check_netconf_node_request(
1245 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1246 self.assertEqual(response.status_code, requests.codes.ok)
1247 res = response.json()
1248 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1249 'administrative-state': 'inService',
1250 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1251 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1252 'supporting-port': 'C1'
1254 input_dict_2 = {'speed': 400000}
1255 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1256 res['interface'][0])
1257 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1258 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1260 def test_79_check_interface_OTSI_xpdra2(self):
1261 response = test_utils.check_netconf_node_request(
1262 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1263 self.assertEqual(response.status_code, requests.codes.ok)
1264 res = response.json()
1265 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1266 'administrative-state': 'inService',
1267 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1268 'type': 'org-openroadm-interfaces:otsi',
1269 'supporting-port': 'L1'}
1271 "frequency": 196.0812,
1272 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1273 "fec": "org-openroadm-common-types:ofec",
1274 "transmit-power": -5,
1275 "provision-mode": "explicit",
1276 "modulation-format": "dp-qam16"}
1278 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1279 res['interface'][0])
1280 self.assertDictEqual(dict(input_dict_2,
1281 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1282 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1283 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1284 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1286 def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1287 response = test_utils.check_netconf_node_request(
1288 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1289 self.assertEqual(response.status_code, requests.codes.ok)
1290 res = response.json()
1291 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1292 'administrative-state': 'inService',
1293 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1294 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1295 'type': 'org-openroadm-interfaces:otsi-group',
1296 'supporting-port': 'L1'}
1297 input_dict_2 = {"group-id": 1,
1298 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1300 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1301 res['interface'][0])
1302 self.assertDictEqual(dict(input_dict_2,
1303 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1304 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1306 def test_81_check_interface_OTUC4_xpdra2(self):
1307 response = test_utils.check_netconf_node_request(
1308 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1309 self.assertEqual(response.status_code, requests.codes.ok)
1310 res = response.json()
1311 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1312 'administrative-state': 'inService',
1313 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1314 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1315 'type': 'org-openroadm-interfaces:otnOtu',
1316 'supporting-port': 'L1'}
1317 input_dict_2 = {"tx-sapi": "AIGiVAQ4gDil", "rate": "org-openroadm-otn-common-types:OTUCn",
1318 "degthr-percentage": 100,
1319 "tim-detect-mode": "Disabled",
1321 "degm-intervals": 2,
1322 "expected-dapi": "AIGiVAQ4gDil"}
1324 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1325 res['interface'][0])
1326 self.assertDictEqual(dict(input_dict_2,
1327 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1328 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1330 def test_82_check_interface_ODUC4_xpdra2(self):
1331 response = test_utils.check_netconf_node_request(
1332 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1333 self.assertEqual(response.status_code, requests.codes.ok)
1334 res = response.json()
1335 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1336 'administrative-state': 'inService',
1337 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1338 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1339 'type': 'org-openroadm-interfaces:otnOdu',
1340 'supporting-port': 'L1'}
1341 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1342 "tim-detect-mode": "Disabled",
1343 "degm-intervals": 2,
1344 "degthr-percentage": 100,
1345 "monitoring-mode": "terminated",
1346 "rate": "org-openroadm-otn-common-types:ODUCn",
1348 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1350 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1351 res['interface'][0])
1352 self.assertDictEqual(dict(input_dict_2,
1353 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1354 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1355 self.assertDictEqual(dict(input_dict_3,
1356 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1357 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1359 def test_83_delete_400GE_service(self):
1360 response = test_utils.service_delete_request("service-400GE")
1361 self.assertEqual(response.status_code, requests.codes.ok)
1362 res = response.json()
1363 self.assertIn('Renderer service delete in progress',
1364 res['output']['configuration-response-common']['response-message'])
1365 time.sleep(self.WAITING)
1367 def test_84_get_no_service(self):
1368 response = test_utils.get_service_list_request("")
1369 self.assertEqual(response.status_code, requests.codes.conflict)
1370 res = response.json()
1372 {"error-type": "application", "error-tag": "data-missing",
1373 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1374 res['errors']['error'])
1377 def test_85_check_no_interface_ODUC4_xpdra2(self):
1378 response = test_utils.check_netconf_node_request(
1379 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1380 self.assertEqual(response.status_code, requests.codes.conflict)
1382 def test_86_check_no_interface_OTUC4_xpdra2(self):
1383 response = test_utils.check_netconf_node_request(
1384 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1385 self.assertEqual(response.status_code, requests.codes.conflict)
1387 def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1388 response = test_utils.check_netconf_node_request(
1389 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1390 self.assertEqual(response.status_code, requests.codes.conflict)
1392 def test_88_check_no_interface_OTSI_xpdra2(self):
1393 response = test_utils.check_netconf_node_request(
1394 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1395 self.assertEqual(response.status_code, requests.codes.conflict)
1397 def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1398 response = test_utils.check_netconf_node_request(
1399 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1400 self.assertEqual(response.status_code, requests.codes.conflict)
1402 def test_90_disconnect_xponders_from_roadm(self):
1403 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1404 response = test_utils.get_ordm_topo_request("")
1405 self.assertEqual(response.status_code, requests.codes.ok)
1406 res = response.json()
1407 links = res['network'][0]['ietf-network-topology:link']
1409 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1410 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1411 link_name = link["link-id"]
1412 response = test_utils.delete_request(url+link_name)
1413 self.assertEqual(response.status_code, requests.codes.ok)
1415 def test_91_disconnect_xpdra2(self):
1416 response = test_utils.unmount_device("XPDR-A2")
1417 self.assertEqual(response.status_code, requests.codes.ok,
1418 test_utils.CODE_SHOULD_BE_200)
1420 def test_92_disconnect_xpdrc2(self):
1421 response = test_utils.unmount_device("XPDR-C2")
1422 self.assertEqual(response.status_code, requests.codes.ok,
1423 test_utils.CODE_SHOULD_BE_200)
1425 def test_93_disconnect_roadmA(self):
1426 response = test_utils.unmount_device("ROADM-A1")
1427 self.assertEqual(response.status_code, requests.codes.ok,
1428 test_utils.CODE_SHOULD_BE_200)
1430 def test_94_disconnect_roadmC(self):
1431 response = test_utils.unmount_device("ROADM-C1")
1432 self.assertEqual(response.status_code, requests.codes.ok,
1433 test_utils.CODE_SHOULD_BE_200)
1436 if __name__ == "__main__":
1437 unittest.main(verbosity=2)