response['nodes'][0]['mapping'])
def test_05_service_path_create(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
'wave-number': '7',
self.assertIn('not-reserved-inuse', response['circuit-packs'][0]["equipment-state"])
def test_14_service_path_delete(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
'wave-number': '7',
self.assertEqual(response.status_code, requests.codes.ok)
def test_11_get_PM_ROADMA(self):
- response = test_utils_rfc8040.olm_get_pm_request({
- 'node-id': 'ROADMA01',
- 'resource-type': 'interface',
- 'granularity': '15min',
- 'resource-identifier': {
- 'resource-name': 'OTS-DEG1-TTP-TXRX'
- }
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'get-pm',
+ {
+ 'node-id': 'ROADMA01',
+ 'resource-type': 'interface',
+ 'granularity': '15min',
+ 'resource-identifier': {
+ 'resource-name': 'OTS-DEG1-TTP-TXRX'
+ }
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn({
'pmparameter-name': 'OpticalPowerOutput',
}, response['output']['measurements'])
def test_12_get_PM_ROADMC(self):
- response = test_utils_rfc8040.olm_get_pm_request({
- 'node-id': 'ROADMC01',
- 'resource-type': 'interface',
- 'granularity': '15min',
- 'resource-identifier': {
- 'resource-name': 'OTS-DEG2-TTP-TXRX'
- }
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'get-pm',
+ {
+ 'node-id': 'ROADMC01',
+ 'resource-type': 'interface',
+ 'granularity': '15min',
+ 'resource-identifier': {
+ 'resource-name': 'OTS-DEG2-TTP-TXRX'
+ }
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn({
'pmparameter-name': 'OpticalPowerOutput',
}, response['output']['measurements'])
def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
- response = test_utils_rfc8040.olm_calculate_spanloss_base_request({
- 'src-type': 'link',
- 'link-id': 'ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX'
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'calculate-spanloss-base',
+ {
+ 'src-type': 'link',
+ 'link-id': 'ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX'
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success',
response['output']['result'])
time.sleep(5)
def test_14_calculate_span_loss_base_all(self):
- response = test_utils_rfc8040.olm_calculate_spanloss_base_request({
- 'src-type': 'all'
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'calculate-spanloss-base',
+ {
+ 'src-type': 'all'
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success',
response['output']['result'])
self.assertEqual(float(response['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive']), 5.7)
def test_17_servicePath_create_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'create',
- 'nodes':
- [{'node-id': 'XPDRA01',
- 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADMA01',
- 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADMC01',
- 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG2-TTP-TXRX'},
- {'node-id': 'XPDRC01',
- 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes':
+ [{'node-id': 'XPDRA01',
+ 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
+ {'node-id': 'ROADMA01',
+ 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'ROADMC01',
+ 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG2-TTP-TXRX'},
+ {'node-id': 'XPDRC01',
+ 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
time.sleep(10)
def test_18_servicePath_create_ZToA(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'create',
- 'nodes':
- [{'node-id': 'XPDRC01',
- 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADMC01',
- 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADMA01',
- 'src-tp': 'DEG1-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'XPDRA01',
- 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes':
+ [{'node-id': 'XPDRC01',
+ 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
+ {'node-id': 'ROADMC01',
+ 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'ROADMA01',
+ 'src-tp': 'DEG1-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'XPDRA01',
+ 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
'lower-spectral-slot-number': 761,
'higher-spectral-slot-number': 768
- })
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
time.sleep(10)
def test_19_service_power_setup_XPDRA_XPDRC(self):
- response = test_utils_rfc8040.olm_service_power_setup_request({
- 'service-name': 'test',
- 'wave-number': 1,
- 'nodes': [
- {
- 'dest-tp': 'XPDR1-NETWORK1',
- 'src-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDRA01'
- },
- {
- 'dest-tp': 'DEG1-TTP-TXRX',
- 'src-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADMA01'
- },
- {
- 'dest-tp': 'SRG1-PP1-TXRX',
- 'src-tp': 'DEG2-TTP-TXRX',
- 'node-id': 'ROADMC01'
- },
- {
- 'dest-tp': 'XPDR1-CLIENT1',
- 'src-tp': 'XPDR1-NETWORK1',
- 'node-id': 'XPDRC01'
- }
- ],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'service-power-setup',
+ {
+ 'service-name': 'test',
+ 'wave-number': 1,
+ 'nodes': [
+ {
+ 'dest-tp': 'XPDR1-NETWORK1',
+ 'src-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDRA01'
+ },
+ {
+ 'dest-tp': 'DEG1-TTP-TXRX',
+ 'src-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADMA01'
+ },
+ {
+ 'dest-tp': 'SRG1-PP1-TXRX',
+ 'src-tp': 'DEG2-TTP-TXRX',
+ 'node-id': 'ROADMC01'
+ },
+ {
+ 'dest-tp': 'XPDR1-CLIENT1',
+ 'src-tp': 'XPDR1-NETWORK1',
+ 'node-id': 'XPDRC01'
+ }
+ ],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success', response['output']['result'])
self.assertEqual("power", response['roadm-connections'][0]['opticalControlMode'])
def test_23_service_power_setup_XPDRC_XPDRA(self):
- response = test_utils_rfc8040.olm_service_power_setup_request({
- 'service-name': 'test',
- 'wave-number': 1,
- 'nodes': [
- {
- 'dest-tp': 'XPDR1-NETWORK1',
- 'src-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDRC01'
- },
- {
- 'dest-tp': 'DEG2-TTP-TXRX',
- 'src-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADMC01'
- },
- {
- 'src-tp': 'DEG1-TTP-TXRX',
- 'dest-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADMA01'
- },
- {
- 'src-tp': 'XPDR1-NETWORK1',
- 'dest-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDRA01'
- }
- ],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'service-power-setup',
+ {
+ 'service-name': 'test',
+ 'wave-number': 1,
+ 'nodes': [
+ {
+ 'dest-tp': 'XPDR1-NETWORK1',
+ 'src-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDRC01'
+ },
+ {
+ 'dest-tp': 'DEG2-TTP-TXRX',
+ 'src-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADMC01'
+ },
+ {
+ 'src-tp': 'DEG1-TTP-TXRX',
+ 'dest-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADMA01'
+ },
+ {
+ 'src-tp': 'XPDR1-NETWORK1',
+ 'dest-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDRA01'
+ }
+ ],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success', response['output']['result'])
self.assertEqual(float(response['roadm-connections'][0]['target-output-power']), -0.63)
def test_26_service_power_turndown_XPDRA_XPDRC(self):
- response = test_utils_rfc8040.olm_service_power_turndown_request({
- 'service-name': 'test',
- 'wave-number': 1,
- 'nodes': [
- {
- 'dest-tp': 'XPDR1-NETWORK1',
- 'src-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDRA01'
- },
- {
- 'dest-tp': 'DEG1-TTP-TXRX',
- 'src-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADMA01'
- },
- {
- 'dest-tp': 'SRG1-PP1-TXRX',
- 'src-tp': 'DEG2-TTP-TXRX',
- 'node-id': 'ROADMC01'
- },
- {
- 'dest-tp': 'XPDR1-CLIENT1',
- 'src-tp': 'XPDR1-NETWORK1',
- 'node-id': 'XPDRC01'
- }
- ],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'service-power-turndown',
+ {
+ 'service-name': 'test',
+ 'wave-number': 1,
+ 'nodes': [
+ {
+ 'dest-tp': 'XPDR1-NETWORK1',
+ 'src-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDRA01'
+ },
+ {
+ 'dest-tp': 'DEG1-TTP-TXRX',
+ 'src-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADMA01'
+ },
+ {
+ 'dest-tp': 'SRG1-PP1-TXRX',
+ 'src-tp': 'DEG2-TTP-TXRX',
+ 'node-id': 'ROADMC01'
+ },
+ {
+ 'dest-tp': 'XPDR1-CLIENT1',
+ 'src-tp': 'XPDR1-NETWORK1',
+ 'node-id': 'XPDRC01'
+ }
+ ],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success', response['output']['result'])
self.assertEqual("off", response['roadm-connections'][0]['opticalControlMode'])
def test_29_servicePath_delete_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'delete',
- 'nodes':
- [{'node-id': 'XPDRA01',
- 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADMA01',
- 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADMC01',
- 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG2-TTP-TXRX'},
- {'node-id': 'XPDRC01',
- 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes':
+ [{'node-id': 'XPDRA01',
+ 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
+ {'node-id': 'ROADMA01',
+ 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'ROADMC01',
+ 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG2-TTP-TXRX'},
+ {'node-id': 'XPDRC01',
+ 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
time.sleep(10)
def test_30_servicePath_delete_ZToA(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'delete',
- 'nodes':
- [{'node-id': 'XPDRC01',
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes':
+ [{'node-id': 'XPDRC01',
'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADMC01',
+ {'node-id': 'ROADMC01',
'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADMA01',
+ {'node-id': 'ROADMA01',
'src-tp': 'DEG1-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'XPDRA01',
+ {'node-id': 'XPDRA01',
'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
time.sleep(10)
self.assertEqual(response.status_code, requests.codes.ok)
def test_33_servicePath_create_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test2',
- 'wave-number': '2',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'create',
- 'nodes':
- [{'node-id': 'XPDRA01',
- 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
- {'node-id': 'ROADMA01',
- 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
- 'center-freq': 196.05,
- 'nmc-width': 40,
- 'min-freq': 196.025,
- 'max-freq': 196.075,
- 'lower-spectral-slot-number': 753,
- 'higher-spectral-slot-number': 760
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test2',
+ 'wave-number': '2',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes':
+ [{'node-id': 'XPDRA01',
+ 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
+ {'node-id': 'ROADMA01',
+ 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
+ 'center-freq': 196.05,
+ 'nmc-width': 40,
+ 'min-freq': 196.025,
+ 'max-freq': 196.075,
+ 'lower-spectral-slot-number': 753,
+ 'higher-spectral-slot-number': 760
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes', response['output']['result'])
time.sleep(10)
self.assertEqual(int(response['org-openroadm-optical-channel-interfaces:och']['wavelength-number']), 2)
def test_35_servicePath_delete_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test2',
- 'wave-number': '2',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'delete',
- 'nodes':
- [{'node-id': 'XPDRA01',
- 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
- {'node-id': 'ROADMA01',
- 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
- 'center-freq': 196.05,
- 'nmc-width': 40,
- 'min-freq': 196.025,
- 'max-freq': 196.075,
- 'lower-spectral-slot-number': 753,
- 'higher-spectral-slot-number': 760
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test2',
+ 'wave-number': '2',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes':
+ [{'node-id': 'XPDRA01',
+ 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
+ {'node-id': 'ROADMA01',
+ 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
+ 'center-freq': 196.05,
+ 'nmc-width': 40,
+ 'min-freq': 196.025,
+ 'max-freq': 196.075,
+ 'lower-spectral-slot-number': 753,
+ 'higher-spectral-slot-number': 760
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
time.sleep(10)
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_38_calculate_span_loss_current(self):
- response = test_utils_rfc8040.olm_calculate_spanloss_current_request()
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'calculate-spanloss-current',
+ None)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success',
response["output"]["result"])
# Renderer interface creations
def test_07_device_renderer(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'modulation-format': 'dp-qpsk',
'operation': 'create',
response['nodes'][0]['mapping'])
def test_05_service_path_create(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
'wave-number': '7',
# FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
def test_17_service_path_delete(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
'wave-number': '7',
response['mapping'])
def test_04_service_path_create_OCH_OTU4(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
'wave-number': '7',
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_08_otn_service_path_create_ODU4(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODU4',
'operation': 'create',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_11_otn_service_path_create_10GE(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service1',
'operation': 'create',
response['odu-connection'][0]['source'])
def test_16_otn_service_path_delete_10GE(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service1',
'operation': 'delete',
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_21_otn_service_path_delete_ODU4(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODU4',
'operation': 'delete',
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_23_service_path_delete_OCH_OTU4(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
'wave-number': '7',
self.assertEqual(response['connection-status'], 'connected')
def test_03_service_create_OTU4(self):
- response = test_utils_rfc8040.renderer_service_implementation_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-renderer', 'service-implementation-request',
{
'service-name': 'SPDRA-SPDRC-OTU4-ODU4',
'connection-type': 'infrastructure',
# Test creation of ODU4 service
def test_08_service_create_ODU4(self):
- response = test_utils_rfc8040.renderer_service_implementation_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-renderer', 'service-implementation-request',
{
'service-name':
'SPDRA-SPDRC-OTU4-ODU4',
# Test creation of 10G service
def test_11_service_create_10GE(self):
- response = test_utils_rfc8040.renderer_service_implementation_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-renderer', 'service-implementation-request',
{
'service-name': 'SPDRA-SPDRC-10G',
'connection-type': 'service',
self.assertEqual(response.status_code, requests.codes.ok)
def test_11_get_PM_ROADMA(self):
- response = test_utils_rfc8040.olm_get_pm_request({
- 'node-id': 'ROADM-A1',
- 'resource-type': 'interface',
- 'granularity': '15min',
- 'resource-identifier': {
- 'resource-name': 'OTS-DEG2-TTP-TXRX'
- }
-
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'get-pm',
+ {
+ 'node-id': 'ROADM-A1',
+ 'resource-type': 'interface',
+ 'granularity': '15min',
+ 'resource-identifier': {
+ 'resource-name': 'OTS-DEG2-TTP-TXRX'
+ }
+
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn({
'pmparameter-name': 'OpticalPowerOutput',
}, response['output']['measurements'])
def test_12_get_PM_ROADMC(self):
- response = test_utils_rfc8040.olm_get_pm_request({
- 'node-id': 'ROADM-C1',
- 'resource-type': 'interface',
- 'granularity': '15min',
- 'resource-identifier': {
- 'resource-name': 'OTS-DEG1-TTP-TXRX'
- }
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'get-pm',
+ {
+ 'node-id': 'ROADM-C1',
+ 'resource-type': 'interface',
+ 'granularity': '15min',
+ 'resource-identifier': {
+ 'resource-name': 'OTS-DEG1-TTP-TXRX'
+ }
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn({
'pmparameter-name': 'OpticalPowerOutput',
}, response['output']['measurements'])
def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
- response = test_utils_rfc8040.olm_calculate_spanloss_base_request({
- 'src-type': 'link',
- 'link-id': 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX'
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'calculate-spanloss-base',
+ {
+ 'src-type': 'link',
+ 'link-id': 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX'
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success',
response['output']['result'])
time.sleep(5)
def test_14_calculate_span_loss_base_all(self):
- response = test_utils_rfc8040.olm_calculate_spanloss_base_request({
- 'src-type': 'all'
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'calculate-spanloss-base',
+ {
+ 'src-type': 'all'
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success',
response['output']['result'])
self.assertEqual(float(response['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive']), 17.6)
def test_17_servicePath_create_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'create',
- 'nodes':
- [{'node-id': 'XPDR-A1',
- 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADM-A1',
- 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADM-C1',
- 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG1-TTP-TXRX'},
- {'node-id': 'XPDR-C1',
- 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes':
+ [{'node-id': 'XPDR-A1',
+ 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
+ {'node-id': 'ROADM-A1',
+ 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'ROADM-C1',
+ 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG1-TTP-TXRX'},
+ {'node-id': 'XPDR-C1',
+ 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
time.sleep(10)
def test_18_servicePath_create_ZToA(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'create',
- 'nodes':
- [{'node-id': 'XPDR-C1',
- 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADM-C1',
- 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADM-A1',
- 'src-tp': 'DEG2-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'XPDR-A1',
- 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes':
+ [{'node-id': 'XPDR-C1',
+ 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
+ {'node-id': 'ROADM-C1',
+ 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'ROADM-A1',
+ 'src-tp': 'DEG2-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'XPDR-A1',
+ 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
time.sleep(10)
def test_19_service_power_setup_XPDRA_XPDRC(self):
- response = test_utils_rfc8040.olm_service_power_setup_request({
- 'service-name': 'test',
- 'wave-number': 1,
- 'nodes': [
- {
- 'dest-tp': 'XPDR1-NETWORK1',
- 'src-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDR-A1'
- },
- {
- 'dest-tp': 'DEG2-TTP-TXRX',
- 'src-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADM-A1'
- },
- {
- 'dest-tp': 'SRG1-PP1-TXRX',
- 'src-tp': 'DEG1-TTP-TXRX',
- 'node-id': 'ROADM-C1'
- },
- {
- 'dest-tp': 'XPDR1-CLIENT1',
- 'src-tp': 'XPDR1-NETWORK1',
- 'node-id': 'XPDR-C1'
- }
- ],
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'service-power-setup',
+ {
+ 'service-name': 'test',
+ 'wave-number': 1,
+ 'nodes': [
+ {
+ 'dest-tp': 'XPDR1-NETWORK1',
+ 'src-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDR-A1'
+ },
+ {
+ 'dest-tp': 'DEG2-TTP-TXRX',
+ 'src-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADM-A1'
+ },
+ {
+ 'dest-tp': 'SRG1-PP1-TXRX',
+ 'src-tp': 'DEG1-TTP-TXRX',
+ 'node-id': 'ROADM-C1'
+ },
+ {
+ 'dest-tp': 'XPDR1-CLIENT1',
+ 'src-tp': 'XPDR1-NETWORK1',
+ 'node-id': 'XPDR-C1'
+ }
+ ],
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success', response['output']['result'])
self.assertEqual("power", response['roadm-connections'][0]['opticalControlMode'])
def test_23_service_power_setup_XPDRC_XPDRA(self):
- response = test_utils_rfc8040.olm_service_power_setup_request({
- 'service-name': 'test',
- 'wave-number': 1,
- 'nodes': [
- {
- 'dest-tp': 'XPDR1-NETWORK1',
- 'src-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDR-C1'
- },
- {
- 'dest-tp': 'DEG1-TTP-TXRX',
- 'src-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADM-C1'
- },
- {
- 'src-tp': 'DEG2-TTP-TXRX',
- 'dest-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADM-A1'
- },
- {
- 'src-tp': 'XPDR1-NETWORK1',
- 'dest-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDR-A1'
- }
- ],
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'service-power-setup',
+ {
+ 'service-name': 'test',
+ 'wave-number': 1,
+ 'nodes': [
+ {
+ 'dest-tp': 'XPDR1-NETWORK1',
+ 'src-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDR-C1'
+ },
+ {
+ 'dest-tp': 'DEG1-TTP-TXRX',
+ 'src-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADM-C1'
+ },
+ {
+ 'src-tp': 'DEG2-TTP-TXRX',
+ 'dest-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADM-A1'
+ },
+ {
+ 'src-tp': 'XPDR1-NETWORK1',
+ 'dest-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDR-A1'
+ }
+ ],
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success', response['output']['result'])
self.assertEqual(float(response['roadm-connections'][0]['target-output-power']), 2.0)
def test_26_service_power_turndown_XPDRA_XPDRC(self):
- response = test_utils_rfc8040.olm_service_power_turndown_request({
- 'service-name': 'test',
- 'wave-number': 1,
- 'nodes': [
- {
- 'dest-tp': 'XPDR1-NETWORK1',
- 'src-tp': 'XPDR1-CLIENT1',
- 'node-id': 'XPDR-A1'
- },
- {
- 'dest-tp': 'DEG2-TTP-TXRX',
- 'src-tp': 'SRG1-PP1-TXRX',
- 'node-id': 'ROADM-A1'
- },
- {
- 'dest-tp': 'SRG1-PP1-TXRX',
- 'src-tp': 'DEG1-TTP-TXRX',
- 'node-id': 'ROADM-C1'
- },
- {
- 'dest-tp': 'XPDR1-CLIENT1',
- 'src-tp': 'XPDR1-NETWORK1',
- 'node-id': 'XPDR-C1'
- }
- ],
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'service-power-turndown',
+ {
+ 'service-name': 'test',
+ 'wave-number': 1,
+ 'nodes': [
+ {
+ 'dest-tp': 'XPDR1-NETWORK1',
+ 'src-tp': 'XPDR1-CLIENT1',
+ 'node-id': 'XPDR-A1'
+ },
+ {
+ 'dest-tp': 'DEG2-TTP-TXRX',
+ 'src-tp': 'SRG1-PP1-TXRX',
+ 'node-id': 'ROADM-A1'
+ },
+ {
+ 'dest-tp': 'SRG1-PP1-TXRX',
+ 'src-tp': 'DEG1-TTP-TXRX',
+ 'node-id': 'ROADM-C1'
+ },
+ {
+ 'dest-tp': 'XPDR1-CLIENT1',
+ 'src-tp': 'XPDR1-NETWORK1',
+ 'node-id': 'XPDR-C1'
+ }
+ ],
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success', response['output']['result'])
self.assertEqual("off", response['roadm-connections'][0]['opticalControlMode'])
def test_29_servicePath_delete_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'delete',
- 'nodes':
- [{'node-id': 'XPDR-A1',
- 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADM-A1',
- 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADM-C1',
- 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG1-TTP-TXRX'},
- {'node-id': 'XPDR-C1',
- 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes':
+ [{'node-id': 'XPDR-A1',
+ 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
+ {'node-id': 'ROADM-A1',
+ 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'ROADM-C1',
+ 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG1-TTP-TXRX'},
+ {'node-id': 'XPDR-C1',
+ 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
time.sleep(10)
def test_30_servicePath_delete_ZToA(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test',
- 'wave-number': '1',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'delete',
- 'nodes':
- [{'node-id': 'XPDR-C1',
- 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
- {'node-id': 'ROADM-C1',
- 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'ROADM-A1',
- 'src-tp': 'DEG2-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
- {'node-id': 'XPDR-A1',
- 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 761,
- 'higher-spectral-slot-number': 768
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test',
+ 'wave-number': '1',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes':
+ [{'node-id': 'XPDR-C1',
+ 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
+ {'node-id': 'ROADM-C1',
+ 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'ROADM-A1',
+ 'src-tp': 'DEG2-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
+ {'node-id': 'XPDR-A1',
+ 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
time.sleep(10)
self.assertEqual(response.status_code, requests.codes.ok)
def test_33_servicePath_create_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test2',
- 'wave-number': '2',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'create',
- 'nodes':
- [{'node-id': 'XPDR-A1',
- 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
- {'node-id': 'ROADM-A1',
- 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 753,
- 'higher-spectral-slot-number': 760
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test2',
+ 'wave-number': '2',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes':
+ [{'node-id': 'XPDR-A1',
+ 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
+ {'node-id': 'ROADM-A1',
+ 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 753,
+ 'higher-spectral-slot-number': 760
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes', response['output']['result'])
time.sleep(10)
# self.assertEqual(2, response['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
def test_35_servicePath_delete_AToZ(self):
- response = test_utils_rfc8040.device_renderer_service_path_request({
- 'service-name': 'test2',
- 'wave-number': '2',
- 'modulation-format': 'dp-qpsk',
- 'operation': 'delete',
- 'nodes':
- [{'node-id': 'XPDR-A1',
- 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
- {'node-id': 'ROADM-A1',
- 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
- 'center-freq': 196.1,
- 'nmc-width': 40,
- 'min-freq': 196.075,
- 'max-freq': 196.125,
- 'lower-spectral-slot-number': 753,
- 'higher-spectral-slot-number': 760
- })
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
+ {
+ 'service-name': 'test2',
+ 'wave-number': '2',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes':
+ [{'node-id': 'XPDR-A1',
+ 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
+ {'node-id': 'ROADM-A1',
+ 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 753,
+ 'higher-spectral-slot-number': 760
+ })
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Request processed', response['output']['result'])
time.sleep(10)
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_38_calculate_span_loss_current(self):
- response = test_utils_rfc8040.olm_calculate_spanloss_current_request()
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-olm', 'calculate-spanloss-current',
+ None)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Success',
response["output"]["result"])
# this test has been removed, since it already exists in port-mapping
# 1a) create a OTUC2 device renderer
def test_02_service_path_create_otuc2(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC2',
'wave-number': '0',
# 1b) create a ODUC2 device renderer
def test_07_otn_service_path_create_oduc2(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC2',
'operation': 'create',
# 1c) create Ethernet device renderer
def test_10_otn_service_path_create_100ge(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_Ethernet',
'operation': 'create',
# 1d) Delete Ethernet device interfaces
def test_15_otn_service_path_delete_100ge(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_Ethernet',
'operation': 'delete',
# 1e) Delete ODUC2 device interfaces
def test_20_otn_service_path_delete_oduc2(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC2',
'operation': 'delete',
# 1f) Delete OTUC2 device interfaces
def test_22_service_path_delete_otuc2(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC2',
'wave-number': '0',
# 2a) create a OTUC3 device renderer
def test_26_service_path_create_otuc3(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC3',
'wave-number': '0',
# 2b) create a ODUC3 device renderer
def test_31_otn_service_path_create_oduc3(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC3',
'operation': 'create',
# 2e) Delete ODUC3 device interfaces
def test_34_otn_service_path_delete_oduc3(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC3',
'operation': 'delete',
# 2f) Delete OTUC3 device interfaces
def test_36_service_path_delete_otuc3(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC3',
'wave-number': '0',
# 3a) create a OTUC4 device renderer
def test_40_service_path_create_otuc4(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC4',
'wave-number': '0',
# 3b) create a ODUC4 device renderer
def test_45_otn_service_path_create_oduc3(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC4',
'operation': 'create',
# 3e) Delete ODUC4 device interfaces
def test_48_otn_service_path_delete_oduc4(self):
- response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC4',
'operation': 'delete',
# 3f) Delete OTUC4 device interfaces
def test_50_service_path_delete_otuc4(self):
- response = test_utils_rfc8040.device_renderer_service_path_request(
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC4',
'wave-number': '0',
return post_request(url, data)
-def device_renderer_service_path_request(payload: dict):
- url = "{}/operations/transportpce-device-renderer:service-path"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
- else:
- data = {'input': payload}
- response = post_request(url, data)
- res = response.json()
- return_key = {'rfc8040': 'transportpce-device-renderer:output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
- return {'status_code': response.status_code,
- 'output': return_output}
-
-
-def device_renderer_otn_service_path_request(payload: dict):
- url = "{}/operations/transportpce-device-renderer:otn-service-path"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
- else:
- data = {'input': payload}
- response = post_request(url, data)
- res = response.json()
- return_key = {'rfc8040': 'transportpce-device-renderer:output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
- return {'status_code': response.status_code,
- 'output': return_output}
-
-
-def renderer_service_implementation_request(payload: dict):
- url = "{}/operations/transportpce-renderer:service-implementation-request"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-renderer:')
- else:
- data = {'input': payload}
- response = post_request(url, data)
- res = response.json()
- return_key = {'rfc8040': 'transportpce-renderer:output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
- return {'status_code': response.status_code,
- 'output': return_output}
-
-
-def olm_get_pm_request(payload: dict):
- url = "{}/operations/transportpce-olm:get-pm"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-olm:')
- else:
- data = {'input': payload}
- response = post_request(url, data)
- res = response.json()
- return_key = {'rfc8040': 'transportpce-olm:output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
- return {'status_code': response.status_code,
- 'output': return_output}
-
-
-def olm_calculate_spanloss_base_request(payload: dict):
- url = "{}/operations/transportpce-olm:calculate-spanloss-base"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-olm:')
- else:
- data = {'input': payload}
- response = post_request(url, data)
- res = response.json()
- return_key = {'rfc8040': 'transportpce-olm:output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
- return {'status_code': response.status_code,
- 'output': return_output}
-
-
-def olm_service_power_setup_request(payload: dict):
- url = "{}/operations/transportpce-olm:service-power-setup"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-olm:')
+def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict):
+ # pylint: disable=consider-using-f-string
+ url = "{}/operations/{}:{}".format('{}', api_module, rpc)
+ if payload is None:
+ data = None
+ elif RESTCONF_VERSION == 'draft-bierman02':
+ data = prepend_dict_keys({'input': payload}, api_module + ':')
else:
data = {'input': payload}
response = post_request(url, data)
res = response.json()
- return_key = {'rfc8040': 'transportpce-olm:output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
- return {'status_code': response.status_code,
- 'output': return_output}
-
-
-def olm_service_power_turndown_request(payload: dict):
- url = "{}/operations/transportpce-olm:service-power-turndown"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-olm:')
- else:
- data = {'input': payload}
- response = post_request(url, data)
- res = response.json()
- return_key = {'rfc8040': 'transportpce-olm:output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
- return {'status_code': response.status_code,
- 'output': return_output}
-
-
-def olm_calculate_spanloss_current_request():
- url = "{}/operations/transportpce-olm:calculate-spanloss-current"
- response = post_request(url, None)
- res = response.json()
- return_key = {'rfc8040': 'transportpce-olm:output',
+ return_key = {'rfc8040': api_module + ':output',
'draft-bierman02': 'output'}
return_output = res[return_key[RESTCONF_VERSION]]
return {'status_code': response.status_code,