Auto-generated patch by python-black
[integration/test.git] / csit / libraries / IoTDM / iotdm_comm.py
index 500f23ef34db6dae99b269fb9b17d1d759f9a9cc..8241dd424bc2651246e9fb2290b7a2bd4b951acd 100644 (file)
@@ -35,45 +35,70 @@ def __get_session(allias, session):
     return None
 
 
-def prepare_primitive_builder_raw(protocol, primitive_params, content=None, proto_specific_params=None):
+def prepare_primitive_builder_raw(
+    protocol, primitive_params, content=None, proto_specific_params=None
+):
     """Creates primitive builder without any default data"""
-    builder = IoTDMJsonPrimitiveBuilder()\
-        .set_communication_protocol(protocol)\
-        .set_content(content)\
-        .set_parameters(primitive_params)\
+    builder = (
+        IoTDMJsonPrimitiveBuilder()
+        .set_communication_protocol(protocol)
+        .set_content(content)
+        .set_parameters(primitive_params)
         .set_protocol_specific_parameters(proto_specific_params)
+    )
     return builder
 
 
-def new_primitive_raw(protocol, primitive_params, content=None, proto_specific_params=None):
+def new_primitive_raw(
+    protocol, primitive_params, content=None, proto_specific_params=None
+):
     """Creates primitive object without any default data"""
-    return prepare_primitive_builder_raw(protocol, primitive_params, content, proto_specific_params).build()
-
-
-def prepare_primitive_builder(primitive_params, content=None, proto_specific_params=None,
-                              allias="default", communication=None):
+    return prepare_primitive_builder_raw(
+        protocol, primitive_params, content, proto_specific_params
+    ).build()
+
+
+def prepare_primitive_builder(
+    primitive_params,
+    content=None,
+    proto_specific_params=None,
+    allias="default",
+    communication=None,
+):
     """Creates primitive builder with default data set according communication object used"""
     communication = __get_session(allias, communication)
 
-    builder = IoTDMJsonPrimitiveBuilder()\
-        .set_communication_protocol(communication.get_protocol())\
-        .set_parameters(communication.get_primitive_params())\
-        .set_protocol_specific_parameters(communication.get_protocol_params())\
+    builder = (
+        IoTDMJsonPrimitiveBuilder()
+        .set_communication_protocol(communication.get_protocol())
+        .set_parameters(communication.get_primitive_params())
+        .set_protocol_specific_parameters(communication.get_protocol_params())
         .set_content(content)
+    )
 
     if communication.get_protocol() == onem2m_http.HTTPPROTOCOLNAME and content:
-        builder.set_proto_param(onem2m_http.http_header_content_length, str(len(content)))
+        builder.set_proto_param(
+            onem2m_http.http_header_content_length, str(len(content))
+        )
 
-    builder.append_parameters(primitive_params)\
-           .append_protocol_specific_parameters(proto_specific_params)
+    builder.append_parameters(primitive_params).append_protocol_specific_parameters(
+        proto_specific_params
+    )
 
     return builder
 
 
-def new_primitive(primitive_params, content=None, proto_specific_params=None,
-                  allias="default", communication=None):
+def new_primitive(
+    primitive_params,
+    content=None,
+    proto_specific_params=None,
+    allias="default",
+    communication=None,
+):
     """Creates new primitive object with default data set according communication object used"""
-    return prepare_primitive_builder(primitive_params, content, proto_specific_params, allias, communication).build()
+    return prepare_primitive_builder(
+        primitive_params, content, proto_specific_params, allias, communication
+    ).build()
 
 
 def _add_param(params, name, value):
@@ -82,8 +107,15 @@ def _add_param(params, name, value):
     params[name] = value
 
 
-def prepare_request_primitive_builder(target_resource, content=None, operation=None, resource_type=None,
-                                      result_content=None, allias="default", communication=None):
+def prepare_request_primitive_builder(
+    target_resource,
+    content=None,
+    operation=None,
+    resource_type=None,
+    result_content=None,
+    allias="default",
+    communication=None,
+):
     """
     Creates builder for request primitive with default data set according
     communication object used
@@ -100,38 +132,74 @@ def prepare_request_primitive_builder(target_resource, content=None, operation=N
 
     primitive_params = json.dumps(primitive_params)
 
-    builder = prepare_primitive_builder(primitive_params, content, communication=communication)
+    builder = prepare_primitive_builder(
+        primitive_params, content, communication=communication
+    )
     return builder
 
 
-def new_create_request_primitive(target_resource, content, resource_type, result_content=None,
-                                 allias="default", communication=None):
+def new_create_request_primitive(
+    target_resource,
+    content,
+    resource_type,
+    result_content=None,
+    allias="default",
+    communication=None,
+):
     """Creates request primitive for Create operation"""
-    return prepare_request_primitive_builder(target_resource, content, operation=OneM2M.operation_create,
-                                             resource_type=resource_type, result_content=result_content,
-                                             allias=allias, communication=communication).build()
-
-
-def new_update_request_primitive(target_resource, content, result_content=None, allias="default", communication=None):
+    return prepare_request_primitive_builder(
+        target_resource,
+        content,
+        operation=OneM2M.operation_create,
+        resource_type=resource_type,
+        result_content=result_content,
+        allias=allias,
+        communication=communication,
+    ).build()
+
+
+def new_update_request_primitive(
+    target_resource, content, result_content=None, allias="default", communication=None
+):
     """Creates request primitive for Update operation"""
-    return prepare_request_primitive_builder(target_resource, content, operation=OneM2M.operation_update,
-                                             resource_type=None, result_content=result_content,
-                                             allias=allias, communication=communication).build()
-
-
-def new_retrieve_request_primitive(target_resource, result_content=None, allias="default", communication=None):
+    return prepare_request_primitive_builder(
+        target_resource,
+        content,
+        operation=OneM2M.operation_update,
+        resource_type=None,
+        result_content=result_content,
+        allias=allias,
+        communication=communication,
+    ).build()
+
+
+def new_retrieve_request_primitive(
+    target_resource, result_content=None, allias="default", communication=None
+):
     """Creates request primitive for Retrieve operation"""
-    return prepare_request_primitive_builder(target_resource, content=None,
-                                             operation=OneM2M.operation_retrieve,
-                                             resource_type=None, result_content=result_content,
-                                             allias=allias, communication=communication).build()
-
-
-def new_delete_request_primitive(target_resource, result_content=None, allias="default", communication=None):
+    return prepare_request_primitive_builder(
+        target_resource,
+        content=None,
+        operation=OneM2M.operation_retrieve,
+        resource_type=None,
+        result_content=result_content,
+        allias=allias,
+        communication=communication,
+    ).build()
+
+
+def new_delete_request_primitive(
+    target_resource, result_content=None, allias="default", communication=None
+):
     """Creates request primitive for Delete operation"""
-    return prepare_request_primitive_builder(target_resource, content=None,
-                                             operation=OneM2M.operation_delete, result_content=result_content,
-                                             allias=allias, communication=communication).build()
+    return prepare_request_primitive_builder(
+        target_resource,
+        content=None,
+        operation=OneM2M.operation_delete,
+        result_content=result_content,
+        allias=allias,
+        communication=communication,
+    ).build()
 
 
 def send_primitive(primitive, allias="default", communication=None):
@@ -146,9 +214,13 @@ def verify_exchange(request_primitive, response_primitive, status_code=None):
     request_primitive.check_exchange(response_primitive, rsc=status_code)
 
 
-def verify_exchange_negative(request_primitive, response_primitive, status_code, error_message=None):
+def verify_exchange_negative(
+    request_primitive, response_primitive, status_code, error_message=None
+):
     """Verifies request and error response primitive parameters"""
-    request_primitive.check_exchange_negative(response_primitive, status_code, error_message)
+    request_primitive.check_exchange_negative(
+        response_primitive, status_code, error_message
+    )
 
 
 def verify_request(request_primitive):
@@ -161,7 +233,9 @@ def verify_response(response_primitive, rqi=None, rsc=None, request_operation=No
     response_primitive.check_response(rqi, rsc, request_operation)
 
 
-def verify_response_negative(response_primitive, rqi=None, rsc=None, error_message=None):
+def verify_response_negative(
+    response_primitive, rqi=None, rsc=None, error_message=None
+):
     """Verifies error response primitive only"""
     response_primitive.check_response_negative(rqi, rsc, error_message)
 
@@ -178,7 +252,9 @@ def receive_request_primitive(allias="default", communication=None):
     return req
 
 
-def respond_response_primitive(response_primitive, allias="default", communication=None):
+def respond_response_primitive(
+    response_primitive, allias="default", communication=None
+):
     """
     Sends response primitive related to the last request primitive received by
     receive_request_primitive() method
@@ -187,24 +263,39 @@ def respond_response_primitive(response_primitive, allias="default", communicati
     communication.respond(response_primitive)
 
 
-def create_notification_response(notification_request_primitive, allias="default", communication=None):
+def create_notification_response(
+    notification_request_primitive, allias="default", communication=None
+):
     """Creates response primitive for provided notification request primitive"""
     communication = __get_session(allias, communication)
-    return communication.create_auto_response(notification_request_primitive,
-                                              OneM2M.result_code_ok)
-
-
-def create_notification_response_negative(notification_request_primitive, result_code, error_message,
-                                          allias="default", communication=None):
+    return communication.create_auto_response(
+        notification_request_primitive, OneM2M.result_code_ok
+    )
+
+
+def create_notification_response_negative(
+    notification_request_primitive,
+    result_code,
+    error_message,
+    allias="default",
+    communication=None,
+):
     """Creates negative response primitive for provided notification request primitive"""
     communication = __get_session(allias, communication)
-    builder = IoTDMJsonPrimitiveBuilder() \
-        .set_communication_protocol(communication.get_protocol()) \
-        .set_param(OneM2M.short_request_identifier,
-                   notification_request_primitive.get_param(OneM2M.short_request_identifier)) \
-        .set_param(OneM2M.short_response_status_code, result_code) \
-        .set_proto_param(onem2m_http.http_result_code, onem2m_http.onem2m_to_http_result_codes[result_code])\
+    builder = (
+        IoTDMJsonPrimitiveBuilder()
+        .set_communication_protocol(communication.get_protocol())
+        .set_param(
+            OneM2M.short_request_identifier,
+            notification_request_primitive.get_param(OneM2M.short_request_identifier),
+        )
+        .set_param(OneM2M.short_response_status_code, result_code)
+        .set_proto_param(
+            onem2m_http.http_result_code,
+            onem2m_http.onem2m_to_http_result_codes[result_code],
+        )
         .set_content('{"error": "' + error_message + '"}')
+    )
     return builder.build()
 
 
@@ -237,48 +328,68 @@ def _on_subscription_create_notificaton_matching_cb(request_primitive):
 
 # Description of such notification request primitive which is received
 # as result of new subscription resource
-ON_SUBSCRIPTION_CREATE_DESCRIPTION =\
-    RequestAutoHandlingDescription(None, None, None,
-                                   onem2m_result_code=OneM2M.result_code_ok,
-                                   matching_cb=_on_subscription_create_notificaton_matching_cb)
+ON_SUBSCRIPTION_CREATE_DESCRIPTION = RequestAutoHandlingDescription(
+    None,
+    None,
+    None,
+    onem2m_result_code=OneM2M.result_code_ok,
+    matching_cb=_on_subscription_create_notificaton_matching_cb,
+)
 
 
 def _prepare_notification_auto_reply_builder():
-    return RequestAutoHandlingDescriptionBuilder()\
-        .add_param_criteria(OneM2M.short_operation, OneM2M.operation_notify)\
+    return (
+        RequestAutoHandlingDescriptionBuilder()
+        .add_param_criteria(OneM2M.short_operation, OneM2M.operation_notify)
         .set_onem2m_result_code(OneM2M.result_code_ok)
+    )
 
 
-def add_notification_auto_reply_on_subscription_create(allias="default", communication=None):
+def add_notification_auto_reply_on_subscription_create(
+    allias="default", communication=None
+):
     """Sets auto reply for notification requests received due to subscription resource creation"""
     communication = __get_session(allias, communication)
     communication.add_auto_reply_description(ON_SUBSCRIPTION_CREATE_DESCRIPTION)
 
 
-def remove_notification_auto_reply_on_subscription_create(allias="default", communication=None):
+def remove_notification_auto_reply_on_subscription_create(
+    allias="default", communication=None
+):
     """Removes auto reply for notification requests received due to subscription resource creation"""
     communication = __get_session(allias, communication)
     communication.remove_auto_reply_description(ON_SUBSCRIPTION_CREATE_DESCRIPTION)
 
 
-def get_number_of_auto_replies_on_subscription_create(allias="default", communication=None):
+def get_number_of_auto_replies_on_subscription_create(
+    allias="default", communication=None
+):
     """Returns number of auto replies on notification requests received when new subscription created"""
     communication = __get_session(allias, communication)
-    return communication.get_auto_handling_statistics(ON_SUBSCRIPTION_CREATE_DESCRIPTION).counter
+    return communication.get_auto_handling_statistics(
+        ON_SUBSCRIPTION_CREATE_DESCRIPTION
+    ).counter
 
 
-def verify_number_of_auto_replies_on_subscription_create(replies, allias="default", communication=None):
+def verify_number_of_auto_replies_on_subscription_create(
+    replies, allias="default", communication=None
+):
     """Compares number of auto replies on notifications received when new subscription created"""
     count = get_number_of_auto_replies_on_subscription_create(allias, communication)
     if replies != count:
-        raise AssertionError("Unexpected number of auto replies on subscription create: {}, expected: {}".format(
-                             count, replies))
+        raise AssertionError(
+            "Unexpected number of auto replies on subscription create: {}, expected: {}".format(
+                count, replies
+            )
+        )
 
 
 __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING = {}
 
 
-def add_auto_reply_to_notification_from_subscription(subscription_resource_id, allias="default", communication=None):
+def add_auto_reply_to_notification_from_subscription(
+    subscription_resource_id, allias="default", communication=None
+):
     """
     Sets auto reply for notifications from specific subscription resource
     identified by its CSE-relative resource ID
@@ -286,47 +397,76 @@ def add_auto_reply_to_notification_from_subscription(subscription_resource_id, a
     communication = __get_session(allias, communication)
     builder = _prepare_notification_auto_reply_builder()
     if subscription_resource_id in __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING:
-        raise RuntimeError("Auto reply for subscription resource {} already set".format(subscription_resource_id))
-
-    builder.add_content_criteria(JSON_POINTER_NOTIFICATION_SUR, subscription_resource_id)
+        raise RuntimeError(
+            "Auto reply for subscription resource {} already set".format(
+                subscription_resource_id
+            )
+        )
+
+    builder.add_content_criteria(
+        JSON_POINTER_NOTIFICATION_SUR, subscription_resource_id
+    )
     new_description = builder.build()
-    __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[subscription_resource_id] = new_description
+    __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[
+        subscription_resource_id
+    ] = new_description
     communication.add_auto_reply_description(new_description)
 
 
-def remove_auto_reply_to_notification_from_subscription(subscription_resource_id, allias="default", communication=None):
+def remove_auto_reply_to_notification_from_subscription(
+    subscription_resource_id, allias="default", communication=None
+):
     """Removes auto reply for specific subscription identified by its CSE-relative resource ID"""
     communication = __get_session(allias, communication)
-    description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[subscription_resource_id]
+    description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[
+        subscription_resource_id
+    ]
     if not description:
-        raise RuntimeError("No auto reply set for specific subscription resource: {}".format(subscription_resource_id))
+        raise RuntimeError(
+            "No auto reply set for specific subscription resource: {}".format(
+                subscription_resource_id
+            )
+        )
     communication.remove_auto_reply_description(description)
 
 
-def get_number_of_auto_replies_to_notifications_from_subscription(subscription_resource_id,
-                                                                  allias="default", communication=None):
+def get_number_of_auto_replies_to_notifications_from_subscription(
+    subscription_resource_id, allias="default", communication=None
+):
     """
     Returns number of automatic replies for specific subscription resource
     identified by its CSE-relative resource ID
     """
     communication = __get_session(allias, communication)
-    description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[subscription_resource_id]
+    description = __SUBSCRIPTION_RESOURCE_ID_DESCRIPTION_MAPPING[
+        subscription_resource_id
+    ]
     if not description:
-        raise RuntimeError("No auto reply set for specific subscription resource: {}".format(subscription_resource_id))
+        raise RuntimeError(
+            "No auto reply set for specific subscription resource: {}".format(
+                subscription_resource_id
+            )
+        )
     return communication.get_auto_handling_statistics(description).counter
 
 
-def verify_number_of_auto_replies_to_notification_from_subscription(subscription_resource_id, replies,
-                                                                    allias="default", communication=None):
+def verify_number_of_auto_replies_to_notification_from_subscription(
+    subscription_resource_id, replies, allias="default", communication=None
+):
     """
     Compares number of automatic replies for specific subscription resource
     identified by its CSE-relative resource ID
     """
-    count = get_number_of_auto_replies_to_notifications_from_subscription(subscription_resource_id,
-                                                                          allias, communication)
+    count = get_number_of_auto_replies_to_notifications_from_subscription(
+        subscription_resource_id, allias, communication
+    )
     if replies != count:
-        raise AssertionError(("Unexpected number of auto replies to notification from subscription {}, " +
-                              "auto replies: {}, expected: {}").format(subscription_resource_id, count, replies))
+        raise AssertionError(
+            (
+                "Unexpected number of auto replies to notification from subscription {}, "
+                + "auto replies: {}, expected: {}"
+            ).format(subscription_resource_id, count, replies)
+        )
 
 
 # Primitive getters uses JSON pointer object or string
@@ -364,7 +504,9 @@ def close_iotdm_communication(allias="default", communication=None):
         del __sessions[allias]
 
 
-def create_iotdm_communication(entity_id, protocol, protocol_params=None, rx_port=None, allias="default"):
+def create_iotdm_communication(
+    entity_id, protocol, protocol_params=None, rx_port=None, allias="default"
+):
     """
     Creates communication object and starts the communication.
     :param entity_id: ID which will be used in From parameter of request primitives
@@ -375,8 +517,9 @@ def create_iotdm_communication(entity_id, protocol, protocol_params=None, rx_por
     :return: The new communication object
     """
     if protocol == onem2m_http.HTTPPROTOCOLNAME:
-        conn = IoTDMItCommunicationFactory().create_http_json_primitive_communication(entity_id, protocol,
-                                                                                      protocol_params, rx_port)
+        conn = IoTDMItCommunicationFactory().create_http_json_primitive_communication(
+            entity_id, protocol, protocol_params, rx_port
+        )
     else:
         raise RuntimeError("Unsupported protocol: {}".format(protocol))
 
@@ -409,7 +552,7 @@ def get_local_ip_from_list(iotdm_ip, local_ip_list_str):
         # TODO this is not real longest prefix match
         # TODO fix if needed
         for ip in ip_list:
-            if ip.startswith(iotdm_ip[0: i]):
+            if ip.startswith(iotdm_ip[0:i]):
                 return ip
 
     # no match, just choose the first one
@@ -420,12 +563,21 @@ def get_local_ip_from_list(iotdm_ip, local_ip_list_str):
 def create_http_default_communication_parameters(address, port, content_type):
     """Returns JSON string including default HTTP specific parameters"""
     return '{{"{}": "{}", "{}": {}, "Content-Type": "{}"}}'.format(
-        onem2m_http.protocol_address, address,
-        onem2m_http.protocol_port, port,
-        content_type)
+        onem2m_http.protocol_address,
+        address,
+        onem2m_http.protocol_port,
+        port,
+        content_type,
+    )
 
 
-def create_iotdm_http_connection(entity_id, address, port, content_type, rx_port=None, allias="default"):
+def create_iotdm_http_connection(
+    entity_id, address, port, content_type, rx_port=None, allias="default"
+):
     """Creates HTTP communication"""
-    default_params = create_http_default_communication_parameters(address, port, content_type)
-    return create_iotdm_communication(entity_id, "http", default_params, rx_port, allias)
+    default_params = create_http_default_communication_parameters(
+        address, port, content_type
+    )
+    return create_iotdm_communication(
+        entity_id, "http", default_params, rx_port, allias
+    )