import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.If1GEODU0;
import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.If400GE;
import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.IfOCH;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.IfOCHOTU2EODU2E;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.IfOCHOTU2ODU2;
import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.IfOCHOTU4ODU4;
import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.IfOTUCnODUCn;
import org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327.SupportedIfCapability;
put("If100GE", If100GE.class);
put("If10GE", If10GE.class);
put("If1GE", If1GE.class);
+ put("IfOCHOTU2EODU2E", IfOCHOTU2EODU2E.class);
+ put("IfOCHOTU2ODU2", IfOCHOTU2ODU2.class);
}
};
nodesList.put(nodes.key(),nodes);
Network network = new NetworkBuilder().setNodes(nodesList).build();
-
final WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
InstanceIdentifier<Network> nodesIID = InstanceIdentifier.builder(Network.class).build();
writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, nodesIID, network);
List<Class<? extends org.opendaylight.yang.gen.v1.http.org.openroadm.port.types.rev200327
.SupportedIfCapability>> supportedIntf = new ArrayList<>();
for (Class<? extends SupportedIfCapability> sup: port.getSupportedInterfaceCapability()) {
- supportedIntf.add(MappingUtilsImpl.convertSupIfCapa(sup.getSimpleName()));
+ if (MappingUtilsImpl.convertSupIfCapa(sup.getSimpleName()) != null) {
+ supportedIntf.add(MappingUtilsImpl.convertSupIfCapa(sup.getSimpleName()));
+ }
}
mpBldr.setSupportedInterfaceCapability(supportedIntf);
}
response['nodes'][0]['mapping'])
def test_05_service_path_create(self):
- data = {
- 'input': {
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }
- }
- response = test_utils_rfc8040.device_renderer_service_path_request(data)
- self.assertEqual(response.status_code, requests.codes.ok)
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ROADMA01', response['output']['result'])
def test_06_service_path_create_rdm_check(self):
response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
self.assertIn('not-reserved-inuse', response['circuit-packs'][0]["equipment-state"])
def test_14_service_path_delete(self):
- data = {
- 'input': {
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }
- }
- response = test_utils_rfc8040.device_renderer_service_path_request(data)
- self.assertEqual(response.status_code, requests.codes.ok)
- self.assertIn(response.json(),
- ({'output': {'result': 'Request processed', 'success': True}},
- {'transportpce-device-renderer:output': {'result': 'Request processed', 'success': True}}))
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertDictEqual(response['output'], {'result': 'Request processed', 'success': True})
def test_15_service_path_delete_rdm_check(self):
response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
# Renderer interface creations
def test_07_device_renderer(self):
- data = {
- "input": {
- "modulation-format": "dp-qpsk",
- "operation": "create",
- "service-name": "testNMC-MC",
- "wave-number": "0",
- "center-freq": "196.05",
- "nmc-width": "80",
- "nodes": [
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'service-name': 'testNMC-MC',
+ 'wave-number': '0',
+ 'center-freq': '196.05',
+ 'nmc-width': '80',
+ 'nodes': [
{
- "node-id": "ROADM-D1",
- "src-tp": "SRG1-PP1-TXRX",
- "dest-tp": "DEG1-TTP-TXRX"
+ 'node-id': 'ROADM-D1',
+ 'src-tp': 'SRG1-PP1-TXRX',
+ 'dest-tp': 'DEG1-TTP-TXRX'
}
],
- "min-freq": 196.00625,
- "max-freq": 196.09375,
- "lower-spectral-slot-number": 749,
- "higher-spectral-slot-number": 763
- }
- }
- response = test_utils_rfc8040.device_renderer_service_path_request(data)
- self.assertEqual(response.status_code, requests.codes.ok)
+ 'min-freq': 196.00625,
+ 'max-freq': 196.09375,
+ 'lower-spectral-slot-number': 749,
+ 'higher-spectral-slot-number': 763
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
time.sleep(10)
# Get Degree MC interface and check
response['nodes'][0]['mapping'])
def test_05_service_path_create(self):
- data = {
- 'input': {
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }
- }
- response = test_utils_rfc8040.device_renderer_service_path_request(data)
- self.assertEqual(response.status_code, requests.codes.ok)
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ROADM-A1', response['output']['result'])
def test_06_service_path_create_rdm_check(self):
response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-nmc-713:720")
# FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
def test_17_service_path_delete(self):
- data = {
- 'input': {
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }
- }
- response = test_utils_rfc8040.device_renderer_service_path_request(data)
- self.assertEqual(response.status_code, requests.codes.ok)
- self.assertIn(response.json(),
- ({'output': {'result': 'Request processed', 'success': True}},
- {'transportpce-device-renderer:output': {'result': 'Request processed', 'success': True}}))
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertDictEqual(response['output'], {'result': 'Request processed', 'success': True})
def test_18_service_path_delete_rdm_check(self):
response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-mc-713:720")
sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(10)
- response = test_utils.get_netconf_oper_request("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
def test_02_get_portmapping_CLIENT4(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res_mapping = (response.json())['mapping'][0]
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-CLIENT4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ res_mapping = response['mapping'][0]
self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
def test_03_get_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
self.NETWORK1_CHECK_DICT,
- res['mapping'])
+ response['mapping'])
def test_04_service_path_create_OCH_OTU4(self):
- response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
- [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
- 196.1, 40, 196.075, 196.125, 761,
- 768)
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_test',
+ 'wave-number': '7',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+ self.assertTrue(response['output']['success'])
self.assertIn(
{'node-id': 'SPDR-SA1',
'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
- 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
+ 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, response['output']['node-interface'])
def test_05_get_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
self.assertIn(
self.NETWORK1_CHECK_DICT,
- res['mapping'])
+ response['mapping'])
def test_06_check_interface_och(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
-
- self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
- 'administrative-state': 'inService',
- 'supporting-circuit-pack-name': 'CP1-CFP0',
- 'type': 'org-openroadm-interfaces:opticalChannel',
- 'supporting-port': 'CP1-CFP0-P1'
- }),
- res['interface'][0])
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ self.assertDictEqual(dict(response['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP1-CFP0',
+ 'type': 'org-openroadm-interfaces:opticalChannel',
+ 'supporting-port': 'CP1-CFP0-P1'
+ }),
+ response['interface'][0])
+
+ self.assertIn(
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
+ [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
+ {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
def test_07_check_interface_OTU(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'fec': 'scfec'
}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
- self.assertDictEqual(input_dict_2,
- res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'], **input_dict_2),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_08_otn_service_path_create_ODU4(self):
- response = test_utils.otn_service_path_request("create", "service_ODU4", "100", "ODU",
- [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODU4',
+ 'operation': 'create',
+ 'service-rate': '100',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
+ self.assertTrue(response['output']['success'])
self.assertIn(
{'node-id': 'SPDR-SA1',
- 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
+ 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, response['output']['node-interface'])
def test_09_get_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
self.assertIn(
self.NETWORK1_CHECK_DICT,
- res['mapping'])
+ response['mapping'])
def test_10_check_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
'type': 'org-openroadm-interfaces:otnOdu',
input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
'rate': 'org-openroadm-otn-common-types:ODU4'}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2
),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_11_otn_service_path_create_10GE(self):
- response = test_utils.otn_service_path_request("create", "service1", "10", "Ethernet",
- [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
- "network-tp": "XPDR1-NETWORK1"}],
- {"ethernet-encoding": "eth encode",
- "trib-slot": ["1"], "trib-port-number": "1"})
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
- self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service1',
+ 'operation': 'create',
+ 'service-rate': '10',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'trib-slot': ['1'],
+ 'trib-port-number': '1'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
+ self.assertTrue(response['output']['success'])
+ self.assertEqual('SPDR-SA1', response['output']['node-interface'][0]['node-id'])
self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
- res["output"]['node-interface'][0]['connection-id'])
- self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
- self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
- self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
+ response['output']['node-interface'][0]['connection-id'])
+ self.assertIn('XPDR1-CLIENT4-ETHERNET10G', response['output']['node-interface'][0]['eth-interface-id'])
+ self.assertIn('XPDR1-NETWORK1-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
+ self.assertIn('XPDR1-CLIENT4-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
def test_12_check_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(res['interface'][0], **input_dict),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(response['interface'][0], **input_dict),
+ response['interface'][0])
+ self.assertEqual(10000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_13_check_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
'administrative-state': 'inService',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_14_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'supporting-interface': 'XPDR1-NETWORK1-ODU4',
input_dict_3 = {'trib-port-number': 1}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'], **input_dict_3
),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
def test_15_check_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1",
- "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
'direction': 'bidirectional'
}
- self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(response['odu-connection'][0], **input_dict_1),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_16_otn_service_path_delete_10GE(self):
- response = test_utils.otn_service_path_request("delete", "service1", "10", "Ethernet",
- [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
- "network-tp": "XPDR1-NETWORK1"}],
- {"ethernet-encoding": "eth encode",
- "trib-slot": ["1"], "trib-port-number": "1"})
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Request processed', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service1',
+ 'operation': 'delete',
+ 'service-rate': '10',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'trib-slot': ['1'],
+ 'trib-port-number': '1'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+ self.assertTrue(response['output']['success'])
def test_17_check_no_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1",
- "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_18_check_no_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_check_no_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_20_check_no_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_21_otn_service_path_delete_ODU4(self):
- response = test_utils.otn_service_path_request("delete", "service_ODU4", "100", "ODU",
- [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Request processed', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODU4',
+ 'operation': 'delete',
+ 'service-rate': '100',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+ self.assertTrue(response['output']['success'])
def test_22_check_no_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_23_service_path_delete_OCH_OTU4(self):
- response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
- [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
- 196.1, 40, 196.075, 196.125, 761,
- 768)
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Request processed', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_test',
+ 'wave-number': '7',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+ self.assertTrue(response['output']['success'])
def test_24_check_no_interface_OTU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_25_check_no_interface_OCH(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_26_disconnect_SPDR_SA1(self):
- response = test_utils.unmount_device("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":
sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils.CODE_SHOULD_BE_201)
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(10)
- response = test_utils.get_netconf_oper_request("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
def test_02_connect_SPDR_SC1(self):
- response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils.CODE_SHOULD_BE_201)
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(10)
- response = test_utils.get_netconf_oper_request("SPDR-SC1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("SPDR-SC1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
def test_03_service_create_OTU4(self):
- url = "{}/operations/transportpce-renderer:service-implementation-request"
- data = {
- "transportpce-renderer:input": {
- "transportpce-renderer:service-name": "SPDRA-SPDRC-OTU4-ODU4",
- "transportpce-renderer:connection-type": "infrastructure",
- "transportpce-renderer:service-handler-header": {
- "transportpce-renderer:request-id": "abcd12-efgh34"
+ response = test_utils_rfc8040.device_renderer_service_implementation_request(
+ {
+ 'service-name': 'SPDRA-SPDRC-OTU4-ODU4',
+ 'connection-type': 'infrastructure',
+ 'service-handler-header': {
+ 'request-id': 'abcd12-efgh34'
},
- "transportpce-renderer:service-a-end": {
- "transportpce-renderer:service-format": "OTU",
- "transportpce-renderer:otu-service-rate": "org-openroadm-otn-common-types:OTU4",
- "transportpce-renderer:clli": "nodeSA",
- "transportpce-renderer:node-id": "SPDR-SA1"
+ 'service-a-end': {
+ 'service-format': 'OTU',
+ 'otu-service-rate': 'org-openroadm-otn-common-types:OTU4',
+ 'clli': 'nodeSA',
+ 'node-id': 'SPDR-SA1'
},
- "transportpce-renderer:service-z-end": {
- "transportpce-renderer:service-format": "OTU",
- "transportpce-renderer:otu-service-rate": "org-openroadm-otn-common-types:OTU4",
- "transportpce-renderer:clli": "nodeSC",
- "transportpce-renderer:node-id": "SPDR-SC1"
+ 'service-z-end': {
+ 'service-format': 'OTU',
+ 'otu-service-rate': 'org-openroadm-otn-common-types:OTU4',
+ 'clli': 'nodeSC',
+ 'node-id': 'SPDR-SC1'
},
- "transportpce-renderer:path-description": {
- "aToZ-direction": {
- "rate": 100,
- "transportpce-renderer:modulation-format": "dp-qpsk",
- "aToZ-wavelength-number": 1,
- "aToZ": [
+ 'path-description': {
+ 'aToZ-direction': {
+ 'rate': 100,
+ 'modulation-format': 'dp-qpsk',
+ 'aToZ-wavelength-number': 1,
+ 'aToZ': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
}
],
- "transportpce-renderer:aToZ-min-frequency": 196.075,
- "transportpce-renderer:aToZ-max-frequency": 196.125
+ 'aToZ-min-frequency': 196.075,
+ 'aToZ-max-frequency': 196.125
},
- "transportpce-renderer:zToA-direction": {
- "transportpce-renderer:zToA-wavelength-number": "1",
- "transportpce-renderer:rate": "100",
- "transportpce-renderer:modulation-format": "dp-qpsk",
- "zToA": [
+ 'zToA-direction': {
+ 'zToA-wavelength-number': '1',
+ 'rate': '100',
+ 'modulation-format': 'dp-qpsk',
+ 'zToA': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
}
],
- "transportpce-renderer:zToA-min-frequency": 196.075,
- "transportpce-renderer:zToA-max-frequency": 196.125
+ 'zToA-min-frequency': 196.075,
+ 'zToA-max-frequency': 196.125
}
}
- }
- }
- response = test_utils.post_request(url, data)
- time.sleep(3)
- print(response.json())
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Operation Successful',
- res["output"]["configuration-response-common"]["response-message"])
+ response['output']['configuration-response-common']['response-message'])
# Test OCH-OTU interfaces on SPDR-A1
def test_04_check_interface_och(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'type': 'org-openroadm-interfaces:opticalChannel',
'supporting-port': 'CP1-CFP0-P1'
- }, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ }, **response['interface'][0]),
+ response['interface'][0])
+ self.assertIn(
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
+ [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
+ {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
def test_05_check_interface_OTU(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'rate': 'org-openroadm-otn-common-types:OTU4',
'fec': 'scfec'
}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(input_dict_2,
- res['interface'][0]
- ['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
# Test OCH-OTU interfaces on SPDR-C1
def test_06_check_interface_och(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-761:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'type': 'org-openroadm-interfaces:opticalChannel',
'supporting-port': 'CP1-CFP0-P1'
- }, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ }, **response['interface'][0]),
+ response['interface'][0])
+ self.assertIn(
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
+ [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
+ {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
def test_07_check_interface_OTU(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'rate': 'org-openroadm-otn-common-types:OTU4',
'fec': 'scfec'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(input_dict_2,
- res['interface'][0]
- ['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
# Test creation of ODU4 service
def test_08_service_create_ODU4(self):
- url = "{}/operations/transportpce-renderer:service-implementation-request"
-
- data = {
- "transportpce-renderer:input": {
- "transportpce-renderer:service-name":
- "SPDRA-SPDRC-OTU4-ODU4",
- "transportpce-renderer:connection-type": "infrastructure",
- "transportpce-renderer:service-handler-header": {
- "transportpce-renderer:request-id": "abcd12-efgh34"
+ response = test_utils_rfc8040.device_renderer_service_implementation_request(
+ {
+ 'service-name':
+ 'SPDRA-SPDRC-OTU4-ODU4',
+ 'connection-type': 'infrastructure',
+ 'service-handler-header': {
+ 'request-id': 'abcd12-efgh34'
},
- "transportpce-renderer:service-a-end": {
- "transportpce-renderer:service-format": "ODU",
- "transportpce-renderer:odu-service-rate":
- "org-openroadm-otn-common-types:ODU4",
- "transportpce-renderer:clli": "nodeSA",
- "transportpce-renderer:node-id": "SPDR-SA1"
+ 'service-a-end': {
+ 'service-format': 'ODU',
+ 'odu-service-rate':
+ 'org-openroadm-otn-common-types:ODU4',
+ 'clli': 'nodeSA',
+ 'node-id': 'SPDR-SA1'
},
- "transportpce-renderer:service-z-end": {
- "transportpce-renderer:service-format": "ODU",
- "transportpce-renderer:odu-service-rate":
- "org-openroadm-otn-common-types:ODU4",
- "transportpce-renderer:clli": "nodeSC",
- "transportpce-renderer:node-id": "SPDR-SC1"
+ 'service-z-end': {
+ 'service-format': 'ODU',
+ 'odu-service-rate':
+ 'org-openroadm-otn-common-types:ODU4',
+ 'clli': 'nodeSC',
+ 'node-id': 'SPDR-SC1'
},
- "transportpce-renderer:path-description": {
- "aToZ-direction": {
- "rate": 100,
- "aToZ": [
+ 'path-description': {
+ 'aToZ-direction': {
+ 'rate': 100,
+ 'aToZ': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
}
]
},
- "transportpce-renderer:zToA-direction": {
- "transportpce-renderer:rate": "100",
- "zToA": [
+ 'zToA-direction': {
+ 'rate': '100',
+ 'zToA': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
}
]
}
}
- }
- }
- response = test_utils.post_request(url, data)
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Operation Successful',
- res["output"]["configuration-response-common"]
- ["response-message"])
+ response['output']['configuration-response-common']['response-message'])
# Test ODU4 interfaces on SPDR-A1 and SPDR-C1
def test_09_check_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'tx-dapi': 'AMf1n5hK6Xkk',
'tx-sapi': 'H/OelLynehI='}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_10_check_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'expected-sapi': 'H/OelLynehI=',
'expected-dapi': 'AMf1n5hK6Xkk'
}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
# Test creation of 10G service
def test_11_service_create_10GE(self):
- url = "{}/operations/transportpce-renderer:service-implementation-request"
-
- data = {
- "transportpce-renderer:input": {
- "transportpce-renderer:service-name": "SPDRA-SPDRC-10G",
- "transportpce-renderer:connection-type": "service",
- "transportpce-renderer:service-handler-header": {
- "transportpce-renderer:request-id": "abcd12-efgh34"
+ response = test_utils_rfc8040.device_renderer_service_implementation_request(
+ {
+ 'service-name': 'SPDRA-SPDRC-10G',
+ 'connection-type': 'service',
+ 'service-handler-header': {
+ 'request-id': 'abcd12-efgh34'
},
- "transportpce-renderer:service-a-end": {
- "transportpce-renderer:service-format": "Ethernet",
- "transportpce-renderer:service-rate": "10",
- "transportpce-renderer:clli": "nodeSA",
- "transportpce-renderer:node-id": "SPDR-SA1"
+ 'service-a-end': {
+ 'service-format': 'Ethernet',
+ 'service-rate': '10',
+ 'clli': 'nodeSA',
+ 'node-id': 'SPDR-SA1'
},
- "transportpce-renderer:service-z-end": {
- "transportpce-renderer:service-format": "Ethernet",
- "transportpce-renderer:service-rate": "10",
- "transportpce-renderer:clli": "nodeSC",
- "transportpce-renderer:node-id": "SPDR-SC1"
+ 'service-z-end': {
+ 'service-format': 'Ethernet',
+ 'service-rate': '10',
+ 'clli': 'nodeSC',
+ 'node-id': 'SPDR-SC1'
},
- "transportpce-renderer:path-description": {
- "aToZ-direction": {
- "rate": 10,
- "min-trib-slot": "1.1",
- "max-trib-slot": "1.8",
- "aToZ": [
+ 'path-description': {
+ 'aToZ-direction': {
+ 'rate': 10,
+ 'min-trib-slot': '1.1',
+ 'max-trib-slot': '1.8',
+ 'aToZ': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
}
]
},
- "transportpce-renderer:zToA-direction": {
- "rate": "10",
- "min-trib-slot": "1.1",
- "max-trib-slot": "1.8",
- "zToA": [
+ 'zToA-direction': {
+ 'rate': '10',
+ 'min-trib-slot': '1.1',
+ 'max-trib-slot': '1.8',
+ 'zToA': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
}
]
}
}
- }
- }
-
- response = test_utils.post_request(url, data)
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Operation Successful',
- res["output"]["configuration-response-common"]
- ["response-message"])
+ response['output']['configuration-response-common']['response-message'])
# Test the interfaces on SPDR-A1
def test_12_check_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'], 10000)
def test_13_check_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_14_check_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
+ response = test_utils_rfc8040.check_node_attribute_request(
"SPDR-SA1",
- "odu-connection/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ "odu-connection", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'direction': 'bidirectional'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_15_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'monitored'}
-
input_dict_3 = {'trib-port-number': 1}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
# Test the interfaces on SPDR-C1
def test_16_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'monitored'}
-
input_dict_3 = {'trib-port-number': 1}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
def test_17_check_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'], 10000)
def test_18_check_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SC1", "interface", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_19_check_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
+ response = test_utils_rfc8040.check_node_attribute_request(
"SPDR-SC1",
- "odu-connection/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ "odu-connection", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'direction': 'bidirectional'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_20_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
input_dict_3 = {'trib-port-number': 1}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
# TODO: Delete interfaces (SPDR-A1, SPDR-C1)
def test_21_disconnect_SPDR_SA1(self):
- response = test_utils.unmount_device("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_22_disconnect_SPDR_SC1(self):
- response = test_utils.unmount_device("SPDR-SC1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":
--- /dev/null
+#!/usr/bin/env python
+##############################################################################
+# Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
+import unittest
+import time
+import requests
+# pylint: disable=wrong-import-order
+import sys
+sys.path.append('transportpce_tests/common')
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils_rfc8040 # nopep8
+
+
+class TransportPCE400GPortMappingTesting(unittest.TestCase):
+
+ processes = None
+ NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
+ "supporting-port": "L1",
+ "supported-interface-capability": [
+ "org-openroadm-port-types:if-otsi-otsigroup"
+ ],
+ "port-direction": "bidirectional",
+ "port-qual": "switch-network",
+ "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+ "xponder-type": "mpdr",
+ 'lcp-hash-val': 'LY9PxYJqUbw=',
+ 'port-admin-state': 'InService',
+ 'port-oper-state': 'InService'}
+ NODE_VERSION = '7.1'
+
+ @classmethod
+ def setUpClass(cls):
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
+
+ @classmethod
+ def tearDownClass(cls):
+ # pylint: disable=not-an-iterable
+ for process in cls.processes:
+ test_utils_rfc8040.shutdown_process(process)
+ print("all processes killed")
+
+ def setUp(self):
+ # pylint: disable=consider-using-f-string
+ print("execution of {}".format(self.id().split(".")[-1]))
+ time.sleep(10)
+
+ def test_01_xpdr_device_connection(self):
+ response = test_utils_rfc8040.mount_device("XPDR-A2",
+ ('xpdra2', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created,
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
+
+ # Check if the node appears in the ietf-network topology
+ # this test has been removed, since it already exists in port-mapping
+ # 1a) create a OTUC2 device renderer
+ def test_02_service_path_create_otuc2(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC2',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
+ 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
+
+ def test_03_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_04_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'type': 'org-openroadm-interfaces:otsi',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ "frequency": 196.0812,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qpsk"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+ def test_05_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+ 'type': 'org-openroadm-interfaces:otsi-group',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+ response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+ def test_06_check_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
+ 'type': 'org-openroadm-interfaces:otnOtu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 2,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+ # 1b) create a ODUC2 device renderer
+ def test_07_otn_service_path_create_oduc2(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC2',
+ 'operation': 'create',
+ 'service-rate': '200',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
+
+ def test_08_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_09_check_interface_oduc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+
+ input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+ 'rate': 'org-openroadm-otn-common-types:ODUCn',
+ 'tx-sapi': 'LY9PxYJqUbw=',
+ 'tx-dapi': 'LY9PxYJqUbw=',
+ 'expected-sapi': 'LY9PxYJqUbw=',
+ 'expected-dapi': 'LY9PxYJqUbw=',
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 2
+ }
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '22', 'exp-payload-type': '22'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ # 1c) create Ethernet device renderer
+ def test_10_otn_service_path_create_100ge(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_Ethernet',
+ 'operation': 'create',
+ 'service-rate': '100',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'opucn-trib-slots': ['1.1', '1.20']
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
+ self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+ response['output']['node-interface'][0]['connection-id'])
+ self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
+ self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
+ response['output']['node-interface'][0]['odu-interface-id'])
+ self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
+ response['output']['node-interface'][0]['odu-interface-id'])
+
+ def test_11_check_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+ 'type': 'org-openroadm-interfaces:ethernetCsmacd',
+ 'supporting-port': 'C1'
+ }
+ input_dict_2 = {'speed': 100000}
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+
+ def test_12_check_interface_odu4_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+ 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'C1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'terminated'}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '07', 'exp-payload-type': '07'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ def test_13_check_interface_odu4_network(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'not-terminated'}
+ input_dict_3 = {'trib-port-number': 1}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(input_dict_3,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
+ self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ ['opucn-trib-slots'])
+ self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ ['opucn-trib-slots'])
+
+ def test_14_check_odu_connection_xpdra2(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2",
+ "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {
+ 'connection-name':
+ 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+ 'direction': 'bidirectional'
+ }
+
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
+ self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
+ response['odu-connection'][0]['destination'])
+ self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
+ response['odu-connection'][0]['source'])
+
+ # 1d) Delete Ethernet device interfaces
+ def test_15_otn_service_path_delete_100ge(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_Ethernet',
+ 'operation': 'delete',
+ 'service-rate': '100',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'trib-slot': ['1'],
+ 'trib-port-number': '1'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_16_check_no_odu_connection(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2",
+ "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_17_check_no_interface_odu_network(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_18_check_no_interface_odu_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_19_check_no_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 1e) Delete ODUC2 device interfaces
+ def test_20_otn_service_path_delete_oduc2(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC2',
+ 'operation': 'delete',
+ 'service-rate': '200',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_21_check_no_interface_oduc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 1f) Delete OTUC2 device interfaces
+ def test_22_service_path_delete_otuc2(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC2',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_23_check_no_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_24_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_25_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 2a) create a OTUC3 device renderer
+ def test_26_service_path_create_otuc3(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC3',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam8',
+ 'operation': 'create',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
+ 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
+
+ def test_27_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_28_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'type': 'org-openroadm-interfaces:otsi',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ "frequency": 196.0812,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qam8"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+ def test_29_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+ 'type': 'org-openroadm-interfaces:otsi-group',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+ response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+ def test_30_check_interface_otuc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
+ 'type': 'org-openroadm-interfaces:otnOtu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 3,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+ # 2b) create a ODUC3 device renderer
+ def test_31_otn_service_path_create_oduc3(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC3',
+ 'operation': 'create',
+ 'service-rate': '300',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
+
+ def test_32_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_33_check_interface_oduc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+
+ input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+ 'rate': 'org-openroadm-otn-common-types:ODUCn',
+ 'tx-sapi': 'LY9PxYJqUbw=',
+ 'tx-dapi': 'LY9PxYJqUbw=',
+ 'expected-sapi': 'LY9PxYJqUbw=',
+ 'expected-dapi': 'LY9PxYJqUbw=',
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 3
+ }
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '22', 'exp-payload-type': '22'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ # 2c) create Ethernet device renderer
+ # No change in the ethernet device renderer so skipping those tests
+ # 2d) Delete Ethernet device interfaces
+ # No change in the ethernet device renderer so skipping those tests
+
+ # 2e) Delete ODUC3 device interfaces
+ def test_34_otn_service_path_delete_oduc3(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC3',
+ 'operation': 'delete',
+ 'service-rate': '300',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_35_check_no_interface_oduc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 2f) Delete OTUC3 device interfaces
+ def test_36_service_path_delete_otuc3(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC3',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam8',
+ 'operation': 'delete',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_37_check_no_interface_otuc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_38_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_39_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 3a) create a OTUC4 device renderer
+ def test_40_service_path_create_otuc3(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC4',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam16',
+ 'operation': 'create',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
+ 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
+
+ def test_41_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_42_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'type': 'org-openroadm-interfaces:otsi',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ "frequency": 196.0812,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qam16"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+ def test_43_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+ 'type': 'org-openroadm-interfaces:otsi-group',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+ response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+ def test_44_check_interface_otuc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
+ 'type': 'org-openroadm-interfaces:otnOtu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 4,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+ # 3b) create a ODUC4 device renderer
+ def test_45_otn_service_path_create_oduc3(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC4',
+ 'operation': 'create',
+ 'service-rate': '400',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
+
+ def test_46_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_47_check_interface_oduc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+
+ input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+ 'rate': 'org-openroadm-otn-common-types:ODUCn',
+ 'tx-sapi': 'LY9PxYJqUbw=',
+ 'tx-dapi': 'LY9PxYJqUbw=',
+ 'expected-sapi': 'LY9PxYJqUbw=',
+ 'expected-dapi': 'LY9PxYJqUbw=',
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 4
+ }
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '22', 'exp-payload-type': '22'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ # 3c) create Ethernet device renderer
+ # No change in the ethernet device renderer so skipping those tests
+ # 3d) Delete Ethernet device interfaces
+ # No change in the ethernet device renderer so skipping those tests
+
+ # 3e) Delete ODUC4 device interfaces
+ def test_48_otn_service_path_delete_oduc4(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC4',
+ 'operation': 'delete',
+ 'service-rate': '400',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_49_check_no_interface_oduc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 3f) Delete OTUC4 device interfaces
+ def test_50_service_path_delete_otuc4(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC4',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam16',
+ 'operation': 'delete',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_51_check_no_interface_otuc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_52_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_53_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # Disconnect the XPDR
+ def test_54_xpdr_device_disconnection(self):
+ response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
+
+ def test_55_xpdr_device_disconnected(self):
+ response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
+ self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
+ self.assertEqual(response['connection-status']['error-message'],
+ 'Request could not be completed because the relevant data model content does not exist')
+
+ def test_56_xpdr_device_not_connected(self):
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
+ self.assertEqual(response['node-info']['error-tag'], 'data-missing')
+ self.assertEqual(response['node-info']['error-message'],
+ 'Request could not be completed because the relevant data model content does not exist')
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
def service_path_request(operation: str, servicename: str, wavenumber: str, nodes, centerfreq: str,
slotwidth: int, minfreq: float, maxfreq: float, lowerslotnumber: int,
- higherslotnumber: int):
+ higherslotnumber: int, modulation_format="dp-qpsk"):
attr = {"renderer:input": {
"renderer:service-name": servicename,
"renderer:wave-number": wavenumber,
- "renderer:modulation-format": "dp-qpsk",
+ "renderer:modulation-format": modulation_format,
"renderer:operation": operation,
"renderer:nodes": nodes,
"renderer:center-freq": centerfreq,
else:
RESTCONF_VERSION = 'rfc8040'
-RESTCONF_BASE_URL = 'http://localhost:' + RESTCONF_PORT + RESTCONF_PATH_PREFIX[RESTCONF_VERSION]
+RESTCONF_BASE_URL = 'http://localhost:' + str(RESTCONF_PORT) + RESTCONF_PATH_PREFIX[RESTCONF_VERSION]
if 'USE_ODL_ALT_KARAF_INSTALL_DIR' in os.environ:
KARAF_INSTALLDIR = os.environ['USE_ODL_ALT_KARAF_INSTALL_DIR']
def check_device_connection(node: str):
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
+ url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}?content=nonconfig',
'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}'}
response = get_request(url[RESTCONF_VERSION].format('{}', node))
res = response.json()
def check_node_attribute_request(node: str, attribute: str, attribute_value: str):
# pylint: disable=line-too-long
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}', # nopep8
- 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'} # nopep8
+ url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}?content=nonconfig', # nopep8
+ 'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'} # nopep8
response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
res = response.json()
return_key = {'rfc8040': 'org-openroadm-device:' + attribute,
return response
#
-# TransportPCE network-utils and service-path operations
+# TransportPCE network-utils and service-path and service-implementation operations
#
def device_renderer_service_path_request(payload: dict):
url = "{}/operations/transportpce-device-renderer:service-path"
if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys(payload, 'transportpce-device-renderer:')
+ data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
else:
- data = payload
- return post_request(url, data)
+ data = {'input': payload}
+ response = post_request(url, data)
+ res = response.json()
+ return_key = {'rfc8040': 'transportpce-device-renderer:output',
+ 'draft-bierman02': 'output'}
+ return_output = res[return_key[RESTCONF_VERSION]]
+ return {'status_code': response.status_code,
+ 'output': return_output}
+
+
+def device_renderer_otn_service_path_request(payload: dict):
+ url = "{}/operations/transportpce-device-renderer:otn-service-path"
+ if RESTCONF_VERSION == 'draft-bierman02':
+ data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
+ else:
+ data = {'input': payload}
+ response = post_request(url, data)
+ res = response.json()
+ return_key = {'rfc8040': 'transportpce-device-renderer:output',
+ 'draft-bierman02': 'output'}
+ return_output = res[return_key[RESTCONF_VERSION]]
+ return {'status_code': response.status_code,
+ 'output': return_output}
+
+
+def device_renderer_service_implementation_request(payload: dict):
+ url = "{}/operations/transportpce-renderer:service-implementation-request"
+ if RESTCONF_VERSION == 'draft-bierman02':
+ data = prepend_dict_keys({'input': payload}, 'transportpce-renderer:')
+ else:
+ data = {'input': payload}
+ response = post_request(url, data)
+ res = response.json()
+ return_key = {'rfc8040': 'transportpce-renderer:output',
+ 'draft-bierman02': 'output'}
+ return_output = res[return_key[RESTCONF_VERSION]]
+ return {'status_code': response.status_code,
+ 'output': return_output}
-r{toxinidir}/tests/requirements.txt
-r{toxinidir}/tests/test-requirements.txt
setuptools>=7.0
- gnpy4tpce
+ gnpy4tpce==1.2.1
whitelist_externals = launch_tests.sh
passenv = LAUNCHER USE_LIGHTY USE_ODL_RESTCONF_VERSION
#setenv =
passenv = LAUNCHER USE_LIGHTY USE_ODL_RESTCONF_VERSION
setenv =
# USE_LIGHTY=True
-# USE_ODL_RESTCONF_VERSION=draft-bierman02
+ USE_ODL_RESTCONF_VERSION=draft-bierman02
USE_ODL_ALT_KARAF_ENV=./karaf71.env
USE_ODL_ALT_KARAF_INSTALL_DIR=karaf71
commands =
basepython = python3
deps = gitlint
commands =
- gitlint
+ gitlint --config ../.gitlint