import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainerAware;
import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.UpdateType;
private final Set<IContainerAware> iContainerAware = Collections.synchronizedSet(new HashSet<IContainerAware>());
private final Set<IContainerListener> iContainerListener = Collections
.synchronizedSet(new HashSet<IContainerListener>());
+ private final Set<IContainerLocalListener> iContainerLocalListener = Collections
+ .synchronizedSet(new HashSet<IContainerLocalListener>());
void setIContainerListener(IContainerListener s) {
if (this.iContainerListener != null) {
}
}
+ void setIContainerLocalListener(IContainerLocalListener s) {
+ if (this.iContainerLocalListener != null) {
+ this.iContainerLocalListener.add(s);
+ }
+ }
+
+ void unsetIContainerLocalListener(IContainerLocalListener s) {
+ if (this.iContainerLocalListener != null) {
+ this.iContainerLocalListener.remove(s);
+ }
+ }
+
public void setIContainerAware(IContainerAware iContainerAware) {
if (!this.iContainerAware.contains(iContainerAware)) {
this.iContainerAware.add(iContainerAware);
public void unsetIContainerAware(IContainerAware iContainerAware) {
this.iContainerAware.remove(iContainerAware);
// There is no need to do cleanup of the component when
- // unregister because it will be taken care by the Containerd
+ // unregister because it will be taken care by the Container
// component itself
}
if (containerConfigs.size() > 0) {
for (Map.Entry<String, ContainerConfig> entry : containerConfigs.entrySet()) {
- notifyContainerChangeInternal(entry.getValue(), UpdateType.ADDED);
+ // Notify global and local listeners about the mode change
+ notifyContainerChangeInternal(entry.getValue(), UpdateType.ADDED, true);
}
}
}
@Override
public void entryUpdated(String key, Object value, String cacheName, boolean originLocal) {
+ /*
+ * This is were container manager replays a configuration event that was
+ * notified by its peer from a cluster node where the configuration
+ * happened. Only the global listeners, the cluster unaware classes,
+ * (mainly the shim classes in the sdn protocol plugins) need to receive
+ * these notifications on this cluster node. The cluster aware classes,
+ * like the functional modules which reacts on these events, must _not_
+ * be notified to avoid parallel computation in the cluster.
+ */
if (!originLocal) {
if (value instanceof NodeConnectorsChangeEvent) {
NodeConnectorsChangeEvent event = (NodeConnectorsChangeEvent) value;
List<NodeConnector> ncList = event.getNodeConnectors();
- notifyContainerEntryChangeInternal(key, ncList, event.getUpdateType());
+ notifyContainerEntryChangeInternal(key, ncList, event.getUpdateType(), false);
} else if (value instanceof ContainerFlowChangeEvent) {
ContainerFlowChangeEvent event = (ContainerFlowChangeEvent) value;
- notifyCFlowChangeInternal(key, event.getConfigList(), event.getUpdateType());
+ notifyCFlowChangeInternal(key, event.getConfigList(), event.getUpdateType(), false);
} else if (value instanceof ContainerChangeEvent) {
ContainerChangeEvent event = (ContainerChangeEvent) value;
- notifyContainerChangeInternal(event.getConfig(), event.getUpdateType());
+ notifyContainerChangeInternal(event.getConfig(), event.getUpdateType(), false);
}
}
}
// Clear local states
this.iContainerAware.clear();
this.iContainerListener.clear();
+ this.iContainerLocalListener.clear();
}
/**
*/
private void updateResourceGroups(String containerName, boolean delete) {
String containerProfile = System.getProperty("container.profile");
- if (containerProfile == null) containerProfile = "Container";
+ if (containerProfile == null) {
+ containerProfile = "Container";
+ }
// Container Roles and Container Resource Group
String groupName = containerProfile+"-" + containerName;
String containerAdminRole = containerProfile+"-" + containerName + "-Admin";
}
/**
- * Notify the ContainerListener listeners in case the container mode has changed
- * following a container configuration operation Note: this call must happen
- * after the configuration db has been updated
+ * Notify the ContainerListener listeners in case the container mode has
+ * changed following a container configuration operation Note: this call
+ * must happen after the configuration db has been updated
*
* @param lastActionDelete
- * true if the last container configuration operation was a container
- * delete operation
+ * true if the last container configuration operation was a
+ * container delete operation
+ * @param notifyLocal
+ * if true, the notification is also sent to the
+ * IContainerLocalListener classes besides the IContainerListener
+ * classes
*/
- private void notifyContainerModeChange(boolean lastActionDelete) {
+ private void notifyContainerModeChange(boolean lastActionDelete, boolean notifyLocal) {
if (lastActionDelete == false && containerConfigs.size() == 1) {
logger.info("First container Creation. Inform listeners");
synchronized (this.iContainerListener) {
i.containerModeUpdated(UpdateType.ADDED);
}
}
+ if (notifyLocal) {
+ synchronized (this.iContainerLocalListener) {
+ for (IContainerLocalListener i : this.iContainerLocalListener) {
+ i.containerModeUpdated(UpdateType.ADDED);
+ }
+ }
+ }
} else if (lastActionDelete == true && containerConfigs.isEmpty()) {
logger.info("Last container Deletion. Inform listeners");
synchronized (this.iContainerListener) {
i.containerModeUpdated(UpdateType.REMOVED);
}
}
+ if (notifyLocal) {
+ synchronized (this.iContainerLocalListener) {
+ for (IContainerLocalListener i : this.iContainerLocalListener) {
+ i.containerModeUpdated(UpdateType.REMOVED);
+ }
+ }
+ }
}
}
// Update cluster Configuration cache
containerConfigs.put(containerName, entryConf);
- // Notify
+ // Notify global and local listeners
UpdateType update = (delete) ? UpdateType.REMOVED : UpdateType.ADDED;
- notifyContainerEntryChangeInternal(containerName, nodeConnectors, update);
+ notifyContainerEntryChangeInternal(containerName, nodeConnectors, update, true);
// Trigger cluster notification
containerChangeEvents.put(containerName, new NodeConnectorsChangeEvent(nodeConnectors, update));
return status;
}
- private void notifyContainerChangeInternal(ContainerConfig conf, UpdateType update) {
+ private void notifyContainerChangeInternal(ContainerConfig conf, UpdateType update, boolean notifyLocal) {
String containerName = conf.getContainerName();
logger.trace("Notifying listeners on {} for container {}", update, containerName);
// Back-end World: container name forced to lower case
String container = containerName.toLowerCase(Locale.ENGLISH);
boolean delete = (update == UpdateType.REMOVED);
// Check if a container mode change notification is needed
- notifyContainerModeChange(delete);
+ notifyContainerModeChange(delete, notifyLocal);
// Notify listeners
notifyContainerAwareListeners(container, delete);
}
- private void notifyContainerEntryChangeInternal(String containerName, List<NodeConnector> ncList, UpdateType update) {
+ private void notifyContainerEntryChangeInternal(String containerName, List<NodeConnector> ncList, UpdateType update, boolean notifyLocal) {
logger.trace("Notifying listeners on {} for ports {} in container {}", update, ncList, containerName);
// Back-end World: container name forced to lower case
String container = containerName.toLowerCase(Locale.ENGLISH);
i.nodeConnectorUpdated(container, nodeConnector, update);
}
}
+ // Check if the Functional Modules need to be notified as well
+ if (notifyLocal) {
+ synchronized (this.iContainerLocalListener) {
+ for (IContainerLocalListener i : this.iContainerLocalListener) {
+ i.nodeConnectorUpdated(container, nodeConnector, update);
+ }
+ }
+ }
}
}
- private void notifyCFlowChangeInternal(String containerName, List<ContainerFlowConfig> confList, UpdateType update) {
+ private void notifyCFlowChangeInternal(String containerName, List<ContainerFlowConfig> confList, UpdateType update,
+ boolean notifyLocal) {
logger.trace("Notifying listeners on {} for flow specs {} in container {}", update, confList, containerName);
// Back-end World: container name forced to lower case
String container = containerName.toLowerCase(Locale.ENGLISH);
- synchronized (this.iContainerListener) {
- for (ContainerFlowConfig conf : confList) {
- for (Match match : conf.getMatches()) {
- ContainerFlow cFlow = new ContainerFlow(match);
+
+ for (ContainerFlowConfig conf : confList) {
+ for (Match match : conf.getMatches()) {
+ ContainerFlow cFlow = new ContainerFlow(match);
+ synchronized (this.iContainerListener) {
for (IContainerListener i : this.iContainerListener) {
i.containerFlowUpdated(container, cFlow, cFlow, update);
}
}
+ // Check if the Functional Modules need to be notified as well
+ if (notifyLocal) {
+ synchronized (this.iContainerLocalListener) {
+ for (IContainerLocalListener i : this.iContainerLocalListener) {
+ i.containerFlowUpdated(container, cFlow, cFlow, update);
+ }
+ }
+ }
}
}
}
// Update cluster cache
this.containerConfigs.put(containerName, containerConfig);
- // Notify listeners
+ // Notify global and local listeners
UpdateType update = (delete) ? UpdateType.REMOVED : UpdateType.ADDED;
- notifyCFlowChangeInternal(containerName, cFlowConfList, update);
+ notifyCFlowChangeInternal(containerName, cFlowConfList, update, true);
// Trigger cluster notification
containerChangeEvents.put(containerName, new ContainerFlowChangeEvent(cFlowConfList, update));
// Automatically create and populate user and resource groups
updateResourceGroups(containerName, delete);
- // Notify listeners
+ // Notify global and local listeners
UpdateType update = (delete) ? UpdateType.REMOVED : UpdateType.ADDED;
- notifyContainerChangeInternal(containerConf, update);
+ notifyContainerChangeInternal(containerConf, update, true);
// Trigger cluster notification
containerChangeEvents.put(containerName, new ContainerChangeEvent(containerConf, update));
if (update == UpdateType.ADDED) {
if (containerConf.hasFlowSpecs()) {
List<ContainerFlowConfig> specList = containerConf.getContainerFlowConfigs();
- // Notify flow spec addition
- notifyCFlowChangeInternal(containerName, specList, update);
+ // Notify global and local listeners about flow spec addition
+ notifyCFlowChangeInternal(containerName, specList, update, true);
// Trigger cluster notification
containerChangeEvents.put(containerName, new ContainerFlowChangeEvent(specList, update));
if (containerConf.hasNodeConnectors()) {
List<NodeConnector> ncList = containerConf.getPortList();
- // Notify port(s) addition
- notifyContainerEntryChangeInternal(containerName, ncList, update);
+ // Notify global and local listeners about port(s) addition
+ notifyContainerEntryChangeInternal(containerName, ncList, update, true);
// Trigger cluster notification
containerChangeEvents.put(containerName, new NodeConnectorsChangeEvent(ncList, update));
}
}
- if (delete) clusterServices.removeContainerCaches(containerName);
+ if (delete) {
+ clusterServices.removeContainerCaches(containerName);
+ }
return status;
}
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.connectionmanager.IConnectionManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.sal.action.Flood;
import org.opendaylight.controller.sal.action.Output;
import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.Property;
public class ForwardingRulesManager implements
IForwardingRulesManager,
PortGroupChangeListener,
- IContainerListener,
+ IContainerLocalListener,
ISwitchManagerAware,
IConfigurationContainerAware,
IInventoryListener,
private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
private ConcurrentMap<String, Object> TSPolicies;
private boolean inContainerMode; // being used by global instance only
- private boolean stopping;
+ protected boolean stopping;
/*
* Flow database. It's the software view of what was requested to install
* not picked by anyone, which is always a case can happen especially on
* Node disconnect cases.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+ protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
/*
* Data structure responsible for retrieving the results of the workOrder
* TODO: The workStatus entries need to have a lifetime associated in case
* of requestor controller leaving the cluster.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+ protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
/*
* Local Map used to hold the Future which a caller can use to monitor for
* merged flow may conflict with an existing old container flows merged flow
* on the network node
*/
- private void updateFlowsContainerFlow() {
+ protected void updateFlowsContainerFlow() {
Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
// First remove all installed entries
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
clusterContainerService.createCache("frm.staticFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.flowsSaveEvent",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlowsOrdinal",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
log.warn("Dequeued null event");
continue;
}
+ log.trace("Dequeued {} event", event.getClass().getSimpleName());
if (event instanceof NodeUpdateEvent) {
NodeUpdateEvent update = (NodeUpdateEvent) event;
Node node = update.getNode();
logsync.trace("Executing the workOrder {}", fe);
Status gotStatus = null;
FlowEntryInstall feiCurrent = fe.getEntry();
- FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
+ FlowEntryInstall feiNew = workOrder.get(fe);
switch (fe.getUpType()) {
case ADDED:
/*