X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager%2Finternal%2FForwardingRulesManager.java;h=b94103fb1c7c9233bb417e93cc74d42bbef498e7;hp=3c936de769d6a22f426b01bab868593eefb899e2;hb=26aab6293f113ec504806e20e382c4fbd1ed3a7c;hpb=8cca2dfdc9d55e956c175b0e07c19443526abf59 diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index 3c936de769..b94103fb1c 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -11,8 +11,6 @@ package org.opendaylight.controller.forwardingrulesmanager.internal; import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -28,19 +26,18 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import org.eclipse.osgi.framework.console.CommandInterpreter; -import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.configuration.ConfigurationObject; import org.opendaylight.controller.configuration.IConfigurationContainerAware; -import org.opendaylight.controller.sal.connection.ConnectionLocality; +import org.opendaylight.controller.configuration.IConfigurationContainerService; import org.opendaylight.controller.connectionmanager.IConnectionManager; +import org.opendaylight.controller.containermanager.IContainerManager; import org.opendaylight.controller.forwardingrulesmanager.FlowConfig; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall; @@ -53,14 +50,12 @@ import org.opendaylight.controller.forwardingrulesmanager.PortGroupProvider; import org.opendaylight.controller.forwardingrulesmanager.implementation.data.FlowEntryDistributionOrder; import org.opendaylight.controller.sal.action.Action; import org.opendaylight.controller.sal.action.ActionType; -import org.opendaylight.controller.sal.action.Controller; -import org.opendaylight.controller.sal.action.Flood; import org.opendaylight.controller.sal.action.Output; -import org.opendaylight.controller.sal.action.PopVlan; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.core.Config; import org.opendaylight.controller.sal.core.ContainerFlow; import org.opendaylight.controller.sal.core.IContainer; -import org.opendaylight.controller.sal.core.IContainerListener; +import org.opendaylight.controller.sal.core.IContainerLocalListener; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.core.Property; @@ -72,21 +67,13 @@ import org.opendaylight.controller.sal.match.Match; import org.opendaylight.controller.sal.match.MatchType; import org.opendaylight.controller.sal.utils.EtherTypes; import org.opendaylight.controller.sal.utils.GlobalConstants; -import org.opendaylight.controller.sal.utils.HexEncode; import org.opendaylight.controller.sal.utils.IObjectReader; -import org.opendaylight.controller.sal.utils.IPProtocols; -import org.opendaylight.controller.sal.utils.NodeConnectorCreator; -import org.opendaylight.controller.sal.utils.NodeCreator; -import org.opendaylight.controller.sal.utils.ObjectReader; -import org.opendaylight.controller.sal.utils.ObjectWriter; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.switchmanager.ISwitchManagerAware; import org.opendaylight.controller.switchmanager.Subnet; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,28 +85,30 @@ import org.slf4j.LoggerFactory; public class ForwardingRulesManager implements IForwardingRulesManager, PortGroupChangeListener, - IContainerListener, + IContainerLocalListener, ISwitchManagerAware, IConfigurationContainerAware, IInventoryListener, IObjectReader, ICacheUpdateAware, - CommandProvider, IFlowProgrammerListener { - private static final String NODEDOWN = "Node is Down"; - private static final String SUCCESS = StatusCode.SUCCESS.toString(); + private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class); - private static final String PORTREMOVED = "Port removed"; private static final Logger logsync = LoggerFactory.getLogger("FRMsync"); - private String frmFileName; - private String portGroupFileName; + private static final String PORT_REMOVED = "Port removed"; + private static final String NODE_DOWN = "Node is Down"; + private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry"; + private static final String STATIC_FLOWS_FILE_NAME = "frm_staticflows.conf"; + private static final String PORT_GROUP_FILE_NAME = "portgroup.conf"; private ConcurrentMap staticFlows; private ConcurrentMap staticFlowsOrdinal; private ConcurrentMap portGroupConfigs; private ConcurrentMap> portGroupData; private ConcurrentMap TSPolicies; + private IContainerManager containerManager; + private IConfigurationContainerService configurationService; private boolean inContainerMode; // being used by global instance only - private boolean stopping; + protected boolean stopping; /* * Flow database. It's the software view of what was requested to install @@ -163,8 +152,10 @@ public class ForwardingRulesManager implements * necessity non-transactional as long as need to be able to synchronize * states also while a transaction is in progress */ - static final String WORKORDERCACHE = "frm.workOrder"; - static final String WORKSTATUSCACHE = "frm.workStatus"; + static final String WORK_ORDER_CACHE = "frm.workOrder"; + static final String WORK_STATUS_CACHE = "frm.workStatus"; + static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView"; + static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView"; /* * Data structure responsible for distributing the FlowEntryInstall requests @@ -181,7 +172,7 @@ public class ForwardingRulesManager implements * not picked by anyone, which is always a case can happen especially on * Node disconnect cases. */ - private ConcurrentMap workOrder; + protected ConcurrentMap workOrder; /* * Data structure responsible for retrieving the results of the workOrder @@ -194,7 +185,7 @@ public class ForwardingRulesManager implements * TODO: The workStatus entries need to have a lifetime associated in case * of requestor controller leaving the cluster. */ - private ConcurrentMap workStatus; + protected ConcurrentMap workStatus; /* * Local Map used to hold the Future which a caller can use to monitor for @@ -203,6 +194,11 @@ public class ForwardingRulesManager implements private ConcurrentMap workMonitor = new ConcurrentHashMap(); + /* + * Max pool size for the executor + */ + private static final int maxPoolSize = 10; + /** * @param e * Entry being installed/updated/removed @@ -262,11 +258,34 @@ public class ForwardingRulesManager implements private Status addEntry(FlowEntry flowEntry, boolean async) { // Sanity Check - if (flowEntry == null || flowEntry.getNode() == null) { - String msg = "Invalid FlowEntry"; - String logMsg = msg + ": {}"; + if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) { + String logMsg = INVALID_FLOW_ENTRY + ": {}"; log.warn(logMsg, flowEntry); - return new Status(StatusCode.NOTACCEPTABLE, msg); + return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY); + } + + /* + * Redundant Check: Check if the request is a redundant one from the + * same application the flowEntry is equal to an existing one. Given we + * do not have an application signature in the requested FlowEntry yet, + * we are here detecting the above condition by comparing the flow + * names, if set. If they are equal to the installed flow, most likely + * this is a redundant installation request from the same application + * and we can silently return success + * + * TODO: in future a sort of application reference list mechanism will + * be added to the FlowEntry so that exact flow can be used by different + * applications. + */ + FlowEntry present = this.originalSwView.get(flowEntry); + if (present != null) { + boolean sameFlow = present.getFlow().equals(flowEntry.getFlow()); + boolean sameApp = present.getFlowName() != null && present.getFlowName().equals(flowEntry.getFlowName()); + if (sameFlow && sameApp) { + log.trace("Skipping redundant request for flow {} on node {}", flowEntry.getFlowName(), + flowEntry.getNode()); + return new Status(StatusCode.SUCCESS, "Entry is already present"); + } } /* @@ -312,7 +331,7 @@ public class ForwardingRulesManager implements for (FlowEntryInstall installEntry : toInstallSafe) { // Install and update database - Status ret = addEntriesInternal(installEntry, async); + Status ret = addEntryInternal(installEntry, async); if (ret.isSuccess()) { oneSucceded = true; @@ -325,7 +344,7 @@ public class ForwardingRulesManager implements succeded = ret; } else { error = ret; - log.warn("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription()); + log.trace("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription()); } } @@ -381,8 +400,8 @@ public class ForwardingRulesManager implements // Sanity checks if (currentFlowEntry == null || currentFlowEntry.getNode() == null || newFlowEntry == null - || newFlowEntry.getNode() == null) { - String msg = "Modify: Invalid FlowEntry"; + || newFlowEntry.getNode() == null || newFlowEntry.getFlow() == null) { + String msg = "Modify: " + INVALID_FLOW_ENTRY; String logMsg = msg + ": {} or {}"; log.warn(logMsg, currentFlowEntry, newFlowEntry); return new Status(StatusCode.NOTACCEPTABLE, msg); @@ -449,7 +468,7 @@ public class ForwardingRulesManager implements Status succeeded = null; boolean decouple = false; if (installedList.size() != toInstallList.size()) { - log.info("Modify: New flow entry does not satisfy the same " + log.trace("Modify: New flow entry does not satisfy the same " + "number of container flows as the original entry does"); decouple = true; } @@ -461,7 +480,7 @@ public class ForwardingRulesManager implements */ FlowEntryInstall sameMatchEntry = installedSwView.get(installEntry); if (sameMatchEntry != null && !sameMatchEntry.getOriginal().equals(currentFlowEntry)) { - log.info("Modify: new container flow merged flow entry clashes with existing flow"); + log.trace("Modify: new container flow merged flow entry clashes with existing flow"); decouple = true; } else { toInstallSafe.add(installEntry); @@ -475,7 +494,7 @@ public class ForwardingRulesManager implements } // Install new entries for (FlowEntryInstall newEntry : toInstallSafe) { - succeeded = this.addEntriesInternal(newEntry, async); + succeeded = this.addEntryInternal(newEntry, async); } } else { /* @@ -535,7 +554,9 @@ public class ForwardingRulesManager implements /** * This is the function that modifies the final container flows merged * entries on the network node and update the database. It expects that all - * the validity checks are passed + * the validity checks are passed. + * This function is supposed to be called only on the controller on which + * the IFRM call is executed. * * @param currentEntries * @param newEntries @@ -545,13 +566,13 @@ public class ForwardingRulesManager implements * contain the unique id assigned to this request */ private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) { + Status status = new Status(StatusCode.UNDEFINED); FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED); if (futureStatus != null) { - Status retStatus = new Status(StatusCode.UNDEFINED); try { - retStatus = futureStatus.get(); - if (retStatus.getCode() + status = futureStatus.get(); + if (status.getCode() .equals(StatusCode.TIMEOUT)) { // A timeout happened, lets cleanup the workMonitor workMonitor.remove(futureStatus.getOrder()); @@ -561,30 +582,31 @@ public class ForwardingRulesManager implements } catch (ExecutionException e) { log.error("", e); } - return retStatus; } else { // Modify the flow on the network node - Status status = async ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall() - .getFlow(), newEntries.getInstall() - .getFlow()) : programmer.modifyFlow(currentEntries.getNode(), currentEntries.getInstall() - .getFlow(), newEntries.getInstall() - .getFlow()); + status = modifyEntryInHw(currentEntries, newEntries, async); + } - if (!status.isSuccess()) { - log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(), - status.getDescription()); - return status; - } + if (!status.isSuccess()) { + log.trace("{} SDN Plugin failed to program the flow: {}. The failure is: {}", + (futureStatus != null) ? "Remote" : "Local", newEntries.getInstall(), status.getDescription()); + return status; + } - log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall()); + log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall()); - // Update DB - newEntries.setRequestId(status.getRequestId()); - updateLocalDatabase(currentEntries, false); - updateLocalDatabase(newEntries, true); + // Update DB + newEntries.setRequestId(status.getRequestId()); + updateSwViews(currentEntries, false); + updateSwViews(newEntries, true); - return status; - } + return status; + } + + private Status modifyEntryInHw(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) { + return async ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall().getFlow(), + newEntries.getInstall().getFlow()) : programmer.modifyFlow(currentEntries.getNode(), currentEntries + .getInstall().getFlow(), newEntries.getInstall().getFlow()); } /** @@ -602,11 +624,10 @@ public class ForwardingRulesManager implements Status error = new Status(null, null); // Sanity Check - if (flowEntry == null || flowEntry.getNode() == null) { - String msg = "Invalid FlowEntry"; - String logMsg = msg + ": {}"; + if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) { + String logMsg = INVALID_FLOW_ENTRY + ": {}"; log.warn(logMsg, flowEntry); - return new Status(StatusCode.NOTACCEPTABLE, msg); + return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY); } // Derive the container flows merged installed entries @@ -631,7 +652,7 @@ public class ForwardingRulesManager implements if (!ret.isSuccess()) { error = ret; - log.warn("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription()); + log.trace("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription()); if (installedList.size() == 1) { // If we had only one entry to remove, this is fatal failure return error; @@ -654,6 +675,8 @@ public class ForwardingRulesManager implements * This is the function that removes the final container flows merged entry * from the network node and update the database. It expects that all the * validity checks are passed + * This function is supposed to be called only on the controller on which + * the IFRM call is executed. * * @param entry * the flow entry to remove @@ -663,13 +686,12 @@ public class ForwardingRulesManager implements * contain the unique id assigned to this request */ private Status removeEntryInternal(FlowEntryInstall entry, boolean async) { + Status status = new Status(StatusCode.UNDEFINED); FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED); if (futureStatus != null) { - Status retStatus = new Status(StatusCode.UNDEFINED); try { - retStatus = futureStatus.get(); - if (retStatus.getCode() - .equals(StatusCode.TIMEOUT)) { + status = futureStatus.get(); + if (status.getCode().equals(StatusCode.TIMEOUT)) { // A timeout happened, lets cleanup the workMonitor workMonitor.remove(futureStatus.getOrder()); } @@ -678,28 +700,31 @@ public class ForwardingRulesManager implements } catch (ExecutionException e) { log.error("", e); } - return retStatus; } else { // Mark the entry to be deleted (for CC just in case we fail) entry.toBeDeleted(); // Remove from node - Status status = async ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall() - .getFlow()) : programmer.removeFlow(entry.getNode(), entry.getInstall() - .getFlow()); - - if (!status.isSuccess()) { - log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(), - status.getDescription()); - return status; - } - log.trace("Removed {}", entry.getInstall()); - - // Update DB - updateLocalDatabase(entry, false); + status = removeEntryInHw(entry, async); + } + if (!status.isSuccess()) { + log.trace("{} SDN Plugin failed to remove the flow: {}. The failure is: {}", + (futureStatus != null) ? "Remote" : "Local", entry.getInstall(), status.getDescription()); return status; } + + log.trace("Removed {}", entry.getInstall()); + + // Update DB + updateSwViews(entry, false); + + return status; + } + + private Status removeEntryInHw(FlowEntryInstall entry, boolean async) { + return async ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall().getFlow()) : programmer + .removeFlow(entry.getNode(), entry.getInstall().getFlow()); } /** @@ -707,6 +732,8 @@ public class ForwardingRulesManager implements * on the network node and updates the database. It expects that all the * validity and conflict checks are passed. That means it does not check * whether this flow would conflict or overwrite an existing one. + * This function is supposed to be called only on the controller on which + * the IFRM call is executed. * * @param entry * the flow entry to install @@ -715,14 +742,13 @@ public class ForwardingRulesManager implements * @return the status of this request. In case of asynchronous call, it will * contain the unique id assigned to this request */ - private Status addEntriesInternal(FlowEntryInstall entry, boolean async) { + private Status addEntryInternal(FlowEntryInstall entry, boolean async) { + Status status = new Status(StatusCode.UNDEFINED); FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED); if (futureStatus != null) { - Status retStatus = new Status(StatusCode.UNDEFINED); try { - retStatus = futureStatus.get(); - if (retStatus.getCode() - .equals(StatusCode.TIMEOUT)) { + status = futureStatus.get(); + if (status.getCode().equals(StatusCode.TIMEOUT)) { // A timeout happened, lets cleanup the workMonitor workMonitor.remove(futureStatus.getOrder()); } @@ -731,27 +757,29 @@ public class ForwardingRulesManager implements } catch (ExecutionException e) { log.error("", e); } - return retStatus; } else { - // Install the flow on the network node - Status status = async ? programmer.addFlowAsync(entry.getNode(), entry.getInstall() - .getFlow()) : programmer.addFlow(entry.getNode(), entry.getInstall() - .getFlow()); + status = addEntryInHw(entry, async); + } - if (!status.isSuccess()) { - log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(), - status.getDescription()); - return status; - } + if (!status.isSuccess()) { + log.trace("{} SDN Plugin failed to program the flow: {}. The failure is: {}", + (futureStatus != null) ? "Remote" : "Local", entry.getInstall(), status.getDescription()); + return status; + } - log.trace("Added {}", entry.getInstall()); + log.trace("Added {}", entry.getInstall()); - // Update DB - entry.setRequestId(status.getRequestId()); - updateLocalDatabase(entry, true); + // Update DB + entry.setRequestId(status.getRequestId()); + updateSwViews(entry, true); - return status; - } + return status; + } + + private Status addEntryInHw(FlowEntryInstall entry, boolean async) { + // Install the flow on the network node + return async ? programmer.addFlowAsync(entry.getNode(), entry.getInstall().getFlow()) : programmer.addFlow( + entry.getNode(), entry.getInstall().getFlow()); } /** @@ -783,10 +811,17 @@ public class ForwardingRulesManager implements return true; } - private void updateLocalDatabase(FlowEntryInstall entry, boolean add) { - // Update the software view - updateSwViewes(entry, add); + private ConcurrentMap.Entry getStaticFlowEntry(String name, Node node) { + for (ConcurrentMap.Entry flowEntry : staticFlows.entrySet()) { + FlowConfig flowConfig = flowEntry.getValue(); + if (flowConfig.isByNameAndNodeIdEqual(name, node)) { + return flowEntry; + } + } + return null; + } + private void updateIndexDatabase(FlowEntryInstall entry, boolean add) { // Update node indexed flow database updateNodeFlowsDB(entry, add); @@ -797,7 +832,7 @@ public class ForwardingRulesManager implements /* * Update the node mapped flows database */ - private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) { + private void updateSwViews(FlowEntryInstall flowEntries, boolean add) { if (add) { originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal()); installedSwView.put(flowEntries, flowEntries); @@ -823,6 +858,17 @@ public class ForwardingRulesManager implements } if (add) { + // there may be an already existing entry. + // remove it before adding the new one. + // This is necessary since we have observed that in some cases + // Infinispan does aggregation for operations (eg:- remove and then put a different value) + // related to the same key within the same transaction. + // Need this defensive code as the new FlowEntryInstall may be different + // than the old one even though the equals method returns true. This is because + // the equals method does not take into account the action list. + if(nodeIndeces.contains(flowEntries)) { + nodeIndeces.remove(flowEntries); + } nodeIndeces.add(flowEntries); } else { nodeIndeces.remove(flowEntries); @@ -857,6 +903,11 @@ public class ForwardingRulesManager implements } if (add) { + // same comments in the similar code section in + // updateNodeFlowsDB method apply here too + if(indices.contains(flowEntries)) { + indices.remove(flowEntries); + } indices.add(flowEntries); } else { indices.remove(flowEntries); @@ -896,10 +947,10 @@ public class ForwardingRulesManager implements // Update DB if (status.isSuccess()) { - updateLocalDatabase(target, false); + updateSwViews(target, false); } else { // log the error - log.warn("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(), + log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(), status.getDescription()); } @@ -1117,7 +1168,7 @@ public class ForwardingRulesManager implements * merged flow may conflict with an existing old container flows merged flow * on the network node */ - private void updateFlowsContainerFlow() { + protected void updateFlowsContainerFlow() { Set toReInstall = new HashSet(); // First remove all installed entries for (ConcurrentMap.Entry entry : installedSwView.entrySet()) { @@ -1139,8 +1190,6 @@ public class ForwardingRulesManager implements private void nonClusterObjectCreate() { originalSwView = new ConcurrentHashMap(); installedSwView = new ConcurrentHashMap(); - nodeFlows = new ConcurrentHashMap>(); - groupFlows = new ConcurrentHashMap>(); TSPolicies = new ConcurrentHashMap(); staticFlowsOrdinal = new ConcurrentHashMap(); portGroupConfigs = new ConcurrentHashMap(); @@ -1149,11 +1198,6 @@ public class ForwardingRulesManager implements inactiveFlows = new ConcurrentHashMap(); } - private void registerWithOSGIConsole() { - BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext(); - bundleContext.registerService(CommandProvider.class.getName(), this, null); - } - @Override public void setTSPolicyData(String policyname, Object o, boolean add) { @@ -1198,7 +1242,7 @@ public class ForwardingRulesManager implements if (policyName != null && !policyName.trim().isEmpty()) { for (Map.Entry entry : this.originalSwView.entrySet()) { if (policyName.equals(entry.getKey().getGroupName())) { - list.add(entry.getKey().clone()); + list.add(entry.getValue().clone()); } } } @@ -1211,7 +1255,7 @@ public class ForwardingRulesManager implements if (policyName != null && !policyName.trim().isEmpty()) { for (Map.Entry entry : this.installedSwView.entrySet()) { if (policyName.equals(entry.getKey().getGroupName())) { - list.add(entry.getKey().getInstall().clone()); + list.add(entry.getValue().getInstall().clone()); } } } @@ -1230,7 +1274,7 @@ public class ForwardingRulesManager implements } Status error = modifyEntry(currentFlowEntry, newFlowEntry, false); if (error.isSuccess()) { - log.info("Ports {} added to FlowEntry {}", portList, flowName); + log.trace("Ports {} added to FlowEntry {}", portList, flowName); } else { log.warn("Failed to add ports {} to Flow entry {}. The failure is: {}", portList, currentFlowEntry.toString(), error.getDescription()); @@ -1254,7 +1298,7 @@ public class ForwardingRulesManager implements } Status status = modifyEntry(currentFlowEntry, newFlowEntry, false); if (status.isSuccess()) { - log.info("Ports {} removed from FlowEntry {}", portList, flowName); + log.trace("Ports {} removed from FlowEntry {}", portList, flowName); } else { log.warn("Failed to remove ports {} from Flow entry {}. The failure is: {}", portList, currentFlowEntry.toString(), status.getDescription()); @@ -1302,7 +1346,7 @@ public class ForwardingRulesManager implements Status status = modifyEntry(currentFlowEntry, newFlowEntry, false); if (status.isSuccess()) { - log.info("Output port replaced with {} for flow {} on node {}", outPort, flowName, node); + log.trace("Output port replaced with {} for flow {} on node {}", outPort, flowName, node); } else { log.warn("Failed to replace output port for flow {} on node {}. The failure is: {}", flowName, node, status.getDescription()); @@ -1339,27 +1383,18 @@ public class ForwardingRulesManager implements log.debug("Allocating caches for Container {}", container.getName()); try { - clusterContainerService.createCache("frm.originalSwView", + clusterContainerService.createCache(ORIGINAL_SW_VIEW_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.installedSwView", + clusterContainerService.createCache(INSTALLED_SW_VIEW_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.inactiveFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.nodeFlows", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - - clusterContainerService.createCache("frm.groupFlows", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.flowsSaveEvent", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.staticFlowsOrdinal", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); @@ -1372,11 +1407,11 @@ public class ForwardingRulesManager implements clusterContainerService.createCache("frm.TSPolicies", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache(WORKSTATUSCACHE, - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache(WORK_STATUS_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); - clusterContainerService.createCache(WORKORDERCACHE, - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache(WORK_ORDER_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); } catch (CacheConfigException cce) { log.error("CacheConfigException"); @@ -1397,14 +1432,14 @@ public class ForwardingRulesManager implements log.debug("Retrieving Caches for Container {}", container.getName()); - map = clusterContainerService.getCache("frm.originalSwView"); + map = clusterContainerService.getCache(ORIGINAL_SW_VIEW_CACHE); if (map != null) { originalSwView = (ConcurrentMap) map; } else { log.error("Retrieval of frm.originalSwView cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache("frm.installedSwView"); + map = clusterContainerService.getCache(INSTALLED_SW_VIEW_CACHE); if (map != null) { installedSwView = (ConcurrentMap) map; } else { @@ -1418,20 +1453,6 @@ public class ForwardingRulesManager implements log.error("Retrieval of frm.inactiveFlows cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache("frm.nodeFlows"); - if (map != null) { - nodeFlows = (ConcurrentMap>) map; - } else { - log.error("Retrieval of cache failed for Container {}", container.getName()); - } - - map = clusterContainerService.getCache("frm.groupFlows"); - if (map != null) { - groupFlows = (ConcurrentMap>) map; - } else { - log.error("Retrieval of frm.groupFlows cache failed for Container {}", container.getName()); - } - map = clusterContainerService.getCache("frm.staticFlows"); if (map != null) { staticFlows = (ConcurrentMap) map; @@ -1467,18 +1488,18 @@ public class ForwardingRulesManager implements log.error("Retrieval of frm.TSPolicies cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache(WORKORDERCACHE); + map = clusterContainerService.getCache(WORK_ORDER_CACHE); if (map != null) { workOrder = (ConcurrentMap) map; } else { - log.error("Retrieval of " + WORKORDERCACHE + " cache failed for Container {}", container.getName()); + log.error("Retrieval of " + WORK_ORDER_CACHE + " cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache(WORKSTATUSCACHE); + map = clusterContainerService.getCache(WORK_STATUS_CACHE); if (map != null) { workStatus = (ConcurrentMap) map; } else { - log.error("Retrieval of " + WORKSTATUSCACHE + " cache failed for Container {}", container.getName()); + log.error("Retrieval of " + WORK_STATUS_CACHE + " cache failed for Container {}", container.getName()); } } @@ -1524,7 +1545,7 @@ public class ForwardingRulesManager implements boolean multipleFlowPush = false; String error; Status status; - config.setStatus(SUCCESS); + config.setStatus(StatusCode.SUCCESS.toString()); // Presence check if (flowConfigExists(config)) { @@ -1602,7 +1623,7 @@ public class ForwardingRulesManager implements continue; } if (config.getNode().equals(node)) { - if (config.installInHw() && !config.getStatus().equals(SUCCESS)) { + if (config.installInHw() && !config.getStatus().equals(StatusCode.SUCCESS.toString())) { Status status = this.installFlowEntryAsync(config.getFlowEntry()); config.setStatus(status.getDescription()); } @@ -1629,7 +1650,7 @@ public class ForwardingRulesManager implements // Take note of this controller generated static flow toRemove.add(entry.getKey()); } else { - config.setStatus(NODEDOWN); + config.setStatus(NODE_DOWN); } } } @@ -1656,7 +1677,7 @@ public class ForwardingRulesManager implements config.setStatus("Removed from node because in container mode"); break; case REMOVED: - config.setStatus(SUCCESS); + config.setStatus(StatusCode.SUCCESS.toString()); break; default: } @@ -1783,7 +1804,7 @@ public class ForwardingRulesManager implements // Do not attempt to reinstall the flow, warn user if (newFlowConfig.equals(oldFlowConfig)) { String msg = "No modification detected"; - log.info("Static flow modification skipped. New flow and old flow are the same: {}", newFlowConfig); + log.trace("Static flow modification skipped. New flow and old flow are the same: {}", newFlowConfig); return new Status(StatusCode.SUCCESS, msg); } @@ -1843,7 +1864,7 @@ public class ForwardingRulesManager implements .installFlowEntry(target.getFlowEntry()); if (status.isSuccess()) { // Update Configuration database - target.setStatus(SUCCESS); + target.setStatus(StatusCode.SUCCESS.toString()); target.toggleInstallation(); staticFlows.put(key, target); } @@ -1876,14 +1897,16 @@ public class ForwardingRulesManager implements * If requested, a copy of each original flow entry will be stored in the * inactive list so that it can be re-applied when needed (This is typically * the case when running in the default container and controller moved to - * container mode) + * container mode) NOTE WELL: The routine as long as does a bulk change will + * operate only on the entries for nodes locally attached so to avoid + * redundant operations initiated by multiple nodes * * @param preserveFlowEntries * if true, a copy of each original entry is stored in the * inactive list */ private void uninstallAllFlowEntries(boolean preserveFlowEntries) { - log.info("Uninstalling all non-internal flows"); + log.trace("Uninstalling all non-internal flows"); List toRemove = new ArrayList(); @@ -1902,9 +1925,15 @@ public class ForwardingRulesManager implements // Now remove the entries for (FlowEntryInstall flowEntryHw : toRemove) { - Status status = this.removeEntryInternal(flowEntryHw, false); - if (!status.isSuccess()) { - log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription()); + Node n = flowEntryHw.getNode(); + if (n != null && connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) { + Status status = this.removeEntryInternal(flowEntryHw, false); + if (!status.isSuccess()) { + log.trace("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription()); + } + } else { + log.debug("Not removing entry {} because not connected locally, the remote guy will do it's job", + flowEntryHw); } } } @@ -1915,7 +1944,7 @@ public class ForwardingRulesManager implements * default container instance of FRM only when the last container is deleted */ private void reinstallAllFlowEntries() { - log.info("Reinstalling all inactive flows"); + log.trace("Reinstalling all inactive flows"); for (FlowEntry flowEntry : this.inactiveFlows.keySet()) { this.addEntry(flowEntry, false); @@ -1927,30 +1956,14 @@ public class ForwardingRulesManager implements @Override public List getStaticFlows() { - return getStaticFlowsOrderedList(staticFlows, staticFlowsOrdinal.get(0).intValue()); - } - - // TODO: need to come out with a better algorithm for maintaining the order - // of the configuration entries - // with actual one, index associated to deleted entries cannot be reused and - // map grows... - private List getStaticFlowsOrderedList(ConcurrentMap flowMap, int maxKey) { - List orderedList = new ArrayList(); - for (int i = 0; i <= maxKey; i++) { - FlowConfig entry = flowMap.get(i); - if (entry != null) { - orderedList.add(entry); - } - } - return orderedList; + return new ArrayList(staticFlows.values()); } @Override public FlowConfig getStaticFlow(String name, Node node) { - for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { - if (entry.getValue().isByNameAndNodeIdEqual(name, node)) { - return entry.getValue(); - } + ConcurrentMap.Entry entry = getStaticFlowEntry(name, node); + if(entry != null) { + return entry.getValue(); } return null; } @@ -1986,34 +1999,13 @@ public class ForwardingRulesManager implements return new ArrayList(set); } - @SuppressWarnings("unchecked") private void loadFlowConfiguration() { - ObjectReader objReader = new ObjectReader(); - ConcurrentMap confList = (ConcurrentMap) objReader.read(this, - frmFileName); - - ConcurrentMap pgConfig = (ConcurrentMap) objReader.read(this, - portGroupFileName); - - if (pgConfig != null) { - for (ConcurrentMap.Entry entry : pgConfig.entrySet()) { - addPortGroupConfig(entry.getKey(), entry.getValue().getMatchString(), true); - } + for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, PORT_GROUP_FILE_NAME)) { + addPortGroupConfig(((PortGroupConfig) conf).getName(), ((PortGroupConfig) conf).getMatchString(), true); } - if (confList == null) { - return; - } - - int maxKey = 0; - for (Integer key : confList.keySet()) { - if (key.intValue() > maxKey) { - maxKey = key.intValue(); - } - } - - for (FlowConfig conf : getStaticFlowsOrderedList(confList, maxKey)) { - addStaticFlowInternal(conf, true); + for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, STATIC_FLOWS_FILE_NAME)) { + addStaticFlowInternal((FlowConfig) conf, true); } } @@ -2028,43 +2020,42 @@ public class ForwardingRulesManager implements } private Status saveConfigInternal() { - ObjectWriter objWriter = new ObjectWriter(); - ConcurrentMap nonDynamicFlows = new ConcurrentHashMap(); + List nonDynamicFlows = new ArrayList(); + for (Integer ordinal : staticFlows.keySet()) { FlowConfig config = staticFlows.get(ordinal); // Do not save dynamic and controller generated static flows if (config.isDynamic() || config.isInternalFlow()) { continue; } - nonDynamicFlows.put(ordinal, config); + nonDynamicFlows.add(config); } - objWriter.write(nonDynamicFlows, frmFileName); - objWriter.write(new ConcurrentHashMap(portGroupConfigs), portGroupFileName); - return new Status(StatusCode.SUCCESS, null); + + configurationService.persistConfiguration(nonDynamicFlows, STATIC_FLOWS_FILE_NAME); + configurationService.persistConfiguration(new ArrayList(portGroupConfigs.values()), + PORT_GROUP_FILE_NAME); + + return new Status(StatusCode.SUCCESS); } @Override public void subnetNotify(Subnet sub, boolean add) { } - private void installImplicitARPReplyPunt(Node node) { - - if (node == null) { - return; + private boolean programInternalFlow(boolean proactive, FlowConfig fc) { + boolean retVal = true; // program flows unless determined otherwise + if(proactive) { + // if the flow already exists do not program + if(flowConfigExists(fc)) { + retVal = false; + } + } else { + // if the flow does not exist do not program + if(!flowConfigExists(fc)) { + retVal = false; + } } - - List puntAction = new ArrayList(); - puntAction.add(ActionType.CONTROLLER.toString()); - - FlowConfig allowARP = new FlowConfig(); - allowARP.setInstallInHw(true); - allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP Reply" + FlowConfig.INTERNALSTATICFLOWEND); - allowARP.setPriority("500"); - allowARP.setNode(node); - allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); - allowARP.setDstMac(HexEncode.bytesToHexString(switchManager.getControllerMAC())); - allowARP.setActions(puntAction); - addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name + return retVal; } /** @@ -2121,14 +2112,20 @@ public class ForwardingRulesManager implements dropAllConfig.setActions(dropAction); defaultConfigs.add(dropAllConfig); - log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive")); + log.trace("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive")); for (FlowConfig fc : defaultConfigs) { - Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc); - if (status.isSuccess()) { - log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName()); + // check if the frm really needs to act on the notification. + // this is to check against duplicate notifications + if(programInternalFlow(proactive, fc)) { + Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc); + if (status.isSuccess()) { + log.trace("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName()); + } else { + log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), + fc.getName()); + } } else { - log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), - fc.getName()); + log.debug("Got redundant install request for internal flow: {} on node: {}. Request not sent to FRM.", fc.getName(), node); } } return new Status(StatusCode.SUCCESS); @@ -2148,12 +2145,12 @@ public class ForwardingRulesManager implements * @param node */ private void cleanDatabaseForNode(Node node) { - log.info("Cleaning Flow database for Node {}", node); + log.trace("Cleaning Flow database for Node {}", node); if (nodeFlows.containsKey(node)) { List toRemove = new ArrayList(nodeFlows.get(node)); for (FlowEntryInstall entry : toRemove) { - updateLocalDatabase(entry, false); + updateSwViews(entry, false); } } } @@ -2239,12 +2236,12 @@ public class ForwardingRulesManager implements List flowConfigForNode = getStaticFlows(nodeConnector.getNode()); for (FlowConfig flowConfig : flowConfigForNode) { if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) { - if (flowConfig.installInHw() && !flowConfig.getStatus().equals(SUCCESS)) { + if (flowConfig.installInHw() && !flowConfig.getStatus().equals(StatusCode.SUCCESS.toString())) { Status status = this.installFlowEntry(flowConfig.getFlowEntry()); if (!status.isSuccess()) { flowConfig.setStatus(status.getDescription()); } else { - flowConfig.setStatus(SUCCESS); + flowConfig.setStatus(StatusCode.SUCCESS.toString()); } updated = true; } @@ -2277,7 +2274,7 @@ public class ForwardingRulesManager implements if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) { FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode()); if (flowConfig != null) { - flowConfig.setStatus(PORTREMOVED); + flowConfig.setStatus(PORT_REMOVED); updated = true; } } @@ -2325,7 +2322,7 @@ public class ForwardingRulesManager implements @Override public void portGroupChanged(PortGroupConfig config, Map data, boolean add) { - log.info("PortGroup Changed for: {} Data: {}", config, portGroupData); + log.trace("PortGroup Changed for: {} Data: {}", config, portGroupData); Map existingData = portGroupData.get(config); if (existingData != null) { for (Map.Entry entry : data.entrySet()) { @@ -2391,17 +2388,6 @@ public class ForwardingRulesManager implements return true; } - private void usePortGroupConfig(String name) { - PortGroupConfig config = portGroupConfigs.get(name); - if (config == null) { - return; - } - if (portGroupProvider != null) { - Map data = portGroupProvider.getPortGroupData(config); - portGroupData.put(config, data); - } - } - @Override public Map getPortGroupConfigs() { return portGroupConfigs; @@ -2424,6 +2410,16 @@ public class ForwardingRulesManager implements } } + public void setConfigurationContainerService(IConfigurationContainerService service) { + log.trace("Got configuration service set request {}", service); + this.configurationService = service; + } + + public void unsetConfigurationContainerService(IConfigurationContainerService service) { + log.trace("Got configuration service UNset request"); + this.configurationService = null; + } + @Override public PortGroupProvider getPortGroupProvider() { return portGroupProvider; @@ -2474,8 +2470,6 @@ public class ForwardingRulesManager implements * */ void init() { - frmFileName = GlobalConstants.STARTUPHOME.toString() + "frm_staticflows_" + this.getContainerName() + ".conf"; - portGroupFileName = GlobalConstants.STARTUPHOME.toString() + "portgroup_" + this.getContainerName() + ".conf"; inContainerMode = false; @@ -2483,9 +2477,10 @@ public class ForwardingRulesManager implements portGroupProvider.registerPortGroupChange(this); } - cacheStartup(); + nodeFlows = new ConcurrentHashMap>(); + groupFlows = new ConcurrentHashMap>(); - registerWithOSGIConsole(); + cacheStartup(); /* * If we are not the first cluster node to come up, do not initialize @@ -2503,11 +2498,12 @@ public class ForwardingRulesManager implements public void run() { while (!stopping) { try { - FRMEvent event = pendingEvents.take(); + final FRMEvent event = pendingEvents.take(); if (event == null) { log.warn("Dequeued null event"); continue; } + log.trace("Dequeued {} event", event.getClass().getSimpleName()); if (event instanceof NodeUpdateEvent) { NodeUpdateEvent update = (NodeUpdateEvent) event; Node node = update.getNode(); @@ -2528,36 +2524,40 @@ public class ForwardingRulesManager implements /* * Take care of handling the remote Work request */ - WorkOrderEvent work = (WorkOrderEvent) event; - FlowEntryDistributionOrder fe = work.getFe(); - if (fe != null) { - logsync.trace("Executing the workOrder {}", fe); - Status gotStatus = null; - FlowEntryInstall feiCurrent = fe.getEntry(); - FlowEntryInstall feiNew = workOrder.get(fe.getEntry()); - switch (fe.getUpType()) { - case ADDED: - /* - * TODO: Not still sure how to handle the - * sync entries - */ - gotStatus = addEntriesInternal(feiCurrent, true); - break; - case CHANGED: - gotStatus = modifyEntryInternal(feiCurrent, feiNew, true); - break; - case REMOVED: - gotStatus = removeEntryInternal(feiCurrent, true); - break; + Runnable r = new Runnable() { + @Override + public void run() { + WorkOrderEvent work = (WorkOrderEvent) event; + FlowEntryDistributionOrder fe = work.getFe(); + if (fe != null) { + logsync.trace("Executing the workOrder {}", fe); + Status gotStatus = null; + FlowEntryInstall feiCurrent = fe.getEntry(); + FlowEntryInstall feiNew = workOrder.get(fe); + switch (fe.getUpType()) { + case ADDED: + gotStatus = addEntryInHw(feiCurrent, false); + break; + case CHANGED: + gotStatus = modifyEntryInHw(feiCurrent, feiNew, false); + break; + case REMOVED: + gotStatus = removeEntryInHw(feiCurrent, false); + break; + } + // Remove the Order + workOrder.remove(fe); + logsync.trace( + "The workOrder has been executed and now the status is being returned {}", fe); + // Place the status + workStatus.put(fe, gotStatus); + } else { + log.warn("Not expected null WorkOrder", work); + } } - // Remove the Order - workOrder.remove(fe); - logsync.trace( - "The workOrder has been executed and now the status is being returned {}", fe); - // Place the status - workStatus.put(fe, gotStatus); - } else { - log.warn("Not expected null WorkOrder", work); + }; + if(executor != null) { + executor.execute(r); } } else if (event instanceof WorkStatusCleanup) { /* @@ -2579,9 +2579,11 @@ public class ForwardingRulesManager implements * flow merging is not an injective function */ updateFlowsContainerFlow(); + } else if (event instanceof UpdateIndexDBs) { + UpdateIndexDBs update = (UpdateIndexDBs)event; + updateIndexDatabase(update.getFei(), update.isAddition()); } else { - log.warn("Dequeued unknown event {}", event.getClass() - .getSimpleName()); + log.warn("Dequeued unknown event {}", event.getClass().getSimpleName()); } } catch (InterruptedException e) { // clear pending events @@ -2613,22 +2615,40 @@ public class ForwardingRulesManager implements * */ void start() { + /* + * If running in default container, need to know if controller is in + * container mode + */ + if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) { + inContainerMode = containerManager.inContainerMode(); + } + // Initialize graceful stop flag stopping = false; // Allocate the executor service - this.executor = Executors.newSingleThreadExecutor(); + this.executor = Executors.newFixedThreadPool(maxPoolSize); // Start event handler thread frmEventHandler.start(); + // replay the installedSwView data structure to populate + // node flows and group flows + for (FlowEntryInstall fei : installedSwView.values()) { + pendingEvents.offer(new UpdateIndexDBs(fei, true)); + } + /* - * Read startup and build database if we have not already gotten the - * configurations synced from another node + * Read startup and build database if we are the coordinator */ - if (staticFlows.isEmpty()) { - loadFlowConfiguration(); - } + loadFlowConfiguration(); + } + + /** + * Function called by the dependency manager before Container is Stopped and Destroyed. + */ + public void containerStop() { + uninstallAllFlowEntries(false); } /** @@ -2638,9 +2658,14 @@ public class ForwardingRulesManager implements */ void stop() { stopping = true; - uninstallAllFlowEntries(false); // Shutdown executor this.executor.shutdownNow(); + // Now walk all the workMonitor and wake up the one sleeping because + // destruction is happening + for (FlowEntryDistributionOrder fe : workMonitor.keySet()) { + FlowEntryDistributionOrderFutureTask task = workMonitor.get(fe); + task.cancel(true); + } } public void setFlowProgrammerService(IFlowProgrammerService service) { @@ -2854,125 +2879,36 @@ public class ForwardingRulesManager implements } } - /* - * OSGI COMMANDS - */ - @Override - public String getHelp() { - StringBuffer help = new StringBuffer(); - help.append("---FRM Matrix Application---\n"); - help.append("\t printMatrixData - Prints the Matrix Configs\n"); - help.append("\t addMatrixConfig \n"); - help.append("\t delMatrixConfig \n"); - help.append("\t useMatrixConfig \n"); - return help.toString(); - } - - public void _printMatrixData(CommandInterpreter ci) { - ci.println("Configs : "); - ci.println("---------"); - ci.println(portGroupConfigs); - - ci.println("Data : "); - ci.println("------"); - ci.println(portGroupData); - } - - public void _addMatrixConfig(CommandInterpreter ci) { - String name = ci.nextArgument(); - String regex = ci.nextArgument(); - addPortGroupConfig(name, regex, false); - } - - public void _delMatrixConfig(CommandInterpreter ci) { - String name = ci.nextArgument(); - delPortGroupConfig(name); - } + private class UpdateIndexDBs extends FRMEvent { + private FlowEntryInstall fei; + private boolean add; - public void _useMatrixConfig(CommandInterpreter ci) { - String name = ci.nextArgument(); - usePortGroupConfig(name); - } - - public void _arpPunt(CommandInterpreter ci) { - String switchId = ci.nextArgument(); - long swid = HexEncode.stringToLong(switchId); - Node node = NodeCreator.createOFNode(swid); - installImplicitARPReplyPunt(node); - } - - public void _frmaddflow(CommandInterpreter ci) throws UnknownHostException { - Node node = null; - String nodeId = ci.nextArgument(); - if (nodeId == null) { - ci.print("Node id not specified"); - return; - } - try { - node = NodeCreator.createOFNode(Long.valueOf(nodeId)); - } catch (NumberFormatException e) { - ci.print("Node id not a number"); - return; + /** + * + * @param fei the flow entry which was installed/removed on the netwrok node + * @param update + */ + UpdateIndexDBs(FlowEntryInstall fei, boolean add) { + this.fei = fei; + this.add = add; } - ci.println(this.programmer.addFlow(node, getSampleFlow(node))); - } - public void _frmremoveflow(CommandInterpreter ci) throws UnknownHostException { - Node node = null; - String nodeId = ci.nextArgument(); - if (nodeId == null) { - ci.print("Node id not specified"); - return; - } - try { - node = NodeCreator.createOFNode(Long.valueOf(nodeId)); - } catch (NumberFormatException e) { - ci.print("Node id not a number"); - return; + + /** + * @return the flowEntryInstall object which was added/removed + * to/from the installed software view cache + */ + public FlowEntryInstall getFei() { + return fei; } - ci.println(this.programmer.removeFlow(node, getSampleFlow(node))); - } - - private Flow getSampleFlow(Node node) throws UnknownHostException { - NodeConnector port = NodeConnectorCreator.createOFNodeConnector((short) 24, node); - NodeConnector oport = NodeConnectorCreator.createOFNodeConnector((short) 30, node); - byte srcMac[] = { (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78, (byte) 0x9a, (byte) 0xbc }; - byte dstMac[] = { (byte) 0x1a, (byte) 0x2b, (byte) 0x3c, (byte) 0x4d, (byte) 0x5e, (byte) 0x6f }; - InetAddress srcIP = InetAddress.getByName("172.28.30.50"); - InetAddress dstIP = InetAddress.getByName("171.71.9.52"); - InetAddress ipMask = InetAddress.getByName("255.255.255.0"); - InetAddress ipMask2 = InetAddress.getByName("255.0.0.0"); - short ethertype = EtherTypes.IPv4.shortValue(); - short vlan = (short) 27; - byte vlanPr = 3; - Byte tos = 4; - byte proto = IPProtocols.TCP.byteValue(); - short src = (short) 55000; - short dst = 80; - /* - * Create a SAL Flow aFlow + /** + * + * @return whether this was an flow addition or removal */ - Match match = new Match(); - match.setField(MatchType.IN_PORT, port); - match.setField(MatchType.DL_SRC, srcMac); - match.setField(MatchType.DL_DST, dstMac); - match.setField(MatchType.DL_TYPE, ethertype); - match.setField(MatchType.DL_VLAN, vlan); - match.setField(MatchType.DL_VLAN_PR, vlanPr); - match.setField(MatchType.NW_SRC, srcIP, ipMask); - match.setField(MatchType.NW_DST, dstIP, ipMask2); - match.setField(MatchType.NW_TOS, tos); - match.setField(MatchType.NW_PROTO, proto); - match.setField(MatchType.TP_SRC, src); - match.setField(MatchType.TP_DST, dst); - - List actions = new ArrayList(); - actions.add(new Output(oport)); - actions.add(new PopVlan()); - actions.add(new Flood()); - actions.add(new Controller()); - return new Flow(match, actions); + public boolean isAddition() { + return add; + } } @Override @@ -2980,58 +2916,6 @@ public class ForwardingRulesManager implements return saveConfig(); } - public void _frmNodeFlows(CommandInterpreter ci) { - String nodeId = ci.nextArgument(); - Node node = Node.fromString(nodeId); - if (node == null) { - ci.println("frmNodeFlows [verbose]"); - return; - } - boolean verbose = false; - String verboseCheck = ci.nextArgument(); - if (verboseCheck != null) { - verbose = verboseCheck.equals("true"); - } - - if (!nodeFlows.containsKey(node)) { - return; - } - // Dump per node database - for (FlowEntryInstall entry : nodeFlows.get(node)) { - if (!verbose) { - ci.println(node + " " + installedSwView.get(entry).getFlowName()); - } else { - ci.println(node + " " + installedSwView.get(entry).toString()); - } - } - } - - public void _frmGroupFlows(CommandInterpreter ci) { - String group = ci.nextArgument(); - if (group == null) { - ci.println("frmGroupFlows [verbose]"); - return; - } - boolean verbose = false; - String verboseCheck = ci.nextArgument(); - if (verboseCheck != null) { - verbose = verboseCheck.equalsIgnoreCase("true"); - } - - if (!groupFlows.containsKey(group)) { - return; - } - // Dump per node database - ci.println("Group " + group + ":\n"); - for (FlowEntryInstall flowEntry : groupFlows.get(group)) { - if (!verbose) { - ci.println(flowEntry.getNode() + " " + flowEntry.getFlowName()); - } else { - ci.println(flowEntry.getNode() + " " + flowEntry.toString()); - } - } - } - @Override public void flowRemoved(Node node, Flow flow) { log.trace("Received flow removed notification on {} for {}", node, flow); @@ -3057,13 +2941,21 @@ public class ForwardingRulesManager implements } if (target != null) { // Update Configuration database - target.toggleInstallation(); - target.setStatus(SUCCESS); + if (target.getHardTimeout() != null || target.getIdleTimeout() != null) { + /* + * No need for checking if actual values: these strings were + * validated at configuration creation. Also, after a switch + * down scenario, no use to reinstall a timed flow. Mark it as + * "do not install". User can manually toggle it. + */ + target.toggleInstallation(); + } + target.setStatus(StatusCode.GONE.toString()); staticFlows.put(key, target); } // Update software views - this.updateLocalDatabase(installedEntry, false); + this.updateSwViews(installedEntry, false); } @Override @@ -3100,7 +2992,20 @@ public class ForwardingRulesManager implements } if (target != null) { // This was a flow install, update database - this.updateLocalDatabase(target, false); + this.updateSwViews(target, false); + // also update the config + if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) { + ConcurrentMap.Entry staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode()); + // staticFlowEntry should never be null. + // the null check is just an extra defensive check. + if(staticFlowEntry != null) { + // Modify status and update cluster cache + log.debug("Updating static flow configuration on async error event"); + String status = String.format("Cannot be installed on node. reason: %s", errorString); + staticFlowEntry.getValue().setStatus(status); + refreshClusterStaticFlowsStatus(node); + } + } } // Notify listeners @@ -3142,6 +3047,16 @@ public class ForwardingRulesManager implements this.connectionManager = s; } + public void unsetIContainerManager(IContainerManager s) { + if (s == this.containerManager) { + this.containerManager = null; + } + } + + public void setIContainerManager(IContainerManager s) { + this.containerManager = s; + } + @Override public void entryCreated(Object key, String cacheName, boolean originLocal) { /* @@ -3151,13 +3066,20 @@ public class ForwardingRulesManager implements @Override public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) { + /* + * Streamline the updates for the per node and per group index databases + */ + if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) { + pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)new_value, true)); + } + if (originLocal) { /* * Local updates are of no interest */ return; } - if (cacheName.equals(WORKORDERCACHE)) { + if (cacheName.equals(WORK_ORDER_CACHE)) { logsync.trace("Got a WorkOrderCacheUpdate for {}", key); /* * This is the case of one workOrder becoming available, so we need @@ -3175,7 +3097,7 @@ public class ForwardingRulesManager implements // processing pendingEvents.offer(new WorkOrderEvent(fe, (FlowEntryInstall) new_value)); } - } else if (cacheName.equals(WORKSTATUSCACHE)) { + } else if (cacheName.equals(WORK_STATUS_CACHE)) { logsync.trace("Got a WorkStatusCacheUpdate for {}", key); /* * This is the case of one workOrder being completed and a status @@ -3202,7 +3124,43 @@ public class ForwardingRulesManager implements @Override public void entryDeleted(Object key, String cacheName, boolean originLocal) { /* - * Do nothing + * Streamline the updates for the per node and per group index databases */ + if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) { + pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, false)); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List getFlowEntriesForNode(Node node) { + List list = new ArrayList(); + if (node != null) { + for (Map.Entry entry : this.originalSwView.entrySet()) { + if (node.equals(entry.getKey().getNode())) { + list.add(entry.getValue().clone()); + } + } + } + return list; + } + + /** + * {@inheritDoc} + */ + @Override + public List getInstalledFlowEntriesForNode(Node node) { + List list = new ArrayList(); + if (node != null) { + List flowEntryInstallList = this.nodeFlows.get(node); + if(flowEntryInstallList != null) { + for(FlowEntryInstall fi: flowEntryInstallList) { + list.add(fi.getInstall().clone()); + } + } + } + return list; } }