b8e7e0d544ab6a0d6eb84618b184e9f2625cd11a
[integration/test.git] / test / csit / libraries / ScaleClient.py
1 '''
2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
4
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
8 '''
9 import random
10 import threading
11 import netaddr
12 import Queue
13 import requests
14 import json
15
16
17 class Counter(object):
18     def __init__(self, start=0):
19         self.lock = threading.Lock()
20         self.value = start
21
22     def increment(self, value=1):
23         self.lock.acquire()
24         val = self.value
25         try:
26             self.value += value
27         finally:
28             self.lock.release()
29         return val
30
31
32 _spreads = ['gauss', 'linear', 'first']    # possible defined spreads at the moment
33 _default_flow_template = '''{
34   "flow-node-inventory:flow": [
35     {
36       "flow-node-inventory:cookie": %d,
37       "flow-node-inventory:cookie_mask": 4294967295,
38       "flow-node-inventory:flow-name": "%s",
39       "flow-node-inventory:hard-timeout": %d,
40       "flow-node-inventory:id": "%s",
41       "flow-node-inventory:idle-timeout": %d,
42       "flow-node-inventory:installHw": false,
43       "flow-node-inventory:instructions": {
44         "flow-node-inventory:instruction": [
45           {
46             "flow-node-inventory:apply-actions": {
47               "flow-node-inventory:action": [
48                  {
49                    "flow-node-inventory:drop-action": {},
50                    "flow-node-inventory:order": 0
51                  }
52                ]
53              },
54              "flow-node-inventory:order": 0
55           }
56         ]
57       },
58       "flow-node-inventory:match": {
59         "flow-node-inventory:ipv4-destination": "%s/32",
60         "flow-node-inventory:ethernet-match": {
61           "flow-node-inventory:ethernet-type": {
62             "flow-node-inventory:type": 2048
63           }
64         }
65       },
66       "flow-node-inventory:priority": 2,
67       "flow-node-inventory:strict": false,
68       "flow-node-inventory:table_id": %d
69     }
70   ]
71 }'''
72
73
74 def _get_notes(fldet=[]):
75     '''For given list of flow details it produces a dictionary with statistics
76     { swId1 : { tabId1 : flows_count1,
77                 tabId2 : flows_count2,
78                ...
79                 'total' : switch count }
80       swId2 ...
81     }
82     '''
83     notes = {}
84     for (sw, tab, flow) in fldet:
85         if sw not in notes:
86             notes[sw] = {'total': 0}
87         if tab not in notes[sw]:
88             notes[sw][tab] = 0
89         notes[sw][tab] += 1
90         notes[sw]['total'] += 1
91     return notes
92
93
94 def _randomize(spread, maxn):
95     '''Returns a randomized switch or table id'''
96     if spread not in _spreads:
97         raise Exception('Spread method {} not available'.format(spread))
98     while True:
99         if spread == 'gauss':
100             ga = abs(random.gauss(0, 1))
101             rv = int(ga*float(maxn)/3)
102             if rv < maxn:
103                 return rv
104         elif spread == 'linear':
105             rv = int(random.random() * float(maxn))
106             if rv < maxn:
107                 return rv
108             else:
109                 raise ValueError('rv >= maxn')
110         elif spread == 'first':
111             return 0
112
113
114 def generate_new_flow_details(flows=10, switches=1, swspread='gauss', tables=250, tabspread='gauss'):
115     """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
116     according to the spread rules between swithces and tables.
117     It also returns a dictionary with statsistics."""
118     swflows = [_randomize(swspread, switches) for f in range(int(flows))]
119     fltables = [(s, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)]
120     notes = _get_notes(fltables)
121     return fltables, notes
122
123
124 def _prepare_add(cntl, sw, tab, fl, ip, template=None):
125     '''Creates a PUT http requests to configure a flow in configuration datastore'''
126     url = 'http://'+cntl+':'+'8181'
127     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
128     flow = template % (fl, 'TestFlow-%d' % fl, 65000, str(fl), 65000, str(netaddr.IPAddress(ip)), tab)
129     req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=flow, auth=('admin', 'admin'))
130     return req
131
132
133 def _prepare_delete(cntl, sw, tab, fl, ip, template=None):
134     '''Creates a DELETE http request to remove the flow from configuration datastore'''
135     url = 'http://'+cntl+':'+'8181'
136     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
137     req = requests.Request('DELETE', url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
138     return req
139
140
141 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='', template=None,
142                        outqueue=None):
143     '''The funcion runs in a thread. It reads out flow details from the queue and configures
144     the flow on the controller'''
145     ses = requests.Session()
146     cntl = controllers[0]
147     counter = [0 for i in range(600)]
148
149     while True:
150         try:
151             (sw, tab, fl, ip) = inqueue.get(timeout=1)
152             sw, tab, fl, ip = sw+1, tab, fl+1, ip
153         except Queue.Empty:
154             if exitevent.is_set() and inqueue.empty():
155                 break
156             continue
157         req = preparefnc(cntl, sw, tab, fl, ip, template=template)
158         prep = ses.prepare_request(req)
159         try:
160             rsp = ses.send(prep, timeout=5)
161         except requests.exceptions.Timeout:
162             counter[99] += 1
163             continue
164         counter[rsp.status_code] += 1
165     res = {}
166     for i, v in enumerate(counter):
167         if v > 0:
168             res[i] = v
169     outqueue.put(res)
170
171
172 def _config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], restport='8181',
173                           nrthreads=1):
174     '''Function starts thread executors and put required information to the queue. Executors read the queue and send
175     http requests. After the thread's join, it produces a summary result.'''
176     # TODO: multi controllers support
177     ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
178     if flow_template is not None:
179         template = flow_template
180     else:
181         template = _default_flow_template
182
183     # lets enlarge the tupple of flow details with IP, to be used with the template
184     flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
185
186     # lets fill the qurue
187     q = Queue.Queue()
188     for f in flows:
189         q.put(f)
190
191     # result_gueue
192     rq = Queue.Queue()
193     # creaet exit event
194     ee = threading.Event()
195
196     # lets start threads whic will read flow details fro queues and send
197     threads = []
198     for i in range(int(nrthreads)):
199         t = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
200                              kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
201                                      "template": template, "outqueue": rq})
202         threads.append(t)
203         t.start()
204
205     ee.set()
206
207     result = {}
208     # waitng for them
209     for t in threads:
210         t.join()
211         res = rq.get()
212         for k, v in res.iteritems():
213             if k not in result:
214                 result[k] = v
215             else:
216                 result[k] += v
217     return result
218
219
220 def configure_flows(*args, **kwargs):
221     '''Configure flows based on given flow details
222     Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
223                                controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
224     return _config_task_executor(_prepare_add, *args, **kwargs)
225
226
227 def deconfigure_flows(*args, **kwargs):
228     '''Deconfigure flows based on given flow details.
229     Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
230                                controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
231     return _config_task_executor(_prepare_delete, *args, **kwargs)
232
233
234 def _get_operational_inventory_of_switches(controller):
235     '''GET requests to get operational inventory node details'''
236     url = 'http://'+controller+':8181/restconf/operational/opendaylight-inventory:nodes'
237     rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
238     if rsp.status_code != 200:
239         return None
240     inv = json.loads(rsp.content)['nodes']['node']
241     switches = [sw for sw in inv if 'openflow:' in sw['id']]
242     return switches
243
244
245 def flow_stats_collected(flow_details=[], controller=''):
246     '''Once flows are configured, thisfunction is used to check if flows are present in the operational datastore'''
247     # print type(flow_details), flow_details
248     if type(flow_details) is not list:
249         raise Exception('List expected')
250     active_flows = 0
251     found_flows = 0
252     switches = _get_operational_inventory_of_switches(controller)
253     if switches is None:
254         return False
255     for sw in switches:
256         tabs = sw['flow-node-inventory:table']
257         for t in tabs:
258             active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
259             if 'flow' in t:
260                 found_flows += len(t['flow'])
261     print "ActiveFlows(reported)/FlowsFound/FlowsExpected", active_flows, found_flows, len(flow_details)
262     if found_flows == len(flow_details):
263         return True
264     return False
265
266
267 def get_switches_count(controller=''):
268     '''Count the switches presnt in the operational inventory nodes datastore'''
269     switches = _get_operational_inventory_of_switches(controller)
270     if switches is None:
271         return 0
272     return len(switches)