Container Manager to expose inContainerMode()
[controller.git] / opendaylight / forwardingrulesmanager / implementation / src / main / java / org / opendaylight / controller / forwardingrulesmanager / internal / ForwardingRulesManager.java
index defe9b4a82f6a9f8a2c9e48c94d54ec0d108568b..45973fe167356b470b97f6bc7d287f59af769d31 100644 (file)
@@ -38,8 +38,8 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.sal.connection.ConnectionLocality;
 import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.containermanager.IContainerManager;
 import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
@@ -56,10 +56,11 @@ import org.opendaylight.controller.sal.action.Controller;
 import org.opendaylight.controller.sal.action.Flood;
 import org.opendaylight.controller.sal.action.Output;
 import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
 import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
@@ -97,7 +98,7 @@ import org.slf4j.LoggerFactory;
 public class ForwardingRulesManager implements
         IForwardingRulesManager,
         PortGroupChangeListener,
-        IContainerListener,
+        IContainerLocalListener,
         ISwitchManagerAware,
         IConfigurationContainerAware,
         IInventoryListener,
@@ -117,8 +118,9 @@ public class ForwardingRulesManager implements
     private ConcurrentMap<String, PortGroupConfig> portGroupConfigs;
     private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
     private ConcurrentMap<String, Object> TSPolicies;
+    private IContainerManager containerManager;
     private boolean inContainerMode; // being used by global instance only
-    private boolean stopping;
+    protected boolean stopping;
 
     /*
      * Flow database. It's the software view of what was requested to install
@@ -180,7 +182,7 @@ public class ForwardingRulesManager implements
      * not picked by anyone, which is always a case can happen especially on
      * Node disconnect cases.
      */
-    private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+    protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
 
     /*
      * Data structure responsible for retrieving the results of the workOrder
@@ -193,7 +195,7 @@ public class ForwardingRulesManager implements
      * TODO: The workStatus entries need to have a lifetime associated in case
      * of requestor controller leaving the cluster.
      */
-    private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+    protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
 
     /*
      * Local Map used to hold the Future which a caller can use to monitor for
@@ -202,6 +204,11 @@ public class ForwardingRulesManager implements
     private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
             new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
 
+    /*
+     * Max pool size for the executor
+     */
+    private static final int maxPoolSize = 10;
+
     /**
      * @param e
      *            Entry being installed/updated/removed
@@ -1126,7 +1133,7 @@ public class ForwardingRulesManager implements
      * merged flow may conflict with an existing old container flows merged flow
      * on the network node
      */
-    private void updateFlowsContainerFlow() {
+    protected void updateFlowsContainerFlow() {
         Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
         // First remove all installed entries
         for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
@@ -1366,9 +1373,6 @@ public class ForwardingRulesManager implements
             clusterContainerService.createCache("frm.staticFlows",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
-            clusterContainerService.createCache("frm.flowsSaveEvent",
-                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
             clusterContainerService.createCache("frm.staticFlowsOrdinal",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
@@ -1382,10 +1386,10 @@ public class ForwardingRulesManager implements
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
             clusterContainerService.createCache(WORKSTATUSCACHE,
EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
                   EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
 
             clusterContainerService.createCache(WORKORDERCACHE,
EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
                   EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
 
         } catch (CacheConfigException cce) {
             log.error("CacheConfigException");
@@ -2519,11 +2523,12 @@ public class ForwardingRulesManager implements
             public void run() {
                 while (!stopping) {
                     try {
-                        FRMEvent event = pendingEvents.take();
+                        final FRMEvent event = pendingEvents.take();
                         if (event == null) {
                             log.warn("Dequeued null event");
                             continue;
                         }
+                        log.trace("Dequeued {} event", event.getClass().getSimpleName());
                         if (event instanceof NodeUpdateEvent) {
                             NodeUpdateEvent update = (NodeUpdateEvent) event;
                             Node node = update.getNode();
@@ -2544,36 +2549,40 @@ public class ForwardingRulesManager implements
                             /*
                              * Take care of handling the remote Work request
                              */
-                            WorkOrderEvent work = (WorkOrderEvent) event;
-                            FlowEntryDistributionOrder fe = work.getFe();
-                            if (fe != null) {
-                                logsync.trace("Executing the workOrder {}", fe);
-                                Status gotStatus = null;
-                                FlowEntryInstall feiCurrent = fe.getEntry();
-                                FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
-                                switch (fe.getUpType()) {
-                                case ADDED:
-                                    /*
-                                     * TODO: Not still sure how to handle the
-                                     * sync entries
-                                     */
-                                    gotStatus = addEntriesInternal(feiCurrent, true);
-                                    break;
-                                case CHANGED:
-                                    gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
-                                    break;
-                                case REMOVED:
-                                    gotStatus = removeEntryInternal(feiCurrent, true);
-                                    break;
+                            Runnable r = new Runnable() {
+                                @Override
+                                public void run() {
+                                    WorkOrderEvent work = (WorkOrderEvent) event;
+                                    FlowEntryDistributionOrder fe = work.getFe();
+                                    if (fe != null) {
+                                        logsync.trace("Executing the workOrder {}", fe);
+                                        Status gotStatus = null;
+                                        FlowEntryInstall feiCurrent = fe.getEntry();
+                                        FlowEntryInstall feiNew = workOrder.get(fe);
+                                        switch (fe.getUpType()) {
+                                        case ADDED:
+                                            gotStatus = addEntriesInternal(feiCurrent, false);
+                                            break;
+                                        case CHANGED:
+                                            gotStatus = modifyEntryInternal(feiCurrent, feiNew, false);
+                                            break;
+                                        case REMOVED:
+                                            gotStatus = removeEntryInternal(feiCurrent, false);
+                                            break;
+                                        }
+                                        // Remove the Order
+                                        workOrder.remove(fe);
+                                        logsync.trace(
+                                                "The workOrder has been executed and now the status is being returned {}", fe);
+                                        // Place the status
+                                        workStatus.put(fe, gotStatus);
+                                    } else {
+                                        log.warn("Not expected null WorkOrder", work);
+                                    }
                                 }
-                                // Remove the Order
-                                workOrder.remove(fe);
-                                logsync.trace(
-                                        "The workOrder has been executed and now the status is being returned {}", fe);
-                                // Place the status
-                                workStatus.put(fe, gotStatus);
-                            } else {
-                                log.warn("Not expected null WorkOrder", work);
+                            };
+                            if(executor != null) {
+                                executor.execute(r);
                             }
                         } else if (event instanceof WorkStatusCleanup) {
                             /*
@@ -2629,11 +2638,19 @@ public class ForwardingRulesManager implements
      *
      */
     void start() {
+        /*
+         * If running in default container, need to know if controller is in
+         * container mode
+         */
+        if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) {
+            inContainerMode = containerManager.inContainerMode();
+        }
+
         // Initialize graceful stop flag
         stopping = false;
 
         // Allocate the executor service
-        this.executor = Executors.newSingleThreadExecutor();
+        this.executor = Executors.newFixedThreadPool(maxPoolSize);
 
         // Start event handler thread
         frmEventHandler.start();
@@ -3203,6 +3220,16 @@ public class ForwardingRulesManager implements
         this.connectionManager = s;
     }
 
+    public void unsetIContainerManager(IContainerManager s) {
+        if (s == this.containerManager) {
+            this.containerManager = null;
+        }
+    }
+
+    public void setIContainerManager(IContainerManager s) {
+        this.containerManager = s;
+    }
+
     @Override
     public void entryCreated(Object key, String cacheName, boolean originLocal) {
         /*