From dfb6fec75c382709abd91a07bf6200dbd9400268 Mon Sep 17 00:00:00 2001 From: Balagangadhar Bathula Date: Tue, 18 Jan 2022 01:43:24 -0500 Subject: [PATCH] Device+OTN renderer tests 7.1 w/ intermediate rate - add functional tests to perform device and OTN renderer service path. - include ODUC2 (ODUC3, ODUC4), OTUC2 (OTUC3, OTUC4) and Ethernet - even if not needed, update legacy test_utils.service_path_request method to support modulation formats used here different from dp-qpsk - force old draft-bierman02 RESTCONF API use in tox.ini 7.1 tests: with RFC8040 URLs, some new tests return a HTTP code 503 (service unavailable) instead of expected code 409 (conflict). TODO: investigate this problem JIRA: TRNSPRTPCE-528 TRNSPRTPCE-595 Change-Id: I56d33746f6283d5ca23199c0968ee04c15da8299 Co-authored-by: guillaume.lambert Signed-off-by: guillaume.lambert --- .../7.1/test02_otn_renderer.py | 851 ++++++++++++++++++ tox.ini | 2 +- 2 files changed, 852 insertions(+), 1 deletion(-) create mode 100644 tests/transportpce_tests/7.1/test02_otn_renderer.py diff --git a/tests/transportpce_tests/7.1/test02_otn_renderer.py b/tests/transportpce_tests/7.1/test02_otn_renderer.py new file mode 100644 index 000000000..7d77949ce --- /dev/null +++ b/tests/transportpce_tests/7.1/test02_otn_renderer.py @@ -0,0 +1,851 @@ +#!/usr/bin/env python +############################################################################## +# Copyright (c) 2022 AT&T, Inc. and others. All rights reserved. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +# pylint: disable=no-member +# pylint: disable=too-many-public-methods + +import unittest +import time +import requests +# pylint: disable=wrong-import-order +import sys +sys.path.append('transportpce_tests/common') +# pylint: disable=wrong-import-position +# pylint: disable=import-error +import test_utils_rfc8040 # nopep8 + + +class TransportPCE400GPortMappingTesting(unittest.TestCase): + + processes = None + NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1", + "supporting-port": "L1", + "supported-interface-capability": [ + "org-openroadm-port-types:if-otsi-otsigroup" + ], + "port-direction": "bidirectional", + "port-qual": "switch-network", + "supporting-circuit-pack-name": "1/2/2-PLUG-NET", + "xponder-type": "mpdr", + 'lcp-hash-val': 'LY9PxYJqUbw=', + 'port-admin-state': 'InService', + 'port-oper-state': 'InService'} + NODE_VERSION = '7.1' + + @classmethod + def setUpClass(cls): + cls.processes = test_utils_rfc8040.start_tpce() + cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)]) + + @classmethod + def tearDownClass(cls): + # pylint: disable=not-an-iterable + for process in cls.processes: + test_utils_rfc8040.shutdown_process(process) + print("all processes killed") + + def setUp(self): + # pylint: disable=consider-using-f-string + print("execution of {}".format(self.id().split(".")[-1])) + time.sleep(10) + + def test_01_xpdr_device_connection(self): + response = test_utils_rfc8040.mount_device("XPDR-A2", + ('xpdra2', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, + test_utils_rfc8040.CODE_SHOULD_BE_201) + + # Check if the node appears in the ietf-network topology + # this test has been removed, since it already exists in port-mapping + # 1a) create a OTUC2 device renderer + def test_02_service_path_create_otuc2(self): + response = test_utils_rfc8040.device_renderer_service_path_request( + {'input': { + 'service-name': 'service_OTUC2', + 'wave-number': '0', + 'modulation-format': 'dp-qpsk', + 'operation': 'create', + 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}], + 'center-freq': 196.1, + 'nmc-width': 75, + 'min-freq': 196.0375, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 755, + 'higher-spectral-slot-number': 768 + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Interfaces created successfully for nodes: ', response['output']['result']) + self.assertIn( + {'node-id': 'XPDR-A2', + 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'], + 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface']) + + def test_03_get_portmapping_network1(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2" + self.assertIn( + self.NETWORK2_CHECK_DICT, + response['mapping']) + + def test_04_check_interface_otsi(self): + # pylint: disable=line-too-long + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + 'type': 'org-openroadm-interfaces:otsi', + 'supporting-port': 'L1'} + input_dict_2 = { + "frequency": 196.0812, + "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi", + "fec": "org-openroadm-common-types:ofec", + "transmit-power": -5, + "provision-mode": "explicit", + "modulation-format": "dp-qpsk"} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']), + response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']) + self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]}, + response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo']) + + def test_05_check_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768', + 'type': 'org-openroadm-interfaces:otsi-group', + 'supporting-port': 'L1'} + input_dict_2 = {"group-id": 1, + "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']), + response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']) + + def test_06_check_interface_otuc2(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G', + 'type': 'org-openroadm-interfaces:otnOtu', + 'supporting-port': 'L1'} + input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn", + "degthr-percentage": 100, + "tim-detect-mode": "Disabled", + "otucn-n-rate": 2, + "degm-intervals": 2} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']), + response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']) + + # 1b) create a ODUC2 device renderer + def test_07_otn_service_path_create_oduc2(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_ODUC2', + 'operation': 'create', + 'service-rate': '200', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}] + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result']) + self.assertIn( + {'node-id': 'XPDR-A2', + 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface']) + + def test_08_get_portmapping_network1(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2" + self.assertIn( + self.NETWORK2_CHECK_DICT, + response['mapping']) + + def test_09_check_interface_oduc2(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2") + self.assertEqual(response['status_code'], requests.codes.ok) + + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'L1'} + + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODUCn', + 'tx-sapi': 'LY9PxYJqUbw=', + 'tx-dapi': 'LY9PxYJqUbw=', + 'expected-sapi': 'LY9PxYJqUbw=', + 'expected-dapi': 'LY9PxYJqUbw=', + "degm-intervals": 2, + "degthr-percentage": 100, + "monitoring-mode": "terminated", + "oducn-n-rate": 2 + } + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual( + {'payload-type': '22', 'exp-payload-type': '22'}, + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + + # 1c) create Ethernet device renderer + def test_10_otn_service_path_create_100ge(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_Ethernet', + 'operation': 'create', + 'service-rate': '100', + 'service-format': 'Ethernet', + 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}], + 'ethernet-encoding': 'eth encode', + 'opucn-trib-slots': ['1.1', '1.20'] + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result']) + self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id']) + self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet', + response['output']['node-interface'][0]['connection-id']) + self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id']) + self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet', + response['output']['node-interface'][0]['odu-interface-id']) + self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet', + response['output']['node-interface'][0]['odu-interface-id']) + + def test_11_check_interface_100ge_client(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT', + 'type': 'org-openroadm-interfaces:ethernetCsmacd', + 'supporting-port': 'C1' + } + input_dict_2 = {'speed': 100000} + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']), + response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']) + + def test_12_check_interface_odu4_client(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT', + 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'C1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'monitoring-mode': 'terminated'} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual( + {'payload-type': '07', 'exp-payload-type': '07'}, + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + + def test_13_check_interface_odu4_network(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'L1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'monitoring-mode': 'not-terminated'} + input_dict_3 = {'trib-port-number': 1} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual(dict(input_dict_3, + **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + 'parent-odu-allocation']), + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']) + self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'] + ['opucn-trib-slots']) + self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'] + ['opucn-trib-slots']) + + def test_14_check_odu_connection_xpdra2(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", + "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = { + 'connection-name': + 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet', + 'direction': 'bidirectional' + } + + self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]), + response['odu-connection'][0]) + self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'}, + response['odu-connection'][0]['destination']) + self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'}, + response['odu-connection'][0]['source']) + + # 1d) Delete Ethernet device interfaces + def test_15_otn_service_path_delete_100ge(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_Ethernet', + 'operation': 'delete', + 'service-rate': '100', + 'service-format': 'Ethernet', + 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}], + 'ethernet-encoding': 'eth encode', + 'trib-slot': ['1'], + 'trib-port-number': '1' + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + + def test_16_check_no_odu_connection(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", + "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_17_check_no_interface_odu_network(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_18_check_no_interface_odu_client(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_19_check_no_interface_100ge_client(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G") + self.assertEqual(response['status_code'], requests.codes.conflict) + + # 1e) Delete ODUC2 device interfaces + def test_20_otn_service_path_delete_oduc2(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_ODUC2', + 'operation': 'delete', + 'service-rate': '200', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}] + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + + def test_21_check_no_interface_oduc2(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2") + self.assertEqual(response['status_code'], requests.codes.conflict) + + # 1f) Delete OTUC2 device interfaces + def test_22_service_path_delete_otuc2(self): + response = test_utils_rfc8040.device_renderer_service_path_request( + {'input': { + 'service-name': 'service_OTUC2', + 'wave-number': '0', + 'modulation-format': 'dp-qpsk', + 'operation': 'delete', + 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}], + 'center-freq': 196.1, + 'nmc-width': 75, + 'min-freq': 196.0375, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 755, + 'higher-spectral-slot-number': 768 + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + + def test_23_check_no_interface_otuc2(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_24_check_no_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_25_check_no_interface_otsi(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768") + self.assertEqual(response['status_code'], requests.codes.conflict) + + # 2a) create a OTUC3 device renderer + def test_26_service_path_create_otuc3(self): + response = test_utils_rfc8040.device_renderer_service_path_request( + {'input': { + 'service-name': 'service_OTUC3', + 'wave-number': '0', + 'modulation-format': 'dp-qam8', + 'operation': 'create', + 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}], + 'center-freq': 196.1, + 'nmc-width': 75, + 'min-freq': 196.0375, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 755, + 'higher-spectral-slot-number': 768 + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Interfaces created successfully for nodes: ', response['output']['result']) + self.assertIn( + {'node-id': 'XPDR-A2', + 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'], + 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface']) + + def test_27_get_portmapping_network1(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3" + self.assertIn( + self.NETWORK2_CHECK_DICT, + response['mapping']) + + def test_28_check_interface_otsi(self): + # pylint: disable=line-too-long + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768") + self.assertEqual(response['status_code'], requests.codes.ok) + + input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + 'type': 'org-openroadm-interfaces:otsi', + 'supporting-port': 'L1'} + input_dict_2 = { + "frequency": 196.0812, + "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi", + "fec": "org-openroadm-common-types:ofec", + "transmit-power": -5, + "provision-mode": "explicit", + "modulation-format": "dp-qam8"} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']), + response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']) + self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]}, + response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo']) + + def test_29_check_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768', + 'type': 'org-openroadm-interfaces:otsi-group', + 'supporting-port': 'L1'} + input_dict_2 = {"group-id": 1, + "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']), + response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']) + + def test_30_check_interface_otuc3(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G', + 'type': 'org-openroadm-interfaces:otnOtu', + 'supporting-port': 'L1'} + input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn", + "degthr-percentage": 100, + "tim-detect-mode": "Disabled", + "otucn-n-rate": 3, + "degm-intervals": 2} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']), + response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']) + + # 2b) create a ODUC3 device renderer + def test_31_otn_service_path_create_oduc3(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_ODUC3', + 'operation': 'create', + 'service-rate': '300', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}] + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result']) + self.assertIn( + {'node-id': 'XPDR-A2', + 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface']) + + def test_32_get_portmapping_network1(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3" + self.assertIn( + self.NETWORK2_CHECK_DICT, + response['mapping']) + + def test_33_check_interface_oduc3(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3") + self.assertEqual(response['status_code'], requests.codes.ok) + + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'L1'} + + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODUCn', + 'tx-sapi': 'LY9PxYJqUbw=', + 'tx-dapi': 'LY9PxYJqUbw=', + 'expected-sapi': 'LY9PxYJqUbw=', + 'expected-dapi': 'LY9PxYJqUbw=', + "degm-intervals": 2, + "degthr-percentage": 100, + "monitoring-mode": "terminated", + "oducn-n-rate": 3 + } + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual( + {'payload-type': '22', 'exp-payload-type': '22'}, + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + + # 2c) create Ethernet device renderer + # No change in the ethernet device renderer so skipping those tests + # 2d) Delete Ethernet device interfaces + # No change in the ethernet device renderer so skipping those tests + + # 2e) Delete ODUC3 device interfaces + def test_34_otn_service_path_delete_oduc3(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_ODUC3', + 'operation': 'delete', + 'service-rate': '300', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}] + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + + def test_35_check_no_interface_oduc3(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3") + self.assertEqual(response['status_code'], requests.codes.conflict) + + # 2f) Delete OTUC3 device interfaces + def test_36_service_path_delete_otuc3(self): + response = test_utils_rfc8040.device_renderer_service_path_request( + {'input': { + 'service-name': 'service_OTUC3', + 'wave-number': '0', + 'modulation-format': 'dp-qam8', + 'operation': 'delete', + 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}], + 'center-freq': 196.1, + 'nmc-width': 75, + 'min-freq': 196.0375, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 755, + 'higher-spectral-slot-number': 768 + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + + def test_37_check_no_interface_otuc3(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_38_check_no_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_39_check_no_interface_otsi(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768") + self.assertEqual(response['status_code'], requests.codes.conflict) + + # 3a) create a OTUC4 device renderer + def test_40_service_path_create_otuc3(self): + response = test_utils_rfc8040.device_renderer_service_path_request( + {'input': { + 'service-name': 'service_OTUC4', + 'wave-number': '0', + 'modulation-format': 'dp-qam16', + 'operation': 'create', + 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}], + 'center-freq': 196.1, + 'nmc-width': 75, + 'min-freq': 196.0375, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 755, + 'higher-spectral-slot-number': 768 + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Interfaces created successfully for nodes: ', response['output']['result']) + self.assertIn( + {'node-id': 'XPDR-A2', + 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'], + 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface']) + + def test_41_get_portmapping_network1(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4" + self.assertIn( + self.NETWORK2_CHECK_DICT, + response['mapping']) + + def test_42_check_interface_otsi(self): + # pylint: disable=line-too-long + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768") + self.assertEqual(response['status_code'], requests.codes.ok) + + input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + 'type': 'org-openroadm-interfaces:otsi', + 'supporting-port': 'L1'} + input_dict_2 = { + "frequency": 196.0812, + "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi", + "fec": "org-openroadm-common-types:ofec", + "transmit-power": -5, + "provision-mode": "explicit", + "modulation-format": "dp-qam16"} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']), + response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']) + self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]}, + response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo']) + + def test_43_check_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768', + 'type': 'org-openroadm-interfaces:otsi-group', + 'supporting-port': 'L1'} + input_dict_2 = {"group-id": 1, + "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']), + response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']) + + def test_44_check_interface_otuc4(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4") + self.assertEqual(response['status_code'], requests.codes.ok) + input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G', + 'type': 'org-openroadm-interfaces:otnOtu', + 'supporting-port': 'L1'} + input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn", + "degthr-percentage": 100, + "tim-detect-mode": "Disabled", + "otucn-n-rate": 4, + "degm-intervals": 2} + + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']), + response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']) + + # 3b) create a ODUC4 device renderer + def test_45_otn_service_path_create_oduc3(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_ODUC4', + 'operation': 'create', + 'service-rate': '400', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}] + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result']) + self.assertIn( + {'node-id': 'XPDR-A2', + 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface']) + + def test_46_get_portmapping_network1(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4" + self.assertIn( + self.NETWORK2_CHECK_DICT, + response['mapping']) + + def test_47_check_interface_oduc4(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4") + self.assertEqual(response['status_code'], requests.codes.ok) + + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': '1/2/2-PLUG-NET', + 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'L1'} + + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODUCn', + 'tx-sapi': 'LY9PxYJqUbw=', + 'tx-dapi': 'LY9PxYJqUbw=', + 'expected-sapi': 'LY9PxYJqUbw=', + 'expected-dapi': 'LY9PxYJqUbw=', + "degm-intervals": 2, + "degthr-percentage": 100, + "monitoring-mode": "terminated", + "oducn-n-rate": 4 + } + self.assertDictEqual(dict(input_dict_1, **response['interface'][0]), + response['interface'][0]) + self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual( + {'payload-type': '22', 'exp-payload-type': '22'}, + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + + # 3c) create Ethernet device renderer + # No change in the ethernet device renderer so skipping those tests + # 3d) Delete Ethernet device interfaces + # No change in the ethernet device renderer so skipping those tests + + # 3e) Delete ODUC4 device interfaces + def test_48_otn_service_path_delete_oduc4(self): + response = test_utils_rfc8040.device_renderer_otn_service_path_request( + {'input': { + 'service-name': 'service_ODUC4', + 'operation': 'delete', + 'service-rate': '400', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}] + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + + def test_49_check_no_interface_oduc4(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4") + self.assertEqual(response['status_code'], requests.codes.conflict) + + # 3f) Delete OTUC4 device interfaces + def test_50_service_path_delete_otuc4(self): + response = test_utils_rfc8040.device_renderer_service_path_request( + {'input': { + 'service-name': 'service_OTUC4', + 'wave-number': '0', + 'modulation-format': 'dp-qam16', + 'operation': 'delete', + 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}], + 'center-freq': 196.1, + 'nmc-width': 75, + 'min-freq': 196.0375, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 755, + 'higher-spectral-slot-number': 768 + }}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + + def test_51_check_no_interface_otuc4(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_52_check_no_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G") + self.assertEqual(response['status_code'], requests.codes.conflict) + + def test_53_check_no_interface_otsi(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768") + self.assertEqual(response['status_code'], requests.codes.conflict) + + # Disconnect the XPDR + def test_54_xpdr_device_disconnection(self): + response = test_utils_rfc8040.unmount_device("XPDR-A2") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) + + def test_55_xpdr_device_disconnected(self): + response = test_utils_rfc8040.check_device_connection("XPDR-A2") + self.assertEqual(response['status_code'], requests.codes.conflict) + self.assertIn(response['connection-status']['error-type'], ('protocol', 'application')) + self.assertEqual(response['connection-status']['error-tag'], 'data-missing') + self.assertEqual(response['connection-status']['error-message'], + 'Request could not be completed because the relevant data model content does not exist') + + def test_56_xpdr_device_not_connected(self): + response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2") + self.assertEqual(response['status_code'], requests.codes.conflict) + self.assertIn(response['node-info']['error-type'], ('protocol', 'application')) + self.assertEqual(response['node-info']['error-tag'], 'data-missing') + self.assertEqual(response['node-info']['error-message'], + 'Request could not be completed because the relevant data model content does not exist') + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tox.ini b/tox.ini index 41bc8670a..db710a348 100644 --- a/tox.ini +++ b/tox.ini @@ -148,7 +148,7 @@ whitelist_externals = launch_tests.sh passenv = LAUNCHER USE_LIGHTY USE_ODL_RESTCONF_VERSION setenv = # USE_LIGHTY=True -# USE_ODL_RESTCONF_VERSION=draft-bierman02 + USE_ODL_RESTCONF_VERSION=draft-bierman02 USE_ODL_ALT_KARAF_ENV=./karaf71.env USE_ODL_ALT_KARAF_INSTALL_DIR=karaf71 commands = -- 2.36.6