Add RESTCONF RFC8040 SSE test
[integration/test.git] / tools / wstools / ssereceiver.py
1 """Server_Sent event messages  receiver.
2
3 The tool receives and logs data from specified URI via SSE"""
4
5 import argparse
6 import logging
7 import asyncio
8 import aiohttp
9 from aiohttp_sse_client import client as sse_client
10
11
12 async def get_events():
13     conn = aiohttp.TCPConnector()
14     auth = aiohttp.BasicAuth(args.user, args.password)
15     client = aiohttp.ClientSession(connector=conn, auth=auth)
16     async with sse_client.EventSource(
17         "http://" + args.controller + ":8181" + args.uri, session=client
18     ) as event_source:
19         try:
20             async for event in event_source:
21                 logger.info(event)
22         except ConnectionError:
23             pass
24
25
26 def parse_arguments():
27     """Use argparse to get arguments,
28
29     Returns:
30         :return: args object
31     """
32     parser = argparse.ArgumentParser()
33     parser.add_argument("--controller", default="127.0.0.1", help="Controller IP")
34     parser.add_argument(
35         "--uri",
36         default="/rests/notif/data-change-event-subscription/opendaylight-inventory:nodes/datastore=CONFIGURATION/scope=BASE",
37         help="URI endpoint to connect",
38     )
39     parser.add_argument(
40         "--count", type=int, default=1, help="Number of messages to receive"
41     )
42     parser.add_argument("--user", default="admin", help="Controller User")
43     parser.add_argument("--password", default="admin", help="Controller Password")
44     parser.add_argument("--logfile", default="ssereceiver.log", help="Log file name")
45     parser.add_argument(
46         "--debug",
47         dest="loglevel",
48         action="store_const",
49         const=logging.DEBUG,
50         default=logging.INFO,
51         help="Log level",
52     )
53     args = parser.parse_args()
54     return args
55
56
57 def main():
58     loop = asyncio.get_event_loop()
59     loop.run_until_complete(get_events())
60     loop.run_until_complete(asyncio.sleep(0))
61     loop.close()
62
63
64 if __name__ == "__main__":
65     args = parse_arguments()
66     logger = logging.getLogger("logger")
67     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
68     console_handler = logging.StreamHandler()
69     file_handler = logging.FileHandler(args.logfile, mode="w")
70     console_handler.setFormatter(log_formatter)
71     file_handler.setFormatter(log_formatter)
72     logger.addHandler(console_handler)
73     logger.addHandler(file_handler)
74     logger.setLevel(args.loglevel)
75     logger.info("Starting to receive server-sent event messages")
76     logger.info(args)
77     main()