import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.connectionmanager.IConnectionManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.sal.action.Flood;
import org.opendaylight.controller.sal.action.Output;
import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainer;
import org.opendaylight.controller.sal.core.IContainerListener;
IConfigurationContainerAware,
IInventoryListener,
IObjectReader,
- ICacheUpdateAware,
+ ICacheUpdateAware<Object,Object>,
CommandProvider,
IFlowProgrammerListener {
private static final String NODEDOWN = "Node is Down";
* @return a Future object for monitoring the progress of the result, or
* null in case the processing should take place locally
*/
- private Future<Status> distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ private FlowEntryDistributionOrderFutureTask distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u,
+ UpdateType t) {
// A null entry it's an unexpected condition, anyway it's safe to keep
// the handling local
if (e == null) {
}
Node n = e.getNode();
- if (!connectionManager.isLocal(n)) {
+ if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
// Create the work order and distribute it
FlowEntryDistributionOrder fe =
new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
return ret;
}
- logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t);
-
+ logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
return null;
}
* contain the unique id assigned to this request
*/
private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
+ FlowEntryDistributionOrderFutureTask futureStatus =
+ distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
* contain the unique id assigned to this request
*/
private Status removeEntryInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
* contain the unique id assigned to this request
*/
private Status addEntriesInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
retrieveCaches();
}
- @SuppressWarnings("deprecation")
private void allocateCaches() {
if (this.clusterContainerService == null) {
log.warn("Un-initialized clusterContainerService, can't create cache");
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
}
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void retrieveCaches() {
ConcurrentMap<?, ?> map;
}
}
if (target != null) {
- // Program the network node
- Status status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this
- .installFlowEntry(target.getFlowEntry());
+ Status status = target.validate(container);
+ if (!status.isSuccess()) {
+ log.warn(status.getDescription());
+ return status;
+ }
+ status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this
+ .installFlowEntry(target.getFlowEntry());
if (status.isSuccess()) {
// Update Configuration database
target.setStatus(SUCCESS);
/**
* Uninstall all the non-internal Flow Entries present in the software view.
- * A copy of each entry is stored in the inactive list so that it can be
- * re-applied when needed. This function is called on the global instance of
- * FRM only, when the first container is created
+ * If requested, a copy of each original flow entry will be stored in the
+ * inactive list so that it can be re-applied when needed (This is typically
+ * the case when running in the default container and controller moved to
+ * container mode)
+ *
+ * @param preserveFlowEntries
+ * if true, a copy of each original entry is stored in the
+ * inactive list
*/
- private void uninstallAllFlowEntries() {
+ private void uninstallAllFlowEntries(boolean preserveFlowEntries) {
log.info("Uninstalling all non-internal flows");
+ List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>();
+
// Store entries / create target list
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> mapEntry : installedSwView.entrySet()) {
FlowEntryInstall flowEntries = mapEntry.getValue();
// Skip internal generated static flows
if (!flowEntries.isInternal()) {
- inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal());
+ toRemove.add(flowEntries);
+ // Store the original entries if requested
+ if (preserveFlowEntries) {
+ inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal());
+ }
}
}
// Now remove the entries
- for (FlowEntry flowEntry : inactiveFlows.keySet()) {
- Status status = this.removeEntry(flowEntry, false);
+ for (FlowEntryInstall flowEntryHw : toRemove) {
+ Status status = this.removeEntryInternal(flowEntryHw, false);
if (!status.isSuccess()) {
- log.warn("Failed to remove entry: {}. The failure is: {}", flowEntry, status.getDescription());
+ log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
}
}
}
addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name
}
+ /**
+ * (non-Javadoc)
+ *
+ * @see org.opendaylight.controller.switchmanager.ISwitchManagerAware#modeChangeNotify(org.opendaylight.controller.sal.core.Node,
+ * boolean)
+ *
+ * This method can be called from within the OSGi framework context,
+ * given the programming operation can take sometime, it not good
+ * pratice to have in it's context operations that can take time,
+ * hence moving off to a different thread for async processing.
+ */
+ private ExecutorService executor;
@Override
- public void modeChangeNotify(Node node, boolean proactive) {
- List<FlowConfig> defaultConfigs = new ArrayList<FlowConfig>();
-
- List<String> puntAction = new ArrayList<String>();
- puntAction.add(ActionType.CONTROLLER.toString());
-
- FlowConfig allowARP = new FlowConfig();
- allowARP.setInstallInHw(true);
- allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND);
- allowARP.setPriority("1");
- allowARP.setNode(node);
- allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase());
- allowARP.setActions(puntAction);
- defaultConfigs.add(allowARP);
-
- FlowConfig allowLLDP = new FlowConfig();
- allowLLDP.setInstallInHw(true);
- allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND);
- allowLLDP.setPriority("1");
- allowLLDP.setNode(node);
- allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()).toUpperCase());
- allowLLDP.setActions(puntAction);
- defaultConfigs.add(allowLLDP);
-
- List<String> dropAction = new ArrayList<String>();
- dropAction.add(ActionType.DROP.toString());
-
- FlowConfig dropAllConfig = new FlowConfig();
- dropAllConfig.setInstallInHw(true);
- dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop" + FlowConfig.INTERNALSTATICFLOWEND);
- dropAllConfig.setPriority("0");
- dropAllConfig.setNode(node);
- dropAllConfig.setActions(dropAction);
- defaultConfigs.add(dropAllConfig);
-
- log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
- for (FlowConfig fc : defaultConfigs) {
- Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
- if (status.isSuccess()) {
- log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
- } else {
- log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), fc.getName());
+ public void modeChangeNotify(final Node node, final boolean proactive) {
+ Callable<Status> modeChangeCallable = new Callable<Status>() {
+ @Override
+ public Status call() throws Exception {
+ List<FlowConfig> defaultConfigs = new ArrayList<FlowConfig>();
+
+ List<String> puntAction = new ArrayList<String>();
+ puntAction.add(ActionType.CONTROLLER.toString());
+
+ FlowConfig allowARP = new FlowConfig();
+ allowARP.setInstallInHw(true);
+ allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND);
+ allowARP.setPriority("1");
+ allowARP.setNode(node);
+ allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue())
+ .toUpperCase());
+ allowARP.setActions(puntAction);
+ defaultConfigs.add(allowARP);
+
+ FlowConfig allowLLDP = new FlowConfig();
+ allowLLDP.setInstallInHw(true);
+ allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND);
+ allowLLDP.setPriority("1");
+ allowLLDP.setNode(node);
+ allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue())
+ .toUpperCase());
+ allowLLDP.setActions(puntAction);
+ defaultConfigs.add(allowLLDP);
+
+ List<String> dropAction = new ArrayList<String>();
+ dropAction.add(ActionType.DROP.toString());
+
+ FlowConfig dropAllConfig = new FlowConfig();
+ dropAllConfig.setInstallInHw(true);
+ dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop"
+ + FlowConfig.INTERNALSTATICFLOWEND);
+ dropAllConfig.setPriority("0");
+ dropAllConfig.setNode(node);
+ dropAllConfig.setActions(dropAction);
+ defaultConfigs.add(dropAllConfig);
+
+ log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
+ for (FlowConfig fc : defaultConfigs) {
+ Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
+ if (status.isSuccess()) {
+ log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
+ } else {
+ log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"),
+ fc.getName());
+ }
+ }
+ return new Status(StatusCode.SUCCESS);
}
- }
+ };
+
+ /*
+ * Execute the work outside the caller context, this could be an
+ * expensive operation and we don't want to block the caller for it.
+ */
+ this.executor.submit(modeChangeCallable);
}
/**
@Override
public void notifyNodeConnector(NodeConnector nodeConnector, UpdateType type, Map<String, Property> propMap) {
+ boolean updateStaticFlowCluster = false;
+ switch (type) {
+ case ADDED:
+ break;
+ case CHANGED:
+ Config config = (propMap == null) ? null : (Config) propMap.get(Config.ConfigPropName);
+ if (config != null) {
+ switch (config.getValue()) {
+ case Config.ADMIN_DOWN:
+ log.trace("Port {} is administratively down: uninstalling interested flows", nodeConnector);
+ updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector);
+ break;
+ case Config.ADMIN_UP:
+ log.trace("Port {} is administratively up: installing interested flows", nodeConnector);
+ updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nodeConnector);
+ break;
+ case Config.ADMIN_UNDEF:
+ break;
+ default:
+ }
+ }
+ break;
+ case REMOVED:
+ // This is the case where a switch port is removed from the SDN agent space
+ log.trace("Port {} was removed from our control: uninstalling interested flows", nodeConnector);
+ updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector);
+ break;
+ default:
+
+ }
+
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nodeConnector.getNode());
+ }
+ }
+
+ /*
+ * It goes through the static flows configuration, it identifies the ones
+ * which have the specified node connector as input or output port and
+ * install them on the network node if they are marked to be installed in
+ * hardware and their status shows they were not installed yet
+ */
+ private boolean installFlowsOnNodeConnectorUp(NodeConnector nodeConnector) {
+ boolean updated = false;
+ List<FlowConfig> flowConfigForNode = getStaticFlows(nodeConnector.getNode());
+ for (FlowConfig flowConfig : flowConfigForNode) {
+ if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) {
+ if (flowConfig.installInHw() && !flowConfig.getStatus().equals(SUCCESS)) {
+ Status status = this.installFlowEntry(flowConfig.getFlowEntry());
+ if (!status.isSuccess()) {
+ flowConfig.setStatus(status.getDescription());
+ } else {
+ flowConfig.setStatus(SUCCESS);
+ }
+ updated = true;
+ }
+ }
+ }
+ return updated;
+ }
+
+ /*
+ * Remove from the network node all the flows which have the specified node
+ * connector as input or output port. If any of the flow entry is a static
+ * flow, it updates the correspondent configuration.
+ */
+ private boolean removeFlowsOnNodeConnectorDown(NodeConnector nodeConnector) {
+ boolean updated = false;
+ List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nodeConnector.getNode());
+ if (nodeFlowEntries == null) {
+ return updated;
+ }
+ for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
+ if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nodeConnector)) {
+ Status status = this.removeEntryInternal(fei, true);
+ if (!status.isSuccess()) {
+ continue;
+ }
+ /*
+ * If the flow entry is a static flow, then update its
+ * configuration
+ */
+ if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
+ FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
+ if (flowConfig != null) {
+ flowConfig.setStatus(PORTREMOVED);
+ updated = true;
+ }
+ }
+ }
+ }
+ return updated;
}
private FlowConfig getDerivedFlowConfig(FlowConfig original, String configName, Short port) {
} else {
log.warn("Not expected null WorkStatus", work);
}
+ } else if (event instanceof ContainerFlowChangeEvent) {
+ /*
+ * Whether it is an addition or removal, we have to
+ * recompute the merged flows entries taking into
+ * account all the current container flows because
+ * flow merging is not an injective function
+ */
+ updateFlowsContainerFlow();
} else {
log.warn("Dequeued unknown event {}", event.getClass()
.getSimpleName());
// Initialize graceful stop flag
stopping = false;
+ // Allocate the executor service
+ this.executor = Executors.newSingleThreadExecutor();
+
// Start event handler thread
frmEventHandler.start();
*/
void stop() {
stopping = true;
- uninstallAllFlowEntries();
+ uninstallAllFlowEntries(false);
+ // Shutdown executor
+ this.executor.shutdownNow();
}
public void setFlowProgrammerService(IFlowProgrammerService service) {
}
log.trace("Container {}: Updating installed flows because of container flow change: {} {}",
container.getName(), t, current);
- /*
- * Whether it is an addition or removal, we have to recompute the merged
- * flows entries taking into account all the current container flows
- * because flow merging is not an injective function
- */
- updateFlowsContainerFlow();
+ ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t);
+ pendingEvents.offer(ev);
}
@Override
switch (t) {
case REMOVED:
-
- List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nc.getNode());
- if (nodeFlowEntries == null) {
- return;
- }
- for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
- if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nc)) {
- Status status = this.removeEntryInternal(fei, true);
- if (!status.isSuccess()) {
- continue;
- }
- /*
- * If the flow entry is a static flow, then update its
- * configuration
- */
- if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
- FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
- if (flowConfig != null) {
- flowConfig.setStatus(PORTREMOVED);
- updateStaticFlowCluster = true;
- }
- }
- }
- }
- if (updateStaticFlowCluster) {
- refreshClusterStaticFlowsStatus(nc.getNode());
- }
+ log.trace("Port {} was removed from container: uninstalling interested flows", nc);
+ updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nc);
break;
case ADDED:
- List<FlowConfig> flowConfigForNode = getStaticFlows(nc.getNode());
- for (FlowConfig flowConfig : flowConfigForNode) {
- if (doesFlowContainNodeConnector(flowConfig.getFlow(), nc)) {
- if (flowConfig.installInHw()) {
- Status status = this.installFlowEntry(flowConfig.getFlowEntry());
- if (!status.isSuccess()) {
- flowConfig.setStatus(status.getDescription());
- } else {
- flowConfig.setStatus(SUCCESS);
- }
- updateStaticFlowCluster = true;
- }
- }
- }
- if (updateStaticFlowCluster) {
- refreshClusterStaticFlowsStatus(nc.getNode());
- }
+ log.trace("Port {} was added to container: reinstall interested flows", nc);
+ updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nc);
+
break;
case CHANGED:
break;
default:
}
+
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nc.getNode());
+ }
}
@Override
}
switch (update) {
case ADDED:
+ /*
+ * Controller is moving to container mode. We are in the default
+ * container context, we need to remove all our non-internal flows
+ * to prevent any container isolation breakage. We also need to
+ * preserve our flow so that they can be re-installed if we move
+ * back to non container mode (no containers).
+ */
this.inContainerMode = true;
- this.uninstallAllFlowEntries();
+ this.uninstallAllFlowEntries(true);
break;
case REMOVED:
this.inContainerMode = false;
return newEntry;
}
}
+ private class ContainerFlowChangeEvent extends FRMEvent {
+ private final ContainerFlow previous;
+ private final ContainerFlow current;
+ private final UpdateType type;
+
+ public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) {
+ this.previous = previous;
+ this.current = current;
+ this.type = type;
+ }
+
+ public ContainerFlow getPrevious() {
+ return this.previous;
+ }
+
+ public ContainerFlow getCurrent() {
+ return this.current;
+ }
+
+ public UpdateType getType() {
+ return this.type;
+ }
+ }
+
private class WorkStatusCleanup extends FRMEvent {
private FlowEntryDistributionOrder fe;
* mapping will have to be added in future
*/
FlowEntryInstall target = null;
- for (FlowEntryInstall index : nodeFlows.get(node)) {
- FlowEntryInstall entry = installedSwView.get(index);
- if (entry.getRequestId() == rid) {
- target = entry;
- break;
+ List<FlowEntryInstall> flowEntryInstallList = nodeFlows.get(node);
+ // flowEntryInstallList could be null.
+ // so check for it.
+ if(flowEntryInstallList != null) {
+ for (FlowEntryInstall index : flowEntryInstallList) {
+ FlowEntryInstall entry = installedSwView.get(index);
+ if(entry != null) {
+ if (entry.getRequestId() == rid) {
+ target = entry;
+ break;
+ }
+ }
}
}
if (target != null) {
return;
}
Node n = fei.getNode();
- if (connectionManager.isLocal(n)) {
+ if (connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
logsync.trace("workOrder for fe {} processed locally", fe);
// I'm the controller in charge for the request, queue it for
// processing
*/
if (fe.getRequestorController()
.equals(clusterContainerService.getMyAddress())) {
- FlowEntryDistributionOrderFutureTask fet = workMonitor.get(fe);
+ FlowEntryDistributionOrderFutureTask fet = workMonitor.remove(fe);
if (fet != null) {
logsync.trace("workStatus response is for us {}", fe);
// Signal we got the status