2 Implementation of IoTDM communication class implementing generic
3 (protocol independent) send and receive functionality
7 # Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
9 # This program and the accompanying materials are made available under the
10 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
11 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from Queue import Queue
15 from Queue import Empty
18 from onem2m_http import OneM2MHttpJsonEncoderTx
19 from onem2m_http import OneM2MHttpJsonDecoderTx
20 from onem2m_http import OneM2MHttpJsonDecoderRx
21 from onem2m_http import OneM2MHttpJsonEncoderRx
22 from onem2m_http import OneM2MHttpTx
23 from onem2m_http import OneM2MHttpRx
24 from onem2m_http import OneM2MHttpJsonPrimitive
25 from onem2m_json_primitive import OneM2MJsonPrimitiveBuilder
26 from iot_communication_concepts import IotComm
27 from onem2m_primitive import OneM2M
30 class IoTDMItCommunication(IotComm):
32 Generic IoTDM communication implementation which can be used for
33 all supported protocols
36 __blocking_call_timeout = 3 # seconds
45 auto_handling_descriptions={},
47 super(IoTDMItCommunication, self).__init__()
51 self.entity_id = entity_id
52 self.protocol = protocol
53 self.protocol_params = protocol_params
54 self.auto_handling_descriptions = auto_handling_descriptions
56 self.rx_request_queue = None
57 self.rx_response_queue = None
59 def create_auto_response(self, notification_request_primitive, onem2m_result_code):
61 Creates and returns response to provided notification request with
65 IoTDMJsonPrimitiveBuilder()
66 .set_communication_protocol(self.get_protocol())
68 OneM2M.short_request_identifier,
69 notification_request_primitive.get_param(
70 OneM2M.short_request_identifier
73 .set_param(OneM2M.short_response_status_code, onem2m_result_code)
75 onem2m_http.http_result_code,
76 onem2m_http.onem2m_to_http_result_codes[onem2m_result_code],
79 return builder.build()
81 def add_auto_reply_description(self, auto_reply_description):
83 Adds description of automatic reply for requests matching described
86 if not isinstance(auto_reply_description, RequestAutoHandlingDescription):
87 raise RuntimeError("Invalid automatic handling description object passed")
89 if auto_reply_description in self.auto_handling_descriptions:
91 "Attempt to insert the same auto handling description multiple times"
94 self.auto_handling_descriptions[
95 auto_reply_description
96 ] = AutoHandlingStatistics()
98 def remove_auto_reply_description(self, auto_reply_description):
99 """Removes description of automatic reply"""
100 if not isinstance(auto_reply_description, RequestAutoHandlingDescription):
101 raise RuntimeError("Invalid automatic handling description object passed")
103 if auto_reply_description not in self.auto_handling_descriptions:
104 raise RuntimeError("No such auto handling description")
106 del self.auto_handling_descriptions[auto_reply_description]
108 def get_auto_handling_statistics(self, auto_criteria):
110 Returns statistics of automatic handling according to auto reply
113 return self.auto_handling_descriptions[auto_criteria]
115 def _receive_cb(self, request_primitive):
117 Callback called by Rx channel when request primitive is received.
118 Auto response descriptions are checked first and the received request
119 is handled automatically if matches at least one of stored auto reply
120 descriptions. Created automatic reply is returned immediately.
121 If the received request is not handled automatically (because it
122 doesn't match any auto reply description) it is stored in
123 rx_request_queue and processing stays blocked on get() method of
124 rx_response_queue where a response to the request is expected.
125 When the response to the request is retrieved from rx_respnose_queue in
126 specified timeout then the response is returned from this callback.
127 None is returned if the timeout expires.
129 if not self.rx_request_queue:
130 raise RuntimeError("No rx request queue")
131 if not self.rx_response_queue:
132 raise RuntimeError("No rx response queue")
134 # Use auto handling if match criteria
135 for auto_response_desc, statistics in self.auto_handling_descriptions.items():
136 if auto_response_desc.match(request_primitive):
137 response = self.create_auto_response(
138 request_primitive, auto_response_desc.get_result_code()
140 # this request was successfully handled automatically,
141 # increment statistics and return the resulting response
142 statistics.counter += 1
145 # put the request to the queue to be processed by upper layer
146 self.rx_request_queue.put_nowait(request_primitive)
149 # get response from the queue and return as result
150 return self.rx_response_queue.get(timeout=self.__blocking_call_timeout)
155 def get_protocol_params(self):
156 """Returns default protocol specific parameters"""
157 return self.protocol_params
159 def get_protocol(self):
160 """Returns protocol used for this communication instance"""
163 def get_primitive_params(self):
164 """Returns default primitive parameters"""
166 OneM2M.short_from: self.entity_id,
167 OneM2M.short_request_identifier: str(self.get_next_request_id()),
172 if not self.rx and not self.tx:
173 raise RuntimeError("Nothing to start!")
174 if None is not self.tx:
176 if None is not self.rx:
177 self.rx_request_queue = Queue()
178 self.rx_response_queue = Queue()
179 self.rx.start(self._receive_cb)
182 if None is not self.tx:
184 if None is not self.rx:
186 req_size = self.rx_request_queue.qsize()
187 rsp_size = self.rx_response_queue.qsize()
188 self.rx_request_queue = None
189 self.rx_response_queue = None
190 if req_size or rsp_size:
192 "No all requests: {} or responses: {} were processed".format(
197 def get_next_request_id(self):
198 """Returns unique request ID"""
199 # TODO how to make this thread safe ?
201 return self.requestId
203 def send(self, primitive):
204 if not self.is_started():
205 raise RuntimeError("Communication not started yet!")
206 return self.tx.send(primitive)
210 Blocking receive requests waits till request is received and returns the
211 request or returns None when timeouted.
212 Receiving thread stays blocked when request was returned from this method
213 and it waits for related response to be inserted into the queue by
217 req = self.rx_request_queue.get(timeout=self.__blocking_call_timeout)
220 # call timeouted and nothing received
223 def respond(self, response_primitive):
225 This method expects response to the last request returned by
226 receive() method. This response is put to the rx_respnose_queue.
228 self.rx_response_queue.put_nowait(response_primitive)
231 class IoTDMItCommunicationFactory(object):
233 Factory classs which should be used for instantiation objects of
234 IoTDMItCommunication class
237 def create_http_json_primitive_communication(
238 self, entity_id, protocol, protocol_params, rx_port, rx_interface=""
241 Instantiates encoder/decoder and rx/tx objects required by
242 IoTDMItCommunication and returns new instance of the
243 IoTDMItCommunication class
245 protocol = protocol.lower()
246 if protocol == "http":
247 encoder = OneM2MHttpJsonEncoderTx()
248 decoder = OneM2MHttpJsonDecoderTx()
250 tx = OneM2MHttpTx(encoder, decoder)
255 encoder_rx = OneM2MHttpJsonEncoderRx()
256 decoder_rx = OneM2MHttpJsonDecoderRx()
259 decoder_rx, encoder_rx, port=rx_port, interface=rx_interface
262 return IoTDMItCommunication(tx, rx, entity_id, protocol, protocol_params)
265 "Unsupported communication protocol specified: {}".format(protocol)
269 class IoTDMJsonPrimitiveBuilder(OneM2MJsonPrimitiveBuilder):
271 Helper class providing single point of access for multiple primitive
272 builder classes of all supported protocols
275 IoTDMProtoPrimitiveClasses = {"http": OneM2MHttpJsonPrimitive}
278 if not self.protocol or self.protocol not in self.IoTDMProtoPrimitiveClasses:
279 return super(IoTDMJsonPrimitiveBuilder, self).build()
281 primitive_class = self.IoTDMProtoPrimitiveClasses[self.protocol]
282 return primitive_class(
283 self.parameters, self.content, self.protocol, self.proto_params
287 class RequestAutoHandlingDescription(object):
288 """Class stores auto handling matching criteria for request primitives"""
292 parameters_match_dict,
294 proto_param_match_dict,
298 self.onem2m_result_code = onem2m_result_code
300 self.parameters_match_dict = parameters_match_dict
301 if not self.parameters_match_dict:
302 self.parameters_match_dict = {}
304 self.content_match_dict = content_match_dict
305 if not self.content_match_dict:
306 self.content_match_dict = {}
308 self.proto_param_match_dict = proto_param_match_dict
309 if not self.proto_param_match_dict:
310 self.proto_param_match_dict = {}
312 self.matching_cb = matching_cb
314 def match(self, request_primitive):
316 Returns True if the request_primitive object matches stored criteria,
319 for name, value in self.parameters_match_dict.items():
320 if not request_primitive.has_param(name):
323 val = request_primitive.get_param(name)
327 for name, value in self.content_match_dict.items():
328 if not request_primitive.has_attr(name):
331 val = request_primitive.get_attr(name)
335 for name, value in self.proto_param_match_dict.items():
336 if not request_primitive.has_proto_param(name):
339 val = request_primitive.get_proto_param(name)
343 if None is not self.matching_cb:
344 return self.matching_cb(request_primitive)
348 def get_result_code(self):
350 Returns result code which should be used for resulting response
351 primitive as result of automatic handling
353 return self.onem2m_result_code
356 class AutoHandlingStatistics(object):
357 """Statistics gathered by auto handling"""
360 # TODO might store requests for further processing / verification
361 self.counter = 0 # number of automatically handled requests
364 class RequestAutoHandlingDescriptionBuilder(object):
365 """Builder class for auto handling description objects"""
368 self.onem2m_result_code = None
369 self.parameter_match_dict = {}
370 self.content_match_dict = {}
371 self.proto_param_match_dict = {}
373 def _add_critieria(self, json_pointer, value, match_dict):
374 if str(json_pointer) in match_dict:
376 "JSON pointer: {} already added".format(str(json_pointer))
378 match_dict[json_pointer] = value
380 def add_param_criteria(self, json_pointer, value):
381 self._add_critieria(json_pointer, value, self.parameter_match_dict)
384 def add_content_criteria(self, json_pointer, value):
385 self._add_critieria(json_pointer, value, self.content_match_dict)
388 def add_proto_param_criteria(self, json_pointer, value):
389 self._add_critieria(json_pointer, value, self.proto_param_match_dict)
392 def set_onem2m_result_code(self, result_code):
393 self.onem2m_result_code = result_code
397 if None is self.onem2m_result_code:
398 raise RuntimeError("Result code not set")
400 return RequestAutoHandlingDescription(
401 self.parameter_match_dict,
402 self.content_match_dict,
403 self.proto_param_match_dict,
404 self.onem2m_result_code,