Migrate Get Requests invocations(libraries)
[integration/test.git] / tools / exabgp_files / exarpc.py
1 #!/usr/bin/env python
2
3 import argparse
4 import binascii
5 import json
6 import logging
7 import os
8 import re
9 import select
10 import sys
11 import threading
12 from exabgp.bgp.message import Message
13 from SimpleXMLRPCServer import SimpleXMLRPCServer
14
15
16 class ExaStorage(dict):
17     """Thread safe dictionary
18
19     The object will serve as thread safe data storage.
20     It should be used with "with" statement.
21
22     The content of thet dict may be changed dynamically, but i'll use it as
23     {
24       "counters": { <msg type>: count, ...}
25       "messages": { <msg type>: [hex_string1, ]}
26     """
27
28     def __init__(self):
29         """Thread safe dictionary init"""
30         super(ExaStorage, self).__init__()
31         self._lock = threading.Lock()
32
33     def __enter__(self):
34         """Entry point of "with" statement"""
35         self._lock.acquire()
36         return self
37
38     def __exit__(self, type_, value, traceback):
39         """End point of "with" statement"""
40         self._lock.release()
41
42
43 class Rpcs(object):
44     """Handler for SimpleXMLRPCServer."""
45
46     def __init__(self, storage):
47         """Init method
48
49         Arguments:
50             :storage: thread safe dict
51         """
52         self.storage = storage
53
54     def _write(self, text):
55         """Pass commands from rpc server towards exabgp
56
57         Arguments:
58             :text: exabgp command
59         """
60         logging.debug("Command towards exabgp: {}".format(text))
61         sys.stdout.write(text)
62         sys.stdout.write("\n")
63         sys.stdout.flush()
64         logging.debug("Connand flushed: {}.".format(text))
65
66     def get_counter(self, msg_type):
67         """Gets counter value
68
69         Arguments:
70             :msg_type: message type which counter should be returned
71         Returns:
72             :cnt: counter value
73         """
74         logging.debug("get_counter rpc called, storage {}".format(self.storage))
75         with self.storage as s:
76             if "counters" not in s:
77                 return 0
78             cnt = 0 if msg_type not in s["counters"] else s["counters"][msg_type]
79         return cnt
80
81     def clean_counter(self, msg_type):
82         """Cleans counter
83
84         Arguments:
85             :msg_type: message type which counter should be cleaned
86         """
87
88         logging.debug("clean_counter rpc called, storage {}".format(self.storage))
89         with self.storage as s:
90             if "counters" not in s:
91                 return
92             if msg_type in s["counters"]:
93                 del s["counters"][msg_type]
94
95     def get_message(self, msg_type):
96         """Gets last received message
97
98         Arguments:
99             :msg_type: message type which counter should be returned
100         Returns:
101             :msg: message
102         """
103         logging.debug(
104             "get_message {} rpc called, storage {}".format(msg_type, self.storage)
105         )
106         with self.storage as s:
107             if "messages" not in s:
108                 return None
109             msg = None if msg_type not in s["messages"] else s["messages"][msg_type]
110         return msg
111
112     def clean_message(self, msg_type):
113         """Removes stored message
114
115         Arguments:
116             :msg_type: message type which message should be cleaned
117         """
118
119         logging.debug("clean_message rpc called, storage {}".format(self.storage))
120         with self.storage as s:
121             if "messages" not in s:
122                 return
123             if msg_type in s["messages"]:
124                 del s["messages"][msg_type]
125         return
126
127     def execute(self, exabgp_cmd):
128         """Execite given command on exabgp
129
130         Arguments:
131             :exabgp_cmd: command
132         """
133         logging.info("executing: {}.".format(exabgp_cmd))
134         self._write(exabgp_cmd)
135
136
137 def decode_message(header, body):
138     """Decodes message
139
140     Arguments:
141         :header: hexstring of the header
142         :body: hexstring of the body
143     Returns:
144         :msg_type: message type
145         :msg: None (in the future some decoded data)
146     """
147     headbin = binascii.unhexlify(header)
148
149     msg_type = ord(headbin[18])
150     msg = None
151
152     return msg_type, msg
153
154
155 def _increment_counter(storage, key):
156     """Increments the counter for a message
157
158     Arguments:
159         :key: message type
160     """
161     with storage as s:
162         if "counters" not in s:
163             s["counters"] = {}
164         if key not in s["counters"]:
165             s["counters"][key] = 1
166         else:
167             s["counters"][key] += 1
168
169
170 def _store_last_received_message(storage, key, msg):
171     """Stores message under key.
172
173     Arguments:
174         :key: message type
175     """
176     with storage as s:
177         if "messages" not in s:
178             s["messages"] = {}
179         s["messages"][key] = msg
180
181
182 def handle_open(storage, msg):
183     """Handles received bgp open message
184
185     - incements open counter
186
187     Arguments:
188         :msg: hex string of open body
189     """
190     logging.debug("Handling Open with storage {}".format(storage))
191     _increment_counter(storage, "open")
192
193
194 def handle_keepalive(storage, msg):
195     """Handles received bgp keepalive message
196
197     - incements keepalive counter
198
199     Arguments:
200         :msg: hex string of message body (in fact it is None)
201     """
202     logging.debug("Handling KeepAlive with storage {}".format(storage))
203     _increment_counter(storage, "keepalive")
204
205
206 def handle_update(storage, msg):
207     """Handles received bgp update message
208
209     - incements update counter
210
211     Arguments:
212         :msg: hex string of update body
213     """
214     logging.debug("Handling Update with storage {}".format(storage))
215     _increment_counter(storage, "update")
216
217
218 def handle_route_refresh(storage, msg):
219     """Handles received bgp route refresh message
220
221     - incements route refresh counter
222
223     Arguments:
224         :msg: hex string of route refresh body
225     """
226     logging.debug("Handling Route Refresh with storage {}".format(storage))
227     _increment_counter(storage, "route_refresh")
228
229
230 def handle_json_update(storage, jdata):
231     """Handles received json parsed bgp update message
232
233     - incements update counter
234
235     Arguments:
236         :jdata: json formated data of update message
237     """
238     logging.debug("Handling Json Update with storage {}".format(storage))
239     _increment_counter(storage, "update")
240     _store_last_received_message(storage, "update", jdata)
241
242
243 def handle_json_state(storage, jdata):
244     """Handles received json state message
245
246     This is for future use. This information is not used/required/needed
247     at the moment.
248
249     Arguments:
250         :jdata: json formated data about connection/peer state
251     """
252     logging.debug("Handling Json State with storage {}".format(storage))
253
254
255 def handle_json_refresh(storage, jdata):
256     """Handles received json route refresh message
257
258     This is for future use. This information is not used/required/needed
259     at the moment.
260
261     Arguments:
262         :jdata: json formated data about connection/peer state
263     """
264     logging.debug("Handling Json State with storage {}".format(storage))
265     _increment_counter(storage, "route_refresh")
266
267
268 def exa_msg_handler(storage, data, encoder):
269     """Handles incomming messages"""
270
271     if encoder == "text":
272         if not ("neighbor" in data and "header" in data and "body" in data):
273             logging.debug("Ignoring received notification from exabgp: {}".format(data))
274             return
275         restr = "neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
276  (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?"
277         pat = re.compile(restr)
278         match = re.search(pat, data)
279         if match is None:
280             logging.warn(
281                 "Unexpected data in this part, only bgp message expected. Received: {}.".format(
282                     data
283                 )
284             )
285             return
286         msg_type, msg = decode_message(
287             match.groupdict()["header"], match.groupdict()["body"]
288         )
289         if msg_type == Message.CODE.KEEPALIVE:
290             handle_keepalive(storage, msg)
291         elif msg_type == Message.CODE.OPEN:
292             handle_open(storage, msg)
293         elif msg_type == Message.CODE.UPDATE:
294             handle_update(storage, msg)
295         elif msg_type == Message.CODE.ROUTE_REFRESH:
296             handle_route_refresh(storage, msg)
297         else:
298             logging.warn("No handler function for msg_type: {}".format(msg_type))
299     elif encoder == "json":
300         try:
301             jdata = json.loads(data)
302         except Exception:
303             logging.error("Unable to parse, expected json, received: {}.".format(data))
304             return
305         if jdata["type"] == "state":
306             logging.debug("State info received: {}.".format(data))
307             handle_json_state(storage, jdata)
308         elif jdata["type"] == "update":
309             logging.debug("Update info received: {}.".format(data))
310             handle_json_update(storage, jdata)
311         elif jdata["type"] == "notification":
312             logging.debug("Notification info received: {}.".format(data))
313         elif jdata["type"] == "refresh":
314             logging.debug("Route refresh received: {}.".format(data))
315             handle_json_refresh(storage, jdata)
316         else:
317             logging.error("Unexpected type for data: {}".format(data))
318     else:
319         logging.error("Ignoring received data, unknown encoder: {}".format(encoder))
320
321
322 def main(*argv):
323     """This script is used as i/o api for communication with exabgp
324
325     Arguments:
326         :*argv: unparsed cli arguments
327
328     In a separate thread an rpc server is started. This server will be used as an api towards the user.
329     Stdin and stdout are used for communication with exabgp.
330     """
331
332     parser = argparse.ArgumentParser(description="ExaBgp rpc server script")
333     parser.add_argument(
334         "--host",
335         default="127.0.0.1",
336         help="Host where exabgp is running (default is 127.0.0.1)",
337     )
338     parser.add_argument("--loglevel", default=logging.DEBUG, help="Log level")
339     parser.add_argument(
340         "--logfile",
341         default="{}/exarpc.log".format(os.path.dirname(os.path.abspath(__file__))),
342         help="Log file name.",
343     )
344     parser.add_argument("--encoder", default="json", help="Exabgp encoder type")
345     in_args = parser.parse_args(*argv)
346     logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
347
348     storage = ExaStorage()
349     rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True)
350     rpcserver.register_instance(Rpcs(storage))
351     trpc = threading.Thread(target=rpcserver.serve_forever)
352     trpc.start()
353
354     epoll = select.epoll()
355
356     epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
357
358     try:
359         while True:
360             logging.debug("Epoll loop")
361             events = epoll.poll(10)
362             for fd, event_type in events:
363                 logging.debug("Epoll returned: {},{}".format(fd, event_type))
364                 if event_type != select.EPOLLIN:
365                     raise Exception("Unexpected epoll event")
366                 else:
367                     data = sys.stdin.readline()
368                     logging.debug("Data recevied from exabgp: {}.".format(data))
369                     exa_msg_handler(storage, data, in_args.encoder)
370     except Exception as e:
371         logging.warn("Exception occured: {}".format(e))
372     finally:
373         rpcserver.shutdown()
374         trpc.join()
375
376
377 if __name__ == "__main__":
378     main()