Auto-generated patch by python-black
[integration/test.git] / csit / libraries / IoTDM / client_libs / iotdm_it_test_com.py
1 """
2  Implementation of IoTDM communication class implementing generic
3  (protocol independent) send and receive functionality
4 """
5
6 #
7 # Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
8 #
9 # This program and the accompanying materials are made available under the
10 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
11 # and is available at http://www.eclipse.org/legal/epl-v10.html
12 #
13
14 from Queue import Queue
15 from Queue import Empty
16
17 import onem2m_http
18 from onem2m_http import OneM2MHttpJsonEncoderTx
19 from onem2m_http import OneM2MHttpJsonDecoderTx
20 from onem2m_http import OneM2MHttpJsonDecoderRx
21 from onem2m_http import OneM2MHttpJsonEncoderRx
22 from onem2m_http import OneM2MHttpTx
23 from onem2m_http import OneM2MHttpRx
24 from onem2m_http import OneM2MHttpJsonPrimitive
25 from onem2m_json_primitive import OneM2MJsonPrimitiveBuilder
26 from iot_communication_concepts import IotComm
27 from onem2m_primitive import OneM2M
28
29
30 class IoTDMItCommunication(IotComm):
31     """
32     Generic IoTDM communication implementation which can be used for
33     all supported protocols
34     """
35
36     __blocking_call_timeout = 3  # seconds
37
38     def __init__(
39         self,
40         tx,
41         rx,
42         entity_id,
43         protocol,
44         protocol_params,
45         auto_handling_descriptions={},
46     ):
47         super(IoTDMItCommunication, self).__init__()
48         self.tx = tx
49         self.rx = rx
50         self.requestId = 0
51         self.entity_id = entity_id
52         self.protocol = protocol
53         self.protocol_params = protocol_params
54         self.auto_handling_descriptions = auto_handling_descriptions
55
56         self.rx_request_queue = None
57         self.rx_response_queue = None
58
59     def create_auto_response(self, notification_request_primitive, onem2m_result_code):
60         """
61         Creates and returns response to provided notification request with
62         provided result code
63         """
64         builder = (
65             IoTDMJsonPrimitiveBuilder()
66             .set_communication_protocol(self.get_protocol())
67             .set_param(
68                 OneM2M.short_request_identifier,
69                 notification_request_primitive.get_param(
70                     OneM2M.short_request_identifier
71                 ),
72             )
73             .set_param(OneM2M.short_response_status_code, onem2m_result_code)
74             .set_proto_param(
75                 onem2m_http.http_result_code,
76                 onem2m_http.onem2m_to_http_result_codes[onem2m_result_code],
77             )
78         )
79         return builder.build()
80
81     def add_auto_reply_description(self, auto_reply_description):
82         """
83         Adds description of automatic reply for requests matching described
84         criteria
85         """
86         if not isinstance(auto_reply_description, RequestAutoHandlingDescription):
87             raise RuntimeError("Invalid automatic handling description object passed")
88
89         if auto_reply_description in self.auto_handling_descriptions:
90             raise RuntimeError(
91                 "Attempt to insert the same auto handling description multiple times"
92             )
93
94         self.auto_handling_descriptions[
95             auto_reply_description
96         ] = AutoHandlingStatistics()
97
98     def remove_auto_reply_description(self, auto_reply_description):
99         """Removes description of automatic reply"""
100         if not isinstance(auto_reply_description, RequestAutoHandlingDescription):
101             raise RuntimeError("Invalid automatic handling description object passed")
102
103         if auto_reply_description not in self.auto_handling_descriptions:
104             raise RuntimeError("No such auto handling description")
105
106         del self.auto_handling_descriptions[auto_reply_description]
107
108     def get_auto_handling_statistics(self, auto_criteria):
109         """
110         Returns statistics of automatic handling according to auto reply
111         descriptions stored
112         """
113         return self.auto_handling_descriptions[auto_criteria]
114
115     def _receive_cb(self, request_primitive):
116         """
117         Callback called by Rx channel when request primitive is received.
118         Auto response descriptions are checked first and the received request
119         is handled automatically if matches at least one of stored auto reply
120         descriptions. Created automatic reply is returned immediately.
121         If the received request is not handled automatically (because it
122         doesn't match any auto reply description) it is stored in
123         rx_request_queue and processing stays blocked on get() method of
124         rx_response_queue where a response to the request is expected.
125         When the response to the request is retrieved from rx_respnose_queue in
126         specified timeout then the response is returned from this callback.
127         None is returned if the timeout expires.
128         """
129         if not self.rx_request_queue:
130             raise RuntimeError("No rx request queue")
131         if not self.rx_response_queue:
132             raise RuntimeError("No rx response queue")
133
134         # Use auto handling if match criteria
135         for auto_response_desc, statistics in self.auto_handling_descriptions.items():
136             if auto_response_desc.match(request_primitive):
137                 response = self.create_auto_response(
138                     request_primitive, auto_response_desc.get_result_code()
139                 )
140                 # this request was successfully handled automatically,
141                 # increment statistics and return the resulting response
142                 statistics.counter += 1
143                 return response
144
145         # put the request to the queue to be processed by upper layer
146         self.rx_request_queue.put_nowait(request_primitive)
147
148         try:
149             # get response from the queue and return as result
150             return self.rx_response_queue.get(timeout=self.__blocking_call_timeout)
151         except Empty:
152             # timeouted
153             return None
154
155     def get_protocol_params(self):
156         """Returns default protocol specific parameters"""
157         return self.protocol_params
158
159     def get_protocol(self):
160         """Returns protocol used for this communication instance"""
161         return self.protocol
162
163     def get_primitive_params(self):
164         """Returns default primitive parameters"""
165         params = {
166             OneM2M.short_from: self.entity_id,
167             OneM2M.short_request_identifier: str(self.get_next_request_id()),
168         }
169         return params
170
171     def _start(self):
172         if not self.rx and not self.tx:
173             raise RuntimeError("Nothing to start!")
174         if None is not self.tx:
175             self.tx.start()
176         if None is not self.rx:
177             self.rx_request_queue = Queue()
178             self.rx_response_queue = Queue()
179             self.rx.start(self._receive_cb)
180
181     def _stop(self):
182         if None is not self.tx:
183             self.tx.stop()
184         if None is not self.rx:
185             self.rx.stop()
186             req_size = self.rx_request_queue.qsize()
187             rsp_size = self.rx_response_queue.qsize()
188             self.rx_request_queue = None
189             self.rx_response_queue = None
190             if req_size or rsp_size:
191                 raise RuntimeError(
192                     "No all requests: {} or responses: {} were processed".format(
193                         req_size, rsp_size
194                     )
195                 )
196
197     def get_next_request_id(self):
198         """Returns unique request ID"""
199         # TODO how to make this thread safe ?
200         self.requestId += 1
201         return self.requestId
202
203     def send(self, primitive):
204         if not self.is_started():
205             raise RuntimeError("Communication not started yet!")
206         return self.tx.send(primitive)
207
208     def receive(self):
209         """
210         Blocking receive requests waits till request is received and returns the
211         request or returns None when timeouted.
212         Receiving thread stays blocked when request was returned from this method
213         and it waits for related response to be inserted into the queue by
214         respond() method.
215         """
216         try:
217             req = self.rx_request_queue.get(timeout=self.__blocking_call_timeout)
218             return req
219         except Empty:
220             # call timeouted and nothing received
221             return None
222
223     def respond(self, response_primitive):
224         """
225         This method expects response to the last request returned by
226         receive() method. This response is put to the rx_respnose_queue.
227         """
228         self.rx_response_queue.put_nowait(response_primitive)
229
230
231 class IoTDMItCommunicationFactory(object):
232     """
233     Factory classs which should be used for instantiation objects of
234     IoTDMItCommunication class
235     """
236
237     def create_http_json_primitive_communication(
238         self, entity_id, protocol, protocol_params, rx_port, rx_interface=""
239     ):
240         """
241         Instantiates encoder/decoder and rx/tx objects required by
242         IoTDMItCommunication and returns new instance of the
243         IoTDMItCommunication class
244         """
245         protocol = protocol.lower()
246         if protocol == "http":
247             encoder = OneM2MHttpJsonEncoderTx()
248             decoder = OneM2MHttpJsonDecoderTx()
249
250             tx = OneM2MHttpTx(encoder, decoder)
251
252             if not rx_port:
253                 rx = None
254             else:
255                 encoder_rx = OneM2MHttpJsonEncoderRx()
256                 decoder_rx = OneM2MHttpJsonDecoderRx()
257
258                 rx = OneM2MHttpRx(
259                     decoder_rx, encoder_rx, port=rx_port, interface=rx_interface
260                 )
261
262             return IoTDMItCommunication(tx, rx, entity_id, protocol, protocol_params)
263
264         raise RuntimeError(
265             "Unsupported communication protocol specified: {}".format(protocol)
266         )
267
268
269 class IoTDMJsonPrimitiveBuilder(OneM2MJsonPrimitiveBuilder):
270     """
271     Helper class providing single point of access for multiple primitive
272     builder classes of all supported protocols
273     """
274
275     IoTDMProtoPrimitiveClasses = {"http": OneM2MHttpJsonPrimitive}
276
277     def build(self):
278         if not self.protocol or self.protocol not in self.IoTDMProtoPrimitiveClasses:
279             return super(IoTDMJsonPrimitiveBuilder, self).build()
280
281         primitive_class = self.IoTDMProtoPrimitiveClasses[self.protocol]
282         return primitive_class(
283             self.parameters, self.content, self.protocol, self.proto_params
284         )
285
286
287 class RequestAutoHandlingDescription(object):
288     """Class stores auto handling matching criteria for request primitives"""
289
290     def __init__(
291         self,
292         parameters_match_dict,
293         content_match_dict,
294         proto_param_match_dict,
295         onem2m_result_code,
296         matching_cb=None,
297     ):
298         self.onem2m_result_code = onem2m_result_code
299
300         self.parameters_match_dict = parameters_match_dict
301         if not self.parameters_match_dict:
302             self.parameters_match_dict = {}
303
304         self.content_match_dict = content_match_dict
305         if not self.content_match_dict:
306             self.content_match_dict = {}
307
308         self.proto_param_match_dict = proto_param_match_dict
309         if not self.proto_param_match_dict:
310             self.proto_param_match_dict = {}
311
312         self.matching_cb = matching_cb
313
314     def match(self, request_primitive):
315         """
316         Returns True if the request_primitive object matches stored criteria,
317         False otherwise
318         """
319         for name, value in self.parameters_match_dict.items():
320             if not request_primitive.has_param(name):
321                 return False
322
323             val = request_primitive.get_param(name)
324             if val != value:
325                 return False
326
327         for name, value in self.content_match_dict.items():
328             if not request_primitive.has_attr(name):
329                 return False
330
331             val = request_primitive.get_attr(name)
332             if val != value:
333                 return False
334
335         for name, value in self.proto_param_match_dict.items():
336             if not request_primitive.has_proto_param(name):
337                 return False
338
339             val = request_primitive.get_proto_param(name)
340             if val != value:
341                 return False
342
343         if None is not self.matching_cb:
344             return self.matching_cb(request_primitive)
345
346         return True
347
348     def get_result_code(self):
349         """
350         Returns result code which should be used for resulting response
351         primitive as result of automatic handling
352         """
353         return self.onem2m_result_code
354
355
356 class AutoHandlingStatistics(object):
357     """Statistics gathered by auto handling"""
358
359     def __init__(self):
360         # TODO might store requests for further processing / verification
361         self.counter = 0  # number of automatically handled requests
362
363
364 class RequestAutoHandlingDescriptionBuilder(object):
365     """Builder class for auto handling description objects"""
366
367     def __init__(self):
368         self.onem2m_result_code = None
369         self.parameter_match_dict = {}
370         self.content_match_dict = {}
371         self.proto_param_match_dict = {}
372
373     def _add_critieria(self, json_pointer, value, match_dict):
374         if str(json_pointer) in match_dict:
375             raise RuntimeError(
376                 "JSON pointer: {} already added".format(str(json_pointer))
377             )
378         match_dict[json_pointer] = value
379
380     def add_param_criteria(self, json_pointer, value):
381         self._add_critieria(json_pointer, value, self.parameter_match_dict)
382         return self
383
384     def add_content_criteria(self, json_pointer, value):
385         self._add_critieria(json_pointer, value, self.content_match_dict)
386         return self
387
388     def add_proto_param_criteria(self, json_pointer, value):
389         self._add_critieria(json_pointer, value, self.proto_param_match_dict)
390         return self
391
392     def set_onem2m_result_code(self, result_code):
393         self.onem2m_result_code = result_code
394         return self
395
396     def build(self):
397         if None is self.onem2m_result_code:
398             raise RuntimeError("Result code not set")
399
400         return RequestAutoHandlingDescription(
401             self.parameter_match_dict,
402             self.content_match_dict,
403             self.proto_param_match_dict,
404             self.onem2m_result_code,
405         )