def test_05_service_path_create(self):
response = test_utils_rfc8040.device_renderer_service_path_request(
- {'input': {
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ROADMA01', response['output']['result'])
def test_14_service_path_delete(self):
response = test_utils_rfc8040.device_renderer_service_path_request(
- {'input': {
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(response['output'], {'result': 'Request processed', 'success': True})
# Renderer interface creations
def test_07_device_renderer(self):
response = test_utils_rfc8040.device_renderer_service_path_request(
- {'input': {
+ {
'modulation-format': 'dp-qpsk',
'operation': 'create',
'service-name': 'testNMC-MC',
'max-freq': 196.09375,
'lower-spectral-slot-number': 749,
'higher-spectral-slot-number': 763
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
time.sleep(10)
def test_05_service_path_create(self):
response = test_utils_rfc8040.device_renderer_service_path_request(
- {'input': {
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ROADM-A1', response['output']['result'])
def test_17_service_path_delete(self):
response = test_utils_rfc8040.device_renderer_service_path_request(
- {'input': {
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 195.825,
'lower-spectral-slot-number': 713,
'higher-spectral-slot-number': 720
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(response['output'], {'result': 'Request processed', 'success': True})
def test_04_service_path_create_OCH_OTU4(self):
response = test_utils_rfc8040.device_renderer_service_path_request(
- {'input': {
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 196.125,
'lower-spectral-slot-number': 761,
'higher-spectral-slot-number': 768
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
self.assertTrue(response['output']['success'])
def test_08_otn_service_path_create_ODU4(self):
response = test_utils_rfc8040.device_renderer_otn_service_path_request(
- {'input': {
+ {
'service-name': 'service_ODU4',
'operation': 'create',
'service-rate': '100',
'service-format': 'ODU',
'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
self.assertTrue(response['output']['success'])
def test_11_otn_service_path_create_10GE(self):
response = test_utils_rfc8040.device_renderer_otn_service_path_request(
- {'input': {
+ {
'service-name': 'service1',
'operation': 'create',
'service-rate': '10',
'ethernet-encoding': 'eth encode',
'trib-slot': ['1'],
'trib-port-number': '1'
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
self.assertTrue(response['output']['success'])
def test_16_otn_service_path_delete_10GE(self):
response = test_utils_rfc8040.device_renderer_otn_service_path_request(
- {'input': {
+ {
'service-name': 'service1',
'operation': 'delete',
'service-rate': '10',
'ethernet-encoding': 'eth encode',
'trib-slot': ['1'],
'trib-port-number': '1'
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
self.assertTrue(response['output']['success'])
def test_21_otn_service_path_delete_ODU4(self):
response = test_utils_rfc8040.device_renderer_otn_service_path_request(
- {'input': {
+ {
'service-name': 'service_ODU4',
'operation': 'delete',
'service-rate': '100',
'service-format': 'ODU',
'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
self.assertTrue(response['output']['success'])
def test_23_service_path_delete_OCH_OTU4(self):
response = test_utils_rfc8040.device_renderer_service_path_request(
- {'input': {
+ {
'service-name': 'service_test',
'wave-number': '7',
'modulation-format': 'dp-qpsk',
'max-freq': 196.125,
'lower-spectral-slot-number': 761,
'higher-spectral-slot-number': 768
- }})
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
self.assertTrue(response['output']['success'])
sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils.CODE_SHOULD_BE_201)
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(10)
- response = test_utils.get_netconf_oper_request("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
def test_02_connect_SPDR_SC1(self):
- response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils.CODE_SHOULD_BE_201)
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(10)
- response = test_utils.get_netconf_oper_request("SPDR-SC1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("SPDR-SC1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
def test_03_service_create_OTU4(self):
- url = "{}/operations/transportpce-renderer:service-implementation-request"
- data = {
- "transportpce-renderer:input": {
- "transportpce-renderer:service-name": "SPDRA-SPDRC-OTU4-ODU4",
- "transportpce-renderer:connection-type": "infrastructure",
- "transportpce-renderer:service-handler-header": {
- "transportpce-renderer:request-id": "abcd12-efgh34"
+ response = test_utils_rfc8040.device_renderer_service_implementation_request(
+ {
+ 'service-name': 'SPDRA-SPDRC-OTU4-ODU4',
+ 'connection-type': 'infrastructure',
+ 'service-handler-header': {
+ 'request-id': 'abcd12-efgh34'
},
- "transportpce-renderer:service-a-end": {
- "transportpce-renderer:service-format": "OTU",
- "transportpce-renderer:otu-service-rate": "org-openroadm-otn-common-types:OTU4",
- "transportpce-renderer:clli": "nodeSA",
- "transportpce-renderer:node-id": "SPDR-SA1"
+ 'service-a-end': {
+ 'service-format': 'OTU',
+ 'otu-service-rate': 'org-openroadm-otn-common-types:OTU4',
+ 'clli': 'nodeSA',
+ 'node-id': 'SPDR-SA1'
},
- "transportpce-renderer:service-z-end": {
- "transportpce-renderer:service-format": "OTU",
- "transportpce-renderer:otu-service-rate": "org-openroadm-otn-common-types:OTU4",
- "transportpce-renderer:clli": "nodeSC",
- "transportpce-renderer:node-id": "SPDR-SC1"
+ 'service-z-end': {
+ 'service-format': 'OTU',
+ 'otu-service-rate': 'org-openroadm-otn-common-types:OTU4',
+ 'clli': 'nodeSC',
+ 'node-id': 'SPDR-SC1'
},
- "transportpce-renderer:path-description": {
- "aToZ-direction": {
- "rate": 100,
- "transportpce-renderer:modulation-format": "dp-qpsk",
- "aToZ-wavelength-number": 1,
- "aToZ": [
+ 'path-description': {
+ 'aToZ-direction': {
+ 'rate': 100,
+ 'modulation-format': 'dp-qpsk',
+ 'aToZ-wavelength-number': 1,
+ 'aToZ': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
}
],
- "transportpce-renderer:aToZ-min-frequency": 196.075,
- "transportpce-renderer:aToZ-max-frequency": 196.125
+ 'aToZ-min-frequency': 196.075,
+ 'aToZ-max-frequency': 196.125
},
- "transportpce-renderer:zToA-direction": {
- "transportpce-renderer:zToA-wavelength-number": "1",
- "transportpce-renderer:rate": "100",
- "transportpce-renderer:modulation-format": "dp-qpsk",
- "zToA": [
+ 'zToA-direction': {
+ 'zToA-wavelength-number': '1',
+ 'rate': '100',
+ 'modulation-format': 'dp-qpsk',
+ 'zToA': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
}
],
- "transportpce-renderer:zToA-min-frequency": 196.075,
- "transportpce-renderer:zToA-max-frequency": 196.125
+ 'zToA-min-frequency': 196.075,
+ 'zToA-max-frequency': 196.125
}
}
- }
- }
- response = test_utils.post_request(url, data)
- time.sleep(3)
- print(response.json())
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Operation Successful',
- res["output"]["configuration-response-common"]["response-message"])
+ response['output']['configuration-response-common']['response-message'])
# Test OCH-OTU interfaces on SPDR-A1
def test_04_check_interface_och(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'type': 'org-openroadm-interfaces:opticalChannel',
'supporting-port': 'CP1-CFP0-P1'
- }, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ }, **response['interface'][0]),
+ response['interface'][0])
+ self.assertIn(
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
+ [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
+ {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
def test_05_check_interface_OTU(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'rate': 'org-openroadm-otn-common-types:OTU4',
'fec': 'scfec'
}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(input_dict_2,
- res['interface'][0]
- ['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
# Test OCH-OTU interfaces on SPDR-C1
def test_06_check_interface_och(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-761:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'type': 'org-openroadm-interfaces:opticalChannel',
'supporting-port': 'CP1-CFP0-P1'
- }, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ }, **response['interface'][0]),
+ response['interface'][0])
+ self.assertIn(
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
+ [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
+ {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
def test_07_check_interface_OTU(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'rate': 'org-openroadm-otn-common-types:OTU4',
'fec': 'scfec'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(input_dict_2,
- res['interface'][0]
- ['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
# Test creation of ODU4 service
def test_08_service_create_ODU4(self):
- url = "{}/operations/transportpce-renderer:service-implementation-request"
-
- data = {
- "transportpce-renderer:input": {
- "transportpce-renderer:service-name":
- "SPDRA-SPDRC-OTU4-ODU4",
- "transportpce-renderer:connection-type": "infrastructure",
- "transportpce-renderer:service-handler-header": {
- "transportpce-renderer:request-id": "abcd12-efgh34"
+ response = test_utils_rfc8040.device_renderer_service_implementation_request(
+ {
+ 'service-name':
+ 'SPDRA-SPDRC-OTU4-ODU4',
+ 'connection-type': 'infrastructure',
+ 'service-handler-header': {
+ 'request-id': 'abcd12-efgh34'
},
- "transportpce-renderer:service-a-end": {
- "transportpce-renderer:service-format": "ODU",
- "transportpce-renderer:odu-service-rate":
- "org-openroadm-otn-common-types:ODU4",
- "transportpce-renderer:clli": "nodeSA",
- "transportpce-renderer:node-id": "SPDR-SA1"
+ 'service-a-end': {
+ 'service-format': 'ODU',
+ 'odu-service-rate':
+ 'org-openroadm-otn-common-types:ODU4',
+ 'clli': 'nodeSA',
+ 'node-id': 'SPDR-SA1'
},
- "transportpce-renderer:service-z-end": {
- "transportpce-renderer:service-format": "ODU",
- "transportpce-renderer:odu-service-rate":
- "org-openroadm-otn-common-types:ODU4",
- "transportpce-renderer:clli": "nodeSC",
- "transportpce-renderer:node-id": "SPDR-SC1"
+ 'service-z-end': {
+ 'service-format': 'ODU',
+ 'odu-service-rate':
+ 'org-openroadm-otn-common-types:ODU4',
+ 'clli': 'nodeSC',
+ 'node-id': 'SPDR-SC1'
},
- "transportpce-renderer:path-description": {
- "aToZ-direction": {
- "rate": 100,
- "aToZ": [
+ 'path-description': {
+ 'aToZ-direction': {
+ 'rate': 100,
+ 'aToZ': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
}
]
},
- "transportpce-renderer:zToA-direction": {
- "transportpce-renderer:rate": "100",
- "zToA": [
+ 'zToA-direction': {
+ 'rate': '100',
+ 'zToA': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": ""
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': ''
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": ""
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': ''
}
}
]
}
}
- }
- }
- response = test_utils.post_request(url, data)
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Operation Successful',
- res["output"]["configuration-response-common"]
- ["response-message"])
+ response['output']['configuration-response-common']['response-message'])
# Test ODU4 interfaces on SPDR-A1 and SPDR-C1
def test_09_check_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'tx-dapi': 'AMf1n5hK6Xkk',
'tx-sapi': 'H/OelLynehI='}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_10_check_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'expected-sapi': 'H/OelLynehI=',
'expected-dapi': 'AMf1n5hK6Xkk'
}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
# Test creation of 10G service
def test_11_service_create_10GE(self):
- url = "{}/operations/transportpce-renderer:service-implementation-request"
-
- data = {
- "transportpce-renderer:input": {
- "transportpce-renderer:service-name": "SPDRA-SPDRC-10G",
- "transportpce-renderer:connection-type": "service",
- "transportpce-renderer:service-handler-header": {
- "transportpce-renderer:request-id": "abcd12-efgh34"
+ response = test_utils_rfc8040.device_renderer_service_implementation_request(
+ {
+ 'service-name': 'SPDRA-SPDRC-10G',
+ 'connection-type': 'service',
+ 'service-handler-header': {
+ 'request-id': 'abcd12-efgh34'
},
- "transportpce-renderer:service-a-end": {
- "transportpce-renderer:service-format": "Ethernet",
- "transportpce-renderer:service-rate": "10",
- "transportpce-renderer:clli": "nodeSA",
- "transportpce-renderer:node-id": "SPDR-SA1"
+ 'service-a-end': {
+ 'service-format': 'Ethernet',
+ 'service-rate': '10',
+ 'clli': 'nodeSA',
+ 'node-id': 'SPDR-SA1'
},
- "transportpce-renderer:service-z-end": {
- "transportpce-renderer:service-format": "Ethernet",
- "transportpce-renderer:service-rate": "10",
- "transportpce-renderer:clli": "nodeSC",
- "transportpce-renderer:node-id": "SPDR-SC1"
+ 'service-z-end': {
+ 'service-format': 'Ethernet',
+ 'service-rate': '10',
+ 'clli': 'nodeSC',
+ 'node-id': 'SPDR-SC1'
},
- "transportpce-renderer:path-description": {
- "aToZ-direction": {
- "rate": 10,
- "min-trib-slot": "1.1",
- "max-trib-slot": "1.8",
- "aToZ": [
+ 'path-description': {
+ 'aToZ-direction': {
+ 'rate': 10,
+ 'min-trib-slot': '1.1',
+ 'max-trib-slot': '1.8',
+ 'aToZ': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
}
]
},
- "transportpce-renderer:zToA-direction": {
- "rate": "10",
- "min-trib-slot": "1.1",
- "max-trib-slot": "1.8",
- "zToA": [
+ 'zToA-direction': {
+ 'rate': '10',
+ 'min-trib-slot': '1.1',
+ 'max-trib-slot': '1.8',
+ 'zToA': [
{
- "id": "0",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '0',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
},
{
- "id": "1",
- "resource": {
- "tp-node-id": "SPDR-SC1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '1',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SC1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "2",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-NETWORK1"
+ 'id': '2',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-NETWORK1'
}
},
{
- "id": "3",
- "resource": {
- "tp-node-id": "SPDR-SA1-XPDR1",
- "tp-id": "XPDR1-CLIENT1"
+ 'id': '3',
+ 'resource': {
+ 'tp-node-id': 'SPDR-SA1-XPDR1',
+ 'tp-id': 'XPDR1-CLIENT1'
}
}
]
}
}
- }
- }
-
- response = test_utils.post_request(url, data)
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Operation Successful',
- res["output"]["configuration-response-common"]
- ["response-message"])
+ response['output']['configuration-response-common']['response-message'])
# Test the interfaces on SPDR-A1
def test_12_check_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'], 10000)
def test_13_check_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_14_check_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
+ response = test_utils_rfc8040.check_node_attribute_request(
"SPDR-SA1",
- "odu-connection/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ "odu-connection", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'direction': 'bidirectional'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_15_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'monitored'}
-
input_dict_3 = {'trib-port-number': 1}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
# Test the interfaces on SPDR-C1
def test_16_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'monitored'}
-
input_dict_3 = {'trib-port-number': 1}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
def test_17_check_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SC1", "interface", "XPDR1-CLIENT1-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'], 10000)
def test_18_check_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SC1", "interface", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_19_check_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
+ response = test_utils_rfc8040.check_node_attribute_request(
"SPDR-SC1",
- "odu-connection/XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ "odu-connection", "XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G-x-XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'direction': 'bidirectional'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-SPDRA-SPDRC-10G'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_20_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SC1", "interface", "XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-SPDRA-SPDRC-10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
input_dict_3 = {'trib-port-number': 1}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
# TODO: Delete interfaces (SPDR-A1, SPDR-C1)
def test_21_disconnect_SPDR_SA1(self):
- response = test_utils.unmount_device("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_22_disconnect_SPDR_SC1(self):
- response = test_utils.unmount_device("SPDR-SC1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":
--- /dev/null
+#!/usr/bin/env python
+##############################################################################
+# Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
+import unittest
+import time
+import requests
+# pylint: disable=wrong-import-order
+import sys
+sys.path.append('transportpce_tests/common')
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils_rfc8040 # nopep8
+
+
+class TransportPCE400GPortMappingTesting(unittest.TestCase):
+
+ processes = None
+ NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
+ "supporting-port": "L1",
+ "supported-interface-capability": [
+ "org-openroadm-port-types:if-otsi-otsigroup"
+ ],
+ "port-direction": "bidirectional",
+ "port-qual": "switch-network",
+ "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+ "xponder-type": "mpdr",
+ 'lcp-hash-val': 'LY9PxYJqUbw=',
+ 'port-admin-state': 'InService',
+ 'port-oper-state': 'InService'}
+ NODE_VERSION = '7.1'
+
+ @classmethod
+ def setUpClass(cls):
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
+
+ @classmethod
+ def tearDownClass(cls):
+ # pylint: disable=not-an-iterable
+ for process in cls.processes:
+ test_utils_rfc8040.shutdown_process(process)
+ print("all processes killed")
+
+ def setUp(self):
+ # pylint: disable=consider-using-f-string
+ print("execution of {}".format(self.id().split(".")[-1]))
+ time.sleep(10)
+
+ def test_01_xpdr_device_connection(self):
+ response = test_utils_rfc8040.mount_device("XPDR-A2",
+ ('xpdra2', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created,
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
+
+ # Check if the node appears in the ietf-network topology
+ # this test has been removed, since it already exists in port-mapping
+ # 1a) create a OTUC2 device renderer
+ def test_02_service_path_create_otuc2(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC2',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
+ 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
+
+ def test_03_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_04_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'type': 'org-openroadm-interfaces:otsi',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ "frequency": 196.0812,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qpsk"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+ def test_05_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+ 'type': 'org-openroadm-interfaces:otsi-group',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+ response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+ def test_06_check_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
+ 'type': 'org-openroadm-interfaces:otnOtu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 2,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+ # 1b) create a ODUC2 device renderer
+ def test_07_otn_service_path_create_oduc2(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC2',
+ 'operation': 'create',
+ 'service-rate': '200',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
+
+ def test_08_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_09_check_interface_oduc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+
+ input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+ 'rate': 'org-openroadm-otn-common-types:ODUCn',
+ 'tx-sapi': 'LY9PxYJqUbw=',
+ 'tx-dapi': 'LY9PxYJqUbw=',
+ 'expected-sapi': 'LY9PxYJqUbw=',
+ 'expected-dapi': 'LY9PxYJqUbw=',
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 2
+ }
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '22', 'exp-payload-type': '22'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ # 1c) create Ethernet device renderer
+ def test_10_otn_service_path_create_100ge(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_Ethernet',
+ 'operation': 'create',
+ 'service-rate': '100',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'opucn-trib-slots': ['1.1', '1.20']
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
+ self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+ response['output']['node-interface'][0]['connection-id'])
+ self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
+ self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
+ response['output']['node-interface'][0]['odu-interface-id'])
+ self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
+ response['output']['node-interface'][0]['odu-interface-id'])
+
+ def test_11_check_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+ 'type': 'org-openroadm-interfaces:ethernetCsmacd',
+ 'supporting-port': 'C1'
+ }
+ input_dict_2 = {'speed': 100000}
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+
+ def test_12_check_interface_odu4_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+ 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'C1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'terminated'}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '07', 'exp-payload-type': '07'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ def test_13_check_interface_odu4_network(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'not-terminated'}
+ input_dict_3 = {'trib-port-number': 1}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(input_dict_3,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
+ self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ ['opucn-trib-slots'])
+ self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ ['opucn-trib-slots'])
+
+ def test_14_check_odu_connection_xpdra2(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2",
+ "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {
+ 'connection-name':
+ 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+ 'direction': 'bidirectional'
+ }
+
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
+ self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
+ response['odu-connection'][0]['destination'])
+ self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
+ response['odu-connection'][0]['source'])
+
+ # 1d) Delete Ethernet device interfaces
+ def test_15_otn_service_path_delete_100ge(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_Ethernet',
+ 'operation': 'delete',
+ 'service-rate': '100',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'trib-slot': ['1'],
+ 'trib-port-number': '1'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_16_check_no_odu_connection(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2",
+ "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_17_check_no_interface_odu_network(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_18_check_no_interface_odu_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_19_check_no_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 1e) Delete ODUC2 device interfaces
+ def test_20_otn_service_path_delete_oduc2(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC2',
+ 'operation': 'delete',
+ 'service-rate': '200',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_21_check_no_interface_oduc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 1f) Delete OTUC2 device interfaces
+ def test_22_service_path_delete_otuc2(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC2',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_23_check_no_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_24_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_25_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 2a) create a OTUC3 device renderer
+ def test_26_service_path_create_otuc3(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC3',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam8',
+ 'operation': 'create',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
+ 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
+
+ def test_27_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_28_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'type': 'org-openroadm-interfaces:otsi',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ "frequency": 196.0812,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qam8"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+ def test_29_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+ 'type': 'org-openroadm-interfaces:otsi-group',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+ response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+ def test_30_check_interface_otuc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
+ 'type': 'org-openroadm-interfaces:otnOtu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 3,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+ # 2b) create a ODUC3 device renderer
+ def test_31_otn_service_path_create_oduc3(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC3',
+ 'operation': 'create',
+ 'service-rate': '300',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
+
+ def test_32_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_33_check_interface_oduc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+
+ input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+ 'rate': 'org-openroadm-otn-common-types:ODUCn',
+ 'tx-sapi': 'LY9PxYJqUbw=',
+ 'tx-dapi': 'LY9PxYJqUbw=',
+ 'expected-sapi': 'LY9PxYJqUbw=',
+ 'expected-dapi': 'LY9PxYJqUbw=',
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 3
+ }
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '22', 'exp-payload-type': '22'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ # 2c) create Ethernet device renderer
+ # No change in the ethernet device renderer so skipping those tests
+ # 2d) Delete Ethernet device interfaces
+ # No change in the ethernet device renderer so skipping those tests
+
+ # 2e) Delete ODUC3 device interfaces
+ def test_34_otn_service_path_delete_oduc3(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC3',
+ 'operation': 'delete',
+ 'service-rate': '300',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_35_check_no_interface_oduc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 2f) Delete OTUC3 device interfaces
+ def test_36_service_path_delete_otuc3(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC3',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam8',
+ 'operation': 'delete',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_37_check_no_interface_otuc3(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_38_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_39_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 3a) create a OTUC4 device renderer
+ def test_40_service_path_create_otuc3(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC4',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam16',
+ 'operation': 'create',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
+ 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
+
+ def test_41_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_42_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'type': 'org-openroadm-interfaces:otsi',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ "frequency": 196.0812,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qam16"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
+ response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+ def test_43_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+ 'type': 'org-openroadm-interfaces:otsi-group',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+ response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+ def test_44_check_interface_otuc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
+ 'type': 'org-openroadm-interfaces:otnOtu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 4,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+ # 3b) create a ODUC4 device renderer
+ def test_45_otn_service_path_create_oduc3(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC4',
+ 'operation': 'create',
+ 'service-rate': '400',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+ self.assertIn(
+ {'node-id': 'XPDR-A2',
+ 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
+
+ def test_46_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response['mapping'])
+
+ def test_47_check_interface_oduc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+
+ input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+ 'rate': 'org-openroadm-otn-common-types:ODUCn',
+ 'tx-sapi': 'LY9PxYJqUbw=',
+ 'tx-dapi': 'LY9PxYJqUbw=',
+ 'expected-sapi': 'LY9PxYJqUbw=',
+ 'expected-dapi': 'LY9PxYJqUbw=',
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 4
+ }
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '22', 'exp-payload-type': '22'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ # 3c) create Ethernet device renderer
+ # No change in the ethernet device renderer so skipping those tests
+ # 3d) Delete Ethernet device interfaces
+ # No change in the ethernet device renderer so skipping those tests
+
+ # 3e) Delete ODUC4 device interfaces
+ def test_48_otn_service_path_delete_oduc4(self):
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ {
+ 'service-name': 'service_ODUC4',
+ 'operation': 'delete',
+ 'service-rate': '400',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_49_check_no_interface_oduc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # 3f) Delete OTUC4 device interfaces
+ def test_50_service_path_delete_otuc4(self):
+ response = test_utils_rfc8040.device_renderer_service_path_request(
+ {
+ 'service-name': 'service_OTUC4',
+ 'wave-number': '0',
+ 'modulation-format': 'dp-qam16',
+ 'operation': 'delete',
+ 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 75,
+ 'min-freq': 196.0375,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 755,
+ 'higher-spectral-slot-number': 768
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+
+ def test_51_check_no_interface_otuc4(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_52_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ def test_53_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+
+ # Disconnect the XPDR
+ def test_54_xpdr_device_disconnection(self):
+ response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
+
+ def test_55_xpdr_device_disconnected(self):
+ response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
+ self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
+ self.assertEqual(response['connection-status']['error-message'],
+ 'Request could not be completed because the relevant data model content does not exist')
+
+ def test_56_xpdr_device_not_connected(self):
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
+ self.assertEqual(response['node-info']['error-tag'], 'data-missing')
+ self.assertEqual(response['node-info']['error-message'],
+ 'Request could not be completed because the relevant data model content does not exist')
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
def service_path_request(operation: str, servicename: str, wavenumber: str, nodes, centerfreq: str,
slotwidth: int, minfreq: float, maxfreq: float, lowerslotnumber: int,
- higherslotnumber: int):
+ higherslotnumber: int, modulation_format="dp-qpsk"):
attr = {"renderer:input": {
"renderer:service-name": servicename,
"renderer:wave-number": wavenumber,
- "renderer:modulation-format": "dp-qpsk",
+ "renderer:modulation-format": modulation_format,
"renderer:operation": operation,
"renderer:nodes": nodes,
"renderer:center-freq": centerfreq,
return response
#
-# TransportPCE network-utils and service-path operations
+# TransportPCE network-utils and service-path and service-implementation operations
#
def device_renderer_service_path_request(payload: dict):
url = "{}/operations/transportpce-device-renderer:service-path"
if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys(payload, 'transportpce-device-renderer:')
+ data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
else:
- data = payload
+ data = {'input': payload}
response = post_request(url, data)
res = response.json()
return_key = {'rfc8040': 'transportpce-device-renderer:output',
def device_renderer_otn_service_path_request(payload: dict):
url = "{}/operations/transportpce-device-renderer:otn-service-path"
if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys(payload, 'transportpce-device-renderer:')
+ data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
else:
- data = payload
+ data = {'input': payload}
response = post_request(url, data)
res = response.json()
return_key = {'rfc8040': 'transportpce-device-renderer:output',
return_output = res[return_key[RESTCONF_VERSION]]
return {'status_code': response.status_code,
'output': return_output}
+
+
+def device_renderer_service_implementation_request(payload: dict):
+ url = "{}/operations/transportpce-renderer:service-implementation-request"
+ if RESTCONF_VERSION == 'draft-bierman02':
+ data = prepend_dict_keys({'input': payload}, 'transportpce-renderer:')
+ else:
+ data = {'input': payload}
+ response = post_request(url, data)
+ res = response.json()
+ return_key = {'rfc8040': 'transportpce-renderer:output',
+ 'draft-bierman02': 'output'}
+ return_output = res[return_key[RESTCONF_VERSION]]
+ return {'status_code': response.status_code,
+ 'output': return_output}
-r{toxinidir}/tests/requirements.txt
-r{toxinidir}/tests/test-requirements.txt
setuptools>=7.0
- gnpy4tpce
+ gnpy4tpce==1.2.1
whitelist_externals = launch_tests.sh
passenv = LAUNCHER USE_LIGHTY USE_ODL_RESTCONF_VERSION
#setenv =
passenv = LAUNCHER USE_LIGHTY USE_ODL_RESTCONF_VERSION
setenv =
# USE_LIGHTY=True
-# USE_ODL_RESTCONF_VERSION=draft-bierman02
+ USE_ODL_RESTCONF_VERSION=draft-bierman02
USE_ODL_ALT_KARAF_ENV=./karaf71.env
USE_ODL_ALT_KARAF_INSTALL_DIR=karaf71
commands =