adding a scale test for statistic collection and its it's testplans
[integration/test.git] / test / csit / libraries / ScaleClient.py
index b8e7e0d544ab6a0d6eb84618b184e9f2625cd11a..3c2ec35c88adab1b93207f8bd493572332c40a2f 100644 (file)
@@ -12,6 +12,7 @@ import netaddr
 import Queue
 import requests
 import json
+import copy
 
 
 class Counter(object):
@@ -30,45 +31,45 @@ class Counter(object):
 
 
 _spreads = ['gauss', 'linear', 'first']    # possible defined spreads at the moment
-_default_flow_template = '''{
-  "flow-node-inventory:flow": [
-    {
-      "flow-node-inventory:cookie": %d,
-      "flow-node-inventory:cookie_mask": 4294967295,
-      "flow-node-inventory:flow-name": "%s",
-      "flow-node-inventory:hard-timeout": %d,
-      "flow-node-inventory:id": "%s",
-      "flow-node-inventory:idle-timeout": %d,
-      "flow-node-inventory:installHw": false,
-      "flow-node-inventory:instructions": {
-        "flow-node-inventory:instruction": [
-          {
-            "flow-node-inventory:apply-actions": {
-              "flow-node-inventory:action": [
-                 {
-                   "flow-node-inventory:drop-action": {},
-                   "flow-node-inventory:order": 0
-                 }
-               ]
-             },
-             "flow-node-inventory:order": 0
-          }
-        ]
-      },
-      "flow-node-inventory:match": {
-        "flow-node-inventory:ipv4-destination": "%s/32",
-        "flow-node-inventory:ethernet-match": {
-          "flow-node-inventory:ethernet-type": {
-            "flow-node-inventory:type": 2048
-          }
+_default_flow_template_json = {
+    u'flow': [
+        {
+            u'hard-timeout': 65000,
+            u'idle-timeout': 65000,
+            u'cookie_mask': 4294967295,
+            u'flow-name': u'FLOW-NAME-TEMPLATE',
+            u'priority': 2,
+            u'strict': False,
+            u'cookie': 0,
+            u'table_id': 0,
+            u'installHw': False,
+            u'id': u'FLOW-ID-TEMPLATE',
+            u'match': {
+                u'ipv4-destination': u'0.0.0.0/32',
+                u'ethernet-match': {
+                    u'ethernet-type': {
+                        u'type': 2048
+                    }
+                }
+            },
+            u'instructions': {
+                u'instruction': [
+                    {
+                        u'order': 0,
+                        u'apply-actions': {
+                            u'action': [
+                                {
+                                    u'drop-action': {},
+                                    u'order': 0
+                                }
+                            ]
+                        }
+                    }
+                ]
+            }
         }
-      },
-      "flow-node-inventory:priority": 2,
-      "flow-node-inventory:strict": false,
-      "flow-node-inventory:table_id": %d
-    }
-  ]
-}'''
+    ]
+}
 
 
 def _get_notes(fldet=[]):
@@ -125,8 +126,40 @@ def _prepare_add(cntl, sw, tab, fl, ip, template=None):
     '''Creates a PUT http requests to configure a flow in configuration datastore'''
     url = 'http://'+cntl+':'+'8181'
     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
-    flow = template % (fl, 'TestFlow-%d' % fl, 65000, str(fl), 65000, str(netaddr.IPAddress(ip)), tab)
-    req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=flow, auth=('admin', 'admin'))
+    flow = copy.deepcopy(template['flow'][0])
+    flow['cookie'] = fl
+    flow['flow-name'] = 'TestFlow-%d' % fl
+    flow['id'] = str(fl)
+    flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
+    flow['table_id'] = tab
+    fmod = dict(template)
+    fmod['flow'] = flow
+    req_data = json.dumps(fmod)
+    req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data,
+                           auth=('admin', 'admin'))
+    return req
+
+
+def _prepare_table_add(cntl, flows, template=None):
+    '''Creates a POST http requests to configure a flow in configuration datastore'''
+    f1 = flows[0]
+    sw, tab, fl, ip = f1
+    url = 'http://'+cntl+':'+'8181'
+    url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)
+    fdets = []
+    for sw, tab, fl, ip in flows:
+        flow = copy.deepcopy(template['flow'][0])
+        flow['cookie'] = fl
+        flow['flow-name'] = 'TestFlow-%d' % fl
+        flow['id'] = str(fl)
+        flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
+        flow['table_id'] = tab
+        fdets.append(flow)
+    fmod = dict(template)
+    fmod['flow'] = fdets
+    req_data = json.dumps(fmod)
+    req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
+                           auth=('admin', 'admin'))
     return req
 
 
@@ -169,6 +202,37 @@ def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, cont
     outqueue.put(res)
 
 
+def _wt_bulk_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
+                            template=None, outqueue=None):
+    '''The funcion runs in a thread. It reads out flow details from the queue and configures
+    the flow on the controller'''
+    ses = requests.Session()
+    cntl = controllers[0]
+    counter = [0 for i in range(600)]
+    loop = True
+
+    while loop:
+        try:
+            flowlist = inqueue.get(timeout=1)
+        except Queue.Empty:
+            if exitevent.is_set() and inqueue.empty():
+                loop = False
+            continue
+        req = preparefnc(cntl, flowlist, template=template)
+        prep = ses.prepare_request(req)
+        try:
+            rsp = ses.send(prep, timeout=5)
+        except requests.exceptions.Timeout:
+            counter[99] += 1
+            continue
+        counter[rsp.status_code] += 1
+    res = {}
+    for i, v in enumerate(counter):
+        if v > 0:
+            res[i] = v
+    outqueue.put(res)
+
+
 def _config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], restport='8181',
                           nrthreads=1):
     '''Function starts thread executors and put required information to the queue. Executors read the queue and send
@@ -178,7 +242,7 @@ def _config_task_executor(preparefnc, flow_details=[], flow_template=None, contr
     if flow_template is not None:
         template = flow_template
     else:
-        template = _default_flow_template
+        template = _default_flow_template_json
 
     # lets enlarge the tupple of flow details with IP, to be used with the template
     flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
@@ -231,37 +295,104 @@ def deconfigure_flows(*args, **kwargs):
     return _config_task_executor(_prepare_delete, *args, **kwargs)
 
 
+def _bulk_config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'],
+                               restport='8181', nrthreads=1, fpr=1):
+    '''Function starts thread executors and put required information to the queue. Executors read the queue and send
+    http requests. After the thread's join, it produces a summary result.'''
+    # TODO: multi controllers support
+    ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
+    if flow_template is not None:
+        template = flow_template
+    else:
+        template = _default_flow_template_json
+
+    # lets enlarge the tupple of flow details with IP, to be used with the template
+    flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
+    # lest divide flows into switches and tables
+    fg = {}
+    for fl in flows:
+        s, t, f, ip = fl
+        fk = (s, t)
+        if (s, t) in fg:
+            fg[fk].append(fl)
+        else:
+            fg[fk] = [fl]
+
+    # lets fill the qurue
+    q = Queue.Queue()
+    for k, v in fg.iteritems():
+        while len(v) > 0:
+            q.put(v[:int(fpr)])
+            v = v[int(fpr):]
+
+    # result_gueue
+    rq = Queue.Queue()
+    # creaet exit event
+    ee = threading.Event()
+
+    # lets start threads whic will read flow details fro queues and send
+    threads = []
+    for i in range(int(nrthreads)):
+        t = threading.Thread(target=_wt_bulk_request_sender, args=(i, preparefnc),
+                             kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
+                                     "template": template, "outqueue": rq})
+        threads.append(t)
+        t.start()
+
+    ee.set()
+
+    result = {}
+    # waitng for them
+    for t in threads:
+        t.join()
+        res = rq.get()
+        for k, v in res.iteritems():
+            if k not in result:
+                result[k] = v
+            else:
+                result[k] += v
+    return result
+
+
+def configure_flows_bulk(*args, **kwargs):
+    '''Configure flows based on given flow details
+    Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
+                               controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
+    return _bulk_config_task_executor(_prepare_table_add, *args, **kwargs)
+
+
 def _get_operational_inventory_of_switches(controller):
     '''GET requests to get operational inventory node details'''
     url = 'http://'+controller+':8181/restconf/operational/opendaylight-inventory:nodes'
     rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
     if rsp.status_code != 200:
         return None
-    inv = json.loads(rsp.content)['nodes']['node']
+    inv = json.loads(rsp.content)
+    if 'nodes' not in inv:
+        return None
+    if 'node' not in inv['nodes']:
+        return []
+    inv = inv['nodes']['node']
     switches = [sw for sw in inv if 'openflow:' in sw['id']]
     return switches
 
 
-def flow_stats_collected(flow_details=[], controller=''):
+def flow_stats_collected(controller=''):
     '''Once flows are configured, thisfunction is used to check if flows are present in the operational datastore'''
     # print type(flow_details), flow_details
-    if type(flow_details) is not list:
-        raise Exception('List expected')
     active_flows = 0
     found_flows = 0
     switches = _get_operational_inventory_of_switches(controller)
     if switches is None:
-        return False
+        return 0, 0, 0
     for sw in switches:
         tabs = sw['flow-node-inventory:table']
         for t in tabs:
             active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
             if 'flow' in t:
                 found_flows += len(t['flow'])
-    print "ActiveFlows(reported)/FlowsFound/FlowsExpected", active_flows, found_flows, len(flow_details)
-    if found_flows == len(flow_details):
-        return True
-    return False
+    print "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows
+    return len(switches), active_flows, found_flows
 
 
 def get_switches_count(controller=''):