# Connect the tail XPDRA to ROADMA and vice versa
def test_10_connect_tail_xpdr_rdm(self):
# Connect the tail: XPDRA to ROADMA
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_11_connect_tail_rdm_xpdr(self):
# Connect the tail: ROADMA to XPDRA
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_12_getLinks_OpenRoadmTopology(self):
response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_to_roadmA(self):
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_06_connect_roadmA_to_xpdrA(self):
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_07_connect_xprdC_to_roadmC(self):
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_08_connect_roadmC_to_xpdrC(self):
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_09_create_OTS_ROADMA(self):
- response = test_utils_rfc8040.device_renderer_create_ots_oms_request('ROADMA01', 'DEG1-TTP-TXRX')
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'create-ots-oms',
+ {
+ 'node-id': 'ROADMA01',
+ 'logical-connection-point': 'DEG1-TTP-TXRX'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_10_create_OTS_ROADMC(self):
- response = test_utils_rfc8040.device_renderer_create_ots_oms_request('ROADMC01', 'DEG2-TTP-TXRX')
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'create-ots-oms',
+ {
+ 'node-id': 'ROADMC01',
+ 'logical-connection-point': 'DEG2-TTP-TXRX'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_11_get_PM_ROADMA(self):
response = test_utils_rfc8040.transportpce_api_rpc_request(
#"""to test case where SRG where the xpdr is connected to has no optical range data"""
def test_31_connect_xprdA_to_roadmA(self):
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
- 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
+ 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_32_connect_roadmA_to_xpdrA(self):
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
- 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
+ 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_33_servicePath_create_AToZ(self):
response = test_utils_rfc8040.transportpce_api_rpc_request(
# Connect the tail XPDRA to ROADMA and vice versa
def test_10_connect_tail_xpdr_rdm(self):
# Connect the tail: XPDRA to ROADMA
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_11_connect_tail_rdm_xpdr(self):
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_12_getLinks_OpenRoadmTopology(self):
# pylint: disable=redundant-unittest-assert
self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_to_roadmA(self):
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_06_connect_roadmA_to_xpdrA(self):
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_07_connect_xprdC_to_roadmC(self):
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_08_connect_roadmC_to_xpdrC(self):
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
- 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_09_create_OTS_ROADMA(self):
- response = test_utils_rfc8040.device_renderer_create_ots_oms_request('ROADM-A1', 'DEG1-TTP-TXRX')
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'create-ots-oms',
+ {
+ 'node-id': 'ROADM-A1',
+ 'logical-connection-point': 'DEG1-TTP-TXRX'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_10_create_OTS_ROADMC(self):
- response = test_utils_rfc8040.device_renderer_create_ots_oms_request('ROADM-C1', 'DEG2-TTP-TXRX')
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-device-renderer', 'create-ots-oms',
+ {
+ 'node-id': 'ROADM-C1',
+ 'logical-connection-point': 'DEG2-TTP-TXRX'
+ })
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_11_get_PM_ROADMA(self):
response = test_utils_rfc8040.transportpce_api_rpc_request(
#"""to test case where SRG where the xpdr is connected to has no optical range data"""
def test_31_connect_xprdA_to_roadmA(self):
- response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
- {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '2',
- 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '2',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_32_connect_roadmA_to_xpdrA(self):
- response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
- {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '2',
- 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'})
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '2',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_33_servicePath_create_AToZ(self):
response = test_utils_rfc8040.transportpce_api_rpc_request(
return return_dict
-def connect_xpdr_to_rdm_request(payload: dict):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': {'links-input': payload}}, 'networkutils:')
- else:
- data = {'input': {'links-input': payload}}
- return post_request(url, data)
-
-
-def connect_rdm_to_xpdr_request(payload: dict):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': {'links-input': payload}}, 'networkutils:')
- else:
- data = {'input': {'links-input': payload}}
- return post_request(url, data)
-
-
-def device_renderer_create_ots_oms_request(nodeid: str, lcp: str):
- url = "{}/operations/transportpce-device-renderer:create-ots-oms"
- payload = {
- 'node-id': nodeid,
- 'logical-connection-point': lcp}
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
- else:
- data = {'input': payload}
- return post_request(url, data)
-
-
def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict):
# pylint: disable=consider-using-f-string
url = "{}/operations/{}:{}".format('{}', api_module, rpc)