Switch all func tests to lightynode
[transportpce.git] / tests / transportpce_tests / common / test_utils.py
index 3ff03545c1be7ecf8df6750162991c04a3c31019..5d2789c1b024bd901f9a61c68ccc773f725e72cf 100644 (file)
@@ -22,6 +22,12 @@ import time
 
 import psutil
 import requests
+import urllib.parse
+
+from dict2xml import dict2xml
+from netconf_client.connect import connect_ssh
+from netconf_client.ncclient import Manager
+
 
 # pylint: disable=import-error
 import simulators
@@ -29,7 +35,8 @@ import simulators
 SIMS = simulators.SIMS
 
 HONEYNODE_OK_START_MSG = 'Netconf SSH endpoint started successfully at 0.0.0.0'
-KARAF_OK_START_MSG = "Blueprint container for bundle org.opendaylight.netconf.restconf.* was successfully created"
+LIGHTYNODE_OK_START_MSG = 'Data tree change listeners registered'
+KARAF_OK_START_MSG = "Transportpce controller started"
 LIGHTY_OK_START_MSG = re.escape("lighty.io and RESTCONF-NETCONF started")
 
 ODL_LOGIN = 'admin'
@@ -47,6 +54,9 @@ CODE_SHOULD_BE_201 = 'Http status code should be 201'
 T100GE = 'Transponder 100GE'
 T0_MULTILAYER_TOPO = 'T0 - Multi-layer topology'
 T0_FULL_MULTILAYER_TOPO = 'T0 - Full Multi-layer topology'
+T100GE_UUID = 'cf51c729-3699-308a-a7d0-594c6a62ebbb'
+T0_MULTILAYER_TOPO_UUID = '747c670e-7a07-3dab-b379-5b1cd17402a3'
+T0_FULL_MULTILAYER_TOPO_UUID = '393f09a4-0a0b-3d82-a4f6-1fbbc14ca1a7'
 
 SIM_LOG_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log')
 
@@ -59,8 +69,6 @@ else:
 
 RESTCONF_PATH_PREFIX = {'rfc8040': '/rests',
                         'draft-bierman02': '/restconf'}
-if 'USE_LIGHTY' in os.environ and os.environ['USE_LIGHTY'] == 'True':
-    RESTCONF_PATH_PREFIX['rfc8040'] = '/restconf'
 
 if 'USE_ODL_RESTCONF_VERSION' in os.environ:
     RESTCONF_VERSION = os.environ['USE_ODL_RESTCONF_VERSION']
@@ -86,6 +94,19 @@ if 'USE_LIGHTY' in os.environ and os.environ['USE_LIGHTY'] == 'True':
 else:
     TPCE_LOG = KARAF_LOG
 
+if 'USE_SIMS' not in os.environ:
+    SIMS_TO_USE = 'lightynode'
+    SIMS_TYPE = 'lightynode'
+else:
+    SIMS_TO_USE = os.environ['USE_SIMS']
+    print("Forcing to use SIMS " + SIMS_TO_USE)
+    if SIMS_TO_USE != 'None' or 'SIMS_TYPE' not in os.environ:
+        SIMS_TYPE = SIMS_TO_USE
+    else:
+        SIMS_TYPE = os.environ['SIMS_TYPE']
+        print("Forcing to use SIMS type" + SIMS_TYPE)
+
+
 #
 # Basic HTTP operations
 #
@@ -135,12 +156,47 @@ def post_request(url, data):
 #
 
 
+def start_honeynode(log_file: str, sim):
+    executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                              '..', '..', 'honeynode', sim[1], 'honeynode-simulator', 'honeycomb-tpce')
+    sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                                    '..', '..', 'sample_configs', 'openroadm', sim[1])
+    if os.path.isfile(executable):
+        with open(log_file, 'w', encoding='utf-8') as outfile:
+            return subprocess.Popen(
+                [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
+                stdout=outfile, stderr=outfile)
+    return None
+
+
+def start_lightynode(log_file: str, sim):
+    executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                              '..', '..', 'lightynode', 'lightynode-openroadm-device', 'start-device.sh')
+    sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+                                    '..', '..', 'sample_configs', 'openroadm', sim[1])
+    if os.path.isfile(executable):
+        with open(log_file, 'w', encoding='utf-8') as outfile:
+            return subprocess.Popen(
+                [executable, "-v" + sim[1], "-p" + SIMS[sim]['port'], "-f" + os.path.join(sample_directory,
+                                                                                          SIMS[sim]['configfile'])],
+                stdout=outfile, stderr=outfile)
+    return None
+
+
 def start_sims(sims_list):
+    if SIMS_TO_USE == 'None':
+        return None
+    if SIMS_TO_USE == 'honeynode':
+        start_msg = HONEYNODE_OK_START_MSG
+        start_method = start_honeynode
+    else:
+        start_msg = LIGHTYNODE_OK_START_MSG
+        start_method = start_lightynode
     for sim in sims_list:
         print('starting simulator ' + sim[0] + ' in OpenROADM device version ' + sim[1] + '...')
         log_file = os.path.join(SIM_LOG_DIRECTORY, SIMS[sim]['logfile'])
-        process = start_honeynode(log_file, sim)
-        if wait_until_log_contains(log_file, HONEYNODE_OK_START_MSG, 100):
+        process = start_method(log_file, sim)
+        if wait_until_log_contains(log_file, start_msg, 100):
             print('simulator for ' + sim[0] + ' started')
         else:
             print('simulator for ' + sim[0] + ' failed to start')
@@ -163,7 +219,7 @@ def start_tpce():
     else:
         process = start_karaf()
         start_msg = KARAF_OK_START_MSG
-    if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=300):
+    if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=100):
         print('OpenDaylight started !')
     else:
         print('OpenDaylight failed to start !')
@@ -217,51 +273,33 @@ def shutdown_process(process):
         process.send_signal(signal.SIGINT)
 
 
-def start_honeynode(log_file: str, sim):
-    executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
-                              '..', '..', 'honeynode', sim[1], 'honeynode-simulator', 'honeycomb-tpce')
-    sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
-                                    '..', '..', 'sample_configs', 'openroadm', sim[1])
-    if os.path.isfile(executable):
-        with open(log_file, 'w', encoding='utf-8') as outfile:
-            return subprocess.Popen(
-                [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
-                stdout=outfile, stderr=outfile)
-    return None
-
-
 def wait_until_log_contains(log_file, regexp, time_to_wait=60):
     # pylint: disable=lost-exception
     # pylint: disable=consider-using-with
     stringfound = False
-    filefound = False
     line = None
     try:
         with TimeOut(seconds=time_to_wait):
             while not os.path.exists(log_file):
                 time.sleep(0.2)
-            filelogs = open(log_file, 'r', encoding='utf-8')
-            filelogs.seek(0, 2)
-            filefound = True
-            print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
-            compiled_regexp = re.compile(regexp)
-            while True:
-                line = filelogs.readline()
-                if compiled_regexp.search(line):
-                    print('Pattern found!', end=' ')
-                    stringfound = True
-                    break
-                if not line:
-                    time.sleep(0.1)
+            with open(log_file, 'r', encoding='utf-8') as filelogs:
+                filelogs.seek(0, 2)
+                print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
+                compiled_regexp = re.compile(regexp)
+                while True:
+                    line = filelogs.readline()
+                    if compiled_regexp.search(line):
+                        print('Pattern found!', end=' ')
+                        stringfound = True
+                        break
+                    if not line:
+                        time.sleep(0.1)
+        return stringfound
     except TimeoutError:
         print('Pattern not found after ' + str(time_to_wait), end=' seconds! ', flush=True)
+        return stringfound
     except PermissionError:
         print('Permission Error when trying to access the log file', end=' ... ', flush=True)
-    finally:
-        if filefound:
-            filelogs.close()
-        else:
-            print('log file does not exist or is not accessible... ', flush=True)
         return stringfound
 
 
@@ -360,8 +398,12 @@ def check_node_attribute_request(node: str, attribute: str, attribute_value: str
                   'draft-bierman02': attribute}
     if return_key[RESTCONF_VERSION] in res.keys():
         response_attribute = res[return_key[RESTCONF_VERSION]]
-    else:
+    elif 'errors' in res.keys():
         response_attribute = res['errors']['error'][0]
+    else:
+        # status code 400 invalid request
+        response_attribute = res['message'] + ' ' + res['url']
+        print(response_attribute)
     return {'status_code': response.status_code,
             attribute: response_attribute}
 
@@ -646,3 +688,86 @@ def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict):
             return_output = res[return_key[RESTCONF_VERSION]]
     return {'status_code': response.status_code,
             'output': return_output}
+
+#
+# simulators datastore operations
+#
+
+
+def sims_update_cp_port(sim: tuple, circuitpack: str, port: str, payload: dict):
+    if SIMS_TYPE == 'lightynode':
+        return sims_update_cp_port_ntcf(sim, circuitpack, payload)
+    if SIMS_TYPE == 'honeynode':
+        return sims_update_cp_port_rest(sim, circuitpack, port, payload)
+    return False
+
+
+def sims_update_cp_port_rest(sim: tuple, circuitpack: str, port: str, payload: dict):
+    # pylint: disable=consider-using-f-string
+    url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/{}/ports/{}".format(
+        SIMS[sim]['restconf_baseurl'],
+        urllib.parse.quote(circuitpack, safe=''),
+        urllib.parse.quote(port, safe=''))
+    body = {"ports": [payload]}
+    response = requests.request("PUT",
+                                url,
+                                data=json.dumps(body),
+                                headers=TYPE_APPLICATION_JSON,
+                                auth=(ODL_LOGIN, ODL_PWD),
+                                timeout=REQUEST_TIMEOUT)
+    return response.status_code == requests.codes.ok
+
+
+def sims_update_cp_port_ntcf(sim: tuple, circuitpack: str, payload: dict):
+    body = {"circuit-packs": {"circuit-pack-name": circuitpack, "ports": payload}}
+    xml_body = '<config><org-openroadm-device xmlns="http://org/openroadm/device">'
+    xml_body += dict2xml(body, indent="  ")
+    xml_body += '</org-openroadm-device></config>'
+    with connect_ssh(host='127.0.0.1',
+                     port=int(SIMS[sim]['port']),
+                     username=NODES_LOGIN,
+                     password=NODES_PWD) as session:
+        mgr = Manager(session, timeout=120)
+        reply = mgr.edit_config(xml_body, target="candidate", default_operation="merge")
+    if "None" in str(reply):
+        return True
+    return False
+
+
+def sims_update_pm_interact(sim: tuple, payload: dict):
+    if SIMS_TYPE == 'lightynode':
+        return sims_update_pm_interact_ntcf(sim, payload)
+    if SIMS_TYPE == 'honeynode':
+        return sims_update_pm_interact_rest(sim, payload)
+    return False
+
+
+def sims_update_pm_interact_rest(sim: tuple, payload: dict):
+    # pylint: disable=consider-using-f-string
+    url = "{}/operations/pm-handling:pm-interact".format(SIMS[sim]['restconf_baseurl'])
+    body = {"input": payload}
+    response = requests.request("POST",
+                                url,
+                                data=json.dumps(body),
+                                headers=TYPE_APPLICATION_JSON,
+                                auth=(ODL_LOGIN, ODL_PWD),
+                                timeout=REQUEST_TIMEOUT)
+    return response.status_code == requests.codes.ok
+
+
+def sims_update_pm_interact_ntcf(sim: tuple, payload: dict):
+    # pylint: disable=line-too-long
+    xml_body = '<pm-interact xmlns="http://honeynode-simulator/pm-handling">'
+    xml_body += dict2xml(payload, indent="  ")
+    xml_body += '</pm-interact>'
+    new_xml = xml_body.replace("<pm-resource-instance>/org-openroadm-device:org-openroadm-device/org-openroadm-device:interface[org-openroadm-device:name='OTS-DEG2-TTP-TXRX']</pm-resource-instance>",
+                               "<pm-resource-instance xmlns:a=\"http://org/openroadm/device\">/a:org-openroadm-device/a:interface[a:name='OTS-DEG2-TTP-TXRX']</pm-resource-instance>")
+    with connect_ssh(host='127.0.0.1',
+                     port=int(SIMS[sim]['port']),
+                     username=NODES_LOGIN,
+                     password=NODES_PWD) as session:
+        mgr = Manager(session, timeout=120)
+        reply = mgr.dispatch(new_xml)
+    if "netconf_client.ncclient.RPCReply" in str(reply):
+        return True
+    return False