WAITING = 20 # nominal value is 300
NODE_VERSION = '2.2.1'
- cr_serv_sample_data = {"input": {
+ cr_serv_input_data = {
"sdnc-request-header": {
"request-id": "request-1",
"rpc-action": "service-create",
"due-date": "2018-06-15T00:00:01Z",
"operator-contact": "pw1234"
}
+
+ del_serv_input_data = {
+ "sdnc-request-header": {
+ "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
+ "rpc-action": "service-delete",
+ "request-system-id": "appname",
+ "notification-url": "http://localhost:8585/NotificationServer/notify"},
+ "service-delete-req-info": {
+ "service-name": "TBD",
+ "tail-retention": "no"}
}
@classmethod
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_12_create_OCH_OTU4_service(self):
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_13_get_OCH_OTU4_service1(self):
# test service-create for ODU4 service from spdr to spdr
def test_23_create_ODU4_service(self):
- self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
- del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
- del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
-
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service1-ODU4"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
+ del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
+ del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_24_get_ODU4_service1(self):
# test service-create for 10GE service from spdr to spdr
def test_29_create_10GE_service(self):
- self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
- self.cr_serv_sample_data["input"]["connection-type"] = "service"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
- del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
- del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service1-10GE"
+ self.cr_serv_input_data["connection-type"] = "service"
+ self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
+ del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
+ del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_30_get_10GE_service1(self):
1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
def test_41_delete_10GE_service(self):
- response = test_utils.service_delete_request("service1-10GE")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_42_check_service_list(self):
len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
def test_49_delete_ODU4_service(self):
- response = test_utils.service_delete_request("service1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_50_check_service_list(self):
'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
def test_54_delete_OCH_OTU4_service(self):
- response = test_utils.service_delete_request("service1-OCH-OTU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_55_get_no_service(self):
def test_66_create_OCH_OTU4_service_2(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
- self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
+ self.cr_serv_input_data["connection-type"] = "infrastructure"
+ self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
+ self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_67_get_OCH_OTU4_service2(self):
def test_68_create_ODU4_service_2(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
- self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
- del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
-
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service2-ODU4"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
+ self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
+ del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_69_get_ODU4_service2(self):
def test_70_create_1GE_service(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
- self.cr_serv_sample_data["input"]["connection-type"] = "service"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
- del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
- self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
- self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.cr_serv_input_data["service-name"] = "service1-1GE"
+ self.cr_serv_input_data["connection-type"] = "service"
+ self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
+ self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
+ del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_71_get_1GE_service1(self):
1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
def test_82_delete_1GE_service(self):
- response = test_utils.service_delete_request("service1-1GE")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_83_check_service_list(self):
len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
def test_90_delete_ODU4_service(self):
- response = test_utils.service_delete_request("service2-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_91_delete_OCH_OTU4_service(self):
- response = test_utils.service_delete_request("service2-OCH-OTU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_92_disconnect_xponders_from_roadm(self):