Decouple IContainerListener to avoid parallel computation in cluster 68/1468/1
authorAlessandro Boch <aboch@cisco.com>
Fri, 27 Sep 2013 15:39:49 +0000 (08:39 -0700)
committerAlessandro Boch <aboch@cisco.com>
Fri, 27 Sep 2013 15:53:56 +0000 (08:53 -0700)
ISSUE: On Container configuration change, or when they register with ContainerManager,
IContainerListener classes are notified locally and on all cluster nodes.
While cluster unaware classes like the Shim classes in the protocol plugin need this
notification on all the nodes to keep their local container related info up to date,
Functional Modules like FRM which are cluster aware do not need to be notified as they
react on these notifications performing flow operations. Currently all the FRMs in the
cluster reacts in parallel on Container Flow changes...

CHANGE: From Container config notification prespective, decouple the cluster aware classes
from the cluster unaware ones having the former register to a new IContainerLocal interface
instead of IContainerListener. On Container config changes, IContainerLocalListener will be
notified only on the controller node on which the configuration was applied (from GUI or NB).
On the remote controllers, where ContainerManager replays the configuration events, only
cluster unaware listeners will be notified.
Also, on IContainerLocalListener registration callback, in contrast with IContainerListener
registration callback, no container configurations bulk update will be pushed.
Simply because the FMs do not need to keep a local container information copy, they do not
care about the past events, they only need to be notified about a live config change event
which they use as a trigger to perform whatever task on their DB. If the task requires the
knowledge of the container config, they directly request it to Container Manager on the fly.

Change-Id: Icbaca58e04d60463b3aa54a61a4a1a467222d18e
Signed-off-by: Alessandro Boch <aboch@cisco.com>
opendaylight/containermanager/implementation/src/main/java/org/opendaylight/controller/containermanager/internal/Activator.java
opendaylight/containermanager/implementation/src/main/java/org/opendaylight/controller/containermanager/internal/ContainerManager.java
opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/Activator.java
opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/core/IContainerLocalListener.java [new file with mode: 0644]

index a1fe0d1ee2a1936af798b88ff887b54b693b35cf..162cbf041a8757a9d6ed269783fc9932a15442d2 100644 (file)
@@ -24,9 +24,11 @@ import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.core.IContainer;
 import org.opendaylight.controller.sal.core.IContainerAware;
 import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class Activator extends ComponentActivatorAbstractBase {
     protected static final Logger logger = LoggerFactory.getLogger(Activator.class);
 
@@ -140,6 +142,12 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setService(IContainerListener.class)
                     .setCallbacks("setIContainerListener", "unsetIContainerListener")
                     .setRequired(false));
+
+            // Interface expected to be exported by the Functional Modules
+            c.add(createServiceDependency()
+                    .setService(IContainerLocalListener.class)
+                    .setCallbacks("setIContainerLocalListener", "unsetIContainerLocalListener")
+                    .setRequired(false));
         }
     }
 }
index c9190af03a587a76cd581591c59a33cc7036baee..d420bdd0a97a3033a923b02be374384de6b0ceb7 100644 (file)
@@ -47,6 +47,7 @@ import org.opendaylight.controller.sal.authorization.UserLevel;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainerAware;
 import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.UpdateType;
@@ -93,6 +94,8 @@ public class ContainerManager extends Authorization<String> implements IContaine
     private final Set<IContainerAware> iContainerAware = Collections.synchronizedSet(new HashSet<IContainerAware>());
     private final Set<IContainerListener> iContainerListener = Collections
             .synchronizedSet(new HashSet<IContainerListener>());
+    private final Set<IContainerLocalListener> iContainerLocalListener = Collections
+            .synchronizedSet(new HashSet<IContainerLocalListener>());
 
     void setIContainerListener(IContainerListener s) {
         if (this.iContainerListener != null) {
@@ -128,6 +131,18 @@ public class ContainerManager extends Authorization<String> implements IContaine
         }
     }
 
+    void setIContainerLocalListener(IContainerLocalListener s) {
+        if (this.iContainerLocalListener != null) {
+            this.iContainerLocalListener.add(s);
+        }
+    }
+
+    void unsetIContainerLocalListener(IContainerLocalListener s) {
+        if (this.iContainerLocalListener != null) {
+            this.iContainerLocalListener.remove(s);
+        }
+    }
+
     public void setIContainerAware(IContainerAware iContainerAware) {
         if (!this.iContainerAware.contains(iContainerAware)) {
             this.iContainerAware.add(iContainerAware);
@@ -141,7 +156,7 @@ public class ContainerManager extends Authorization<String> implements IContaine
     public void unsetIContainerAware(IContainerAware iContainerAware) {
         this.iContainerAware.remove(iContainerAware);
         // There is no need to do cleanup of the component when
-        // unregister because it will be taken care by the Containerd
+        // unregister because it will be taken care by the Container
         // component itself
     }
 
@@ -219,7 +234,8 @@ public class ContainerManager extends Authorization<String> implements IContaine
 
         if (containerConfigs.size() > 0) {
             for (Map.Entry<String, ContainerConfig> entry : containerConfigs.entrySet()) {
-                notifyContainerChangeInternal(entry.getValue(), UpdateType.ADDED);
+                // Notify global and local listeners about the mode change
+                notifyContainerChangeInternal(entry.getValue(), UpdateType.ADDED, true);
             }
         }
     }
@@ -231,17 +247,26 @@ public class ContainerManager extends Authorization<String> implements IContaine
 
     @Override
     public void entryUpdated(String key, Object value, String cacheName, boolean originLocal) {
+        /*
+         * This is were container manager replays a configuration event that was
+         * notified by its peer from a cluster node where the configuration
+         * happened. Only the global listeners, the cluster unaware classes,
+         * (mainly the shim classes in the sdn protocol plugins) need to receive
+         * these notifications on this cluster node. The cluster aware classes,
+         * like the functional modules which reacts on these events, must _not_
+         * be notified to avoid parallel computation in the cluster.
+         */
         if (!originLocal) {
             if (value instanceof NodeConnectorsChangeEvent) {
                 NodeConnectorsChangeEvent event = (NodeConnectorsChangeEvent) value;
                 List<NodeConnector> ncList = event.getNodeConnectors();
-                notifyContainerEntryChangeInternal(key, ncList, event.getUpdateType());
+                notifyContainerEntryChangeInternal(key, ncList, event.getUpdateType(), false);
             } else if (value instanceof ContainerFlowChangeEvent) {
                 ContainerFlowChangeEvent event = (ContainerFlowChangeEvent) value;
-                notifyCFlowChangeInternal(key, event.getConfigList(), event.getUpdateType());
+                notifyCFlowChangeInternal(key, event.getConfigList(), event.getUpdateType(), false);
             } else if (value instanceof ContainerChangeEvent) {
                 ContainerChangeEvent event = (ContainerChangeEvent) value;
-                notifyContainerChangeInternal(event.getConfig(), event.getUpdateType());
+                notifyContainerChangeInternal(event.getConfig(), event.getUpdateType(), false);
             }
         }
     }
@@ -273,6 +298,7 @@ public class ContainerManager extends Authorization<String> implements IContaine
         // Clear local states
         this.iContainerAware.clear();
         this.iContainerListener.clear();
+        this.iContainerLocalListener.clear();
     }
 
     /**
@@ -802,7 +828,9 @@ public class ContainerManager extends Authorization<String> implements IContaine
      */
     private void updateResourceGroups(String containerName, boolean delete) {
         String containerProfile = System.getProperty("container.profile");
-        if (containerProfile == null) containerProfile = "Container";
+        if (containerProfile == null) {
+            containerProfile = "Container";
+        }
         // Container Roles and Container Resource Group
         String groupName = containerProfile+"-" + containerName;
         String containerAdminRole = containerProfile+"-" + containerName + "-Admin";
@@ -858,15 +886,19 @@ public class ContainerManager extends Authorization<String> implements IContaine
     }
 
     /**
-     * Notify the ContainerListener listeners in case the container mode has changed
-     * following a container configuration operation Note: this call must happen
-     * after the configuration db has been updated
+     * Notify the ContainerListener listeners in case the container mode has
+     * changed following a container configuration operation Note: this call
+     * must happen after the configuration db has been updated
      *
      * @param lastActionDelete
-     *            true if the last container configuration operation was a container
-     *            delete operation
+     *            true if the last container configuration operation was a
+     *            container delete operation
+     * @param notifyLocal
+     *            if true, the notification is also sent to the
+     *            IContainerLocalListener classes besides the IContainerListener
+     *            classes
      */
-    private void notifyContainerModeChange(boolean lastActionDelete) {
+    private void notifyContainerModeChange(boolean lastActionDelete, boolean notifyLocal) {
         if (lastActionDelete == false && containerConfigs.size() == 1) {
             logger.info("First container Creation. Inform listeners");
             synchronized (this.iContainerListener) {
@@ -874,6 +906,13 @@ public class ContainerManager extends Authorization<String> implements IContaine
                     i.containerModeUpdated(UpdateType.ADDED);
                 }
             }
+            if (notifyLocal) {
+                synchronized (this.iContainerLocalListener) {
+                    for (IContainerLocalListener i : this.iContainerLocalListener) {
+                        i.containerModeUpdated(UpdateType.ADDED);
+                    }
+                }
+            }
         } else if (lastActionDelete == true && containerConfigs.isEmpty()) {
             logger.info("Last container Deletion. Inform listeners");
             synchronized (this.iContainerListener) {
@@ -881,6 +920,13 @@ public class ContainerManager extends Authorization<String> implements IContaine
                     i.containerModeUpdated(UpdateType.REMOVED);
                 }
             }
+            if (notifyLocal) {
+                synchronized (this.iContainerLocalListener) {
+                    for (IContainerLocalListener i : this.iContainerLocalListener) {
+                        i.containerModeUpdated(UpdateType.REMOVED);
+                    }
+                }
+            }
         }
     }
 
@@ -954,28 +1000,28 @@ public class ContainerManager extends Authorization<String> implements IContaine
         // Update cluster Configuration cache
         containerConfigs.put(containerName, entryConf);
 
-        // Notify
+        // Notify global and local listeners
         UpdateType update = (delete) ? UpdateType.REMOVED : UpdateType.ADDED;
-        notifyContainerEntryChangeInternal(containerName, nodeConnectors, update);
+        notifyContainerEntryChangeInternal(containerName, nodeConnectors, update, true);
         // Trigger cluster notification
         containerChangeEvents.put(containerName, new NodeConnectorsChangeEvent(nodeConnectors, update));
 
         return status;
     }
 
-    private void notifyContainerChangeInternal(ContainerConfig conf, UpdateType update) {
+    private void notifyContainerChangeInternal(ContainerConfig conf, UpdateType update, boolean notifyLocal) {
         String containerName = conf.getContainerName();
         logger.trace("Notifying listeners on {} for container {}", update, containerName);
         // Back-end World: container name forced to lower case
         String container = containerName.toLowerCase(Locale.ENGLISH);
         boolean delete = (update == UpdateType.REMOVED);
         // Check if a container mode change notification is needed
-        notifyContainerModeChange(delete);
+        notifyContainerModeChange(delete, notifyLocal);
         // Notify listeners
         notifyContainerAwareListeners(container, delete);
     }
 
-    private void notifyContainerEntryChangeInternal(String containerName, List<NodeConnector> ncList, UpdateType update) {
+    private void notifyContainerEntryChangeInternal(String containerName, List<NodeConnector> ncList, UpdateType update, boolean notifyLocal) {
         logger.trace("Notifying listeners on {} for ports {} in container {}", update, ncList, containerName);
         // Back-end World: container name forced to lower case
         String container = containerName.toLowerCase(Locale.ENGLISH);
@@ -986,21 +1032,39 @@ public class ContainerManager extends Authorization<String> implements IContaine
                     i.nodeConnectorUpdated(container, nodeConnector, update);
                 }
             }
+            // Check if the Functional Modules need to be notified as well
+            if (notifyLocal) {
+                synchronized (this.iContainerLocalListener) {
+                    for (IContainerLocalListener i : this.iContainerLocalListener) {
+                        i.nodeConnectorUpdated(container, nodeConnector, update);
+                    }
+                }
+            }
         }
     }
 
-    private void notifyCFlowChangeInternal(String containerName, List<ContainerFlowConfig> confList, UpdateType update) {
+    private void notifyCFlowChangeInternal(String containerName, List<ContainerFlowConfig> confList, UpdateType update,
+            boolean notifyLocal) {
         logger.trace("Notifying listeners on {} for flow specs {} in container {}", update, confList, containerName);
         // Back-end World: container name forced to lower case
         String container = containerName.toLowerCase(Locale.ENGLISH);
-        synchronized (this.iContainerListener) {
-            for (ContainerFlowConfig conf : confList) {
-                for (Match match : conf.getMatches()) {
-                    ContainerFlow cFlow = new ContainerFlow(match);
+
+        for (ContainerFlowConfig conf : confList) {
+            for (Match match : conf.getMatches()) {
+                ContainerFlow cFlow = new ContainerFlow(match);
+                synchronized (this.iContainerListener) {
                     for (IContainerListener i : this.iContainerListener) {
                         i.containerFlowUpdated(container, cFlow, cFlow, update);
                     }
                 }
+                // Check if the Functional Modules need to be notified as well
+                if (notifyLocal) {
+                    synchronized (this.iContainerLocalListener) {
+                        for (IContainerLocalListener i : this.iContainerLocalListener) {
+                            i.containerFlowUpdated(container, cFlow, cFlow, update);
+                        }
+                    }
+                }
             }
         }
     }
@@ -1067,9 +1131,9 @@ public class ContainerManager extends Authorization<String> implements IContaine
         // Update cluster cache
         this.containerConfigs.put(containerName, containerConfig);
 
-        // Notify listeners
+        // Notify global and local listeners
         UpdateType update = (delete) ? UpdateType.REMOVED : UpdateType.ADDED;
-        notifyCFlowChangeInternal(containerName, cFlowConfList, update);
+        notifyCFlowChangeInternal(containerName, cFlowConfList, update, true);
         // Trigger cluster notification
         containerChangeEvents.put(containerName, new ContainerFlowChangeEvent(cFlowConfList, update));
 
@@ -1156,9 +1220,9 @@ public class ContainerManager extends Authorization<String> implements IContaine
         // Automatically create and populate user and resource groups
         updateResourceGroups(containerName, delete);
 
-        // Notify listeners
+        // Notify global and local listeners
         UpdateType update = (delete) ? UpdateType.REMOVED : UpdateType.ADDED;
-        notifyContainerChangeInternal(containerConf, update);
+        notifyContainerChangeInternal(containerConf, update, true);
 
         // Trigger cluster notification
         containerChangeEvents.put(containerName, new ContainerChangeEvent(containerConf, update));
@@ -1166,8 +1230,8 @@ public class ContainerManager extends Authorization<String> implements IContaine
         if (update == UpdateType.ADDED) {
             if (containerConf.hasFlowSpecs()) {
                 List<ContainerFlowConfig> specList = containerConf.getContainerFlowConfigs();
-                // Notify flow spec addition
-                notifyCFlowChangeInternal(containerName, specList, update);
+                // Notify global and local listeners about flow spec addition
+                notifyCFlowChangeInternal(containerName, specList, update, true);
 
                 // Trigger cluster notification
                 containerChangeEvents.put(containerName, new ContainerFlowChangeEvent(specList, update));
@@ -1175,14 +1239,16 @@ public class ContainerManager extends Authorization<String> implements IContaine
 
             if (containerConf.hasNodeConnectors()) {
                 List<NodeConnector> ncList = containerConf.getPortList();
-                // Notify port(s) addition
-                notifyContainerEntryChangeInternal(containerName, ncList, update);
+                // Notify global and local listeners about port(s) addition
+                notifyContainerEntryChangeInternal(containerName, ncList, update, true);
                 // Trigger cluster notification
                 containerChangeEvents.put(containerName, new NodeConnectorsChangeEvent(ncList, update));
             }
         }
 
-        if (delete) clusterServices.removeContainerCaches(containerName);
+        if (delete) {
+            clusterServices.removeContainerCaches(containerName);
+        }
         return status;
     }
 
index 225db117c07ed88bc297e7c248afef30513f62be..7b37b4187ce91285e44f14de745d5363d79d0a7c 100644 (file)
@@ -8,19 +8,21 @@
 
 package org.opendaylight.controller.forwardingrulesmanager.internal;
 
-import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Set;
 
 import org.apache.felix.dm.Component;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
+import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.forwardingrulesmanager.IForwardingRulesManager;
 import org.opendaylight.controller.forwardingrulesmanager.IForwardingRulesManagerAware;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerListener;
 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
 import org.opendaylight.controller.switchmanager.IInventoryListener;
@@ -29,9 +31,6 @@ import org.opendaylight.controller.switchmanager.ISwitchManagerAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.opendaylight.controller.clustering.services.IClusterContainerServices;
-import org.opendaylight.controller.connectionmanager.IConnectionManager;
-
 public class Activator extends ComponentActivatorAbstractBase {
     protected static final Logger logger = LoggerFactory.getLogger(Activator.class);
 
@@ -77,7 +76,7 @@ public class Activator extends ComponentActivatorAbstractBase {
             props.put("cachenames", propSet);
 
             // export the service
-            interfaces = new String[] { IContainerListener.class.getName(), ISwitchManagerAware.class.getName(),
+            interfaces = new String[] { IContainerLocalListener.class.getName(), ISwitchManagerAware.class.getName(),
                     IForwardingRulesManager.class.getName(), IInventoryListener.class.getName(),
                     IConfigurationContainerAware.class.getName(), ICacheUpdateAware.class.getName(),
                     IFlowProgrammerListener.class.getName() };
index c2d1f46737683b4a0eb8a7339c7677ee7f0ca961..a77224af8a172ed396addb3dbeab3c2dd8939361 100644 (file)
@@ -59,7 +59,7 @@ import org.opendaylight.controller.sal.action.PopVlan;
 import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
@@ -97,7 +97,7 @@ import org.slf4j.LoggerFactory;
 public class ForwardingRulesManager implements
         IForwardingRulesManager,
         PortGroupChangeListener,
-        IContainerListener,
+        IContainerLocalListener,
         ISwitchManagerAware,
         IConfigurationContainerAware,
         IInventoryListener,
@@ -1366,9 +1366,6 @@ public class ForwardingRulesManager implements
             clusterContainerService.createCache("frm.staticFlows",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
-            clusterContainerService.createCache("frm.flowsSaveEvent",
-                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
             clusterContainerService.createCache("frm.staticFlowsOrdinal",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
@@ -2516,6 +2513,7 @@ public class ForwardingRulesManager implements
                             log.warn("Dequeued null event");
                             continue;
                         }
+                        log.trace("Dequeued {} event", event.getClass().getSimpleName());
                         if (event instanceof NodeUpdateEvent) {
                             NodeUpdateEvent update = (NodeUpdateEvent) event;
                             Node node = update.getNode();
diff --git a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/core/IContainerLocalListener.java b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/core/IContainerLocalListener.java
new file mode 100644 (file)
index 0000000..1c861b9
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+/**
+ * @file IContainerLocalListener.java
+ *
+ * @brief Set of methods needed to listen to changes in the Container
+ *        configuration for listeners on the local cluster node only
+ */
+package org.opendaylight.controller.sal.core;
+
+import org.opendaylight.controller.sal.core.IContainerListener;
+
+/**
+ * The interface describes methods used to publish the changes to a given
+ * Container configuration to listeners on the local cluster node only.
+ */
+public interface IContainerLocalListener extends IContainerListener {
+
+}