# pylint: disable=no-member
+import base64
import json
import os
import sys
else:
process = start_karaf()
start_msg = KARAF_OK_START_MSG
- if wait_until_log_contains(TPCE_LOG, start_msg , time_to_wait=60):
+ if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=60):
print("OpenDaylight started !")
else:
print("OpenDaylight failed to start !")
def mount_device(node_id, sim):
- url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+ url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
body = {"node": [{
"node-id": node_id,
"netconf-node-topology:username": NODES_LOGIN,
"netconf-node-topology:tcp-only": "false",
"netconf-node-topology:pass-through": {}}]}
response = put_request(url, body)
- if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node "+node_id), 60):
- print("Node "+node_id+" correctly added to tpce topology", end='... ', flush=True)
+ if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node " + node_id), 60):
+ print("Node " + node_id + " correctly added to tpce topology", end='... ', flush=True)
else:
- print("Node "+node_id+" still not added to tpce topology", end='... ', flush=True)
+ print("Node " + node_id + " still not added to tpce topology", end='... ', flush=True)
if response.status_code == requests.codes.ok:
print("It was probably loaded at start-up", end='... ', flush=True)
# TODO an else-clause to abort test would probably be nice here
def unmount_device(node_id):
- url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+ url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
response = delete_request(url)
- if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: "+node_id), 60):
- print("Node "+node_id+" correctly deleted from tpce topology", end='... ', flush=True)
+ if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: " + node_id), 60):
+ print("Node " + node_id + " correctly deleted from tpce topology", end='... ', flush=True)
else:
- print("Node "+node_id+" still not deleted from tpce topology", end='... ', flush=True)
+ print("Node " + node_id + " still not deleted from tpce topology", end='... ', flush=True)
return response
filelogs = open(log_file, 'r')
filelogs.seek(0, 2)
filefound = True
- print("Searching for pattern '"+regexp+"' in "+os.path.basename(log_file), end='... ', flush=True)
+ print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
compiled_regexp = re.compile(regexp)
while True:
line = filelogs.readline()
if not line:
time.sleep(0.1)
except TimeoutError:
- print("Pattern not found after "+str(time_to_wait), end=" seconds! ", flush=True)
+ print("Pattern not found after " + str(time_to_wait), end=" seconds! ", flush=True)
except PermissionError:
print("Permission Error when trying to access the log file", end=" ... ", flush=True)
finally:
def __exit__(self, type, value, traceback):
# pylint: disable=W0622
signal.alarm(0)
+
+
+def check_freq_map(freq_map):
+ freq_map_array = [int(x) for x in freq_map]
+ return freq_map_array[0] == 255 and freq_map_array[1] == 255
+
+
+def set_used_index_for_freq_map(freq_map, index):
+ freq_map[index] = 0
+ return freq_map
+
+
+INDEX_1_USED_FREQ_MAP = base64.b64encode(set_used_index_for_freq_map(bytearray(b'\xFF' * 96), 0)).decode('UTF-8')
+
+INDEX_1_2_USED_FREQ_MAP = base64.b64encode(set_used_index_for_freq_map(
+ set_used_index_for_freq_map(bytearray(b'\xFF' * 96), 0), 1)).decode('utf-8')
+
+AVAILABLE_FREQ_MAP = base64.b64encode(bytearray(b'\xFF' * 96)).decode('UTF-8')