Step 2: Move test folder to root
[integration/test.git] / tools / CSIT_Test / base / modules / forwarding_rule_manager.py
diff --git a/tools/CSIT_Test/base/modules/forwarding_rule_manager.py b/tools/CSIT_Test/base/modules/forwarding_rule_manager.py
new file mode 100644 (file)
index 0000000..2874db9
--- /dev/null
@@ -0,0 +1,67 @@
+"""
+CSIT test tools.
+Authors: Baohua Yang@IBM, Denghui Huang@IBM
+Updated: 2013-11-05
+"""
+
+import sys
+
+sys.path.append('..')
+from restlib import *  # noqa
+from testmodule import TestModule
+
+sys.path.remove('..')
+
+
+class ForwardingRuleManager(TestModule):
+    """
+    Test for the forwarding rule manager.
+    Start 2-layer tree topology network. e.g., in Mininet, run
+        'sudo mn --controller=remote,ip=127.0.0.1 --mac --topo tree,2'
+    """
+
+    def __init__(self, restSubContext='/controller/nb/v2/flowprogrammer', user=DEFAULT_USER, password=DEFAULT_PWD,
+                 container=DEFAULT_CONTAINER, contentType='json', prefix=DEFAULT_PREFIX):
+        super(self.__class__, self).__init__(restSubContext, user, password, container, contentType, prefix)
+
+    def get_flows(self):
+        """
+        The name is suggested to match the NB API.
+        Show the flows
+        """
+        return super(self.__class__, self).get_entries('')
+
+    def add_flow_to_node(self, node_type, node_id, name, body):
+        suffix = 'node/' + node_type + '/' + node_id + '/staticFlow'
+        super(self.__class__, self).add_entry(suffix, name, body)
+
+    def remove_flow_from_node(self, node_type, node_id, name):
+        suffix = 'node/' + node_type + '/' + node_id + '/staticFlow'
+        super(self.__class__, self).remove_entry(suffix, name)
+
+    def test_flow_operations(self, node_type, node_id, name, body):
+        """
+        Test the add,remove,show actions on flows.
+        >>> body = {'installInHw':'true','name':'flow1','node':{'id':'00:00:00:00:00:00:00:02','type':'OF'},
+                'priority':'1','etherType':'0x800','nwDst':'10.0.0.1/32','actions':['OUTPUT=1']}
+        >>> ForwardingRuleManager().test_flow_operations('OF','00:00:00:00:00:00:00:02','flow1',body)
+        True
+        >>> body = {'installInHw':'true','name':'flow2','node':{'id':'00:00:00:00:00:00:00:02','type':'OF'},
+                'priority':'1','etherType':'0x800','nwDst':'10.0.0.2/32','actions':['OUTPUT=2']}
+        >>> ForwardingRuleManager().test_flow_operations('OF','00:00:00:00:00:00:00:02','flow2',body)
+        True
+        """
+        result = []
+        # current flow table should be empty.
+        r = self.get_flows()
+        result.append(body not in r['flowConfig'])
+        # Add a flow
+        self.add_flow_to_node(node_type, node_id, name, body)
+        r = self.get_flows()
+        result.append(body in r['flowConfig'])
+        # Remove the flow and test if succeed
+        if result == [True, True]:
+            self.remove_flow_from_node(node_type, node_id, name)
+            r = self.get_flows()
+            result.append(body not in r['flowConfig'])
+        return result == [True, True, True]