Migrate OTN SH renderer func. tests to RFC8040
[transportpce.git] / tests / transportpce_tests / common / test_utils_rfc8040.py
index 1a5ea6265adcf49706d7c4a099b9a8396f779cde..a5c901fb9de9823f49aa2cb2ff9c8b97293d7d09 100644 (file)
@@ -51,19 +51,19 @@ process_list = []
 if 'USE_ODL_ALT_RESTCONF_PORT' in os.environ:
     RESTCONF_PORT = os.environ['USE_ODL_ALT_RESTCONF_PORT']
 else:
-    RESTCONF_PORT = 8181
+    RESTCONF_PORT = str(8181)
 
 RESTCONF_PATH_PREFIX = {'rfc8040': '/rests',
                         'draft-bierman02': '/restconf'}
 if 'USE_ODL_RESTCONF_VERSION' in os.environ:
     RESTCONF_VERSION = os.environ['USE_ODL_RESTCONF_VERSION']
-    if RESTCONF_VERSION not in RESTCONF_PATH_PREFIX.keys():
+    if RESTCONF_VERSION not in RESTCONF_PATH_PREFIX:
         print('unsupported RESTCONF version ' + RESTCONF_VERSION)
         sys.exit(3)
 else:
     RESTCONF_VERSION = 'rfc8040'
 
-RESTCONF_BASE_URL = 'http://localhost:' + RESTCONF_PORT + RESTCONF_PATH_PREFIX[RESTCONF_VERSION]
+RESTCONF_BASE_URL = 'http://localhost:' + str(RESTCONF_PORT) + RESTCONF_PATH_PREFIX[RESTCONF_VERSION]
 
 if 'USE_ODL_ALT_KARAF_INSTALL_DIR' in os.environ:
     KARAF_INSTALLDIR = os.environ['USE_ODL_ALT_KARAF_INSTALL_DIR']
@@ -105,6 +105,20 @@ def delete_request(url):
         headers=TYPE_APPLICATION_JSON,
         auth=(ODL_LOGIN, ODL_PWD))
 
+
+def post_request(url, data):
+    if data:
+        print(json.dumps(data))
+        return requests.request(
+            "POST", url.format(RESTCONF_BASE_URL),
+            data=json.dumps(data),
+            headers=TYPE_APPLICATION_JSON,
+            auth=(ODL_LOGIN, ODL_PWD))
+    return requests.request(
+        "POST", url.format(RESTCONF_BASE_URL),
+        headers=TYPE_APPLICATION_JSON,
+        auth=(ODL_LOGIN, ODL_PWD))
+
 #
 # Process management
 #
@@ -290,7 +304,7 @@ def unmount_device(node: str):
 
 
 def check_device_connection(node: str):
-    url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
+    url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}?content=nonconfig',
            'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}'}
     response = get_request(url[RESTCONF_VERSION].format('{}', node))
     res = response.json()
@@ -303,6 +317,30 @@ def check_device_connection(node: str):
     return {'status_code': response.status_code,
             'connection-status': connection_status}
 
+
+def check_node_attribute_request(node: str, attribute: str, attribute_value: str):
+    # pylint: disable=line-too-long
+    url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}?content=nonconfig',  # nopep8
+           'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'}  # nopep8
+    response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
+    res = response.json()
+    return_key = {'rfc8040': 'org-openroadm-device:' + attribute,
+                  'draft-bierman02': attribute}
+    if return_key[RESTCONF_VERSION] in res.keys():
+        response_attribute = res[return_key[RESTCONF_VERSION]]
+    else:
+        response_attribute = res['errors']['error'][0]
+    return {'status_code': response.status_code,
+            attribute: response_attribute}
+
+
+def del_node_attribute_request(node: str, attribute: str, attribute_value: str):
+    # pylint: disable=line-too-long
+    url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}',  # nopep8
+           'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'}  # nopep8
+    response = delete_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
+    return response
+
 #
 # Portmapping operations
 #
@@ -379,3 +417,146 @@ def get_ietf_network_request(network: str, content: str):
     networks = res[return_key[RESTCONF_VERSION]]
     return {'status_code': response.status_code,
             'network': networks}
+
+
+def get_ietf_network_link_request(network: str, link: str, content: str):
+    url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
+           'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
+    if RESTCONF_VERSION == 'rfc8040':
+        format_args = ('{}', network, link, content)
+    elif content == 'config':
+        format_args = ('{}', content, network, link)
+    else:
+        format_args = ('{}', 'operational', network, link)
+    response = get_request(url[RESTCONF_VERSION].format(*format_args))
+    res = response.json()
+    return_key = {'rfc8040': 'ietf-network-topology:link',
+                  'draft-bierman02': 'ietf-network-topology:link'}
+    link = res[return_key[RESTCONF_VERSION]][0]
+    return {'status_code': response.status_code,
+            'link': link}
+
+
+def del_ietf_network_link_request(network: str, link: str, content: str):
+    url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
+           'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
+    if RESTCONF_VERSION == 'rfc8040':
+        format_args = ('{}', network, link, content)
+    elif content == 'config':
+        format_args = ('{}', content, network, link)
+    else:
+        format_args = ('{}', 'operational', network, link)
+    response = delete_request(url[RESTCONF_VERSION].format(*format_args))
+    return response
+
+
+def add_oms_attr_request(link: str, oms_attr: str):
+    url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
+           'draft-bierman02': '{}/config/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
+    url2 = url[RESTCONF_VERSION] + '/org-openroadm-network-topology:OMS-attributes/span'
+    network = 'openroadm-topology'
+    response = put_request(url2.format('{}', network, link), oms_attr)
+    return response
+
+
+def del_oms_attr_request(link: str,):
+    url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
+           'draft-bierman02': '{}/config/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
+    url2 = url[RESTCONF_VERSION] + '/org-openroadm-network-topology:OMS-attributes/span'
+    network = 'openroadm-topology'
+    response = delete_request(url2.format('{}', network, link))
+    return response
+
+
+def del_ietf_network_node_request(network: str, node: str, content: str):
+    url = {'rfc8040': '{}/data/ietf-network:networks/network={}/node={}?content={}',
+           'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/node/{}'}
+    if RESTCONF_VERSION == 'rfc8040':
+        format_args = ('{}', network, node, content)
+    elif content == 'config':
+        format_args = ('{}', content, network, node)
+    else:
+        format_args = ('{}', 'operational', network, node)
+    response = delete_request(url[RESTCONF_VERSION].format(*format_args))
+    return response
+
+#
+# TransportPCE network-utils and service-path and service-implementation operations
+#
+
+
+def prepend_dict_keys(input_dict: dict, prefix: str):
+    return_dict = {}
+    for key, value in input_dict.items():
+        newkey = prefix + key
+        if isinstance(value, dict):
+            return_dict[newkey] = prepend_dict_keys(value, prefix)
+            # TODO: perhaps some recursion depth limit or another solution has to be considered here
+            # even if recursion depth is given by the input_dict argument
+            # direct (self-)recursive functions may carry unwanted side-effects such as ressource consumptions
+        else:
+            return_dict[newkey] = value
+    return return_dict
+
+
+def connect_xpdr_to_rdm_request(payload: dict):
+    url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
+    if RESTCONF_VERSION == 'draft-bierman02':
+        data = prepend_dict_keys({'input': {'links-input': payload}}, 'networkutils:')
+    else:
+        data = {'input': {'links-input': payload}}
+    return post_request(url, data)
+
+
+def connect_rdm_to_xpdr_request(payload: dict):
+    url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
+    if RESTCONF_VERSION == 'draft-bierman02':
+        data = prepend_dict_keys({'input': {'links-input': payload}}, 'networkutils:')
+    else:
+        data = {'input': {'links-input': payload}}
+    return post_request(url, data)
+
+
+def device_renderer_service_path_request(payload: dict):
+    url = "{}/operations/transportpce-device-renderer:service-path"
+    if RESTCONF_VERSION == 'draft-bierman02':
+        data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
+    else:
+        data = {'input': payload}
+    response = post_request(url, data)
+    res = response.json()
+    return_key = {'rfc8040': 'transportpce-device-renderer:output',
+                  'draft-bierman02': 'output'}
+    return_output = res[return_key[RESTCONF_VERSION]]
+    return {'status_code': response.status_code,
+            'output': return_output}
+
+
+def device_renderer_otn_service_path_request(payload: dict):
+    url = "{}/operations/transportpce-device-renderer:otn-service-path"
+    if RESTCONF_VERSION == 'draft-bierman02':
+        data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
+    else:
+        data = {'input': payload}
+    response = post_request(url, data)
+    res = response.json()
+    return_key = {'rfc8040': 'transportpce-device-renderer:output',
+                  'draft-bierman02': 'output'}
+    return_output = res[return_key[RESTCONF_VERSION]]
+    return {'status_code': response.status_code,
+            'output': return_output}
+
+
+def device_renderer_service_implementation_request(payload: dict):
+    url = "{}/operations/transportpce-renderer:service-implementation-request"
+    if RESTCONF_VERSION == 'draft-bierman02':
+        data = prepend_dict_keys({'input': payload}, 'transportpce-renderer:')
+    else:
+        data = {'input': payload}
+    response = post_request(url, data)
+    res = response.json()
+    return_key = {'rfc8040': 'transportpce-renderer:output',
+                  'draft-bierman02': 'output'}
+    return_output = res[return_key[RESTCONF_VERSION]]
+    return {'status_code': response.status_code,
+            'output': return_output}