Fix bug in FRM
[controller.git] / opendaylight / forwardingrulesmanager / implementation / src / main / java / org / opendaylight / controller / forwardingrulesmanager / internal / ForwardingRulesManager.java
index 9c2afe42be0860df310516ff2f5b85ec1dd0261f..c19820bcf94d05c6a82d54f8e546007cc720123a 100644 (file)
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.eclipse.osgi.framework.console.CommandInterpreter;
@@ -39,7 +38,6 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.connectionmanager.ConnectionLocality;
 import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
@@ -57,10 +55,11 @@ import org.opendaylight.controller.sal.action.Controller;
 import org.opendaylight.controller.sal.action.Flood;
 import org.opendaylight.controller.sal.action.Output;
 import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
 import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
@@ -98,7 +97,7 @@ import org.slf4j.LoggerFactory;
 public class ForwardingRulesManager implements
         IForwardingRulesManager,
         PortGroupChangeListener,
-        IContainerListener,
+        IContainerLocalListener,
         ISwitchManagerAware,
         IConfigurationContainerAware,
         IInventoryListener,
@@ -119,7 +118,7 @@ public class ForwardingRulesManager implements
     private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
     private ConcurrentMap<String, Object> TSPolicies;
     private boolean inContainerMode; // being used by global instance only
-    private boolean stopping;
+    protected boolean stopping;
 
     /*
      * Flow database. It's the software view of what was requested to install
@@ -181,7 +180,7 @@ public class ForwardingRulesManager implements
      * not picked by anyone, which is always a case can happen especially on
      * Node disconnect cases.
      */
-    private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+    protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
 
     /*
      * Data structure responsible for retrieving the results of the workOrder
@@ -194,7 +193,7 @@ public class ForwardingRulesManager implements
      * TODO: The workStatus entries need to have a lifetime associated in case
      * of requestor controller leaving the cluster.
      */
-    private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+    protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
 
     /*
      * Local Map used to hold the Future which a caller can use to monitor for
@@ -783,6 +782,16 @@ public class ForwardingRulesManager implements
         return true;
     }
 
+    private ConcurrentMap.Entry<Integer, FlowConfig> getStaticFlowEntry(String name, Node node) {
+        for (ConcurrentMap.Entry<Integer, FlowConfig> flowEntry : staticFlows.entrySet()) {
+            FlowConfig flowConfig = flowEntry.getValue();
+            if (flowConfig.isByNameAndNodeIdEqual(name, node)) {
+                return flowEntry;
+            }
+        }
+        return null;
+    }
+
     private void updateLocalDatabase(FlowEntryInstall entry, boolean add) {
         // Update the software view
         updateSwViewes(entry, add);
@@ -1117,7 +1126,7 @@ public class ForwardingRulesManager implements
      * merged flow may conflict with an existing old container flows merged flow
      * on the network node
      */
-    private void updateFlowsContainerFlow() {
+    protected void updateFlowsContainerFlow() {
         Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
         // First remove all installed entries
         for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
@@ -1357,9 +1366,6 @@ public class ForwardingRulesManager implements
             clusterContainerService.createCache("frm.staticFlows",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
-            clusterContainerService.createCache("frm.flowsSaveEvent",
-                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
             clusterContainerService.createCache("frm.staticFlowsOrdinal",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
@@ -1373,10 +1379,10 @@ public class ForwardingRulesManager implements
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
             clusterContainerService.createCache(WORKSTATUSCACHE,
EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
                   EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
 
             clusterContainerService.createCache(WORKORDERCACHE,
EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
                   EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
 
         } catch (CacheConfigException cce) {
             log.error("CacheConfigException");
@@ -1876,7 +1882,9 @@ public class ForwardingRulesManager implements
      * If requested, a copy of each original flow entry will be stored in the
      * inactive list so that it can be re-applied when needed (This is typically
      * the case when running in the default container and controller moved to
-     * container mode)
+     * container mode) NOTE WELL: The routine as long as does a bulk change will
+     * operate only on the entries for nodes locally attached so to avoid
+     * redundant operations initiated by multiple nodes
      *
      * @param preserveFlowEntries
      *            if true, a copy of each original entry is stored in the
@@ -1902,9 +1910,15 @@ public class ForwardingRulesManager implements
 
         // Now remove the entries
         for (FlowEntryInstall flowEntryHw : toRemove) {
-            Status status = this.removeEntryInternal(flowEntryHw, false);
-            if (!status.isSuccess()) {
-                log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+            Node n = flowEntryHw.getNode();
+            if (n != null && connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
+                Status status = this.removeEntryInternal(flowEntryHw, false);
+                if (!status.isSuccess()) {
+                    log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+                }
+            } else {
+                log.debug("Not removing entry {} because not connected locally, the remote guy will do it's job",
+                        flowEntryHw);
             }
         }
     }
@@ -1947,10 +1961,9 @@ public class ForwardingRulesManager implements
 
     @Override
     public FlowConfig getStaticFlow(String name, Node node) {
-        for (ConcurrentMap.Entry<Integer, FlowConfig> entry : staticFlows.entrySet()) {
-            if (entry.getValue().isByNameAndNodeIdEqual(name, node)) {
-                return entry.getValue();
-            }
+        ConcurrentMap.Entry<Integer, FlowConfig> entry = getStaticFlowEntry(name, node);
+        if(entry != null) {
+            return entry.getValue();
         }
         return null;
     }
@@ -2508,6 +2521,7 @@ public class ForwardingRulesManager implements
                             log.warn("Dequeued null event");
                             continue;
                         }
+                        log.trace("Dequeued {} event", event.getClass().getSimpleName());
                         if (event instanceof NodeUpdateEvent) {
                             NodeUpdateEvent update = (NodeUpdateEvent) event;
                             Node node = update.getNode();
@@ -2534,7 +2548,7 @@ public class ForwardingRulesManager implements
                                 logsync.trace("Executing the workOrder {}", fe);
                                 Status gotStatus = null;
                                 FlowEntryInstall feiCurrent = fe.getEntry();
-                                FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
+                                FlowEntryInstall feiNew = workOrder.get(fe);
                                 switch (fe.getUpType()) {
                                 case ADDED:
                                     /*
@@ -2641,6 +2655,12 @@ public class ForwardingRulesManager implements
         uninstallAllFlowEntries(false);
         // Shutdown executor
         this.executor.shutdownNow();
+        // Now walk all the workMonitor and wake up the one sleeping because
+        // destruction is happening
+        for (FlowEntryDistributionOrder fe : workMonitor.keySet()) {
+            FlowEntryDistributionOrderFutureTask task = workMonitor.get(fe);
+            task.cancel(true);
+        }
     }
 
     public void setFlowProgrammerService(IFlowProgrammerService service) {
@@ -3032,6 +3052,36 @@ public class ForwardingRulesManager implements
         }
     }
 
+    public void _frmProcessErrorEvent(CommandInterpreter ci) throws UnknownHostException {
+        Node node = null;
+        long reqId = 0L;
+        String nodeId = ci.nextArgument();
+        if (nodeId == null) {
+            ci.print("Node id not specified");
+            return;
+        }
+        String requestId = ci.nextArgument();
+        if (requestId == null) {
+            ci.print("Request id not specified");
+            return;
+        }
+        try {
+            node = NodeCreator.createOFNode(Long.valueOf(nodeId));
+        } catch (NumberFormatException e) {
+            ci.print("Node id not a number");
+            return;
+        }
+        try {
+            reqId = Long.parseLong(requestId);
+        } catch (NumberFormatException e) {
+            ci.print("Request id not a number");
+            return;
+        }
+        // null for error object is good enough for now
+        ErrorReportedEvent event = new ErrorReportedEvent(reqId, node, null);
+        this.processErrorEvent(event);
+    }
+
     @Override
     public void flowRemoved(Node node, Flow flow) {
         log.trace("Received flow removed notification on {} for {}", node, flow);
@@ -3101,6 +3151,15 @@ public class ForwardingRulesManager implements
         if (target != null) {
             // This was a flow install, update database
             this.updateLocalDatabase(target, false);
+            // also update the config
+            if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) {
+                ConcurrentMap.Entry<Integer, FlowConfig> staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode());
+                // staticFlowEntry should never be null.
+                // the null check is just an extra defensive check.
+                if(staticFlowEntry != null) {
+                    staticFlows.remove(staticFlowEntry.getKey());
+                }
+            }
         }
 
         // Notify listeners