fixing the lib to be compatible with mininet
[integration/test.git] / test / csit / libraries / ScaleClient.py
1 '''
2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
4
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
8 '''
9 import random
10 import threading
11 import netaddr
12 import Queue
13 import requests
14 import json
15 import copy
16
17
18 class Counter(object):
19     def __init__(self, start=0):
20         self.lock = threading.Lock()
21         self.value = start
22
23     def increment(self, value=1):
24         self.lock.acquire()
25         val = self.value
26         try:
27             self.value += value
28         finally:
29             self.lock.release()
30         return val
31
32
33 _spreads = ['gauss', 'linear', 'first']    # possible defined spreads at the moment
34 _default_flow_template_json = {
35     u'flow': [
36         {
37             u'hard-timeout': 65000,
38             u'idle-timeout': 65000,
39             u'cookie_mask': 4294967295,
40             u'flow-name': u'FLOW-NAME-TEMPLATE',
41             u'priority': 2,
42             u'strict': False,
43             u'cookie': 0,
44             u'table_id': 0,
45             u'installHw': False,
46             u'id': u'FLOW-ID-TEMPLATE',
47             u'match': {
48                 u'ipv4-destination': u'0.0.0.0/32',
49                 u'ethernet-match': {
50                     u'ethernet-type': {
51                         u'type': 2048
52                     }
53                 }
54             },
55             u'instructions': {
56                 u'instruction': [
57                     {
58                         u'order': 0,
59                         u'apply-actions': {
60                             u'action': [
61                                 {
62                                     u'drop-action': {},
63                                     u'order': 0
64                                 }
65                             ]
66                         }
67                     }
68                 ]
69             }
70         }
71     ]
72 }
73
74
75 def _get_notes(fldet=[]):
76     '''For given list of flow details it produces a dictionary with statistics
77     { swId1 : { tabId1 : flows_count1,
78                 tabId2 : flows_count2,
79                ...
80                 'total' : switch count }
81       swId2 ...
82     }
83     '''
84     notes = {}
85     for (sw, tab, flow) in fldet:
86         if sw not in notes:
87             notes[sw] = {'total': 0}
88         if tab not in notes[sw]:
89             notes[sw][tab] = 0
90         notes[sw][tab] += 1
91         notes[sw]['total'] += 1
92     return notes
93
94
95 def _randomize(spread, maxn):
96     '''Returns a randomized switch or table id'''
97     if spread not in _spreads:
98         raise Exception('Spread method {} not available'.format(spread))
99     while True:
100         if spread == 'gauss':
101             ga = abs(random.gauss(0, 1))
102             rv = int(ga*float(maxn)/3)
103             if rv < maxn:
104                 return rv
105         elif spread == 'linear':
106             rv = int(random.random() * float(maxn))
107             if rv < maxn:
108                 return rv
109             else:
110                 raise ValueError('rv >= maxn')
111         elif spread == 'first':
112             return 0
113
114
115 def generate_new_flow_details(flows=10, switches=1, swspread='gauss', tables=250, tabspread='gauss'):
116     """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
117     according to the spread rules between swithces and tables.
118     It also returns a dictionary with statsistics."""
119     swflows = [_randomize(swspread, switches) for f in range(int(flows))]
120     # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
121     fltables = [(s+1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)]
122     notes = _get_notes(fltables)
123     return fltables, notes
124
125
126 def _prepare_add(cntl, sw, tab, fl, ip, template=None):
127     '''Creates a PUT http requests to configure a flow in configuration datastore'''
128     url = 'http://'+cntl+':'+'8181'
129     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
130     flow = copy.deepcopy(template['flow'][0])
131     flow['cookie'] = fl
132     flow['flow-name'] = 'TestFlow-%d' % fl
133     flow['id'] = str(fl)
134     flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
135     flow['table_id'] = tab
136     fmod = dict(template)
137     fmod['flow'] = flow
138     req_data = json.dumps(fmod)
139     req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data,
140                            auth=('admin', 'admin'))
141     return req
142
143
144 def _prepare_table_add(cntl, flows, template=None):
145     '''Creates a POST http requests to configure a flow in configuration datastore'''
146     f1 = flows[0]
147     sw, tab, fl, ip = f1
148     url = 'http://'+cntl+':'+'8181'
149     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)
150     fdets = []
151     for sw, tab, fl, ip in flows:
152         flow = copy.deepcopy(template['flow'][0])
153         flow['cookie'] = fl
154         flow['flow-name'] = 'TestFlow-%d' % fl
155         flow['id'] = str(fl)
156         flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
157         flow['table_id'] = tab
158         fdets.append(flow)
159     fmod = dict(template)
160     fmod['flow'] = fdets
161     req_data = json.dumps(fmod)
162     req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
163                            auth=('admin', 'admin'))
164     return req
165
166
167 def _prepare_delete(cntl, sw, tab, fl, ip, template=None):
168     '''Creates a DELETE http request to remove the flow from configuration datastore'''
169     url = 'http://'+cntl+':'+'8181'
170     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
171     req = requests.Request('DELETE', url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
172     return req
173
174
175 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='', template=None,
176                        outqueue=None):
177     '''The funcion runs in a thread. It reads out flow details from the queue and configures
178     the flow on the controller'''
179     ses = requests.Session()
180     cntl = controllers[0]
181     counter = [0 for i in range(600)]
182
183     while True:
184         try:
185             (sw, tab, fl, ip) = inqueue.get(timeout=1)
186             sw, tab, fl, ip = sw+1, tab, fl+1, ip
187         except Queue.Empty:
188             if exitevent.is_set() and inqueue.empty():
189                 break
190             continue
191         req = preparefnc(cntl, sw, tab, fl, ip, template=template)
192         # prep = ses.prepare_request(req)
193         prep = req.prepare()
194         try:
195             rsp = ses.send(prep, timeout=5)
196         except requests.exceptions.Timeout:
197             counter[99] += 1
198             continue
199         counter[rsp.status_code] += 1
200     res = {}
201     for i, v in enumerate(counter):
202         if v > 0:
203             res[i] = v
204     outqueue.put(res)
205
206
207 def _wt_bulk_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
208                             template=None, outqueue=None):
209     '''The funcion runs in a thread. It reads out flow details from the queue and configures
210     the flow on the controller'''
211     ses = requests.Session()
212     cntl = controllers[0]
213     counter = [0 for i in range(600)]
214     loop = True
215
216     while loop:
217         try:
218             flowlist = inqueue.get(timeout=1)
219         except Queue.Empty:
220             if exitevent.is_set() and inqueue.empty():
221                 loop = False
222             continue
223         req = preparefnc(cntl, flowlist, template=template)
224         # prep = ses.prepare_request(req)
225         prep = req.prepare()
226         try:
227             rsp = ses.send(prep, timeout=5)
228         except requests.exceptions.Timeout:
229             counter[99] += 1
230             continue
231         counter[rsp.status_code] += 1
232     res = {}
233     for i, v in enumerate(counter):
234         if v > 0:
235             res[i] = v
236     outqueue.put(res)
237
238
239 def _config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], restport='8181',
240                           nrthreads=1):
241     '''Function starts thread executors and put required information to the queue. Executors read the queue and send
242     http requests. After the thread's join, it produces a summary result.'''
243     # TODO: multi controllers support
244     ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
245     if flow_template is not None:
246         template = flow_template
247     else:
248         template = _default_flow_template_json
249
250     # lets enlarge the tupple of flow details with IP, to be used with the template
251     flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
252
253     # lets fill the qurue
254     q = Queue.Queue()
255     for f in flows:
256         q.put(f)
257
258     # result_gueue
259     rq = Queue.Queue()
260     # creaet exit event
261     ee = threading.Event()
262
263     # lets start threads whic will read flow details fro queues and send
264     threads = []
265     for i in range(int(nrthreads)):
266         t = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
267                              kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
268                                      "template": template, "outqueue": rq})
269         threads.append(t)
270         t.start()
271
272     ee.set()
273
274     result = {}
275     # waitng for them
276     for t in threads:
277         t.join()
278         res = rq.get()
279         for k, v in res.iteritems():
280             if k not in result:
281                 result[k] = v
282             else:
283                 result[k] += v
284     return result
285
286
287 def configure_flows(*args, **kwargs):
288     '''Configure flows based on given flow details
289     Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
290                                controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
291     return _config_task_executor(_prepare_add, *args, **kwargs)
292
293
294 def deconfigure_flows(*args, **kwargs):
295     '''Deconfigure flows based on given flow details.
296     Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
297                                controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
298     return _config_task_executor(_prepare_delete, *args, **kwargs)
299
300
301 def _bulk_config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'],
302                                restport='8181', nrthreads=1, fpr=1):
303     '''Function starts thread executors and put required information to the queue. Executors read the queue and send
304     http requests. After the thread's join, it produces a summary result.'''
305     # TODO: multi controllers support
306     ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
307     if flow_template is not None:
308         template = flow_template
309     else:
310         template = _default_flow_template_json
311
312     # lets enlarge the tupple of flow details with IP, to be used with the template
313     flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
314     # lest divide flows into switches and tables
315     fg = {}
316     for fl in flows:
317         s, t, f, ip = fl
318         fk = (s, t)
319         if (s, t) in fg:
320             fg[fk].append(fl)
321         else:
322             fg[fk] = [fl]
323
324     # lets fill the qurue
325     q = Queue.Queue()
326     for k, v in fg.iteritems():
327         while len(v) > 0:
328             q.put(v[:int(fpr)])
329             v = v[int(fpr):]
330
331     # result_gueue
332     rq = Queue.Queue()
333     # creaet exit event
334     ee = threading.Event()
335
336     # lets start threads whic will read flow details fro queues and send
337     threads = []
338     for i in range(int(nrthreads)):
339         t = threading.Thread(target=_wt_bulk_request_sender, args=(i, preparefnc),
340                              kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
341                                      "template": template, "outqueue": rq})
342         threads.append(t)
343         t.start()
344
345     ee.set()
346
347     result = {}
348     # waitng for them
349     for t in threads:
350         t.join()
351         res = rq.get()
352         for k, v in res.iteritems():
353             if k not in result:
354                 result[k] = v
355             else:
356                 result[k] += v
357     return result
358
359
360 def configure_flows_bulk(*args, **kwargs):
361     '''Configure flows based on given flow details
362     Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
363                                controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
364     return _bulk_config_task_executor(_prepare_table_add, *args, **kwargs)
365
366
367 def _get_operational_inventory_of_switches(controller):
368     '''GET requests to get operational inventory node details'''
369     url = 'http://'+controller+':8181/restconf/operational/opendaylight-inventory:nodes'
370     rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
371     if rsp.status_code != 200:
372         return None
373     inv = json.loads(rsp.content)
374     if 'nodes' not in inv:
375         return None
376     if 'node' not in inv['nodes']:
377         return []
378     inv = inv['nodes']['node']
379     switches = [sw for sw in inv if 'openflow:' in sw['id']]
380     return switches
381
382
383 def flow_stats_collected(controller=''):
384     '''Once flows are configured, thisfunction is used to check if flows are present in the operational datastore'''
385     # print type(flow_details), flow_details
386     active_flows = 0
387     found_flows = 0
388     switches = _get_operational_inventory_of_switches(controller)
389     if switches is None:
390         return 0, 0, 0
391     for sw in switches:
392         tabs = sw['flow-node-inventory:table']
393         for t in tabs:
394             active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
395             if 'flow' in t:
396                 found_flows += len(t['flow'])
397     print "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows
398     return len(switches), active_flows, found_flows
399
400
401 def get_switches_count(controller=''):
402     '''Count the switches presnt in the operational inventory nodes datastore'''
403     switches = _get_operational_inventory_of_switches(controller)
404     if switches is None:
405         return 0
406     return len(switches)