return None
-def prepare_primitive_builder_raw(protocol, primitive_params, content=None, proto_specific_params=None):
+def prepare_primitive_builder_raw(
+ protocol, primitive_params, content=None, proto_specific_params=None
+):
"""Creates primitive builder without any default data"""
- builder = IoTDMJsonPrimitiveBuilder()\
- .set_communication_protocol(protocol)\
- .set_content(content)\
- .set_parameters(primitive_params)\
+ builder = (
+ IoTDMJsonPrimitiveBuilder()
+ .set_communication_protocol(protocol)
+ .set_content(content)
+ .set_parameters(primitive_params)
.set_protocol_specific_parameters(proto_specific_params)
+ )
return builder
-def new_primitive_raw(protocol, primitive_params, content=None, proto_specific_params=None):
+def new_primitive_raw(
+ protocol, primitive_params, content=None, proto_specific_params=None
+):
"""Creates primitive object without any default data"""
- return prepare_primitive_builder_raw(protocol, primitive_params, content, proto_specific_params).build()
-
-
-def prepare_primitive_builder(primitive_params, content=None, proto_specific_params=None,
- allias="default", communication=None):
+ return prepare_primitive_builder_raw(
+ protocol, primitive_params, content, proto_specific_params
+ ).build()
+
+
+def prepare_primitive_builder(
+ primitive_params,
+ content=None,
+ proto_specific_params=None,
+ allias="default",
+ communication=None,
+):
"""Creates primitive builder with default data set according communication object used"""
communication = __get_session(allias, communication)
- builder = IoTDMJsonPrimitiveBuilder()\
- .set_communication_protocol(communication.get_protocol())\
- .set_parameters(communication.get_primitive_params())\
- .set_protocol_specific_parameters(communication.get_protocol_params())\
+ builder = (
+ IoTDMJsonPrimitiveBuilder()
+ .set_communication_protocol(communication.get_protocol())
+ .set_parameters(communication.get_primitive_params())
+ .set_protocol_specific_parameters(communication.get_protocol_params())
.set_content(content)
+ )
if communication.get_protocol() == onem2m_http.HTTPPROTOCOLNAME and content:
- builder.set_proto_param(onem2m_http.http_header_content_length, str(len(content)))
+ builder.set_proto_param(
+ onem2m_http.http_header_content_length, str(len(content))
+ )
- builder.append_parameters(primitive_params)\
- .append_protocol_specific_parameters(proto_specific_params)
+ builder.append_parameters(primitive_params).append_protocol_specific_parameters(
+ proto_specific_params
+ )
return builder
-def new_primitive(primitive_params, content=None, proto_specific_params=None,
- allias="default", communication=None):
+def new_primitive(
+ primitive_params,
+ content=None,
+ proto_specific_params=None,
+ allias="default",
+ communication=None,
+):
"""Creates new primitive object with default data set according communication object used"""
- return prepare_primitive_builder(primitive_params, content, proto_specific_params, allias, communication).build()
+ return prepare_primitive_builder(
+ primitive_params, content, proto_specific_params, allias, communication
+ ).build()
def _add_param(params, name, value):
params[name] = value
-def prepare_request_primitive_builder(target_resource, content=None, operation=None, resource_type=None,
- result_content=None, allias="default", communication=None):
+def prepare_request_primitive_builder(
+ target_resource,
+ content=None,
+ operation=None,
+ resource_type=None,
+ result_content=None,
+ allias="default",
+ communication=None,
+):
"""
Creates builder for request primitive with default data set according
communication object used
primitive_params = json.dumps(primitive_params)
- builder = prepare_primitive_builder(primitive_params, content, communication=communication)
+ builder = prepare_primitive_builder(
+ primitive_params, content, communication=communication
+ )
return builder
-def new_create_request_primitive(target_resource, content, resource_type, result_content=None,
- allias="default", communication=None):
+def new_create_request_primitive(
+ target_resource,
+ content,
+ resource_type,
+ result_content=None,
+ allias="default",
+ communication=None,
+):
"""Creates request primitive for Create operation"""
- return prepare_request_primitive_builder(target_resource, content, operation=OneM2M.operation_create,
- resource_type=resource_type, result_content=result_content,
- allias=allias, communication=communication).build()
-
-
-def new_update_request_primitive(target_resource, content, result_content=None, allias="default", communication=None):
+ return prepare_request_primitive_builder(
+ target_resource,
+ content,
+ operation=OneM2M.operation_create,
+ resource_type=resource_type,
+ result_content=result_content,
+ allias=allias,
+ communication=communication,
+ ).build()
+
+
+def new_update_request_primitive(
+ target_resource, content, result_content=None, allias="default", communication=None
+):
"""Creates request primitive for Update operation"""
- return prepare_request_primitive_builder(target_resource, content, operation=OneM2M.operation_update,
- resource_type=None, result_content=result_content,
- allias=allias, communication=communication).build()
-
-
-def new_retrieve_request_primitive(target_resource, result_content=None, allias="default", communication=None):
+ return prepare_request_primitive_builder(
+ target_resource,
+ content,
+ operation=OneM2M.operation_update,
+ resource_type=None,
+ result_content=result_content,
+ allias=allias,
+ communication=communication,
+ ).build()
+
+
+def new_retrieve_request_primitive(
+ target_resource, result_content=None, allias="default", communication=None
+):
"""Creates request primitive for Retrieve operation"""
- return prepare_request_primitive_builder(target_resource, content=None,
- operation=OneM2M.operation_retrieve,
- resource_type=None, result_content=result_content,
- allias=allias, communication=communication).build()
-
-
-def new_delete_request_primitive(target_resource, result_content=None, allias="default", communication=None):
+ return prepare_request_primitive_builder(
+ target_resource,
+ content=None,
+ operation=OneM2M.operation_retrieve,
+ resource_type=None,
+ result_content=result_content,
+ allias=allias,
+ communication=communication,
+ ).build()
+
+
+def new_delete_request_primitive(
+ target_resource, result_content=None, allias="default", communication=None
+):
"""Creates request primitive for Delete operation"""
- return prepare_request_primitive_builder(target_resource, content=None,
- operation=OneM2M.operation_delete, result_content=result_content,
- allias=allias, communication=communication).build()
+ return prepare_request_primitive_builder(
+ target_resource,
+ content=None,
+ operation=OneM2M.operation_delete,
+ result_content=result_content,
+ allias=allias,
+ communication=communication,
+ ).build()
def send_primitive(primitive, allias="default", communication=None):
request_primitive.check_exchange(response_primitive, rsc=status_code)
-def verify_exchange_negative(request_primitive, response_primitive, status_code, error_message=None):
+def verify_exchange_negative(
+ request_primitive, response_primitive, status_code, error_message=None
+):
"""Verifies request and error response primitive parameters"""
- request_primitive.check_exchange_negative(response_primitive, status_code, error_message)
+ request_primitive.check_exchange_negative(
+ response_primitive, status_code, error_message
+ )
def verify_request(request_primitive):
response_primitive.check_response(rqi, rsc, request_operation)
-def verify_response_negative(response_primitive, rqi=None, rsc=None, error_message=None):
+def verify_response_negative(
+ response_primitive, rqi=None, rsc=None, error_message=None
+):
"""Verifies error response primitive only"""
response_primitive.check_response_negative(rqi, rsc, error_message)
return req
-def respond_response_primitive(response_primitive, allias="default", communication=None):
+def respond_response_primitive(
+ response_primitive, allias="default", communication=None
+):
"""
Sends response primitive related to the last request primitive received by
receive_request_primitive() method
communication.respond(response_primitive)
-def create_notification_response(notification_request_primitive, allias="default", communication=None):
+def create_notification_response(
+ notification_request_primitive, allias="default", communication=None
+):
"""Creates response primitive for provided notification request primitive"""
communication = __get_session(allias, communication)
- return communication.create_auto_response(notification_request_primitive,
- OneM2M.result_code_ok)
-
-
-def create_notification_response_negative(notification_request_primitive, result_code, error_message,
- allias="default", communication=None):
+ return communication.create_auto_response(
+ notification_request_primitive, OneM2M.result_code_ok
+ )
+
+
+def create_notification_response_negative(
+ notification_request_primitive,
+ result_code,
+ error_message,
+ allias="default",
+ communication=None,
+):
"""Creates negative response primitive for provided notification request primitive"""
communication = __get_session(allias, communication)
- builder = IoTDMJsonPrimitiveBuilder() \
- .set_communication_protocol(communication.get_protocol()) \
- .set_param(OneM2M.short_request_identifier,
- notification_request_primitive.get_param(OneM2M.short_request_identifier)) \
- .set_param(OneM2M.short_response_status_code, result_code) \
- .set_proto_param(onem2m_http.http_result_code, onem2m_http.onem2m_to_http_result_codes[result_code])\
+ builder = (
+ IoTDMJsonPrimitiveBuilder()
+ .set_communication_protocol(communication.get_protocol())
+ .set_param(
+ OneM2M.short_request_identifier,
+ notification_request_primitive.get_param(OneM2M.short_request_identifier),
+ )
+ .set_param(OneM2M.short_response_status_code, result_code)
+ .set_proto_param(
+ onem2m_http.http_result_code,
+ onem2m_http.onem2m_to_http_result_codes[result_code],
+ )
.set_content('{"error": "' + error_message + '"}')
+ )
return builder.build()
# Description of such notification request primitive which is received
# as result of new subscription resource
-ON_SUBSCRIPTION_CREATE_DESCRIPTION =\
- RequestAutoHandlingDescription(None, None, None,
- onem2m_result_code=OneM2M.result_code_ok,
- matching_cb=_on_subscription_create_notificaton_matching_cb)
+ON_SUBSCRIPTION_CREATE_DESCRIPTION = RequestAutoHandlingDescription(
+ None,
+ None,
+ None,
+ onem2m_result_code=OneM2M.result_code_ok,
+ matching_cb=_on_subscription_create_notificaton_matching_cb,
+)
def _prepare_notification_auto_reply_builder():
- return RequestAutoHandlingDescriptionBuilder()\
- .add_param_criteria(OneM2M.short_operation, OneM2M.operation_notify)\
+ return (
+ RequestAutoHandlingDescriptionBuilder()
+ .add_param_criteria(OneM2M.short_operation, OneM2M.operation_notify)
.set_onem2m_result_code(OneM2M.result_code_ok)
+ )
-def add_notification_auto_reply_on_subscription_create(allias="default", communication=None):
+def add_notification_auto_reply_on_subscription_create(
+ allias="default", communication=None
+):
"""Sets auto reply for notification requests received due to subscription resource creation"""
communication = __get_session(allias, communication)
communication.add_auto_reply_description(ON_SUBSCRIPTION_CREATE_DESCRIPTION)
-def remove_notification_auto_reply_on_subscription_create(allias="default", communication=None):
+def remove_notification_auto_reply_on_subscription_create(
+ allias="default", communication=None
+):
"""Removes auto reply for notification requests received due to subscription resource creation"""
communication = __get_session(allias, communication)
communication.remove_auto_reply_description(ON_SUBSCRIPTION_CREATE_DESCRIPTION)
-def get_number_of_auto_replies_on_subscription_create(allias="default", communication=None):
+def get_number_of_auto_replies_on_subscription_create(
+ allias="default", communication=None
+):
"""Returns number of auto replies on notification requests received when new subscription created"""
communication = __get_session(allias, communication)
- return communication.get_auto_handling_statistics(ON_SUBSCRIPTION_CREATE_DESCRIPTION).counter
+ return communication.get_auto_handling_statistics(
+ ON_SUBSCRIPTION_CREATE_DESCRIPTION
+ ).counter
-def verify_number_of_auto_replies_on_subscription_create(replies, allias="default", communication=None):
+def verify_number_of_auto_replies_on_subscription_create(
+ replies, allias="default", communication=None
+):
"""Compares number of auto replies on notifications received when new subscription created"""
count = get_number_of_auto_replies_on_subscription_create(allias, communication)
if replies != count:
- raise AssertionError("Unexpected number of auto replies on subscription create: {}, expected: {}".format(
- count, replies))
+ raise AssertionError(
+ "Unexpected number of auto replies on subscription create: {}, expected: {}".format(
+ count, replies
+ )
+ )
__SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING = {}
-def add_auto_reply_to_notification_from_subscription(subscription_resource_id, allias="default", communication=None):
+def add_auto_reply_to_notification_from_subscription(
+ subscription_resource_id, allias="default", communication=None
+):
"""
Sets auto reply for notifications from specific subscription resource
identified by its CSE-relative resource ID
communication = __get_session(allias, communication)
builder = _prepare_notification_auto_reply_builder()
if subscription_resource_id in __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING:
- raise RuntimeError("Auto reply for subscription resource {} already set".format(subscription_resource_id))
-
- builder.add_content_criteria(JSON_POINTER_NOTIFICATION_SUR, subscription_resource_id)
+ raise RuntimeError(
+ "Auto reply for subscription resource {} already set".format(
+ subscription_resource_id
+ )
+ )
+
+ builder.add_content_criteria(
+ JSON_POINTER_NOTIFICATION_SUR, subscription_resource_id
+ )
new_description = builder.build()
- __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[subscription_resource_id] = new_description
+ __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[
+ subscription_resource_id
+ ] = new_description
communication.add_auto_reply_description(new_description)
-def remove_auto_reply_to_notification_from_subscription(subscription_resource_id, allias="default", communication=None):
+def remove_auto_reply_to_notification_from_subscription(
+ subscription_resource_id, allias="default", communication=None
+):
"""Removes auto reply for specific subscription identified by its CSE-relative resource ID"""
communication = __get_session(allias, communication)
- description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[subscription_resource_id]
+ description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[
+ subscription_resource_id
+ ]
if not description:
- raise RuntimeError("No auto reply set for specific subscription resource: {}".format(subscription_resource_id))
+ raise RuntimeError(
+ "No auto reply set for specific subscription resource: {}".format(
+ subscription_resource_id
+ )
+ )
communication.remove_auto_reply_description(description)
-def get_number_of_auto_replies_to_notifications_from_subscription(subscription_resource_id,
- allias="default", communication=None):
+def get_number_of_auto_replies_to_notifications_from_subscription(
+ subscription_resource_id, allias="default", communication=None
+):
"""
Returns number of automatic replies for specific subscription resource
identified by its CSE-relative resource ID
"""
communication = __get_session(allias, communication)
- description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[subscription_resource_id]
+ description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[
+ subscription_resource_id
+ ]
if not description:
- raise RuntimeError("No auto reply set for specific subscription resource: {}".format(subscription_resource_id))
+ raise RuntimeError(
+ "No auto reply set for specific subscription resource: {}".format(
+ subscription_resource_id
+ )
+ )
return communication.get_auto_handling_statistics(description).counter
-def verify_number_of_auto_replies_to_notification_from_subscription(subscription_resource_id, replies,
- allias="default", communication=None):
+def verify_number_of_auto_replies_to_notification_from_subscription(
+ subscription_resource_id, replies, allias="default", communication=None
+):
"""
Compares number of automatic replies for specific subscription resource
identified by its CSE-relative resource ID
"""
- count = get_number_of_auto_replies_to_notifications_from_subscription(subscription_resource_id,
- allias, communication)
+ count = get_number_of_auto_replies_to_notifications_from_subscription(
+ subscription_resource_id, allias, communication
+ )
if replies != count:
- raise AssertionError(("Unexpected number of auto replies to notification from subscription {}, " +
- "auto replies: {}, expected: {}").format(subscription_resource_id, count, replies))
+ raise AssertionError(
+ (
+ "Unexpected number of auto replies to notification from subscription {}, "
+ + "auto replies: {}, expected: {}"
+ ).format(subscription_resource_id, count, replies)
+ )
# Primitive getters uses JSON pointer object or string
del __sessions[allias]
-def create_iotdm_communication(entity_id, protocol, protocol_params=None, rx_port=None, allias="default"):
+def create_iotdm_communication(
+ entity_id, protocol, protocol_params=None, rx_port=None, allias="default"
+):
"""
Creates communication object and starts the communication.
:param entity_id: ID which will be used in From parameter of request primitives
:return: The new communication object
"""
if protocol == onem2m_http.HTTPPROTOCOLNAME:
- conn = IoTDMItCommunicationFactory().create_http_json_primitive_communication(entity_id, protocol,
- protocol_params, rx_port)
+ conn = IoTDMItCommunicationFactory().create_http_json_primitive_communication(
+ entity_id, protocol, protocol_params, rx_port
+ )
else:
raise RuntimeError("Unsupported protocol: {}".format(protocol))
# TODO this is not real longest prefix match
# TODO fix if needed
for ip in ip_list:
- if ip.startswith(iotdm_ip[0: i]):
+ if ip.startswith(iotdm_ip[0:i]):
return ip
# no match, just choose the first one
def create_http_default_communication_parameters(address, port, content_type):
"""Returns JSON string including default HTTP specific parameters"""
return '{{"{}": "{}", "{}": {}, "Content-Type": "{}"}}'.format(
- onem2m_http.protocol_address, address,
- onem2m_http.protocol_port, port,
- content_type)
+ onem2m_http.protocol_address,
+ address,
+ onem2m_http.protocol_port,
+ port,
+ content_type,
+ )
-def create_iotdm_http_connection(entity_id, address, port, content_type, rx_port=None, allias="default"):
+def create_iotdm_http_connection(
+ entity_id, address, port, content_type, rx_port=None, allias="default"
+):
"""Creates HTTP communication"""
- default_params = create_http_default_communication_parameters(address, port, content_type)
- return create_iotdm_communication(entity_id, "http", default_params, rx_port, allias)
+ default_params = create_http_default_communication_parameters(
+ address, port, content_type
+ )
+ return create_iotdm_communication(
+ entity_id, "http", default_params, rx_port, allias
+ )