HONEYNODE_OK_START_MSG = "Netconf SSH endpoint started successfully at 0.0.0.0"
KARAF_OK_START_MSG = re.escape(
"Blueprint container for bundle org.opendaylight.netconf.restconf")+".* was successfully created"
-
+LIGHTY_OK_START_MSG = re.escape("lighty.io and RESTCONF-NETCONF started")
RESTCONF_BASE_URL = "http://localhost:8181/restconf"
ODL_LOGIN = "admin"
URL_CONFIG_ORDM_NET = "{}/config/ietf-network:networks/network/openroadm-network/"
URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/"
URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/"
+URL_GET_NBINOTIFICATIONS_SERV = "{}/operations/nbi-notifications:get-notifications-service/"
URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create"
URL_SERV_DELETE = "{}/operations/org-openroadm-service:service-delete"
URL_SERVICE_PATH = "{}/operations/transportpce-device-renderer:service-path"
URL_OTN_SERVICE_PATH = "{}/operations/transportpce-device-renderer:otn-service-path"
URL_CREATE_OTS_OMS = "{}/operations/transportpce-device-renderer:create-ots-oms"
URL_PATH_COMPUTATION_REQUEST = "{}/operations/transportpce-pce:path-computation-request"
+URL_FULL_PORTMAPPING = "{}/config/transportpce-portmapping:network"
TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
print("starting OpenDaylight...")
if "USE_LIGHTY" in os.environ and os.environ['USE_LIGHTY'] == 'True':
process = start_lighty()
- # TODO: add some sort of health check similar to Karaf below
+ start_msg = LIGHTY_OK_START_MSG
else:
process = start_karaf()
- if wait_until_log_contains(KARAF_LOG, KARAF_OK_START_MSG, time_to_wait=60):
- print("OpenDaylight started !")
- else:
- print("OpenDaylight failed to start !")
- shutdown_process(process)
- for pid in process_list:
- shutdown_process(pid)
- sys.exit(1)
+ start_msg = KARAF_OK_START_MSG
+ if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=60):
+ print("OpenDaylight started !")
+ else:
+ print("OpenDaylight failed to start !")
+ shutdown_process(process)
+ for pid in process_list:
+ shutdown_process(pid)
+ sys.exit(1)
process_list.append(process)
return process_list
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", "karaf", "target", "assembly", "bin", "client")
return subprocess.run([executable],
- input='feature:install ' + feature_name + '\n feature:list | grep tapi \n logout \n',
+ input='feature:install ' + feature_name + '\n feature:list | grep '
+ + feature_name + ' \n logout \n',
universal_newlines=True, check=False)
def post_request(url, data):
if data:
+ print(json.dumps(data))
return requests.request(
"POST", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
auth=(ODL_LOGIN, ODL_PWD))
+def rawpost_request(url, data):
+ return requests.request(
+ "POST", url.format(RESTCONF_BASE_URL),
+ data=data,
+ headers=TYPE_APPLICATION_JSON,
+ auth=(ODL_LOGIN, ODL_PWD))
+
def delete_request(url):
return requests.request(
def mount_device(node_id, sim):
- url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+ url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
body = {"node": [{
"node-id": node_id,
"netconf-node-topology:username": NODES_LOGIN,
"netconf-node-topology:tcp-only": "false",
"netconf-node-topology:pass-through": {}}]}
response = put_request(url, body)
- if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node "+node_id), 60):
- print("Node "+node_id+" correctly added to tpce topology", end='... ', flush=True)
+ if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node " + node_id), 60):
+ print("Node " + node_id + " correctly added to tpce topology", end='... ', flush=True)
else:
- print("Node "+node_id+" still not added to tpce topology", end='... ', flush=True)
+ print("Node " + node_id + " still not added to tpce topology", end='... ', flush=True)
if response.status_code == requests.codes.ok:
print("It was probably loaded at start-up", end='... ', flush=True)
# TODO an else-clause to abort test would probably be nice here
def unmount_device(node_id):
- url = URL_CONFIG_NETCONF_TOPO+"node/"+node_id
+ url = URL_CONFIG_NETCONF_TOPO + "node/" + node_id
response = delete_request(url)
- if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: "+node_id), 60):
- print("Node "+node_id+" correctly deleted from tpce topology", end='... ', flush=True)
+ if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: " + node_id), 60):
+ print("Node " + node_id + " correctly deleted from tpce topology", end='... ', flush=True)
else:
- print("Node "+node_id+" still not deleted from tpce topology", end='... ', flush=True)
+ print("Node " + node_id + " still not deleted from tpce topology", end='... ', flush=True)
return response
return get_request(url)
+def get_notifications_service_request(attr):
+ return post_request(URL_GET_NBINOTIFICATIONS_SERV, attr)
+
+
def get_service_list_request(suffix: str):
url = URL_OPER_SERV_LIST + suffix
return get_request(url)
return post_request(URL_SERV_DELETE, attr)
-def service_path_request(operation: str, servicename: str, wavenumber: str, nodes):
+def service_path_request(operation: str, servicename: str, wavenumber: str, nodes, centerfreq: str,
+ slotwidth: int, minfreq: float, maxfreq: float, lowerslotnumber: int,
+ higherslotnumber: int):
attr = {"renderer:input": {
"renderer:service-name": servicename,
"renderer:wave-number": wavenumber,
- "renderer:modulation-format": "qpsk",
+ "renderer:modulation-format": "dp-qpsk",
"renderer:operation": operation,
- "renderer:nodes": nodes}}
+ "renderer:nodes": nodes,
+ "renderer:center-freq": centerfreq,
+ "renderer:width": slotwidth,
+ "renderer:min-freq": minfreq,
+ "renderer:max-freq": maxfreq,
+ "renderer:lower-spectral-slot-number": lowerslotnumber,
+ "renderer:higher-spectral-slot-number": higherslotnumber}}
return post_request(URL_SERVICE_PATH, attr)
filelogs = open(log_file, 'r')
filelogs.seek(0, 2)
filefound = True
- print("Searching for pattern '"+regexp+"' in "+os.path.basename(log_file), end='... ', flush=True)
+ print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
compiled_regexp = re.compile(regexp)
while True:
line = filelogs.readline()
if not line:
time.sleep(0.1)
except TimeoutError:
- print("Pattern not found after "+str(time_to_wait), end=" seconds! ", flush=True)
+ print("Pattern not found after " + str(time_to_wait), end=" seconds! ", flush=True)
except PermissionError:
print("Permission Error when trying to access the log file", end=" ... ", flush=True)
finally:
def __exit__(self, type, value, traceback):
# pylint: disable=W0622
signal.alarm(0)
+