X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Ftransportpce_tests%2Fcommon%2Ftest_utils.py;h=da746393c0d903d82b00df8ce5ddfb9229a2e10b;hb=64975190383a3682d35b8e434b8be43606f1f81a;hp=33808b424e98c779ec9f4261cc5d0b77e826267a;hpb=802e7005667364a567c19bd50e9c167f63028cee;p=transportpce.git diff --git a/tests/transportpce_tests/common/test_utils.py b/tests/transportpce_tests/common/test_utils.py index 33808b424..da746393c 100644 --- a/tests/transportpce_tests/common/test_utils.py +++ b/tests/transportpce_tests/common/test_utils.py @@ -35,6 +35,17 @@ ODL_PWD = "admin" NODES_LOGIN = "admin" NODES_PWD = "admin" URL_CONFIG_NETCONF_TOPO = "{}/config/network-topology:network-topology/topology/topology-netconf/" +URL_CONFIG_ORDM_TOPO = "{}/config/ietf-network:networks/network/openroadm-topology/" +URL_CONFIG_OTN_TOPO = "{}/config/ietf-network:networks/network/otn-topology/" +URL_CONFIG_CLLI_NET = "{}/config/ietf-network:networks/network/clli-network/" +URL_CONFIG_ORDM_NET = "{}/config/ietf-network:networks/network/openroadm-network/" +URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/" +URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/" +URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create" +URL_SERV_DELETE = "{}/operations/org-openroadm-service:service-delete" +URL_SERVICE_PATH = "{}/operations/transportpce-device-renderer:service-path" +URL_OTN_SERVICE_PATH = "{}/operations/transportpce-device-renderer:otn-service-path" +URL_CREATE_OTS_OMS = "{}/operations/transportpce-device-renderer:create-ots-oms" TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'} TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'} @@ -263,6 +274,103 @@ def get_netconf_oper_request(node: str): return get_request(url) +def get_ordm_topo_request(suffix: str): + url = URL_CONFIG_ORDM_TOPO + suffix + return get_request(url) + + +def add_oms_attr_request(link: str, attr): + url = URL_CONFIG_ORDM_TOPO + ( + "ietf-network-topology:link/" + link + "/org-openroadm-network-topology:OMS-attributes/span" + ) + return put_request(url, attr) + + +def del_oms_attr_request(link: str): + url = URL_CONFIG_ORDM_TOPO + ( + "ietf-network-topology:link/" + link + "/org-openroadm-network-topology:OMS-attributes/span" + ) + return delete_request(url) + + +def get_clli_net_request(): + return get_request(URL_CONFIG_CLLI_NET) + + +def get_ordm_net_request(): + return get_request(URL_CONFIG_ORDM_NET) + + +def get_otn_topo_request(): + return get_request(URL_CONFIG_OTN_TOPO) + + +def del_link_request(link: str): + url = URL_CONFIG_ORDM_TOPO + ("ietf-network-topology:link/" + link) + return delete_request(url) + + +def del_node_request(node: str): + url = URL_CONFIG_CLLI_NET + ("node/" + node) + return delete_request(url) + + +def portmapping_request(suffix: str): + url = URL_PORTMAPPING + suffix + return get_request(url) + + +def get_service_list_request(suffix: str): + url = URL_OPER_SERV_LIST + suffix + return get_request(url) + + +def service_create_request(attr): + return post_request(URL_SERV_CREATE, attr) + +def service_delete_request(servicename : str, + requestid = "e3028bae-a90f-4ddd-a83f-cf224eba0e58", + notificationurl="http://localhost:8585/NotificationServer/notify"): + attr = {"input": { + "sdnc-request-header": { + "request-id": requestid, + "rpc-action": "service-delete", + "request-system-id": "appname", + "notification-url": notificationurl}, + "service-delete-req-info": { + "service-name": servicename, + "tail-retention": "no"}}} + return post_request(URL_SERV_DELETE, attr) + + +def service_path_request(operation: str, servicename: str, wavenumber: str, nodes): + attr = {"renderer:input": { + "renderer:service-name": servicename, + "renderer:wave-number": wavenumber, + "renderer:modulation-format": "qpsk", + "renderer:operation": operation, + "renderer:nodes": nodes}} + return post_request(URL_SERVICE_PATH, attr) + + +def otn_service_path_request(operation: str, servicename: str, servicerate: str, servicetype: str, nodes, eth_attr=None): + attr = {"service-name": servicename, + "operation": operation, + "service-rate": servicerate, + "service-type": servicetype, + "nodes": nodes} + if eth_attr: + attr.update(eth_attr) + return post_request(URL_OTN_SERVICE_PATH, {"renderer:input": attr}) + + +def create_ots_oms_request(nodeid: str, lcp: str): + attr = {"input": { + "node-id": nodeid, + "logical-connection-point": lcp}} + return post_request(URL_CREATE_OTS_OMS, attr) + + def shutdown_process(process): if process is not None: for child in psutil.Process(process.pid).children():