Container Manager to expose inContainerMode()
[controller.git] / opendaylight / forwardingrulesmanager / implementation / src / main / java / org / opendaylight / controller / forwardingrulesmanager / internal / ForwardingRulesManager.java
index a77224af8a172ed396addb3dbeab3c2dd8939361..45973fe167356b470b97f6bc7d287f59af769d31 100644 (file)
@@ -38,8 +38,8 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.sal.connection.ConnectionLocality;
 import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.containermanager.IContainerManager;
 import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
@@ -56,6 +56,7 @@ import org.opendaylight.controller.sal.action.Controller;
 import org.opendaylight.controller.sal.action.Flood;
 import org.opendaylight.controller.sal.action.Output;
 import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
 import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainer;
@@ -117,8 +118,9 @@ public class ForwardingRulesManager implements
     private ConcurrentMap<String, PortGroupConfig> portGroupConfigs;
     private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
     private ConcurrentMap<String, Object> TSPolicies;
+    private IContainerManager containerManager;
     private boolean inContainerMode; // being used by global instance only
-    private boolean stopping;
+    protected boolean stopping;
 
     /*
      * Flow database. It's the software view of what was requested to install
@@ -180,7 +182,7 @@ public class ForwardingRulesManager implements
      * not picked by anyone, which is always a case can happen especially on
      * Node disconnect cases.
      */
-    private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+    protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
 
     /*
      * Data structure responsible for retrieving the results of the workOrder
@@ -193,7 +195,7 @@ public class ForwardingRulesManager implements
      * TODO: The workStatus entries need to have a lifetime associated in case
      * of requestor controller leaving the cluster.
      */
-    private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+    protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
 
     /*
      * Local Map used to hold the Future which a caller can use to monitor for
@@ -202,6 +204,11 @@ public class ForwardingRulesManager implements
     private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
             new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
 
+    /*
+     * Max pool size for the executor
+     */
+    private static final int maxPoolSize = 10;
+
     /**
      * @param e
      *            Entry being installed/updated/removed
@@ -1126,7 +1133,7 @@ public class ForwardingRulesManager implements
      * merged flow may conflict with an existing old container flows merged flow
      * on the network node
      */
-    private void updateFlowsContainerFlow() {
+    protected void updateFlowsContainerFlow() {
         Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
         // First remove all installed entries
         for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
@@ -1379,10 +1386,10 @@ public class ForwardingRulesManager implements
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
             clusterContainerService.createCache(WORKSTATUSCACHE,
EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
                   EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
 
             clusterContainerService.createCache(WORKORDERCACHE,
EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
                   EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
 
         } catch (CacheConfigException cce) {
             log.error("CacheConfigException");
@@ -1882,7 +1889,9 @@ public class ForwardingRulesManager implements
      * If requested, a copy of each original flow entry will be stored in the
      * inactive list so that it can be re-applied when needed (This is typically
      * the case when running in the default container and controller moved to
-     * container mode)
+     * container mode) NOTE WELL: The routine as long as does a bulk change will
+     * operate only on the entries for nodes locally attached so to avoid
+     * redundant operations initiated by multiple nodes
      *
      * @param preserveFlowEntries
      *            if true, a copy of each original entry is stored in the
@@ -1908,9 +1917,15 @@ public class ForwardingRulesManager implements
 
         // Now remove the entries
         for (FlowEntryInstall flowEntryHw : toRemove) {
-            Status status = this.removeEntryInternal(flowEntryHw, false);
-            if (!status.isSuccess()) {
-                log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+            Node n = flowEntryHw.getNode();
+            if (n != null && connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
+                Status status = this.removeEntryInternal(flowEntryHw, false);
+                if (!status.isSuccess()) {
+                    log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+                }
+            } else {
+                log.debug("Not removing entry {} because not connected locally, the remote guy will do it's job",
+                        flowEntryHw);
             }
         }
     }
@@ -2508,7 +2523,7 @@ public class ForwardingRulesManager implements
             public void run() {
                 while (!stopping) {
                     try {
-                        FRMEvent event = pendingEvents.take();
+                        final FRMEvent event = pendingEvents.take();
                         if (event == null) {
                             log.warn("Dequeued null event");
                             continue;
@@ -2534,36 +2549,40 @@ public class ForwardingRulesManager implements
                             /*
                              * Take care of handling the remote Work request
                              */
-                            WorkOrderEvent work = (WorkOrderEvent) event;
-                            FlowEntryDistributionOrder fe = work.getFe();
-                            if (fe != null) {
-                                logsync.trace("Executing the workOrder {}", fe);
-                                Status gotStatus = null;
-                                FlowEntryInstall feiCurrent = fe.getEntry();
-                                FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
-                                switch (fe.getUpType()) {
-                                case ADDED:
-                                    /*
-                                     * TODO: Not still sure how to handle the
-                                     * sync entries
-                                     */
-                                    gotStatus = addEntriesInternal(feiCurrent, true);
-                                    break;
-                                case CHANGED:
-                                    gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
-                                    break;
-                                case REMOVED:
-                                    gotStatus = removeEntryInternal(feiCurrent, true);
-                                    break;
+                            Runnable r = new Runnable() {
+                                @Override
+                                public void run() {
+                                    WorkOrderEvent work = (WorkOrderEvent) event;
+                                    FlowEntryDistributionOrder fe = work.getFe();
+                                    if (fe != null) {
+                                        logsync.trace("Executing the workOrder {}", fe);
+                                        Status gotStatus = null;
+                                        FlowEntryInstall feiCurrent = fe.getEntry();
+                                        FlowEntryInstall feiNew = workOrder.get(fe);
+                                        switch (fe.getUpType()) {
+                                        case ADDED:
+                                            gotStatus = addEntriesInternal(feiCurrent, false);
+                                            break;
+                                        case CHANGED:
+                                            gotStatus = modifyEntryInternal(feiCurrent, feiNew, false);
+                                            break;
+                                        case REMOVED:
+                                            gotStatus = removeEntryInternal(feiCurrent, false);
+                                            break;
+                                        }
+                                        // Remove the Order
+                                        workOrder.remove(fe);
+                                        logsync.trace(
+                                                "The workOrder has been executed and now the status is being returned {}", fe);
+                                        // Place the status
+                                        workStatus.put(fe, gotStatus);
+                                    } else {
+                                        log.warn("Not expected null WorkOrder", work);
+                                    }
                                 }
-                                // Remove the Order
-                                workOrder.remove(fe);
-                                logsync.trace(
-                                        "The workOrder has been executed and now the status is being returned {}", fe);
-                                // Place the status
-                                workStatus.put(fe, gotStatus);
-                            } else {
-                                log.warn("Not expected null WorkOrder", work);
+                            };
+                            if(executor != null) {
+                                executor.execute(r);
                             }
                         } else if (event instanceof WorkStatusCleanup) {
                             /*
@@ -2619,11 +2638,19 @@ public class ForwardingRulesManager implements
      *
      */
     void start() {
+        /*
+         * If running in default container, need to know if controller is in
+         * container mode
+         */
+        if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) {
+            inContainerMode = containerManager.inContainerMode();
+        }
+
         // Initialize graceful stop flag
         stopping = false;
 
         // Allocate the executor service
-        this.executor = Executors.newSingleThreadExecutor();
+        this.executor = Executors.newFixedThreadPool(maxPoolSize);
 
         // Start event handler thread
         frmEventHandler.start();
@@ -2647,6 +2674,12 @@ public class ForwardingRulesManager implements
         uninstallAllFlowEntries(false);
         // Shutdown executor
         this.executor.shutdownNow();
+        // Now walk all the workMonitor and wake up the one sleeping because
+        // destruction is happening
+        for (FlowEntryDistributionOrder fe : workMonitor.keySet()) {
+            FlowEntryDistributionOrderFutureTask task = workMonitor.get(fe);
+            task.cancel(true);
+        }
     }
 
     public void setFlowProgrammerService(IFlowProgrammerService service) {
@@ -3187,6 +3220,16 @@ public class ForwardingRulesManager implements
         this.connectionManager = s;
     }
 
+    public void unsetIContainerManager(IContainerManager s) {
+        if (s == this.containerManager) {
+            this.containerManager = null;
+        }
+    }
+
+    public void setIContainerManager(IContainerManager s) {
+        this.containerManager = s;
+    }
+
     @Override
     public void entryCreated(Object key, String cacheName, boolean originLocal) {
         /*