Upgrade to OpenROADM 7.1.0 network models
[transportpce.git] / tests / transportpce_tests / common / test_utils.py
index b541abb09df8b2ac431759b5cc2827ee911ee832..9d3bfec3eb62c86a8518b42363a68fb906905df0 100644 (file)
@@ -11,6 +11,7 @@
 
 # pylint: disable=no-member
 
+import base64
 import json
 import os
 import sys
@@ -97,7 +98,7 @@ def start_tpce():
     else:
         process = start_karaf()
         start_msg = KARAF_OK_START_MSG
-    if wait_until_log_contains(TPCE_LOG, start_msg , time_to_wait=60):
+    if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=60):
         print("OpenDaylight started !")
     else:
         print("OpenDaylight failed to start !")
@@ -203,7 +204,7 @@ def delete_request(url):
 
 
 def mount_device(node_id, sim):
-    url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+    url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
     body = {"node": [{
         "node-id": node_id,
         "netconf-node-topology:username": NODES_LOGIN,
@@ -213,10 +214,10 @@ def mount_device(node_id, sim):
         "netconf-node-topology:tcp-only": "false",
         "netconf-node-topology:pass-through": {}}]}
     response = put_request(url, body)
-    if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node "+node_id), 60):
-        print("Node "+node_id+" correctly added to tpce topology", end='... ', flush=True)
+    if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node " + node_id), 60):
+        print("Node " + node_id + " correctly added to tpce topology", end='... ', flush=True)
     else:
-        print("Node "+node_id+" still not added to tpce topology", end='... ', flush=True)
+        print("Node " + node_id + " still not added to tpce topology", end='... ', flush=True)
         if response.status_code == requests.codes.ok:
             print("It was probably loaded at start-up", end='... ', flush=True)
         # TODO an else-clause to abort test would probably be nice here
@@ -224,12 +225,12 @@ def mount_device(node_id, sim):
 
 
 def unmount_device(node_id):
-    url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+    url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
     response = delete_request(url)
-    if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: "+node_id), 60):
-        print("Node "+node_id+" correctly deleted from tpce topology", end='... ', flush=True)
+    if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: " + node_id), 60):
+        print("Node " + node_id + " correctly deleted from tpce topology", end='... ', flush=True)
     else:
-        print("Node "+node_id+" still not deleted from tpce topology", end='... ', flush=True)
+        print("Node " + node_id + " still not deleted from tpce topology", end='... ', flush=True)
     return response
 
 
@@ -426,7 +427,7 @@ def wait_until_log_contains(log_file, regexp, time_to_wait=20):
             filelogs = open(log_file, 'r')
             filelogs.seek(0, 2)
             filefound = True
-            print("Searching for pattern '"+regexp+"' in "+os.path.basename(log_file), end='... ', flush=True)
+            print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
             compiled_regexp = re.compile(regexp)
             while True:
                 line = filelogs.readline()
@@ -437,7 +438,7 @@ def wait_until_log_contains(log_file, regexp, time_to_wait=20):
                 if not line:
                     time.sleep(0.1)
     except TimeoutError:
-        print("Pattern not found after "+str(time_to_wait), end=" seconds! ", flush=True)
+        print("Pattern not found after " + str(time_to_wait), end=" seconds! ", flush=True)
     except PermissionError:
         print("Permission Error when trying to access the log file", end=" ... ", flush=True)
     finally:
@@ -463,3 +464,21 @@ class TimeOut:
     def __exit__(self, type, value, traceback):
         # pylint: disable=W0622
         signal.alarm(0)
+
+
+def check_freq_map(freq_map):
+    freq_map_array = [int(x) for x in freq_map]
+    return freq_map_array[0] == 255 and freq_map_array[1] == 255
+
+
+def set_used_index_for_freq_map(freq_map, index):
+    freq_map[index] = 0
+    return freq_map
+
+
+INDEX_1_USED_FREQ_MAP = base64.b64encode(set_used_index_for_freq_map(bytearray(b'\xFF' * 96), 0)).decode('UTF-8')
+
+INDEX_1_2_USED_FREQ_MAP = base64.b64encode(set_used_index_for_freq_map(
+    set_used_index_for_freq_map(bytearray(b'\xFF' * 96), 0), 1)).decode('utf-8')
+
+AVAILABLE_FREQ_MAP = base64.b64encode(bytearray(b'\xFF' * 96)).decode('UTF-8')