Added flow and group NSF.
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / FlowConsumerImpl.java
index 59c7e043de823a83a4f6b0318c2d25650bd4b384..7545203f06d4005f39086fe290fd8c28c44226f3 100644 (file)
@@ -1,31 +1,42 @@
 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.clustering.services.CacheConfigException;
+import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
+import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
 import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.IContainer;
+import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.controller.sal.utils.ServiceHelper;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.ISwitchManager;
+import org.opendaylight.controller.switchmanager.Switch;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
-
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
@@ -34,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -44,76 +54,126 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class FlowConsumerImpl {
-       protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
-       private FlowEventListener flowEventListener = new FlowEventListener();
+    protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
+    private FlowEventListener flowEventListener = new FlowEventListener();
     private Registration<NotificationListener> listener1Reg;
-       private SalFlowService flowService;
-       private FlowDataListener listener;
-       private FlowDataCommitHandler commitHandler;
-       private ConcurrentHashMap<FlowKey, Flow> originalSwView;
-       
-    public FlowConsumerImpl() {        
-               InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
-       flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
-               
-               if (null == flowService) {
-                       logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
-               System.out.println("Consumer SAL Service is down or NULL.");
-               return;
-               }
-               
-               listener = new FlowDataListener();
-               
-               if (null == FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path, listener)) {
-                       logger.error("Failed to listen on flow data modifcation events");
-               System.out.println("Consumer SAL Service is down or NULL.");
-               return;
-               }       
-                       
-               // For switch events
-               listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
-               
-               if (null == listener1Reg) {
-                       logger.error("Listener to listen on flow data modifcation events");
-               System.out.println("Consumer SAL Service is down or NULL.");
-               return;
-               }
-               //addFlowTest();
-               System.out.println("-------------------------------------------------------------------");
-               allocateCaches();
-               commitHandler = new FlowDataCommitHandler();
-               FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
+    private SalFlowService flowService;
+    // private FlowDataListener listener;
+    private FlowDataCommitHandler commitHandler;
+    private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
+    private static ConcurrentMap<FlowKey, Flow> installedSwView;
+    private IClusterContainerServices clusterContainerService = null;
+    private IContainer container;
+    private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
+    private static ConcurrentMap<Integer, Flow> staticFlows;
+    private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal;
+    /*
+     * Inactive flow list. This is for the global instance of FRM It will
+     * contain all the flow entries which were installed on the global container
+     * when the first container is created.
+     */
+    private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
+
+    /*
+     * /* Per node indexing
+     */
+    private static ConcurrentMap<Node, List<Flow>> nodeFlows;
+    private boolean inContainerMode; // being used by global instance only
+
+    public FlowConsumerImpl() {
+        InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
+        flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
+
+        if (null == flowService) {
+            logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
+            System.out.println("Consumer SAL Service is down or NULL.");
+            return;
+        }
+
+        // listener = new FlowDataListener();
+
+        // if (null ==
+        // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
+        // listener)) {
+        // logger.error("Failed to listen on flow data modifcation events");
+        // System.out.println("Consumer SAL Service is down or NULL.");
+        // return;
+        // }
+
+        // For switch events
+        listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
+
+        if (null == listener1Reg) {
+            logger.error("Listener to listen on flow data modifcation events");
+            System.out.println("Consumer SAL Service is down or NULL.");
+            return;
+        }
+        // addFlowTest();
+        System.out.println("-------------------------------------------------------------------");
+        allocateCaches();
+        commitHandler = new FlowDataCommitHandler();
+        FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
+        clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
+                IClusterContainerServices.class, this);
+        container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
+        /*
+         * If we are not the first cluster node to come up, do not initialize
+         * the static flow entries ordinal
+         */
+        if (staticFlowsOrdinal.size() == 0) {
+            staticFlowsOrdinal.put(0, Integer.valueOf(0));
+        }
     }
-    
+
     private void allocateCaches() {
-       originalSwView = new ConcurrentHashMap<FlowKey, Flow>();
+
+        if (this.clusterContainerService == null) {
+            logger.warn("Un-initialized clusterContainerService, can't create cache");
+            return;
+        }
+
+        try {
+            clusterContainerService.createCache("frm.originalSwView",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            clusterContainerService.createCache("frm.installedSwView",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            clusterContainerService
+                    .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            clusterContainerService.createCache("frm.staticFlowsOrdinal",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            clusterContainerService.createCache("frm.inactiveFlows",
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+        } catch (CacheConfigException cce) {
+            logger.error("CacheConfigException");
+        } catch (CacheExistException cce) {
+            logger.error("CacheExistException");
+        }
     }
-    
-    private void addFlowTest()
-    {
-       try {
-                       NodeRef nodeOne = createNodeRef("foo:node:1");
-                       AddFlowInputBuilder input1 = new AddFlowInputBuilder();
-                       
-                       input1.setNode(nodeOne);
-                       AddFlowInput firstMsg = input1.build();
-                       
-                       if(null != flowService) {
-                               System.out.println(flowService.toString());
-                       }
-                       else
-                       {
-                               System.out.println("ConsumerFlowService is NULL");
-                       }
-                       @SuppressWarnings("unused")
-                       Future<RpcResult<java.lang.Void>> result1 = flowService.addFlow(firstMsg);
-                       
-                       
-               } catch (Exception e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               }
+
+    private void addFlowTest() {
+        try {
+            NodeRef nodeOne = createNodeRef("foo:node:1");
+            AddFlowInputBuilder input1 = new AddFlowInputBuilder();
+
+            input1.setNode(nodeOne);
+            AddFlowInput firstMsg = input1.build();
+
+            if (null != flowService) {
+                System.out.println(flowService.toString());
+            } else {
+                System.out.println("ConsumerFlowService is NULL");
+            }
+            @SuppressWarnings("unused")
+            Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
+
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
     }
+
     /**
      * Adds flow to the southbound plugin and our internal database
      *
@@ -129,44 +189,87 @@ public class FlowConsumerImpl {
         input.setMatch((dataObject).getMatch());
         input.setCookie((dataObject).getCookie());
         input.setInstructions((dataObject).getInstructions());
-        dataObject.getMatch().getLayer3Match()
-        for (int i=0;i<inst.size();i++) {
-            System.out.println("i = "+ i + inst.get(i).getInstruction().toString());
-            System.out.println("i = "+ i + inst.get(i).toString());
+        dataObject.getMatch().getLayer3Match();
+        for (int i = 0; i < inst.size(); i++) {
+            System.out.println("i = " + i + inst.get(i).getInstruction().toString());
+            System.out.println("i = " + i + inst.get(i).toString());
         }
-        
+
         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
 
+        // updating the staticflow cache
+        Integer ordinal = staticFlowsOrdinal.get(0);
+        staticFlowsOrdinal.put(0, ++ordinal);
+        staticFlows.put(ordinal, (Flow) dataObject);
+
         // We send flow to the sounthbound plugin
         flowService.addFlow(input.build());
+        updateLocalDatabase((NodeFlow) dataObject, true);
     }
-    
+
+    /**
+     * Removes flow to the southbound plugin and our internal database
+     *
+     * @param path
+     * @param dataObject
+     */
+    private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
+
+        RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
+        List<Instruction> inst = (dataObject).getInstructions().getInstruction();
+        input.setNode((dataObject).getNode());
+        input.setPriority((dataObject).getPriority());
+        input.setMatch((dataObject).getMatch());
+        input.setCookie((dataObject).getCookie());
+        input.setInstructions((dataObject).getInstructions());
+        dataObject.getMatch().getLayer3Match();
+        for (int i = 0; i < inst.size(); i++) {
+            System.out.println("i = " + i + inst.get(i).getInstruction().toString());
+            System.out.println("i = " + i + inst.get(i).toString());
+        }
+
+        System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
+
+        // updating the staticflow cache
+        Integer ordinal = staticFlowsOrdinal.get(0);
+        staticFlowsOrdinal.put(0, ++ordinal);
+        staticFlows.put(ordinal, dataObject);
+
+        // We send flow to the sounthbound plugin
+        flowService.removeFlow(input.build());
+        updateLocalDatabase((NodeFlow) dataObject, false);
+    }
+
+    @SuppressWarnings("unchecked")
     private void commitToPlugin(internalTransaction transaction) {
-        for(Entry<InstanceIdentifier<?>, Flow> entry :transaction.additions.entrySet()) {
+        for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
             System.out.println("Coming add cc in FlowDatacommitHandler");
-            addFlow(entry.getKey(),entry.getValue());
+            addFlow(entry.getKey(), entry.getValue());
         }
-        for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Flow> entry :transaction.updates.entrySet()) {
+        for (@SuppressWarnings("unused")
+        Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
             System.out.println("Coming update cc in FlowDatacommitHandler");
-           // updateFlow(entry.getKey(),entry.getValue());
+            // updateFlow(entry.getKey(),entry.getValue());
         }
-        
-        for(@SuppressWarnings("unused") InstanceIdentifier<?> removal : transaction.removals) {
-           // removeFlow(removal);
+
+        for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
+            System.out.println("Coming remove cc in FlowDatacommitHandler");
+            removeFlow(entry.getKey(), entry.getValue());
         }
+
     }
-    
+
     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
 
-        @SuppressWarnings("unchecked")
-               @Override
-         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
-             // We should verify transaction
-                System.out.println("Coming in FlowDatacommitHandler");
-             internalTransaction transaction = new internalTransaction(modification);
-             transaction.prepareUpdate();
-             return transaction;
-         }
+        @SuppressWarnings("unchecked")
+        @Override
+        public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
+            // We should verify transaction
+            System.out.println("Coming in FlowDatacommitHandler");
+            internalTransaction transaction = new internalTransaction(modification);
+            transaction.prepareUpdate();
+            return transaction;
+        }
     }
 
     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
@@ -184,17 +287,47 @@ public class FlowConsumerImpl {
 
         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
-        Set<InstanceIdentifier<?>> removals = new HashSet<>();
+        Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
 
         /**
          * We create a plan which flows will be added, which will be updated and
          * which will be removed based on our internal state.
-         * 
+         *
          */
         void prepareUpdate() {
 
             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
+
+                // validating the DataObject
+
+                Status status = validate(container, (NodeFlow) entry);
+                if (!status.isSuccess()) {
+                    logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
+                    String error = "Invalid Configuration (" + status.getDescription() + ")";
+                    logger.error(error);
+                    return;
+                }
+                // Presence check
+                if (flowEntryExists((NodeFlow) entry)) {
+                    String error = "Entry with this name on specified table already exists";
+                    logger.warn("Entry with this name on specified table already exists: {}", entry);
+                    logger.error(error);
+                    return;
+                }
+                if (originalSwView.containsKey((FlowKey) entry)) {
+                    logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
+                    logger.trace("Aborting to install {}", entry);
+                    continue;
+                }
+                if (!FRMUtil.validateMatch((NodeFlow) entry)) {
+                    logger.error("Not a valid Match");
+                    return;
+                }
+                if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
+                    logger.error("Not a valid Instruction");
+                    return;
+                }
                 if (entry.getValue() instanceof Flow) {
                     Flow flow = (Flow) entry.getValue();
                     preparePutEntry(entry.getKey(), flow);
@@ -202,7 +335,15 @@ public class FlowConsumerImpl {
 
             }
 
-            removals = modification.getRemovedConfigurationData();
+            // removals = modification.getRemovedConfigurationData();
+            Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
+            for (InstanceIdentifier<?> removal : removedData) {
+                DataObject value = modification.getOriginalConfigurationData().get(removal);
+                if (value instanceof Flow) {
+                    removals.put(removal, (Flow) value);
+                }
+            }
+
         }
 
         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
@@ -220,93 +361,200 @@ public class FlowConsumerImpl {
 
         /**
          * We are OK to go with execution of plan
-         * 
+         *
          */
         @Override
         public RpcResult<Void> finish() throws IllegalStateException {
-            
+
             commitToPlugin(this);
             // We return true if internal transaction is successful.
-          //  return Rpcs.getRpcResult(true, null, Collections.emptySet());
-               return Rpcs.getRpcResult(true, null, null);
+            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+            return Rpcs.getRpcResult(true, null, null);
         }
 
         /**
-         * 
+         *
          * We should rollback our preparation
-         * 
+         *
          */
         @Override
         public RpcResult<Void> rollback() throws IllegalStateException {
             // NOOP - we did not modified any internal state during
             // requestCommit phase
-           // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
             return Rpcs.getRpcResult(true, null, null);
-               
+
         }
 
+        public Status validate(IContainer container, NodeFlow dataObject) {
+
+            // container validation
+            Switch sw = null;
+            Node node = null;
+            String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
+            ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
+                    containerName, this);
+            // flow Name validation
+            if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
+                    || !dataObject.getFlowName().matches(NAMEREGEX)) {
+                return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
+            }
+            // Node Validation
+            if (dataObject.getNode() == null) {
+                return new Status(StatusCode.BADREQUEST, "Node is null");
+            }
+
+            if (switchManager != null) {
+                for (Switch device : switchManager.getNetworkDevices()) {
+                    node = (Node) device.getNode();
+                    if (device.getNode().equals(dataObject.getNode())) {
+                        sw = device;
+                        break;
+                    }
+                }
+                if (sw == null) {
+                    return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
+                }
+            } else {
+                logger.debug("switchmanager is not set yet");
+            }
+
+            if (dataObject.getPriority() != null) {
+                if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
+                    return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
+                            dataObject.getPriority()));
+                }
+            }
+
+            return new Status(StatusCode.SUCCESS);
+        }
+
+        private boolean flowEntryExists(NodeFlow config) {
+            // Flow name has to be unique on per table id basis
+            for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
+                if (entry.getValue().getFlowName().equals(config.getFlowName())
+                        && entry.getValue().getTableId().equals(config.getTableId())) {
+                    return true;
+                }
+            }
+            return false;
+        }
     }
-    
-       final class FlowEventListener implements SalFlowListener {
-       
+
+    final class FlowEventListener implements SalFlowListener {
+
         List<FlowAdded> addedFlows = new ArrayList<>();
         List<FlowRemoved> removedFlows = new ArrayList<>();
         List<FlowUpdated> updatedFlows = new ArrayList<>();
-       
+
         @Override
         public void onFlowAdded(FlowAdded notification) {
-               System.out.println("added flow..........................");
-        addedFlows.add(notification);
-           }
-       
-           @Override
-           public void onFlowRemoved(FlowRemoved notification) {
-               removedFlows.add(notification);
-           };
-       
-           @Override
-           public void onFlowUpdated(FlowUpdated notification) {
-               updatedFlows.add(notification);
-           }
-       
-       }
-       
-       final class FlowDataListener implements DataChangeListener {
-               private SalFlowService flowService;
-               
-               public FlowDataListener() {
-                       
-               }
-               
-               @Override
-               public void onDataChanged(
-                               DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {                    
-                       System.out.println("Coming in onDataChange..............");
-                       @SuppressWarnings("unchecked")
-                       Collection<DataObject> additions = (Collection<DataObject>) change.getCreatedConfigurationData();
-                       // we can check for getCreated, getDeleted or getUpdated from DataChange Event class
-                       for (DataObject dataObject : additions) {
-                           if (dataObject instanceof NodeFlow) {
-                               NodeRef nodeOne = createNodeRef("foo:node:1");
-                                       // validating the dataObject here                               
-                                   AddFlowInputBuilder input = new AddFlowInputBuilder();
-                                   input.setNode(((NodeFlow) dataObject).getNode());
-                                   input.setNode(nodeOne);
-                                         //  input.setPriority(((NodeFlow) dataObject).getPriority());
-                                       //input.setMatch(((NodeFlow) dataObject).getMatch());
-                                       //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
-                                       //input.setCookie(((NodeFlow) dataObject).getCookie());
-                                       //input.setAction(((NodeFlow) dataObject).getAction());
-       
-                               @SuppressWarnings("unused")
-                                       Future<RpcResult<java.lang.Void>> result = flowService.addFlow(input.build());
-                   }
-                       }       
-               } 
-       }
-                               
-           
-           
+            System.out.println("added flow..........................");
+            addedFlows.add(notification);
+        }
+
+        @Override
+        public void onFlowRemoved(FlowRemoved notification) {
+            removedFlows.add(notification);
+        };
+
+        @Override
+        public void onFlowUpdated(FlowUpdated notification) {
+            updatedFlows.add(notification);
+        }
+
+    }
+
+    // Commented out DataChangeListene - to be used by Stats
+
+    // final class FlowDataListener implements DataChangeListener {
+    // private SalFlowService flowService;
+    //
+    // public FlowDataListener() {
+    //
+    // }
+    //
+    // @Override
+    // public void onDataChanged(
+    // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+    // System.out.println("Coming in onDataChange..............");
+    // @SuppressWarnings("unchecked")
+    // Collection<DataObject> additions = (Collection<DataObject>)
+    // change.getCreatedConfigurationData();
+    // // we can check for getCreated, getDeleted or getUpdated from DataChange
+    // Event class
+    // for (DataObject dataObject : additions) {
+    // if (dataObject instanceof NodeFlow) {
+    // NodeRef nodeOne = createNodeRef("foo:node:1");
+    // // validating the dataObject here
+    // AddFlowInputBuilder input = new AddFlowInputBuilder();
+    // input.setNode(((NodeFlow) dataObject).getNode());
+    // input.setNode(nodeOne);
+    // // input.setPriority(((NodeFlow) dataObject).getPriority());
+    // //input.setMatch(((NodeFlow) dataObject).getMatch());
+    // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
+    // //input.setCookie(((NodeFlow) dataObject).getCookie());
+    // //input.setAction(((NodeFlow) dataObject).getAction());
+    //
+    // @SuppressWarnings("unused")
+    // Future<RpcResult<java.lang.Void>> result =
+    // flowService.addFlow(input.build());
+    // }
+    // }
+    // }
+    // }
+
+    private static void updateLocalDatabase(NodeFlow entry, boolean add) {
+
+        updateSwViewes(entry, add);
+
+        updateNodeFlowsDB(entry, add);
+
+    }
+
+    /*
+     * Update the node mapped flows database
+     */
+    private static void updateSwViewes(NodeFlow entry, boolean add) {
+        if (add) {
+            FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
+            installedSwView.put((FlowKey) entry, (Flow) entry);
+        } else {
+            originalSwView.remove((Flow) entry);
+            installedSwView.remove((FlowKey) entry);
+
+        }
+    }
+
+    /*
+     * Update the node mapped flows database
+     */
+    private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
+        Node node = (Node) entry.getNode();
+
+        List<Flow> nodeIndeces = nodeFlows.get(node);
+        if (nodeIndeces == null) {
+            if (!add) {
+                return;
+            } else {
+                nodeIndeces = new ArrayList<Flow>();
+            }
+        }
+
+        if (add) {
+            nodeIndeces.add((Flow) entry);
+        } else {
+            nodeIndeces.remove((Flow) entry);
+        }
+
+        // Update cache across cluster
+        if (nodeIndeces.isEmpty()) {
+            nodeFlows.remove(node);
+        } else {
+            nodeFlows.put(node, nodeIndeces);
+        }
+    }
+
     private static NodeRef createNodeRef(String string) {
         NodeKey key = new NodeKey(new NodeId(string));
         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
@@ -314,7 +562,4 @@ public class FlowConsumerImpl {
 
         return new NodeRef(path);
     }
-           
-       
-
 }