X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=test%2Fcsit%2Flibraries%2FScaleClient.py;h=3c2ec35c88adab1b93207f8bd493572332c40a2f;hb=b02f48862ebd74ad07d43891de60296adc55ff89;hp=b8e7e0d544ab6a0d6eb84618b184e9f2625cd11a;hpb=c2d9b555d8eb44074f45a18e3981e1e0499def51;p=integration%2Ftest.git diff --git a/test/csit/libraries/ScaleClient.py b/test/csit/libraries/ScaleClient.py index b8e7e0d544..3c2ec35c88 100644 --- a/test/csit/libraries/ScaleClient.py +++ b/test/csit/libraries/ScaleClient.py @@ -12,6 +12,7 @@ import netaddr import Queue import requests import json +import copy class Counter(object): @@ -30,45 +31,45 @@ class Counter(object): _spreads = ['gauss', 'linear', 'first'] # possible defined spreads at the moment -_default_flow_template = '''{ - "flow-node-inventory:flow": [ - { - "flow-node-inventory:cookie": %d, - "flow-node-inventory:cookie_mask": 4294967295, - "flow-node-inventory:flow-name": "%s", - "flow-node-inventory:hard-timeout": %d, - "flow-node-inventory:id": "%s", - "flow-node-inventory:idle-timeout": %d, - "flow-node-inventory:installHw": false, - "flow-node-inventory:instructions": { - "flow-node-inventory:instruction": [ - { - "flow-node-inventory:apply-actions": { - "flow-node-inventory:action": [ - { - "flow-node-inventory:drop-action": {}, - "flow-node-inventory:order": 0 - } - ] - }, - "flow-node-inventory:order": 0 - } - ] - }, - "flow-node-inventory:match": { - "flow-node-inventory:ipv4-destination": "%s/32", - "flow-node-inventory:ethernet-match": { - "flow-node-inventory:ethernet-type": { - "flow-node-inventory:type": 2048 - } +_default_flow_template_json = { + u'flow': [ + { + u'hard-timeout': 65000, + u'idle-timeout': 65000, + u'cookie_mask': 4294967295, + u'flow-name': u'FLOW-NAME-TEMPLATE', + u'priority': 2, + u'strict': False, + u'cookie': 0, + u'table_id': 0, + u'installHw': False, + u'id': u'FLOW-ID-TEMPLATE', + u'match': { + u'ipv4-destination': u'0.0.0.0/32', + u'ethernet-match': { + u'ethernet-type': { + u'type': 2048 + } + } + }, + u'instructions': { + u'instruction': [ + { + u'order': 0, + u'apply-actions': { + u'action': [ + { + u'drop-action': {}, + u'order': 0 + } + ] + } + } + ] + } } - }, - "flow-node-inventory:priority": 2, - "flow-node-inventory:strict": false, - "flow-node-inventory:table_id": %d - } - ] -}''' + ] +} def _get_notes(fldet=[]): @@ -125,8 +126,40 @@ def _prepare_add(cntl, sw, tab, fl, ip, template=None): '''Creates a PUT http requests to configure a flow in configuration datastore''' url = 'http://'+cntl+':'+'8181' url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl) - flow = template % (fl, 'TestFlow-%d' % fl, 65000, str(fl), 65000, str(netaddr.IPAddress(ip)), tab) - req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=flow, auth=('admin', 'admin')) + flow = copy.deepcopy(template['flow'][0]) + flow['cookie'] = fl + flow['flow-name'] = 'TestFlow-%d' % fl + flow['id'] = str(fl) + flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip)) + flow['table_id'] = tab + fmod = dict(template) + fmod['flow'] = flow + req_data = json.dumps(fmod) + req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data, + auth=('admin', 'admin')) + return req + + +def _prepare_table_add(cntl, flows, template=None): + '''Creates a POST http requests to configure a flow in configuration datastore''' + f1 = flows[0] + sw, tab, fl, ip = f1 + url = 'http://'+cntl+':'+'8181' + url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab) + fdets = [] + for sw, tab, fl, ip in flows: + flow = copy.deepcopy(template['flow'][0]) + flow['cookie'] = fl + flow['flow-name'] = 'TestFlow-%d' % fl + flow['id'] = str(fl) + flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip)) + flow['table_id'] = tab + fdets.append(flow) + fmod = dict(template) + fmod['flow'] = fdets + req_data = json.dumps(fmod) + req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data, + auth=('admin', 'admin')) return req @@ -169,6 +202,37 @@ def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, cont outqueue.put(res) +def _wt_bulk_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='', + template=None, outqueue=None): + '''The funcion runs in a thread. It reads out flow details from the queue and configures + the flow on the controller''' + ses = requests.Session() + cntl = controllers[0] + counter = [0 for i in range(600)] + loop = True + + while loop: + try: + flowlist = inqueue.get(timeout=1) + except Queue.Empty: + if exitevent.is_set() and inqueue.empty(): + loop = False + continue + req = preparefnc(cntl, flowlist, template=template) + prep = ses.prepare_request(req) + try: + rsp = ses.send(prep, timeout=5) + except requests.exceptions.Timeout: + counter[99] += 1 + continue + counter[rsp.status_code] += 1 + res = {} + for i, v in enumerate(counter): + if v > 0: + res[i] = v + outqueue.put(res) + + def _config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], restport='8181', nrthreads=1): '''Function starts thread executors and put required information to the queue. Executors read the queue and send @@ -178,7 +242,7 @@ def _config_task_executor(preparefnc, flow_details=[], flow_template=None, contr if flow_template is not None: template = flow_template else: - template = _default_flow_template + template = _default_flow_template_json # lets enlarge the tupple of flow details with IP, to be used with the template flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details] @@ -231,37 +295,104 @@ def deconfigure_flows(*args, **kwargs): return _config_task_executor(_prepare_delete, *args, **kwargs) +def _bulk_config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], + restport='8181', nrthreads=1, fpr=1): + '''Function starts thread executors and put required information to the queue. Executors read the queue and send + http requests. After the thread's join, it produces a summary result.''' + # TODO: multi controllers support + ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1'))) + if flow_template is not None: + template = flow_template + else: + template = _default_flow_template_json + + # lets enlarge the tupple of flow details with IP, to be used with the template + flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details] + # lest divide flows into switches and tables + fg = {} + for fl in flows: + s, t, f, ip = fl + fk = (s, t) + if (s, t) in fg: + fg[fk].append(fl) + else: + fg[fk] = [fl] + + # lets fill the qurue + q = Queue.Queue() + for k, v in fg.iteritems(): + while len(v) > 0: + q.put(v[:int(fpr)]) + v = v[int(fpr):] + + # result_gueue + rq = Queue.Queue() + # creaet exit event + ee = threading.Event() + + # lets start threads whic will read flow details fro queues and send + threads = [] + for i in range(int(nrthreads)): + t = threading.Thread(target=_wt_bulk_request_sender, args=(i, preparefnc), + kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport, + "template": template, "outqueue": rq}) + threads.append(t) + t.start() + + ee.set() + + result = {} + # waitng for them + for t in threads: + t.join() + res = rq.get() + for k, v in res.iteritems(): + if k not in result: + result[k] = v + else: + result[k] += v + return result + + +def configure_flows_bulk(*args, **kwargs): + '''Configure flows based on given flow details + Input parameters with default values: preparefnc, flow_details=[], flow_template=None, + controllers=['127.0.0.1'], restport='8181', nrthreads=1''' + return _bulk_config_task_executor(_prepare_table_add, *args, **kwargs) + + def _get_operational_inventory_of_switches(controller): '''GET requests to get operational inventory node details''' url = 'http://'+controller+':8181/restconf/operational/opendaylight-inventory:nodes' rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin')) if rsp.status_code != 200: return None - inv = json.loads(rsp.content)['nodes']['node'] + inv = json.loads(rsp.content) + if 'nodes' not in inv: + return None + if 'node' not in inv['nodes']: + return [] + inv = inv['nodes']['node'] switches = [sw for sw in inv if 'openflow:' in sw['id']] return switches -def flow_stats_collected(flow_details=[], controller=''): +def flow_stats_collected(controller=''): '''Once flows are configured, thisfunction is used to check if flows are present in the operational datastore''' # print type(flow_details), flow_details - if type(flow_details) is not list: - raise Exception('List expected') active_flows = 0 found_flows = 0 switches = _get_operational_inventory_of_switches(controller) if switches is None: - return False + return 0, 0, 0 for sw in switches: tabs = sw['flow-node-inventory:table'] for t in tabs: active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows'] if 'flow' in t: found_flows += len(t['flow']) - print "ActiveFlows(reported)/FlowsFound/FlowsExpected", active_flows, found_flows, len(flow_details) - if found_flows == len(flow_details): - return True - return False + print "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows + return len(switches), active_flows, found_flows def get_switches_count(controller=''):