outsource flexgrid constants import in functests
[transportpce.git] / tests / transportpce_tests / common / test_utils.py
index dd26aba7996af0440a37316cef7b3be9dbe45731..34238bcc52a3edcc3927d80061d1841fd487a6c7 100644 (file)
@@ -31,7 +31,7 @@ SAMPLES_DIRECTORY = simulators.SAMPLES_DIRECTORY
 HONEYNODE_OK_START_MSG = "Netconf SSH endpoint started successfully at 0.0.0.0"
 KARAF_OK_START_MSG = re.escape(
     "Blueprint container for bundle org.opendaylight.netconf.restconf")+".* was successfully created"
-
+LIGHTY_OK_START_MSG = re.escape("lighty.io and RESTCONF-NETCONF started")
 
 RESTCONF_BASE_URL = "http://localhost:8181/restconf"
 ODL_LOGIN = "admin"
@@ -93,17 +93,18 @@ def start_tpce():
     print("starting OpenDaylight...")
     if "USE_LIGHTY" in os.environ and os.environ['USE_LIGHTY'] == 'True':
         process = start_lighty()
-        # TODO: add some sort of health check similar to Karaf below
+        start_msg = LIGHTY_OK_START_MSG
     else:
         process = start_karaf()
-        if wait_until_log_contains(KARAF_LOG, KARAF_OK_START_MSG, time_to_wait=60):
-            print("OpenDaylight started !")
-        else:
-            print("OpenDaylight failed to start !")
-            shutdown_process(process)
-            for pid in process_list:
-                shutdown_process(pid)
-            sys.exit(1)
+        start_msg = KARAF_OK_START_MSG
+    if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=60):
+        print("OpenDaylight started !")
+    else:
+        print("OpenDaylight failed to start !")
+        shutdown_process(process)
+        for pid in process_list:
+            shutdown_process(pid)
+        sys.exit(1)
     process_list.append(process)
     return process_list
 
@@ -202,7 +203,7 @@ def delete_request(url):
 
 
 def mount_device(node_id, sim):
-    url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+    url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
     body = {"node": [{
         "node-id": node_id,
         "netconf-node-topology:username": NODES_LOGIN,
@@ -212,10 +213,10 @@ def mount_device(node_id, sim):
         "netconf-node-topology:tcp-only": "false",
         "netconf-node-topology:pass-through": {}}]}
     response = put_request(url, body)
-    if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node "+node_id), 60):
-        print("Node "+node_id+" correctly added to tpce topology", end='... ', flush=True)
+    if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node " + node_id), 60):
+        print("Node " + node_id + " correctly added to tpce topology", end='... ', flush=True)
     else:
-        print("Node "+node_id+" still not added to tpce topology", end='... ', flush=True)
+        print("Node " + node_id + " still not added to tpce topology", end='... ', flush=True)
         if response.status_code == requests.codes.ok:
             print("It was probably loaded at start-up", end='... ', flush=True)
         # TODO an else-clause to abort test would probably be nice here
@@ -223,12 +224,12 @@ def mount_device(node_id, sim):
 
 
 def unmount_device(node_id):
-    url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+    url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
     response = delete_request(url)
-    if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: "+node_id), 60):
-        print("Node "+node_id+" correctly deleted from tpce topology", end='... ', flush=True)
+    if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: " + node_id), 60):
+        print("Node " + node_id + " correctly deleted from tpce topology", end='... ', flush=True)
     else:
-        print("Node "+node_id+" still not deleted from tpce topology", end='... ', flush=True)
+        print("Node " + node_id + " still not deleted from tpce topology", end='... ', flush=True)
     return response
 
 
@@ -425,7 +426,7 @@ def wait_until_log_contains(log_file, regexp, time_to_wait=20):
             filelogs = open(log_file, 'r')
             filelogs.seek(0, 2)
             filefound = True
-            print("Searching for pattern '"+regexp+"' in "+os.path.basename(log_file), end='... ', flush=True)
+            print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
             compiled_regexp = re.compile(regexp)
             while True:
                 line = filelogs.readline()
@@ -436,7 +437,7 @@ def wait_until_log_contains(log_file, regexp, time_to_wait=20):
                 if not line:
                     time.sleep(0.1)
     except TimeoutError:
-        print("Pattern not found after "+str(time_to_wait), end=" seconds! ", flush=True)
+        print("Pattern not found after " + str(time_to_wait), end=" seconds! ", flush=True)
     except PermissionError:
         print("Permission Error when trying to access the log file", end=" ... ", flush=True)
     finally:
@@ -462,3 +463,4 @@ class TimeOut:
     def __exit__(self, type, value, traceback):
         # pylint: disable=W0622
         signal.alarm(0)
+