-import unittest
-import os
-import re
-import sys
+import argparse
import logging
+import sys
import time
-import argparse
-import threading
-import requests
-
-from multiprocessing import Process
-import xml.dom.minidom as md
-from xml.etree import ElementTree as ET
-
-from odl_tests_new import MininetTools, ParseTools
-
-FLOW_ID_TEMPLATE = 'FLOW_ID_TEMPLATE'
-COOKIE_TEMPLATE = 'COOKIE_TEMPLATE'
-HARD_TO_TEMPLATE = 'HARD_TO_TEMPLATE'
-FLOW_NAME_TEMPLATE = 'FLOW_NAME_TEMPLATE'
-IPV4DST_TEMPLATE = 'IPV4DST_TEMPLATE'
-PRIORITY_TEMPLATE = 'PRIORITY_TEMPLATE'
-
-xml_template = '<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>' \
-'<flow xmlns=\"urn:opendaylight:flow:inventory\">' \
- '<strict>false</strict>' \
- '<instructions><instruction><order>0</order><apply-actions><action><order>0</order><dec-nw-ttl/>' \
- '</action></apply-actions></instruction></instructions><table_id>2</table_id>' \
- '<id>'+FLOW_ID_TEMPLATE+'</id><cookie_mask>255</cookie_mask><installHw>false</installHw>' \
- '<match><ethernet-match><ethernet-type><type>2048</type></ethernet-type></ethernet-match>' \
- '<ipv4-destination>'+IPV4DST_TEMPLATE+'</ipv4-destination></match><hard-timeout>'+HARD_TO_TEMPLATE+'</hard-timeout>' \
- '<flags>FlowModFlags [_cHECKOVERLAP=false, _rESETCOUNTS=false, _nOPKTCOUNTS=false, _nOBYTCOUNTS=false, _sENDFLOWREM=false]</flags>' \
- '<cookie>'+COOKIE_TEMPLATE+'</cookie><idle-timeout>34</idle-timeout><flow-name>'+FLOW_NAME_TEMPLATE+'</flow-name><priority>'+PRIORITY_TEMPLATE+'</priority>' \
- '<barrier>false</barrier></flow>'
-
-
-class Tool():
-
- @staticmethod
- def get_flows_string(net=None):
- if net is None:
- return []
+import unittest
- switch = net.switches[0]
- output = switch.cmdPrint(
- 'ovs-ofctl -O OpenFlow13 dump-flows %s' % switch.name)
+from openvswitch.flow_tools import FlowAdderThread, FlowRemoverThread, loglevels, \
+ WAIT_TIME
+from openvswitch.mininet_tools import MininetTools
+from openvswitch.testclass_components import CheckOperFlowsComponent, \
+ CheckConfigFlowsComponent, CheckSwitchDump
- return output.splitlines()[1:]
+REMOVE_FLOWS_PER_THREAD = 250
class MultiTest(unittest.TestCase):
log = logging.getLogger('MultiTest')
total_errors = 0
total_flows = 0
+ stored_before_test_flows = 0
def setUp(self):
- MultiTest.log.info('setUp')
+ MultiTest.log.info('test setup...')
self.threads_count = 50
self.thread_pool = list()
+ self.active_map = dict()
+
self.__start_MN()
self.__setup_threads()
self.__run_threads()
+
def tearDown(self):
- MultiTest.log.info('tearDown')
- self.net.stop()
+ MultiTest.log.info('test cleanup...')
+ MultiTest.total_deleted = 0
+ self.__delete_flows()
- @staticmethod
- def inc_error(value=1):
+ def inc_error(self, value=1):
MultiTest.total_errors += value
- @staticmethod
- def inc_flow(value=1):
- MultiTest.total_flows += 1
+ def inc_flow(self, flow_id= None, cookie_id=None):
+ if flow_id is not None and cookie_id is not None:
+ self.active_map[cookie_id] = flow_id
- def __start_MN(self):
- wait_time = 15
+ def delete_flow_from_map(self, flow_id, cookie_id):
+ del self.active_map[cookie_id]
+ def __start_MN(self):
self.net = MininetTools.create_network(self.host, self.mn_port)
self.net.start()
MultiTest.log.info('mininet stared')
- MultiTest.log.info('waiting {} seconds...'.format(wait_time))
- time.sleep(wait_time)
-
+ MultiTest.log.info('waiting {0} seconds...'.format(WAIT_TIME))
+ time.sleep(WAIT_TIME)
+ self.stored_before_test_flows = len(MininetTools.get_flows_string(self.net))
def __setup_threads(self):
- if args.threads is not None:
- self.threads_count = int(args.threads)
+ if in_args.threads is not None:
+ self.threads_count = int(in_args.threads)
for i in range(0, self.threads_count):
- #thread will have predetermined flows id to avoid using shared resource
- t = FlowAdderThread(i, self.host, self.port, self.net, flows_ids_from=i*MultiTest.flows + 1, flows_ids_to=(i+1)*MultiTest.flows + 1)
+ t = FlowAdderThread(self, i, self.host, self.port, self.net,\
+ flows_ids_from=i*MultiTest.flows + 1, flows_ids_to=i*MultiTest.flows + MultiTest.flows)
self.thread_pool.append(t)
def __run_threads(self):
- # start threads
for t in self.thread_pool:
t.start()
- # wait for them to finish
for t in self.thread_pool:
t.join()
- # collect results
- #for t in self.thread_pool:
- # MultiTest.inc_flow(t.flows)
- # MultiTest.inc_error(t.errors)
-
def test(self):
-
switch_flows = 0
+ total_flows = len(self.active_map.keys())
- assert MultiTest.total_flows > 0, 'Stored flows should be greater than 0, actual is {}'.format(MultiTest.total_flows)
+ assert total_flows > 0, ('Stored flows should be greater than 0, actual is {0}'.format(total_flows))
MultiTest.log.info('\n\n---------- preparation finished, running test ----------')
+
# check config
- url = 'http://%s:%d/restconf/config/opendaylight-inventory:nodes' \
- '/node/openflow:1/table/2/' % (self.host, self.port)
- MultiTest.log.info('checking flows in controller - sending request to url: {}'.format(url))
- response = requests.get(url, auth=('admin', 'admin'),
- headers={'Accept': 'application/xml'})
- assert response.status_code == 200
-
- tree = ET.ElementTree(ET.fromstring(response.text))
- flows_on_controller = len(tree.getroot())
-
- # check operational
- url = 'http://%s:%d/restconf/operational/opendaylight-inventory:nodes' \
- '/node/openflow:1/table/2/' % (self.host, self.port)
- MultiTest.log.info('checking flows in controller - sending request to url: {}'.format(url))
- response = requests.get(url, auth=('admin', 'admin'),
- headers={'Accept': 'application/xml'})
- #assert response.status_code == 200
- MultiTest.log.info('got resposnse: {}'.format(response.status_code))
- MultiTest.log.info('operational dump:\n{}'.format(response.text))
-
- MultiTest.log.info('{} flows are stored by results from threads, {} errors'.format(MultiTest.total_flows, MultiTest.total_errors))
- MultiTest.log.info('{} flows are stored in controller config'.format(flows_on_controller))
-
- switch_flows_list = Tool.get_flows_string(self.net)
- switch_flows = len(switch_flows_list)
- MultiTest.log.info('{} flows are stored on switch'.format(switch_flows))
- MultiTest.log.debug('switch flow-dump:\n{}'.format(switch_flows_list))
-
-
- assert MultiTest.total_flows == switch_flows, 'Added amount of flows to switch should be equal to successfully added flows to controller {} <> {}'.format(switch_flows,MultiTest.total_flows)
-
-
-class FlowAdderThread(threading.Thread):
-
- def __init__(self, thread_id, host, port, net, flows_ids_from=0, flows_ids_to=1):
- threading.Thread.__init__(self)
- self.thread_id = thread_id
- self.flows = 0
- self.errors = 0
- self.flows_ids_from = flows_ids_from
- self.flows_ids_to = flows_ids_to
-
- self.net = net
- self.host = host
- self.port = port
-
- self.log = logging.getLogger('FlowAdderThread: ' + str(thread_id))
- self.log.propagate = False
- self.channel = logging.StreamHandler()
- self.log.addHandler(self.channel)
- formatter = logging.Formatter('THREAD {}: %(levelname)s: %(message)s'.format(self.thread_id))
- self.channel.setFormatter(formatter)
- #self.log.setLevel(logging.INFO)
-
- self.log.info('created new FlowAdderThread->id:{}, flows id: {} -> {}'.format(self.thread_id, self.flows_ids_from, self.flows_ids_to))
-
- def make_ipv4_address(self, number, octet_count=4, octet_size=255):
- mask = 24
- ip = ['10', '0', '0', '0']
-
- if number < (255**3):
- for o in range(1, octet_count):
- ip[octet_count - 1 - o] = str(number % octet_size)
- number = number / octet_size
- #mask -= 8
- if number == 0:
- break
-
- return '.'.join(ip) + '/{}'.format(mask)
-
- def __add_flows(self, act_flow_id):
- try:
- self.log.info('adding flow id: {}'.format(act_flow_id))
- self.log.debug('flow ip address from id: {}'.format(self.make_ipv4_address(act_flow_id)))
-
- xml_string = str(xml_template).replace(FLOW_ID_TEMPLATE, str(act_flow_id)).replace(COOKIE_TEMPLATE, str(act_flow_id))\
- .replace(HARD_TO_TEMPLATE, '12').replace(FLOW_NAME_TEMPLATE,'FooXf{}'.format(act_flow_id))\
- .replace(IPV4DST_TEMPLATE,self.make_ipv4_address(act_flow_id)).replace(PRIORITY_TEMPLATE,str(act_flow_id))
-
- #TestRestartMininet.log.info('loaded xml: {}'.format(''.join(xml_string.split())))
- tree = md.parseString(xml_string)
- ids = ParseTools.get_values(tree.documentElement, 'table_id', 'id')
- data = (self.host, self.port, ids['table_id'], ids['id'])
-
- url = 'http://%s:%d/restconf/config/opendaylight-inventory:nodes' \
- '/node/openflow:1/table/%s/flow/%s' % data
- # send request via RESTCONF
- headers = {
- 'Content-Type': 'application/xml',
- 'Accept': 'application/xml',
- }
- self.log.debug('sending request to url: {}'.format(url))
- rsp = requests.put(url, auth=('admin', 'admin'), data=xml_string,
- headers=headers)
- self.log.debug('received status code: {}'.format(rsp.status_code))
- self.log.debug('received content: {}'.format(rsp.text))
- assert rsp.status_code == 204 or rsp.status_code == 200, 'Status' \
- ' code returned %d' % rsp.status_code
-
- # check request content against restconf's datastore
- #response = requests.get(url, auth=('admin', 'admin'),
- # headers={'Accept': 'application/xml'})
- #assert response.status_code == 200, 'Conifg response should be 200, is {}'.format(response.status_code)
-
- #switch_flows = Tool.get_flows_string(self.net)
- #assert len(switch_flows) > 0, 'Flows stored on switch shoul be greater than 0'
-
- # we expect that controller doesn't fail to store flow on switch
- self.flows += 1
- MultiTest.inc_flow()
- # store last used table id which got flows for later checkup
- #self.log.debug('{} successfully stored flows - {} flows are on switch'.format(self.flows, len(switch_flows)))
- except AssertionError as e:
- self.errors += 1
- self.log.error('AssertionError storing flow id:{}, reason: {}'.format(act_flow_id, str(e)))
- MultiTest.inc_error()
- except Exception as e:
- self.errors += 1
- self.log.error('Error storing flow id:{}, reason: {}'.format(act_flow_id, str(e)))
- MultiTest.inc_error()
-
- def run(self):
- self.log.info('started... adding flows {} to {}'.format(self.flows_ids_from, self.flows_ids_to))
- for i in range(self.flows_ids_from, self.flows_ids_to):
- self.__add_flows(i)
-
- self.log.info('finished, successfully added {} flows, {} errors'.format(self.flows,self.errors))
+ flows_on_controller = CheckConfigFlowsComponent().check_config_flows(self.host, self.port, self.active_map)
+
+ #check operational
+ flows_on_controller_operational = CheckOperFlowsComponent().check_oper_flows_loop(self.host, self.port, self.active_map)
+
+ # check switch
+ switch_flows_list = MininetTools.get_flows_string(self.net)
+ MultiTest.log.info('flow dump has {0} entries (including informational)'.format(len(switch_flows_list)))
+ for item in switch_flows_list:
+ if CheckSwitchDump().get_id_by_entry(item, self.active_map) is not None:
+ MultiTest.log.debug('flow_id:{0} = {1}'.format(CheckSwitchDump().get_id_by_entry(item, self.active_map), item))
+ switch_flows += 1
+
+ # print info
+ MultiTest.log.info('{0} flows are stored by results from threads, {1} errors'.format(total_flows, MultiTest.total_errors))
+ MultiTest.log.info('{0} flows are stored in controller config'.format(flows_on_controller))
+ MultiTest.log.info('{0} flows are stored in controller operational'.format(flows_on_controller_operational))
+ MultiTest.log.info('{0} flows are stored on switch'.format(switch_flows))
+
+ assert total_flows == switch_flows, 'Added amount of flows to switch should be equal to successfully added flows to controller {0} <> {1}'.format(switch_flows,total_flows)
+
+ def __delete_flows(self):
+ MultiTest.log.info('deleting flows added during test')
+
+ # using threads to delete to speed up cleanup
+ items_to_delete = list(self.active_map.items())
+ self.thread_pool = []
+ thread_id = 0
+ slice_from = REMOVE_FLOWS_PER_THREAD * thread_id
+ slice_to = REMOVE_FLOWS_PER_THREAD * (thread_id + 1)
+
+ while(slice_from < len(items_to_delete)):
+ self.thread_pool.append(FlowRemoverThread(self, thread_id, self.host, self.port, self.net, items_to_delete[slice_from:slice_to]))
+ thread_id += 1
+ slice_from = REMOVE_FLOWS_PER_THREAD * thread_id
+ slice_to = REMOVE_FLOWS_PER_THREAD * (thread_id + 1)
+
+ for t in self.thread_pool:
+ t.start()
+
+ for t in self.thread_pool:
+ t.join()
if __name__ == '__main__':
- logging.basicConfig(level=logging.INFO)
+
+ requests_log = logging.getLogger("requests")
+ requests_log.setLevel(logging.WARNING)
# parse cmdline arguments
- parser = argparse.ArgumentParser(description='Test for flow addition to'
- ' switch after the switch has been restarted')
+ parser = argparse.ArgumentParser(description='End to end stress tests of flows '
+ 'addition from multiple connections')
parser.add_argument('--odlhost', default='127.0.0.1', help='host where '
- 'odl controller is running')
+ 'odl controller is running (default = 127.0.0.1) ')
parser.add_argument('--odlport', type=int, default=8080, help='port on '
- 'which odl\'s RESTCONF is listening')
- parser.add_argument('--mnport', type=int, default=6653, help='port on '
- 'which odl\'s controller is listening')
- parser.add_argument('--xmls', default=None, help='generete tests only '
- 'from some xmls (i.e. 1,3,34) ')
+ 'which odl\'s RESTCONF is listening (default = 8080) ')
+ parser.add_argument('--mnport', type=int, default=6633, help='port on '
+ 'which odl\'s controller is listening (default = 6633)')
parser.add_argument('--threads', default=50, help='how many threads '
- 'should be used')
+ 'should be used (default = 50)')
parser.add_argument('--flows', default=20, help='how many flows will add'
- ' one thread')
- args = parser.parse_args()
+ ' one thread (default = 20)')
+ parser.add_argument('--log', default='info', help='log level, permitted values are'
+ ' debug/info/warning/error (default = info)')
+# parser.add_argument('--xmls', default=None, help='generete tests only '
+# 'from some xmls (i.e. 1,3,34) (default = None)')
+ in_args = parser.parse_args()
+
+ #logging.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(level=loglevels.get(in_args.log, logging.INFO))
# set host and port of ODL controller for test cases
- MultiTest.port = args.odlport
- MultiTest.host = args.odlhost
- MultiTest.mn_port = args.mnport
- MultiTest.threads = int(args.threads)
- MultiTest.flows = int(args.flows)
+ MultiTest.port = in_args.odlport
+ MultiTest.host = in_args.odlhost
+ MultiTest.mn_port = in_args.mnport
+ MultiTest.threads = int(in_args.threads)
+ MultiTest.flows = int(in_args.flows)
del sys.argv[1:]
- unittest.main()
+
+ suite = unittest.TestSuite()
+ test = MultiTest('test')
+ suite.addTest(test)
+
+ try:
+ unittest.TextTestRunner(verbosity=2).run(suite)
+ finally:
+ test.net.stop()