Do not use AutoCloseable
[openflowplugin.git] / test-scripts / stress_test.py
index 7eac51a135e759dc92a457a13272bcf7dc9a549a..d7287ab9aa0a4a5fcd28f2ee1254dd2b705a7ae5 100644 (file)
-import unittest
-import os
-import re
-import sys
+import argparse
 import logging
+import sys
 import time
-import argparse
-import threading
-import requests
-
-from multiprocessing import Process
-import xml.dom.minidom as md
-from xml.etree import ElementTree as ET
-
-from odl_tests_new import MininetTools, ParseTools
-
-FLOW_ID_TEMPLATE = 'FLOW_ID_TEMPLATE'
-COOKIE_TEMPLATE = 'COOKIE_TEMPLATE'
-HARD_TO_TEMPLATE = 'HARD_TO_TEMPLATE'
-FLOW_NAME_TEMPLATE = 'FLOW_NAME_TEMPLATE'
-IPV4DST_TEMPLATE = 'IPV4DST_TEMPLATE'
-PRIORITY_TEMPLATE = 'PRIORITY_TEMPLATE'
-
-xml_template = '<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>' \
-'<flow xmlns=\"urn:opendaylight:flow:inventory\">' \
-    '<strict>false</strict>' \
-    '<instructions><instruction><order>0</order><apply-actions><action><order>0</order><dec-nw-ttl/>' \
-    '</action></apply-actions></instruction></instructions><table_id>2</table_id>' \
-    '<id>'+FLOW_ID_TEMPLATE+'</id><cookie_mask>255</cookie_mask><installHw>false</installHw>' \
-    '<match><ethernet-match><ethernet-type><type>2048</type></ethernet-type></ethernet-match>' \
-    '<ipv4-destination>'+IPV4DST_TEMPLATE+'</ipv4-destination></match><hard-timeout>'+HARD_TO_TEMPLATE+'</hard-timeout>' \
-    '<flags>FlowModFlags [_cHECKOVERLAP=false, _rESETCOUNTS=false, _nOPKTCOUNTS=false, _nOBYTCOUNTS=false, _sENDFLOWREM=false]</flags>' \
-    '<cookie>'+COOKIE_TEMPLATE+'</cookie><idle-timeout>34</idle-timeout><flow-name>'+FLOW_NAME_TEMPLATE+'</flow-name><priority>'+PRIORITY_TEMPLATE+'</priority>' \
-    '<barrier>false</barrier></flow>'
-
-
-class Tool():
-
-    @staticmethod
-    def get_flows_string(net=None):
-        if net is None:
-            return []
+import unittest
 
-        switch = net.switches[0]
-        output = switch.cmdPrint(
-        'ovs-ofctl -O OpenFlow13 dump-flows %s' % switch.name)
+from openvswitch.flow_tools import FlowAdderThread, FlowRemoverThread, loglevels, \
+    WAIT_TIME
+from openvswitch.mininet_tools import MininetTools
+from openvswitch.testclass_components import CheckOperFlowsComponent, \
+    CheckConfigFlowsComponent, CheckSwitchDump
 
-        return output.splitlines()[1:]
 
+REMOVE_FLOWS_PER_THREAD = 250
 
 class MultiTest(unittest.TestCase):
 
     log = logging.getLogger('MultiTest')
     total_errors = 0
     total_flows = 0
+    stored_before_test_flows = 0
 
     def setUp(self):
-        MultiTest.log.info('setUp')
+        MultiTest.log.info('test setup...')
         self.threads_count = 50
         self.thread_pool = list()
 
+        self.active_map = dict()
+
         self.__start_MN()
         self.__setup_threads()
         self.__run_threads()
 
+
     def tearDown(self):
-        MultiTest.log.info('tearDown')
-        self.net.stop()
+        MultiTest.log.info('test cleanup...')
+        MultiTest.total_deleted = 0
+        self.__delete_flows()
 
-    @staticmethod
-    def inc_error(value=1):
+    def inc_error(self, value=1):
         MultiTest.total_errors += value
 
-    @staticmethod
-    def inc_flow(value=1):
-        MultiTest.total_flows += 1
+    def inc_flow(self, flow_id= None, cookie_id=None):
+        if flow_id is not None and cookie_id is not None:
+            self.active_map[cookie_id] = flow_id
 
-    def __start_MN(self):
-        wait_time = 15
+    def delete_flow_from_map(self, flow_id, cookie_id):
+       del self.active_map[cookie_id]
 
+    def __start_MN(self):
         self.net = MininetTools.create_network(self.host, self.mn_port)
         self.net.start()
         MultiTest.log.info('mininet stared')
-        MultiTest.log.info('waiting {} seconds...'.format(wait_time))
-        time.sleep(wait_time)
-
+        MultiTest.log.info('waiting {0} seconds...'.format(WAIT_TIME))
+        time.sleep(WAIT_TIME)
+        self.stored_before_test_flows = len(MininetTools.get_flows_string(self.net))
 
     def __setup_threads(self):
-        if args.threads is not None:
-            self.threads_count = int(args.threads)
+        if in_args.threads is not None:
+            self.threads_count = int(in_args.threads)
 
         for i in range(0, self.threads_count):
-            #thread will have predetermined flows id to avoid using shared resource
-            t = FlowAdderThread(i, self.host, self.port, self.net, flows_ids_from=i*MultiTest.flows + 1, flows_ids_to=(i+1)*MultiTest.flows + 1)
+            t = FlowAdderThread(self, i, self.host, self.port, self.net,\
+                                flows_ids_from=i*MultiTest.flows + 1, flows_ids_to=i*MultiTest.flows + MultiTest.flows)
 
             self.thread_pool.append(t)
 
     def __run_threads(self):
-        # start threads
         for t in self.thread_pool:
             t.start()
 
-        # wait for them to finish
         for t in self.thread_pool:
             t.join()
 
-        # collect results
-        #for t in self.thread_pool:
-        #    MultiTest.inc_flow(t.flows)
-        #    MultiTest.inc_error(t.errors)
-
     def test(self):
-
         switch_flows = 0
+        total_flows = len(self.active_map.keys())
 
-        assert MultiTest.total_flows > 0, 'Stored flows should be greater than 0, actual is {}'.format(MultiTest.total_flows)
+        assert total_flows > 0, ('Stored flows should be greater than 0, actual is {0}'.format(total_flows))
 
         MultiTest.log.info('\n\n---------- preparation finished, running test ----------')
+
         # check config
-        url = 'http://%s:%d/restconf/config/opendaylight-inventory:nodes' \
-            '/node/openflow:1/table/2/' % (self.host, self.port)
-        MultiTest.log.info('checking flows in controller - sending request to url: {}'.format(url))
-        response = requests.get(url, auth=('admin', 'admin'),
-                                headers={'Accept': 'application/xml'})
-        assert response.status_code == 200
-
-        tree = ET.ElementTree(ET.fromstring(response.text))
-        flows_on_controller = len(tree.getroot())
-
-        # check operational
-        url = 'http://%s:%d/restconf/operational/opendaylight-inventory:nodes' \
-            '/node/openflow:1/table/2/' % (self.host, self.port)
-        MultiTest.log.info('checking flows in controller - sending request to url: {}'.format(url))
-        response = requests.get(url, auth=('admin', 'admin'),
-                                headers={'Accept': 'application/xml'})
-        #assert response.status_code == 200
-        MultiTest.log.info('got resposnse: {}'.format(response.status_code))
-        MultiTest.log.info('operational dump:\n{}'.format(response.text))
-
-        MultiTest.log.info('{} flows are stored by results from threads, {} errors'.format(MultiTest.total_flows, MultiTest.total_errors))
-        MultiTest.log.info('{} flows are stored in controller config'.format(flows_on_controller))
-
-        switch_flows_list = Tool.get_flows_string(self.net)
-        switch_flows = len(switch_flows_list)
-        MultiTest.log.info('{} flows are stored on switch'.format(switch_flows))
-        MultiTest.log.debug('switch flow-dump:\n{}'.format(switch_flows_list))
-
-
-        assert MultiTest.total_flows == switch_flows, 'Added amount of flows to switch should be equal to successfully added flows to controller {} <> {}'.format(switch_flows,MultiTest.total_flows)
-
-
-class FlowAdderThread(threading.Thread):
-
-    def __init__(self, thread_id, host, port, net, flows_ids_from=0, flows_ids_to=1):
-        threading.Thread.__init__(self)
-        self.thread_id = thread_id
-        self.flows = 0
-        self.errors = 0
-        self.flows_ids_from = flows_ids_from
-        self.flows_ids_to = flows_ids_to
-
-        self.net = net
-        self.host = host
-        self.port = port
-
-        self.log = logging.getLogger('FlowAdderThread: ' + str(thread_id))
-        self.log.propagate = False
-        self.channel = logging.StreamHandler()
-        self.log.addHandler(self.channel)
-        formatter = logging.Formatter('THREAD {}: %(levelname)s: %(message)s'.format(self.thread_id))
-        self.channel.setFormatter(formatter)
-        #self.log.setLevel(logging.INFO)
-
-        self.log.info('created new FlowAdderThread->id:{}, flows id: {} -> {}'.format(self.thread_id, self.flows_ids_from, self.flows_ids_to))
-
-    def make_ipv4_address(self, number, octet_count=4, octet_size=255):
-        mask = 24
-        ip = ['10', '0', '0', '0']
-
-        if number < (255**3):
-            for o in range(1, octet_count):
-                ip[octet_count - 1 - o] = str(number % octet_size)
-                number = number / octet_size
-                #mask -= 8
-                if number == 0:
-                    break
-
-        return '.'.join(ip) + '/{}'.format(mask)
-
-    def __add_flows(self, act_flow_id):
-        try:
-            self.log.info('adding flow id: {}'.format(act_flow_id))
-            self.log.debug('flow ip address from id: {}'.format(self.make_ipv4_address(act_flow_id)))
-
-            xml_string = str(xml_template).replace(FLOW_ID_TEMPLATE, str(act_flow_id)).replace(COOKIE_TEMPLATE, str(act_flow_id))\
-            .replace(HARD_TO_TEMPLATE, '12').replace(FLOW_NAME_TEMPLATE,'FooXf{}'.format(act_flow_id))\
-            .replace(IPV4DST_TEMPLATE,self.make_ipv4_address(act_flow_id)).replace(PRIORITY_TEMPLATE,str(act_flow_id))
-
-            #TestRestartMininet.log.info('loaded xml: {}'.format(''.join(xml_string.split())))
-            tree = md.parseString(xml_string)
-            ids = ParseTools.get_values(tree.documentElement, 'table_id', 'id')
-            data = (self.host, self.port, ids['table_id'], ids['id'])
-
-            url = 'http://%s:%d/restconf/config/opendaylight-inventory:nodes' \
-                  '/node/openflow:1/table/%s/flow/%s' % data
-            # send request via RESTCONF
-            headers = {
-                'Content-Type': 'application/xml',
-                'Accept': 'application/xml',
-            }
-            self.log.debug('sending request to url: {}'.format(url))
-            rsp = requests.put(url, auth=('admin', 'admin'), data=xml_string,
-                               headers=headers)
-            self.log.debug('received status code: {}'.format(rsp.status_code))
-            self.log.debug('received content: {}'.format(rsp.text))
-            assert rsp.status_code == 204 or rsp.status_code == 200, 'Status' \
-                            ' code returned %d' % rsp.status_code
-
-            # check request content against restconf's datastore
-            #response = requests.get(url, auth=('admin', 'admin'),
-            #                        headers={'Accept': 'application/xml'})
-            #assert response.status_code == 200, 'Conifg response should be 200, is {}'.format(response.status_code)
-
-            #switch_flows = Tool.get_flows_string(self.net)
-            #assert len(switch_flows) > 0, 'Flows stored on switch shoul be greater than 0'
-
-            # we expect that controller doesn't fail to store flow on switch
-            self.flows += 1
-            MultiTest.inc_flow()
-            # store last used table id which got flows for later checkup
-            #self.log.debug('{} successfully stored flows - {} flows are on switch'.format(self.flows, len(switch_flows)))
-        except AssertionError as e:
-            self.errors += 1
-            self.log.error('AssertionError storing flow id:{}, reason: {}'.format(act_flow_id, str(e)))
-            MultiTest.inc_error()
-        except Exception as e:
-            self.errors += 1
-            self.log.error('Error storing flow id:{}, reason: {}'.format(act_flow_id, str(e)))
-            MultiTest.inc_error()
-
-    def run(self):
-        self.log.info('started... adding flows {} to {}'.format(self.flows_ids_from, self.flows_ids_to))
-        for i in range(self.flows_ids_from, self.flows_ids_to):
-            self.__add_flows(i)
-
-        self.log.info('finished, successfully added {} flows, {} errors'.format(self.flows,self.errors))
+        flows_on_controller = CheckConfigFlowsComponent().check_config_flows(self.host, self.port, self.active_map)
+
+        #check operational
+        flows_on_controller_operational = CheckOperFlowsComponent().check_oper_flows_loop(self.host, self.port, self.active_map)
+
+        # check switch
+        switch_flows_list = MininetTools.get_flows_string(self.net)
+        MultiTest.log.info('flow dump has {0} entries (including informational)'.format(len(switch_flows_list)))
+        for item in switch_flows_list:
+            if CheckSwitchDump().get_id_by_entry(item, self.active_map) is not None:
+                MultiTest.log.debug('flow_id:{0} = {1}'.format(CheckSwitchDump().get_id_by_entry(item, self.active_map), item))
+                switch_flows += 1
+
+        # print info
+        MultiTest.log.info('{0} flows are stored by results from threads, {1} errors'.format(total_flows, MultiTest.total_errors))
+        MultiTest.log.info('{0} flows are stored in controller config'.format(flows_on_controller))
+        MultiTest.log.info('{0} flows are stored in controller operational'.format(flows_on_controller_operational))
+        MultiTest.log.info('{0} flows are stored on switch'.format(switch_flows))
+
+        assert total_flows == switch_flows, 'Added amount of flows to switch should be equal to successfully added flows to controller {0} <> {1}'.format(switch_flows,total_flows)
+
+    def __delete_flows(self):
+        MultiTest.log.info('deleting flows added during test')
+
+        # using threads to delete to speed up cleanup
+        items_to_delete = list(self.active_map.items())
+        self.thread_pool = []
+        thread_id = 0
+        slice_from = REMOVE_FLOWS_PER_THREAD * thread_id
+        slice_to = REMOVE_FLOWS_PER_THREAD * (thread_id + 1)
+
+        while(slice_from < len(items_to_delete)):
+            self.thread_pool.append(FlowRemoverThread(self, thread_id, self.host, self.port, self.net, items_to_delete[slice_from:slice_to]))
+            thread_id += 1
+            slice_from = REMOVE_FLOWS_PER_THREAD * thread_id
+            slice_to = REMOVE_FLOWS_PER_THREAD * (thread_id + 1)
+
+        for t in self.thread_pool:
+            t.start()
+
+        for t in self.thread_pool:
+            t.join()
 
 
 if __name__ == '__main__':
-    logging.basicConfig(level=logging.INFO)
+
+    requests_log = logging.getLogger("requests")
+    requests_log.setLevel(logging.WARNING)
 
     # parse cmdline arguments
-    parser = argparse.ArgumentParser(description='Test for flow addition to'
-                        ' switch after the switch has been restarted')
+    parser = argparse.ArgumentParser(description='End to end stress tests of flows '
+                        'addition from multiple connections')
     parser.add_argument('--odlhost', default='127.0.0.1', help='host where '
-                        'odl controller is running')
+                        'odl controller is running  (default = 127.0.0.1) ')
     parser.add_argument('--odlport', type=int, default=8080, help='port on '
-                        'which odl\'s RESTCONF is listening')
-    parser.add_argument('--mnport', type=int, default=6653, help='port on '
-                        'which odl\'s controller is listening')
-    parser.add_argument('--xmls', default=None, help='generete tests only '
-                        'from some xmls (i.e. 1,3,34) ')
+                        'which odl\'s RESTCONF is listening  (default = 8080) ')
+    parser.add_argument('--mnport', type=int, default=6633, help='port on '
+                        'which odl\'s controller is listening  (default = 6633)')
     parser.add_argument('--threads', default=50, help='how many threads '
-                        'should be used')
+                        'should be used  (default = 50)')
     parser.add_argument('--flows', default=20, help='how many flows will add'
-                        ' one thread')
-    args = parser.parse_args()
+                        ' one thread  (default = 20)')
+    parser.add_argument('--log', default='info', help='log level, permitted values are'
+                        ' debug/info/warning/error  (default = info)')
+#     parser.add_argument('--xmls', default=None, help='generete tests only '
+#                         'from some xmls (i.e. 1,3,34)  (default = None)')
+    in_args = parser.parse_args()
+
+    #logging.basicConfig(level=logging.DEBUG)
+    logging.basicConfig(level=loglevels.get(in_args.log, logging.INFO))
 
     # set host and port of ODL controller for test cases
-    MultiTest.port = args.odlport
-    MultiTest.host = args.odlhost
-    MultiTest.mn_port = args.mnport
-    MultiTest.threads = int(args.threads)
-    MultiTest.flows = int(args.flows)
+    MultiTest.port = in_args.odlport
+    MultiTest.host = in_args.odlhost
+    MultiTest.mn_port = in_args.mnport
+    MultiTest.threads = int(in_args.threads)
+    MultiTest.flows = int(in_args.flows)
 
     del sys.argv[1:]
-    unittest.main()
+
+    suite = unittest.TestSuite()
+    test = MultiTest('test')
+    suite.addTest(test)
+
+    try:
+        unittest.TextTestRunner(verbosity=2).run(suite)
+    finally:
+        test.net.stop()