Bump pre-commit flake8 to 6.1.0
[integration/test.git] / csit / libraries / AuthStandalone.py
1 """Library hiding details of restconf authentication.
2
3 Suitable for performance tests against different
4 restconf authentication methods.
5 Type of authentication is currently determined by "scope" value.
6
7 This library does not rely on RobotFramework libraries,
8 so it is suitable for running from wide range of VMs.
9 Requirements: Basic Python installation,
10 which should include "json" and "requests" Python modules.
11
12 *_Using_Session keywords take the same kwargs as requests.Session.request,
13 but instead of method and URL, they take "session" created by
14 Init_Session keyword and URI (without "/restconf/").
15
16 Due to performance of TCP on some systems, two session strategies are available.
17 reuse=True reuses the same requests.Session, which possibly means
18 the library tries to re-use the same source TCP port.
19 This conserves resources (TCP ports available), but it may lead
20 to a significant performance hit (as in 10 times slower).
21 reuse=False closes every requests.Session object, presumably
22 causing the new session to take another TCP port.
23 This has good performance, but may perhaps lead to port starvation in some cases.
24
25 TODO: Put "RESTCONF" to more places,
26 as URIs not starting with /restconf/ are not supported yet.
27 """
28
29 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
30 #
31 # This program and the accompanying materials are made available under the
32 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
33 # and is available at http://www.eclipse.org/legal/epl-v10.html
34
35 import json
36 import requests
37
38
39 __author__ = "Vratko Polak"
40 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
41 __license__ = "Eclipse Public License v1.0"
42 __email__ = "vrpolak@cisco.com"
43
44
45 #
46 # Karaf Keyword definitions.
47 #
48 def Init_Session(ip, username, password, scope, reuse=True, port="8181"):
49     """Robot keyword, return opaque session object, which handles authentication automatically."""
50     if scope:
51         if reuse:
52             return _TokenReusingSession(ip, username, password, scope, port=port)
53         else:
54             return _TokenClosingSession(ip, username, password, scope, port=port)
55     else:
56         if reuse:
57             return _BasicReusingSession(ip, username, password, port=port)
58         else:
59             return _BasicClosingSession(ip, username, password, port=port)
60
61
62 def Get_Using_Session(session, uri, **kwargs):
63     """Robot keyword, perform GET operation using given opaque session object."""
64     return session.robust_method("GET", uri, **kwargs)
65
66
67 def Post_Using_Session(session, uri, **kwargs):
68     """Robot keyword, perform POST operation using given opaque session object."""
69     return session.robust_method("POST", uri, **kwargs)
70
71
72 def Put_Using_Session(session, uri, **kwargs):
73     """Robot keyword, perform PUT operation using given opaque session object."""
74     return session.robust_method("PUT", uri, **kwargs)
75
76
77 def Delete_Using_Session(session, uri, **kwargs):
78     """Robot keyword, perform DELETE operation using given opaque session object."""
79     return session.robust_method("DELETE", uri, **kwargs)
80
81
82 #
83 # Private classes.
84 #
85 class _BasicReusingSession(object):
86     """Handling of restconf requests using persistent session and basic http authentication."""
87
88     def __init__(self, ip, username="", password="", port="8181"):
89         """Initialize session using hardcoded text data, remember credentials."""
90         self.rest_prefix = "http://" + ip + ":" + port + "/rests/"
91         self.session = requests.Session()
92         if username:
93             self.session.auth = (username, password)  # May work with non-string values
94         else:
95             self.session.auth = (
96                 None  # Supports "no authentication mode" as in odl-restconf-noauth
97             )
98
99     def robust_method(self, method, uri, **kwargs):
100         """Try method once using session credentials. Return last response."""
101         return self.session.request(method, self.rest_prefix + uri, **kwargs)
102         # TODO: Do we want to fix URI at __init__ just to avoid string concat?
103
104
105 class _BasicClosingSession(object):
106     """Handling of restconf requests using one-time sessions and basic http authentication."""
107
108     def __init__(self, ip, username="", password="", port="8181"):
109         """Prepare session initialization data using hardcoded text, remember credentials."""
110         self.rest_prefix = "http://" + ip + ":" + port + "/rests/"
111         if username:
112             self.auth = (username, password)  # May work with non-string values
113         else:
114             self.auth = (
115                 None  # Supports "no authentication mode" as in odl-restconf-noauth
116             )
117         self.session = None
118
119     def robust_method(self, method, uri, **kwargs):
120         """Create new session, send method using remembered credentials. Return response"""
121         if self.session:
122             # The session object may still keep binding a TCP port here.
123             # As caller has finished processing previous response,
124             # we can explicitly close the old session to free resources
125             # before creating new session.
126             self.session.close()
127         self.session = requests.Session()
128         self.session.auth = self.auth
129         return self.session.request(method, self.rest_prefix + uri, **kwargs)
130
131
132 class _TokenReusingSession(object):
133     """Handling of restconf requests using token-based authentication, one session per token."""
134
135     def __init__(self, ip, username, password, scope, port="8181"):
136         """Initialize session using hardcoded text data."""
137         self.auth_url = "http://" + ip + ":" + port + "/oauth2/token"
138         self.rest_prefix = "http://" + ip + ":" + port + "/rests/"
139         self.auth_data = "grant_type=password&username=" + username
140         self.auth_data += "&password=" + password + "&scope=" + scope
141         self.auth_header = {"Content-Type": "application/x-www-form-urlencoded"}
142         self.session = None
143         self.token = None
144         self.refresh_token()
145
146     def refresh_token(self):
147         """Reset session, invoke call to get token, parse it and remember."""
148         if self.session:
149             self.session.close()
150         self.session = requests.Session()
151         resp = self.session.post(
152             self.auth_url, data=self.auth_data, headers=self.auth_header
153         )
154         resp_obj = json.loads(resp.text)
155         try:
156             token = resp_obj["access_token"]
157         except KeyError:
158             raise RuntimeError("Parse failed: " + resp.text)
159         self.token = token
160         # TODO: Use logging so that callers could see token refreshes.
161         # We keep self.session to use for the following restconf requests.
162
163     def oneshot_method(self, method, uri, **kwargs):
164         """Return response of request of given method to given uri (without restconf/)."""
165         # Token needs to be merged into headers.
166         authed_headers = kwargs.get("headers", {})
167         authed_headers["Authorization"] = "Bearer " + self.token
168         authed_kwargs = dict(kwargs)  # shallow copy
169         authed_kwargs["headers"] = authed_headers
170         return self.session.request(method, self.rest_prefix + uri, **authed_kwargs)
171
172     def robust_method(self, method, uri, **kwargs):
173         """Try method once; upon 401, refresh token and retry once. Return last response."""
174         resp = self.oneshot_method(method, uri, **kwargs)
175         if resp.status_code != 401:
176             return resp
177         self.refresh_token()
178         return self.oneshot_method(method, uri, **kwargs)
179
180
181 class _TokenClosingSession(object):
182     """Handling of restconf requests using token-based authentication, one session per request."""
183
184     def __init__(self, ip, username, password, scope, port="8181"):
185         """Prepare session initialization data using hardcoded text."""
186         self.auth_url = "http://" + ip + ":" + port + "/oauth2/token"
187         self.rest_prefix = "http://" + ip + ":" + port + "/rests/"
188         self.auth_data = "grant_type=password&username=" + username
189         self.auth_data += "&password=" + password + "&scope=" + scope
190         self.auth_header = {"Content-Type": "application/x-www-form-urlencoded"}
191         self.session = None
192         self.token = None
193         self.refresh_token()
194
195     def refresh_token(self):
196         """Reset session, invoke call to get token, parse it and remember."""
197         if self.session:
198             self.session.close()
199         self.session = requests.Session()
200         resp = self.session.post(
201             self.auth_url, data=self.auth_data, headers=self.auth_header
202         )
203         resp_obj = json.loads(resp.text)
204         try:
205             token = resp_obj["access_token"]
206         except KeyError:
207             raise RuntimeError("Parse failed: " + resp.text)
208         self.token = token
209         # TODO: Use logging so that callers could see token refreshes.
210         # We keep self.session to use for the following restconf requests.
211
212     def oneshot_method(self, method, uri, **kwargs):
213         """Reset session, return response of request of given method to given uri (without restconf/)."""
214         # This assumes self.session was already initialized.
215         self.session.close()
216         self.session = requests.Session()
217         # Token needs to be merged into headers.
218         authed_headers = kwargs.get("headers", {})
219         authed_headers["Authorization"] = "Bearer " + self.token
220         authed_kwargs = dict(kwargs)  # shallow copy
221         authed_kwargs["headers"] = authed_headers
222         return self.session.request(method, self.rest_prefix + uri, **authed_kwargs)
223
224     def robust_method(self, method, uri, **kwargs):
225         """Try method once; upon 401, refresh token and retry once. Return last response."""
226         resp = self.oneshot_method(method, uri, **kwargs)
227         if resp.status_code != 401:
228             return resp
229         self.refresh_token()
230         return self.oneshot_method(method, uri, **kwargs)