Fix new pylint issues in functional tests
[transportpce.git] / tests / transportpce_tests / common / test_utils.py
index 136991f444e90232cc44efba8cb782dad34d9cf0..4b2b7f1d01c76d2730376038716b17f8aad875b3 100644 (file)
 ##############################################################################
 
 # pylint: disable=no-member
+# pylint: disable=too-many-arguments
 
 import json
 import os
+# pylint: disable=wrong-import-order
 import sys
 import re
 import signal
@@ -22,6 +24,7 @@ import time
 import psutil
 import requests
 
+# pylint: disable=import-error
 import simulators
 
 SIMS = simulators.SIMS
@@ -42,19 +45,32 @@ URL_CONFIG_CLLI_NET = "{}/config/ietf-network:networks/network/clli-network/"
 URL_CONFIG_ORDM_NET = "{}/config/ietf-network:networks/network/openroadm-network/"
 URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/"
 URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/"
+URL_OPER_SERV_PATH_LIST = "{}/operational/transportpce-service-path:service-path-list/"
 URL_GET_NBINOTIFICATIONS_PROCESS_SERV = "{}/operations/nbi-notifications:get-notifications-process-service/"
 URL_GET_NBINOTIFICATIONS_ALARM_SERV = "{}/operations/nbi-notifications:get-notifications-alarm-service/"
 URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create"
 URL_SERV_DELETE = "{}/operations/org-openroadm-service:service-delete"
 URL_SERVICE_PATH = "{}/operations/transportpce-device-renderer:service-path"
 URL_OTN_SERVICE_PATH = "{}/operations/transportpce-device-renderer:otn-service-path"
+URL_TAPI_CREATE_CONNECTIVITY = "{}/operations/tapi-connectivity:create-connectivity-service"
+URL_TAPI_DELETE_CONNECTIVITY = "{}/operations/tapi-connectivity:delete-connectivity-service"
+URL_TAPI_GET_CONNECTIVITY = "{}/operations/tapi-connectivity:get-connectivity-service-details"
 URL_CREATE_OTS_OMS = "{}/operations/transportpce-device-renderer:create-ots-oms"
 URL_PATH_COMPUTATION_REQUEST = "{}/operations/transportpce-pce:path-computation-request"
 URL_FULL_PORTMAPPING = "{}/config/transportpce-portmapping:network"
+URL_TAPI_TOPOLOGY_DETAILS = "{}/operations/tapi-topology:get-topology-details"
+URL_TAPI_NODE_DETAILS = "{}/operations/tapi-topology:get-node-details"
+URL_TAPI_NEP_DETAILS = "{}/operations/tapi-topology:get-node-edge-point-details"
+URL_TAPI_SIP_LIST = "{}/operations/tapi-common:get-service-interface-point-list"
+URL_TAPI_SERVICE_LIST = "{}/operations/tapi-connectivity:get-connectivity-service-list"
+URL_TAPI_NOTIFICATION_SUBS_SERVICE = "{}/operations/tapi-notification:create-notification-subscription-service"
+URL_TAPI_GET_NOTIFICATION_LIST = "{}/operations/tapi-notification:get-notification-list"
 
 TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
 TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
 
+REQUEST_TIMEOUT = 10
+
 CODE_SHOULD_BE_200 = 'Http status code should be 200'
 CODE_SHOULD_BE_201 = 'Http status code should be 201'
 
@@ -125,7 +141,7 @@ def start_karaf():
     executable = os.path.join(
         os.path.dirname(os.path.realpath(__file__)),
         "..", "..", "..", KARAF_INSTALLDIR, "target", "assembly", "bin", "karaf")
-    with open('odl.log', 'w') as outfile:
+    with open('odl.log', 'w', encoding='utf-8') as outfile:
         return subprocess.Popen(
             ["sh", executable, "server"], stdout=outfile, stderr=outfile, stdin=None)
 
@@ -136,7 +152,7 @@ def start_lighty():
         os.path.dirname(os.path.realpath(__file__)),
         "..", "..", "..", "lighty", "target", "tpce",
         "clean-start-controller.sh")
-    with open(TPCE_LOG, 'w') as outfile:
+    with open(TPCE_LOG, 'w', encoding='utf-8') as outfile:
         return subprocess.Popen(
             ["sh", executable], stdout=outfile, stderr=outfile, stdin=None)
 
@@ -156,7 +172,8 @@ def get_request(url):
     return requests.request(
         "GET", url.format(RESTCONF_BASE_URL),
         headers=TYPE_APPLICATION_JSON,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def post_request(url, data):
@@ -166,12 +183,14 @@ def post_request(url, data):
             "POST", url.format(RESTCONF_BASE_URL),
             data=json.dumps(data),
             headers=TYPE_APPLICATION_JSON,
-            auth=(ODL_LOGIN, ODL_PWD))
+            auth=(ODL_LOGIN, ODL_PWD),
+            timeout=REQUEST_TIMEOUT)
 
     return requests.request(
         "POST", url.format(RESTCONF_BASE_URL),
         headers=TYPE_APPLICATION_JSON,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def post_xmlrequest(url, data):
@@ -180,7 +199,8 @@ def post_xmlrequest(url, data):
             "POST", url.format(RESTCONF_BASE_URL),
             data=data,
             headers=TYPE_APPLICATION_XML,
-            auth=(ODL_LOGIN, ODL_PWD))
+            auth=(ODL_LOGIN, ODL_PWD),
+            timeout=REQUEST_TIMEOUT)
     return None
 
 
@@ -189,7 +209,8 @@ def put_request(url, data):
         "PUT", url.format(RESTCONF_BASE_URL),
         data=json.dumps(data),
         headers=TYPE_APPLICATION_JSON,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def put_xmlrequest(url, data):
@@ -197,7 +218,8 @@ def put_xmlrequest(url, data):
         "PUT", url.format(RESTCONF_BASE_URL),
         data=data,
         headers=TYPE_APPLICATION_XML,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def put_jsonrequest(url, data):
@@ -205,7 +227,8 @@ def put_jsonrequest(url, data):
         "PUT", url.format(RESTCONF_BASE_URL),
         data=data,
         headers=TYPE_APPLICATION_JSON,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def rawput_request(url, data):
@@ -213,7 +236,8 @@ def rawput_request(url, data):
         "PUT", url.format(RESTCONF_BASE_URL),
         data=data,
         headers=TYPE_APPLICATION_JSON,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def rawpost_request(url, data):
@@ -221,14 +245,16 @@ def rawpost_request(url, data):
         "POST", url.format(RESTCONF_BASE_URL),
         data=data,
         headers=TYPE_APPLICATION_JSON,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def delete_request(url):
     return requests.request(
         "DELETE", url.format(RESTCONF_BASE_URL),
         headers=TYPE_APPLICATION_JSON,
-        auth=(ODL_LOGIN, ODL_PWD))
+        auth=(ODL_LOGIN, ODL_PWD),
+        timeout=REQUEST_TIMEOUT)
 
 
 def mount_device(node_id, sim):
@@ -252,6 +278,27 @@ def mount_device(node_id, sim):
     return response
 
 
+def mount_tapi_device(node_id, sim):
+    url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
+    body = {"node": [{
+        "node-id": node_id,
+        "netconf-node-topology:username": NODES_LOGIN,
+        "netconf-node-topology:password": NODES_PWD,
+        "netconf-node-topology:host": "127.0.0.1",
+        "netconf-node-topology:port": SIMS[sim]['port'],
+        "netconf-node-topology:tcp-only": "false",
+        "netconf-node-topology:pass-through": {}}]}
+    response = put_request(url, body)
+    if wait_until_log_contains(TPCE_LOG, re.escape(f"TAPI node for or node {node_id} successfully merged"), 200):
+        print("Node " + node_id + " correctly added to tpce topology", end='... ', flush=True)
+    else:
+        print("Node " + node_id + " still not added to tpce topology", end='... ', flush=True)
+        if response.status_code == requests.codes.ok:
+            print("It was probably loaded at start-up", end='... ', flush=True)
+        # TODO an else-clause to abort test would probably be nice here
+    return response
+
+
 def unmount_device(node_id):
     url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
     response = delete_request(url)
@@ -298,6 +345,19 @@ def connect_rdm_to_xpdr_request(xpdr_node: str, xpdr_num: str, network_num: str,
     return post_request(url, data)
 
 
+def connect_xpdr_to_rdm_tapi_request(xpdr_node: str, xpdr_num: str, rdm_node: str, srg_num: str):
+    url = "{}/operations/transportpce-tapinetworkutils:init-xpdr-rdm-tapi-link"
+    data = {
+        "input": {
+            "xpdr-node": xpdr_node,
+            "network-tp": xpdr_num,
+            "rdm-node": rdm_node,
+            "add-drop-tp": srg_num
+        }
+    }
+    return post_request(url, data)
+
+
 def check_netconf_node_request(node: str, suffix: str):
     url = URL_CONFIG_NETCONF_TOPO + (
         "node/" + node + "/yang-ext:mount/org-openroadm-device:org-openroadm-device/" + suffix
@@ -365,8 +425,11 @@ def get_notifications_alarm_service_request(attr):
 
 
 def get_service_list_request(suffix: str):
-    url = URL_OPER_SERV_LIST + suffix
-    return get_request(url)
+    return get_request(URL_OPER_SERV_LIST + suffix)
+
+
+def get_service_path_list_request(suffix: str):
+    return get_request(URL_OPER_SERV_PATH_LIST + suffix)
 
 
 def service_create_request(attr):
@@ -390,15 +453,15 @@ def service_delete_request(servicename: str,
 
 def service_path_request(operation: str, servicename: str, wavenumber: str, nodes, centerfreq: str,
                          slotwidth: int, minfreq: float, maxfreq: float, lowerslotnumber: int,
-                         higherslotnumber: int):
+                         higherslotnumber: int, modulation_format="dp-qpsk"):
     attr = {"renderer:input": {
         "renderer:service-name": servicename,
         "renderer:wave-number": wavenumber,
-        "renderer:modulation-format": "dp-qpsk",
+        "renderer:modulation-format": modulation_format,
         "renderer:operation": operation,
         "renderer:nodes": nodes,
         "renderer:center-freq": centerfreq,
-        "renderer:width": slotwidth,
+        "renderer:nmc-width": slotwidth,
         "renderer:min-freq": minfreq,
         "renderer:max-freq": maxfreq,
         "renderer:lower-spectral-slot-number": lowerslotnumber,
@@ -432,7 +495,7 @@ def path_computation_request(requestid: str, servicename: str, serviceaend, serv
             "service-handler-header": {"request-id": requestid},
             "service-a-end": serviceaend,
             "service-z-end": servicezend,
-            "pce-metric": metric}
+            "pce-routing-metric": metric}
     if hardconstraints:
         attr.update({"hard-constraints": hardconstraints})
     if softconstraints:
@@ -442,6 +505,64 @@ def path_computation_request(requestid: str, servicename: str, serviceaend, serv
     return post_request(URL_PATH_COMPUTATION_REQUEST, {"input": attr})
 
 
+def tapi_create_connectivity_request(topologyidorname):
+    return post_request(URL_TAPI_CREATE_CONNECTIVITY, topologyidorname)
+
+
+def tapi_get_connectivity_request(serviceidorname):
+    attr = {
+        "input": {
+            "service-id-or-name": serviceidorname}}
+    return post_request(URL_TAPI_GET_CONNECTIVITY, attr)
+
+
+def tapi_delete_connectivity_request(serviceidorname):
+    attr = {
+        "input": {
+            "service-id-or-name": serviceidorname}}
+    return post_request(URL_TAPI_DELETE_CONNECTIVITY, attr)
+
+
+def tapi_get_topology_details_request(topologyidorname):
+    attr = {
+        "input": {
+            "topology-id-or-name": topologyidorname}}
+    return post_request(URL_TAPI_TOPOLOGY_DETAILS, attr)
+
+
+def tapi_get_node_details_request(topologyidorname, nodeidorname):
+    attr = {
+        "input": {
+            "topology-id-or-name": topologyidorname,
+            "node-id-or-name": nodeidorname}}
+    return post_request(URL_TAPI_NODE_DETAILS, attr)
+
+
+def tapi_get_node_edge_point_details_request(topologyidorname, nodeidorname, nepidorname):
+    attr = {
+        "input": {
+            "topology-id-or-name": topologyidorname,
+            "node-id-or-name": nodeidorname,
+            "ep-id-or-name": nepidorname}}
+    return post_request(URL_TAPI_NEP_DETAILS, attr)
+
+
+def tapi_get_sip_details_request():
+    return post_request(URL_TAPI_SIP_LIST, "")
+
+
+def tapi_get_service_list_request():
+    return post_request(URL_TAPI_SERVICE_LIST, "")
+
+
+def tapi_create_notification_subscription_service_request(attr):
+    return post_request(URL_TAPI_NOTIFICATION_SUBS_SERVICE, attr)
+
+
+def tapi_get_notifications_list_request(attr):
+    return post_request(URL_TAPI_GET_NOTIFICATION_LIST, attr)
+
+
 def shutdown_process(process):
     if process is not None:
         for child in psutil.Process(process.pid).children():
@@ -456,7 +577,7 @@ def start_honeynode(log_file: str, sim):
     sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
                                     "..", "..", "sample_configs", "openroadm", sim[1])
     if os.path.isfile(executable):
-        with open(log_file, 'w') as outfile:
+        with open(log_file, 'w', encoding='utf-8') as outfile:
             return subprocess.Popen(
                 [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
                 stdout=outfile, stderr=outfile)
@@ -465,6 +586,7 @@ def start_honeynode(log_file: str, sim):
 
 def wait_until_log_contains(log_file, regexp, time_to_wait=60):
     # pylint: disable=lost-exception
+    # pylint: disable=consider-using-with
     stringfound = False
     filefound = False
     line = None
@@ -472,7 +594,7 @@ def wait_until_log_contains(log_file, regexp, time_to_wait=60):
         with TimeOut(seconds=time_to_wait):
             while not os.path.exists(log_file):
                 time.sleep(0.2)
-            filelogs = open(log_file, 'r')
+            filelogs = open(log_file, 'r', encoding='utf-8')
             filelogs.seek(0, 2)
             filefound = True
             print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)