X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Ftransportpce_tests%2Fcommon%2Ftest_utils.py;h=da746393c0d903d82b00df8ce5ddfb9229a2e10b;hb=64975190383a3682d35b8e434b8be43606f1f81a;hp=5a063b4d045615f58595448ba8ca1cc5cfd7e3a7;hpb=577378faeb91f60565d60c65504aaaebce197c94;p=transportpce.git diff --git a/tests/transportpce_tests/common/test_utils.py b/tests/transportpce_tests/common/test_utils.py index 5a063b4d0..da746393c 100644 --- a/tests/transportpce_tests/common/test_utils.py +++ b/tests/transportpce_tests/common/test_utils.py @@ -39,6 +39,13 @@ URL_CONFIG_ORDM_TOPO = "{}/config/ietf-network:networks/network/openroadm-topolo URL_CONFIG_OTN_TOPO = "{}/config/ietf-network:networks/network/otn-topology/" URL_CONFIG_CLLI_NET = "{}/config/ietf-network:networks/network/clli-network/" URL_CONFIG_ORDM_NET = "{}/config/ietf-network:networks/network/openroadm-network/" +URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/" +URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/" +URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create" +URL_SERV_DELETE = "{}/operations/org-openroadm-service:service-delete" +URL_SERVICE_PATH = "{}/operations/transportpce-device-renderer:service-path" +URL_OTN_SERVICE_PATH = "{}/operations/transportpce-device-renderer:otn-service-path" +URL_CREATE_OTS_OMS = "{}/operations/transportpce-device-renderer:create-ots-oms" TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'} TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'} @@ -308,6 +315,62 @@ def del_node_request(node: str): return delete_request(url) +def portmapping_request(suffix: str): + url = URL_PORTMAPPING + suffix + return get_request(url) + + +def get_service_list_request(suffix: str): + url = URL_OPER_SERV_LIST + suffix + return get_request(url) + + +def service_create_request(attr): + return post_request(URL_SERV_CREATE, attr) + +def service_delete_request(servicename : str, + requestid = "e3028bae-a90f-4ddd-a83f-cf224eba0e58", + notificationurl="http://localhost:8585/NotificationServer/notify"): + attr = {"input": { + "sdnc-request-header": { + "request-id": requestid, + "rpc-action": "service-delete", + "request-system-id": "appname", + "notification-url": notificationurl}, + "service-delete-req-info": { + "service-name": servicename, + "tail-retention": "no"}}} + return post_request(URL_SERV_DELETE, attr) + + +def service_path_request(operation: str, servicename: str, wavenumber: str, nodes): + attr = {"renderer:input": { + "renderer:service-name": servicename, + "renderer:wave-number": wavenumber, + "renderer:modulation-format": "qpsk", + "renderer:operation": operation, + "renderer:nodes": nodes}} + return post_request(URL_SERVICE_PATH, attr) + + +def otn_service_path_request(operation: str, servicename: str, servicerate: str, servicetype: str, nodes, eth_attr=None): + attr = {"service-name": servicename, + "operation": operation, + "service-rate": servicerate, + "service-type": servicetype, + "nodes": nodes} + if eth_attr: + attr.update(eth_attr) + return post_request(URL_OTN_SERVICE_PATH, {"renderer:input": attr}) + + +def create_ots_oms_request(nodeid: str, lcp: str): + attr = {"input": { + "node-id": nodeid, + "logical-connection-point": lcp}} + return post_request(URL_CREATE_OTS_OMS, attr) + + def shutdown_process(process): if process is not None: for child in psutil.Process(process.pid).children():