Outsource sims DS direct op on CP to test_utils
[transportpce.git] / tests / transportpce_tests / common / test_utils.py
index f156ad67ac4838968ff05b1d8e72e6c4029b9a8e..cc0604abb6f6c15135eddcd4501b5b35de4b8e47 100644 (file)
@@ -386,8 +386,12 @@ def check_node_attribute_request(node: str, attribute: str, attribute_value: str
                   'draft-bierman02': attribute}
     if return_key[RESTCONF_VERSION] in res.keys():
         response_attribute = res[return_key[RESTCONF_VERSION]]
-    else:
+    elif 'errors' in res.keys():
         response_attribute = res['errors']['error'][0]
+    else:
+        # status code 400 invalid request
+        response_attribute = res['message'] + ' ' + res['url']
+        print(response_attribute)
     return {'status_code': response.status_code,
             attribute: response_attribute}
 
@@ -672,3 +676,23 @@ def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict):
             return_output = res[return_key[RESTCONF_VERSION]]
     return {'status_code': response.status_code,
             'output': return_output}
+
+#
+# simulators datastore operations
+#
+
+
+def sims_update_cp_port(sim: tuple, circuitpack: str, port: str, payload: dict):
+    # pylint: disable=consider-using-f-string
+    url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/{}/ports/{}".format(
+        SIMS[sim]['restconf_baseurl'], circuitpack, port)
+    body = {"ports": [payload]}
+    print(sim)
+    print(url)
+    response = requests.request("PUT",
+                                url,
+                                data=json.dumps(body),
+                                headers=TYPE_APPLICATION_JSON,
+                                auth=(ODL_LOGIN, ODL_PWD),
+                                timeout=REQUEST_TIMEOUT)
+    return response.status_code == requests.codes.ok