Fix new pylint-3.0.0 detected issues
[transportpce.git] / tests / transportpce_tests / common / test_utils.py
index 4911c2a3ff9d10595d78548afb5742ec2aaf65dc..888a8e91fc228751b85ecd386c41bd49cd2f4537 100644 (file)
@@ -22,6 +22,11 @@ import time
 
 import psutil
 import requests
+import urllib.parse
+
+from dict2xml import dict2xml
+from netconf_client.connect import connect_ssh
+from netconf_client.ncclient import Manager
 
 # pylint: disable=import-error
 import simulators
@@ -29,6 +34,7 @@ import simulators
 SIMS = simulators.SIMS
 
 HONEYNODE_OK_START_MSG = 'Netconf SSH endpoint started successfully at 0.0.0.0'
+LIGHTYNODE_OK_START_MSG = 'Data tree change listeners registered'
 KARAF_OK_START_MSG = "Transportpce controller started"
 LIGHTY_OK_START_MSG = re.escape("lighty.io and RESTCONF-NETCONF started")
 
@@ -84,6 +90,19 @@ if 'USE_LIGHTY' in os.environ and os.environ['USE_LIGHTY'] == 'True':
 else:
     TPCE_LOG = KARAF_LOG
 
+if 'USE_SIMS' not in os.environ:
+    SIMS_TO_USE = 'lightynode'
+    SIMS_TYPE = 'lightynode'
+else:
+    SIMS_TO_USE = os.environ['USE_SIMS']
+    print("Forcing to use SIMS " + SIMS_TO_USE)
+    if SIMS_TO_USE != 'None' or 'SIMS_TYPE' not in os.environ:
+        SIMS_TYPE = SIMS_TO_USE
+    else:
+        SIMS_TYPE = os.environ['SIMS_TYPE']
+        print("Forcing to use SIMS type" + SIMS_TYPE)
+
+
 #
 # Basic HTTP operations
 #
@@ -133,12 +152,47 @@ def post_request(url, data):
 #
 
 
+def start_honeynode(log_file: str, sim):
+    executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                              '..', '..', 'honeynode', sim[1], 'honeynode-simulator', 'honeycomb-tpce')
+    sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                                    '..', '..', 'sample_configs', 'openroadm', sim[1])
+    if os.path.isfile(executable):
+        with open(log_file, 'w', encoding='utf-8') as outfile:
+            return subprocess.Popen(
+                [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
+                stdout=outfile, stderr=outfile)
+    return None
+
+
+def start_lightynode(log_file: str, sim):
+    executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                              '..', '..', 'lightynode', 'lightynode-openroadm-device', 'start-device.sh')
+    sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                                    '..', '..', 'sample_configs', 'openroadm', sim[1])
+    if os.path.isfile(executable):
+        with open(log_file, 'w', encoding='utf-8') as outfile:
+            return subprocess.Popen(
+                [executable, "-v" + sim[1], "-p" + SIMS[sim]['port'], "-f" + os.path.join(sample_directory,
+                                                                                          SIMS[sim]['configfile'])],
+                stdout=outfile, stderr=outfile)
+    return None
+
+
 def start_sims(sims_list):
+    if SIMS_TO_USE == 'None':
+        return None
+    if SIMS_TO_USE == 'honeynode':
+        start_msg = HONEYNODE_OK_START_MSG
+        start_method = start_honeynode
+    else:
+        start_msg = LIGHTYNODE_OK_START_MSG
+        start_method = start_lightynode
     for sim in sims_list:
         print('starting simulator ' + sim[0] + ' in OpenROADM device version ' + sim[1] + '...')
         log_file = os.path.join(SIM_LOG_DIRECTORY, SIMS[sim]['logfile'])
-        process = start_honeynode(log_file, sim)
-        if wait_until_log_contains(log_file, HONEYNODE_OK_START_MSG, 100):
+        process = start_method(log_file, sim)
+        if wait_until_log_contains(log_file, start_msg, 100):
             print('simulator for ' + sim[0] + ' started')
         else:
             print('simulator for ' + sim[0] + ' failed to start')
@@ -215,51 +269,33 @@ def shutdown_process(process):
         process.send_signal(signal.SIGINT)
 
 
-def start_honeynode(log_file: str, sim):
-    executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
-                              '..', '..', 'honeynode', sim[1], 'honeynode-simulator', 'honeycomb-tpce')
-    sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
-                                    '..', '..', 'sample_configs', 'openroadm', sim[1])
-    if os.path.isfile(executable):
-        with open(log_file, 'w', encoding='utf-8') as outfile:
-            return subprocess.Popen(
-                [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
-                stdout=outfile, stderr=outfile)
-    return None
-
-
 def wait_until_log_contains(log_file, regexp, time_to_wait=60):
     # pylint: disable=lost-exception
     # pylint: disable=consider-using-with
     stringfound = False
-    filefound = False
     line = None
     try:
         with TimeOut(seconds=time_to_wait):
             while not os.path.exists(log_file):
                 time.sleep(0.2)
-            filelogs = open(log_file, 'r', encoding='utf-8')
-            filelogs.seek(0, 2)
-            filefound = True
-            print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
-            compiled_regexp = re.compile(regexp)
-            while True:
-                line = filelogs.readline()
-                if compiled_regexp.search(line):
-                    print('Pattern found!', end=' ')
-                    stringfound = True
-                    break
-                if not line:
-                    time.sleep(0.1)
+            with open(log_file, 'r', encoding='utf-8') as filelogs:
+                filelogs.seek(0, 2)
+                print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
+                compiled_regexp = re.compile(regexp)
+                while True:
+                    line = filelogs.readline()
+                    if compiled_regexp.search(line):
+                        print('Pattern found!', end=' ')
+                        stringfound = True
+                        break
+                    if not line:
+                        time.sleep(0.1)
+        return stringfound
     except TimeoutError:
         print('Pattern not found after ' + str(time_to_wait), end=' seconds! ', flush=True)
+        return stringfound
     except PermissionError:
         print('Permission Error when trying to access the log file', end=' ... ', flush=True)
-    finally:
-        if filefound:
-            filelogs.close()
-        else:
-            print('log file does not exist or is not accessible... ', flush=True)
         return stringfound
 
 
@@ -358,8 +394,12 @@ def check_node_attribute_request(node: str, attribute: str, attribute_value: str
                   'draft-bierman02': attribute}
     if return_key[RESTCONF_VERSION] in res.keys():
         response_attribute = res[return_key[RESTCONF_VERSION]]
-    else:
+    elif 'errors' in res.keys():
         response_attribute = res['errors']['error'][0]
+    else:
+        # status code 400 invalid request
+        response_attribute = res['message'] + ' ' + res['url']
+        print(response_attribute)
     return {'status_code': response.status_code,
             attribute: response_attribute}
 
@@ -644,3 +684,47 @@ def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict):
             return_output = res[return_key[RESTCONF_VERSION]]
     return {'status_code': response.status_code,
             'output': return_output}
+
+#
+# simulators datastore operations
+#
+
+
+def sims_update_cp_port(sim: tuple, circuitpack: str, port: str, payload: dict):
+    if SIMS_TYPE == 'lightynode':
+        return sims_update_cp_port_ntcf(sim, circuitpack, payload)
+    if SIMS_TYPE == 'honeynode':
+        return sims_update_cp_port_rest(sim, circuitpack, port, payload)
+    return False
+
+
+def sims_update_cp_port_rest(sim: tuple, circuitpack: str, port: str, payload: dict):
+    # pylint: disable=consider-using-f-string
+    url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/{}/ports/{}".format(
+        SIMS[sim]['restconf_baseurl'],
+        urllib.parse.quote(circuitpack, safe=''),
+        urllib.parse.quote(port, safe=''))
+    body = {"ports": [payload]}
+    response = requests.request("PUT",
+                                url,
+                                data=json.dumps(body),
+                                headers=TYPE_APPLICATION_JSON,
+                                auth=(ODL_LOGIN, ODL_PWD),
+                                timeout=REQUEST_TIMEOUT)
+    return response.status_code == requests.codes.ok
+
+
+def sims_update_cp_port_ntcf(sim: tuple, circuitpack: str, payload: dict):
+    body = {"circuit-packs": {"circuit-pack-name": circuitpack, "ports": payload}}
+    xml_body = '<config><org-openroadm-device xmlns="http://org/openroadm/device">'
+    xml_body += dict2xml(body, indent="  ")
+    xml_body += '</org-openroadm-device></config>'
+    with connect_ssh(host='127.0.0.1',
+                     port=int(SIMS[sim]['port']),
+                     username=NODES_LOGIN,
+                     password=NODES_PWD) as session:
+        mgr = Manager(session, timeout=120)
+        reply = mgr.edit_config(xml_body, target="candidate", default_operation="merge")
+    if "None" in str(reply):
+        return True
+    return False