WAITING = 20 # nominal value is 300
NODE_VERSION = '2.2.1'
- cr_serv_sample_data = {"input": {
+ cr_serv_input_data = {
"sdnc-request-header": {
"request-id": "request-1",
"rpc-action": "service-create",
"due-date": "2018-06-15T00:00:01Z",
"operator-contact": "pw1234"
}
+
+ del_serv_input_data = {
+ "sdnc-request-header": {
+ "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
+ "rpc-action": "service-delete",
+ "request-system-id": "appname",
+ "notification-url": "http://localhost:8585/NotificationServer/notify"},
+ "service-delete-req-info": {
+ "service-name": "TBD",
+ "tail-retention": "no"}
}
@classmethod
# test service-create for OCH-OTU4 service from spdr to spdr
def test_11_check_otn_topology(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nbNode = len(res['network'][0]['node'])
- self.assertEqual(nbNode, 6, 'There should be 6 nodes')
- self.assertNotIn('ietf-network-topology:link', res['network'][0])
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['node']), 6)
+ self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_12_create_OCH_OTU4_service(self):
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_13_get_OCH_OTU4_service1(self):
self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
def test_22_check_otn_topo_otu4_links(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nb_links = len(res['network'][0]['ietf-network-topology:link'])
- self.assertEqual(nb_links, 2)
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
- for link in res['network'][0]['ietf-network-topology:link']:
+ for link in response['network'][0]['ietf-network-topology:link']:
self.assertIn(link['link-id'], listLinkId)
self.assertEqual(
link['transportpce-networkutils:otn-link-type'], 'OTU4')
# test service-create for ODU4 service from spdr to spdr
def test_23_create_ODU4_service(self):
- self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
- del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
- del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
-
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service1-ODU4"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
+ del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
+ del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_24_get_ODU4_service1(self):
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_27_check_otn_topo_links(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nb_links = len(res['network'][0]['ietf-network-topology:link'])
- self.assertEqual(nb_links, 4)
- for link in res['network'][0]['ietf-network-topology:link']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
+ for link in response['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
self.fail("this link should not exist")
def test_28_check_otn_topo_tp(self):
- response = test_utils.get_otn_topo_request()
- res = response.json()
- for node in res['network'][0]['node']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
# test service-create for 10GE service from spdr to spdr
def test_29_create_10GE_service(self):
- self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
- self.cr_serv_sample_data["input"]["connection-type"] = "service"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
- del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
- del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service1-10GE"
+ self.cr_serv_input_data["connection-type"] = "service"
+ self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
+ del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
+ del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_30_get_10GE_service1(self):
response['odu-connection'][0]['source'])
def test_39_check_otn_topo_links(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nb_links = len(res['network'][0]['ietf-network-topology:link'])
- self.assertEqual(nb_links, 4)
- for link in res['network'][0]['ietf-network-topology:link']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
+ for link in response['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
def test_40_check_otn_topo_tp(self):
- response = test_utils.get_otn_topo_request()
- res = response.json()
- for node in res['network'][0]['node']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
def test_41_delete_10GE_service(self):
- response = test_utils.service_delete_request("service1-10GE")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_42_check_service_list(self):
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_47_check_otn_topo_links(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nb_links = len(res['network'][0]['ietf-network-topology:link'])
- self.assertEqual(nb_links, 4)
- for link in res['network'][0]['ietf-network-topology:link']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
+ for link in response['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
def test_48_check_otn_topo_tp(self):
- response = test_utils.get_otn_topo_request()
- res = response.json()
- for node in res['network'][0]['node']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
def test_49_delete_ODU4_service(self):
- response = test_utils.service_delete_request("service1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_50_check_service_list(self):
self.test_22_check_otn_topo_otu4_links()
def test_53_check_otn_topo_tp(self):
- response = test_utils.get_otn_topo_request()
- res = response.json()
- for node in res['network'][0]['node']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
def test_54_delete_OCH_OTU4_service(self):
- response = test_utils.service_delete_request("service1-OCH-OTU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_55_get_no_service(self):
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_58_getLinks_OtnTopology(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertNotIn('ietf-network-topology:link', res['network'][0])
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_59_check_openroadm_topo_spdra(self):
response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
def test_66_create_OCH_OTU4_service_2(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
- self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
+ self.cr_serv_input_data["connection-type"] = "infrastructure"
+ self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
+ self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_67_get_OCH_OTU4_service2(self):
def test_68_create_ODU4_service_2(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
- self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
-
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service2-ODU4"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
+ self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_69_get_ODU4_service2(self):
def test_70_create_1GE_service(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
- self.cr_serv_sample_data["input"]["connection-type"] = "service"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
- del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service1-1GE"
+ self.cr_serv_input_data["connection-type"] = "service"
+ self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
+ self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
+ del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_71_get_1GE_service1(self):
response['odu-connection'][0]['source'])
def test_80_check_otn_topo_links(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nb_links = len(res['network'][0]['ietf-network-topology:link'])
- self.assertEqual(nb_links, 4)
- for link in res['network'][0]['ietf-network-topology:link']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
+ for link in response['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
def test_81_check_otn_topo_tp(self):
- response = test_utils.get_otn_topo_request()
- res = response.json()
- for node in res['network'][0]['node']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
def test_82_delete_1GE_service(self):
- response = test_utils.service_delete_request("service1-1GE")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_83_check_service_list(self):
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_88_check_otn_topo_links(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nb_links = len(res['network'][0]['ietf-network-topology:link'])
- self.assertEqual(nb_links, 4)
- for link in res['network'][0]['ietf-network-topology:link']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
+ for link in response['network'][0]['ietf-network-topology:link']:
linkId = link['link-id']
if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
def test_89_check_otn_topo_tp(self):
- response = test_utils.get_otn_topo_request()
- res = response.json()
- for node in res['network'][0]['node']:
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
tpList = node['ietf-network-topology:termination-point']
for tp in tpList:
len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
def test_90_delete_ODU4_service(self):
- response = test_utils.service_delete_request("service2-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_91_delete_OCH_OTU4_service(self):
- response = test_utils.service_delete_request("service2-OCH-OTU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_92_disconnect_xponders_from_roadm(self):