Migrate Get Requests invocations(libraries)
[integration/test.git] / csit / libraries / IoTDM / client_libs / testing / test_onem2m_http.py
1 #
2 # Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3 #
4 # This program and the accompanying materials are made available under the
5 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 # and is available at http://www.eclipse.org/legal/epl-v10.html
7 #
8
9 import unittest
10
11 from onem2m_json_primitive import OneM2MJsonPrimitiveBuilder
12 import onem2m_http
13 from onem2m_http import OneM2MHttpJsonEncoderTx
14 from onem2m_http import OneM2MHttpJsonDecoderTx
15 from onem2m_http import OneM2MHttpJsonDecoderRx
16 from onem2m_http import OneM2MHttpJsonEncoderRx
17 from onem2m_http import OneM2MHttpTx
18 from onem2m_http import OneM2MHttpRx
19 from onem2m_http import http_result_code
20 from onem2m_primitive import OneM2M
21
22
23 class TestOneM2MHttp(unittest.TestCase):
24     """Class of unittests testing OneM2M HTTP communication and related classes"""
25
26     params = {OneM2M.short_to: "InCSE2/Postman", "op": 2, "fr": "AE1", "rqi": 12345}
27     proto_params = {
28         onem2m_http.protocol_address: "localhost",
29         onem2m_http.protocol_port: 8282,
30         "Content-Type": "application/json",
31     }
32     content = {"content": 123}
33
34     def test_primitive_encoding(self):
35         builder = OneM2MJsonPrimitiveBuilder()
36         builder.set_parameters(self.params)
37         builder.set_protocol_specific_parameters(self.proto_params)
38         primitive = builder.build()
39         encoder = OneM2MHttpJsonEncoderTx()
40         http_req = encoder.encode(primitive)
41         self.assertIsNotNone(http_req)
42
43     def test_primitive_decoding(self):
44         # TODO
45         raise NotImplementedError()
46
47     def test_primitive_encoded_decode_compare(self):
48         # TODO encode primitive, decode encoded primitive and use _compare()
49         # TODO method of primitive and the primitives should be equal
50         raise NotImplementedError()
51
52     def _rx_cb(self, request_primitive):
53         # TODO just return the request_primitive as passed (as loopback)
54
55         # now just set result code and return
56         rsp_builder = OneM2MJsonPrimitiveBuilder()
57         rsp_builder.set_param(OneM2M.short_response_status_code, OneM2M.result_code_ok)
58         rsp_builder.set_proto_param(http_result_code, 200)
59         return rsp_builder.build()
60
61     def test_communicaton_send(self):
62         params = {
63             OneM2M.short_to: "InCSE2/Postman",
64             "op": 2,
65             "fr": "AE1",
66             "rqi": 12345,
67             "rcn": 1,
68             "ty": 4,
69         }
70         proto_params = {
71             onem2m_http.protocol_address: "localhost",
72             onem2m_http.protocol_port: 5000,
73             "Content-Type": "application/json",
74         }
75         content = {"content": 123}
76
77         encoder = OneM2MHttpJsonEncoderTx()
78         decoder = OneM2MHttpJsonDecoderTx()
79         tx = OneM2MHttpTx(encoder, decoder)
80         tx.start()
81
82         decoder = OneM2MHttpJsonDecoderRx()
83         encoder = OneM2MHttpJsonEncoderRx()
84         rx = OneM2MHttpRx(decoder, encoder, 5000)
85         rx.start(self._rx_cb)
86
87         builder = OneM2MJsonPrimitiveBuilder()
88         builder.set_parameters(params)
89         builder.set_protocol_specific_parameters(proto_params)
90         builder.set_content(content)
91         primitive = builder.build()
92
93         tx.send(primitive)
94         # TODO method of primitive if was correctly encoded / decoded
95
96         tx.stop()
97         rx.stop()