__blocking_call_timeout = 3 # seconds
- def __init__(self, tx, rx, entity_id, protocol, protocol_params, auto_handling_descriptions={}):
+ def __init__(
+ self,
+ tx,
+ rx,
+ entity_id,
+ protocol,
+ protocol_params,
+ auto_handling_descriptions={},
+ ):
super(IoTDMItCommunication, self).__init__()
self.tx = tx
self.rx = rx
Creates and returns response to provided notification request with
provided result code
"""
- builder = IoTDMJsonPrimitiveBuilder() \
- .set_communication_protocol(self.get_protocol()) \
- .set_param(OneM2M.short_request_identifier,
- notification_request_primitive.get_param(OneM2M.short_request_identifier)) \
- .set_param(OneM2M.short_response_status_code, onem2m_result_code) \
- .set_proto_param(onem2m_http.http_result_code, onem2m_http.onem2m_to_http_result_codes[onem2m_result_code])
+ builder = (
+ IoTDMJsonPrimitiveBuilder()
+ .set_communication_protocol(self.get_protocol())
+ .set_param(
+ OneM2M.short_request_identifier,
+ notification_request_primitive.get_param(
+ OneM2M.short_request_identifier
+ ),
+ )
+ .set_param(OneM2M.short_response_status_code, onem2m_result_code)
+ .set_proto_param(
+ onem2m_http.http_result_code,
+ onem2m_http.onem2m_to_http_result_codes[onem2m_result_code],
+ )
+ )
return builder.build()
def add_auto_reply_description(self, auto_reply_description):
raise RuntimeError("Invalid automatic handling description object passed")
if auto_reply_description in self.auto_handling_descriptions:
- raise RuntimeError("Attempt to insert the same auto handling description multiple times")
+ raise RuntimeError(
+ "Attempt to insert the same auto handling description multiple times"
+ )
- self.auto_handling_descriptions[auto_reply_description] = AutoHandlingStatistics()
+ self.auto_handling_descriptions[
+ auto_reply_description
+ ] = AutoHandlingStatistics()
def remove_auto_reply_description(self, auto_reply_description):
"""Removes description of automatic reply"""
if auto_reply_description not in self.auto_handling_descriptions:
raise RuntimeError("No such auto handling description")
- del(self.auto_handling_descriptions[auto_reply_description])
+ del self.auto_handling_descriptions[auto_reply_description]
def get_auto_handling_statistics(self, auto_criteria):
"""
# Use auto handling if match criteria
for auto_response_desc, statistics in self.auto_handling_descriptions.items():
if auto_response_desc.match(request_primitive):
- response = self.create_auto_response(request_primitive, auto_response_desc.get_result_code())
+ response = self.create_auto_response(
+ request_primitive, auto_response_desc.get_result_code()
+ )
# this request was successfully handled automatically,
# increment statistics and return the resulting response
statistics.counter += 1
"""Returns default primitive parameters"""
params = {
OneM2M.short_from: self.entity_id,
- OneM2M.short_request_identifier: str(self.get_next_request_id())
+ OneM2M.short_request_identifier: str(self.get_next_request_id()),
}
return params
self.rx_request_queue = None
self.rx_response_queue = None
if req_size or rsp_size:
- raise RuntimeError("No all requests: {} or responses: {} were processed".format(
- req_size, rsp_size))
+ raise RuntimeError(
+ "No all requests: {} or responses: {} were processed".format(
+ req_size, rsp_size
+ )
+ )
def get_next_request_id(self):
"""Returns unique request ID"""
IoTDMItCommunication class
"""
- def create_http_json_primitive_communication(self, entity_id, protocol, protocol_params, rx_port, rx_interface=""):
+ def create_http_json_primitive_communication(
+ self, entity_id, protocol, protocol_params, rx_port, rx_interface=""
+ ):
"""
Instantiates encoder/decoder and rx/tx objects required by
IoTDMItCommunication and returns new instance of the
encoder_rx = OneM2MHttpJsonEncoderRx()
decoder_rx = OneM2MHttpJsonDecoderRx()
- rx = OneM2MHttpRx(decoder_rx, encoder_rx, port=rx_port, interface=rx_interface)
+ rx = OneM2MHttpRx(
+ decoder_rx, encoder_rx, port=rx_port, interface=rx_interface
+ )
return IoTDMItCommunication(tx, rx, entity_id, protocol, protocol_params)
- raise RuntimeError("Unsupported communication protocol specified: {}".format(protocol))
+ raise RuntimeError(
+ "Unsupported communication protocol specified: {}".format(protocol)
+ )
class IoTDMJsonPrimitiveBuilder(OneM2MJsonPrimitiveBuilder):
builder classes of all supported protocols
"""
- IoTDMProtoPrimitiveClasses = {
- "http": OneM2MHttpJsonPrimitive
- }
+ IoTDMProtoPrimitiveClasses = {"http": OneM2MHttpJsonPrimitive}
def build(self):
if not self.protocol or self.protocol not in self.IoTDMProtoPrimitiveClasses:
return super(IoTDMJsonPrimitiveBuilder, self).build()
primitive_class = self.IoTDMProtoPrimitiveClasses[self.protocol]
- return primitive_class(self.parameters, self.content, self.protocol,
- self.proto_params)
+ return primitive_class(
+ self.parameters, self.content, self.protocol, self.proto_params
+ )
class RequestAutoHandlingDescription(object):
"""Class stores auto handling matching criteria for request primitives"""
- def __init__(self, parameters_match_dict, content_match_dict, proto_param_match_dict,
- onem2m_result_code, matching_cb=None):
+ def __init__(
+ self,
+ parameters_match_dict,
+ content_match_dict,
+ proto_param_match_dict,
+ onem2m_result_code,
+ matching_cb=None,
+ ):
self.onem2m_result_code = onem2m_result_code
self.parameters_match_dict = parameters_match_dict
def _add_critieria(self, json_pointer, value, match_dict):
if str(json_pointer) in match_dict:
- raise RuntimeError("JSON pointer: {} already added".format(str(json_pointer)))
+ raise RuntimeError(
+ "JSON pointer: {} already added".format(str(json_pointer))
+ )
match_dict[json_pointer] = value
def add_param_criteria(self, json_pointer, value):
if None is self.onem2m_result_code:
raise RuntimeError("Result code not set")
- return RequestAutoHandlingDescription(self.parameter_match_dict,
- self.content_match_dict,
- self.proto_param_match_dict,
- self.onem2m_result_code)
+ return RequestAutoHandlingDescription(
+ self.parameter_match_dict,
+ self.content_match_dict,
+ self.proto_param_match_dict,
+ self.onem2m_result_code,
+ )