Resolve PEP8 in IoTDM
[integration/test.git] / csit / libraries / IoTDM / client_libs / testing / test_onem2m_http.py
1 #
2 # Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3 #
4 # This program and the accompanying materials are made available under the
5 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 # and is available at http://www.eclipse.org/legal/epl-v10.html
7 #
8
9 import unittest
10
11 from onem2m_json_primitive import OneM2MJsonPrimitiveBuilder
12 import onem2m_http
13 from onem2m_http import OneM2MHttpJsonEncoderTx
14 from onem2m_http import OneM2MHttpJsonDecoderTx
15 from onem2m_http import OneM2MHttpJsonDecoderRx
16 from onem2m_http import OneM2MHttpJsonEncoderRx
17 from onem2m_http import OneM2MHttpTx
18 from onem2m_http import OneM2MHttpRx
19 from onem2m_http import http_result_code
20 from onem2m_primitive import OneM2M
21
22
23 class TestOneM2MHttp(unittest.TestCase):
24     """Class of unittests testing OneM2M HTTP communication and related classes"""
25
26     params = {OneM2M.short_to: "InCSE2/Postman", "op": 2, "fr": "AE1", "rqi": 12345}
27     proto_params = {onem2m_http.protocol_address: "localhost", onem2m_http.protocol_port: 8282,
28                     "Content-Type": "application/json"}
29     content = {"content": 123}
30
31     def test_primitive_encoding(self):
32         builder = OneM2MJsonPrimitiveBuilder()
33         builder.set_parameters(self.params)
34         builder.set_protocol_specific_parameters(self.proto_params)
35         primitive = builder.build()
36         encoder = OneM2MHttpJsonEncoderTx()
37         http_req = encoder.encode(primitive)
38         self.assertIsNotNone(http_req)
39
40     def test_primitive_decoding(self):
41         # TODO
42         raise NotImplementedError()
43
44     def test_primitive_encoded_decode_compare(self):
45         # TODO encode primitive, decode encoded primitive and use _compare()
46         # TODO method of primitive and the primitives should be equal
47         raise NotImplementedError()
48
49     def _rx_cb(self, request_primitive):
50         # TODO just return the request_primitive as passed (as loopback)
51
52         # now just set result code and return
53         rsp_builder = OneM2MJsonPrimitiveBuilder()
54         rsp_builder.set_param(OneM2M.short_response_status_code, OneM2M.result_code_ok)
55         rsp_builder.set_proto_param(http_result_code, 200)
56         return rsp_builder.build()
57
58     def test_communicaton_send(self):
59         params = {OneM2M.short_to: "InCSE2/Postman", "op": 2, "fr": "AE1", "rqi": 12345, "rcn": 1, "ty": 4}
60         proto_params = {onem2m_http.protocol_address: "localhost", onem2m_http.protocol_port: 5000,
61                         "Content-Type": "application/json"}
62         content = {"content": 123}
63
64         encoder = OneM2MHttpJsonEncoderTx()
65         decoder = OneM2MHttpJsonDecoderTx()
66         tx = OneM2MHttpTx(encoder, decoder)
67         tx.start()
68
69         decoder = OneM2MHttpJsonDecoderRx()
70         encoder = OneM2MHttpJsonEncoderRx()
71         rx = OneM2MHttpRx(decoder, encoder, 5000)
72         rx.start(self._rx_cb)
73
74         builder = OneM2MJsonPrimitiveBuilder()
75         builder.set_parameters(params)
76         builder.set_protocol_specific_parameters(proto_params)
77         builder.set_content(content)
78         primitive = builder.build()
79
80         tx.send(primitive)
81         # TODO method of primitive if was correctly encoded / decoded
82
83         tx.stop()
84         rx.stop()