X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager%2Finternal%2FForwardingRulesManagerImpl.java;h=2c5144f284b908f46d2a114da4acf2f2c3a14a58;hp=b82ab6f9a850b43d0c41b6313538d96268b86e6e;hb=a84e3c483f492477ace9f6da6908754cb257c225;hpb=c62c3615c0812460a8880f7ff0a1d3f6be548952 diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java index b82ab6f9a8..2c5144f284 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java @@ -19,13 +19,14 @@ import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; import org.eclipse.osgi.framework.console.CommandInterpreter; import org.eclipse.osgi.framework.console.CommandProvider; @@ -88,15 +89,13 @@ import org.slf4j.LoggerFactory; * the network. It also maintains the central repository of all the forwarding * rules installed on the network nodes. */ -public class ForwardingRulesManagerImpl implements IForwardingRulesManager, - PortGroupChangeListener, IContainerListener, ISwitchManagerAware, - IConfigurationContainerAware, IInventoryListener, IObjectReader, - ICacheUpdateAware, CommandProvider, - IFlowProgrammerListener { +public class ForwardingRulesManagerImpl implements IForwardingRulesManager, PortGroupChangeListener, + IContainerListener, ISwitchManagerAware, IConfigurationContainerAware, IInventoryListener, IObjectReader, + ICacheUpdateAware, CommandProvider, IFlowProgrammerListener { private static final String SAVE = "Save"; private static final String NODEDOWN = "Node is Down"; - private static final Logger log = LoggerFactory - .getLogger(ForwardingRulesManagerImpl.class); + private static final String SUCCESS = StatusCode.SUCCESS.toString(); + private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManagerImpl.class); private Map flowsSaveEvent; private String frmFileName; private String portGroupFileName; @@ -105,17 +104,26 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, private ConcurrentMap portGroupConfigs; private ConcurrentMap> portGroupData; private ConcurrentMap TSPolicies; - private boolean inContainerMode; // being used by default instance only + private boolean inContainerMode; // being used by global instance only + private boolean stopping; + + /* + * Flow database. It's the software view of what was requested to install + * and what is installed on the switch. It is indexed by the entry itself. + * The entry's hashcode resumes the network node index, the flow's priority + * and the flow's match. The value element is a class which contains the + * flow entry pushed by the applications modules and the respective + * container flow merged version. In absence of container flows, the two + * flow entries are the same. + */ + private ConcurrentMap originalSwView; + private ConcurrentMap installedSwView; /* - * Flow database. It's the software view of what was installed on the - * switch. It is indexed by node. For convenience a version indexed by group - * name is also maintained. The core element is a class which contains the - * flow entry pushed by the functional modules and the respective container - * flow merged version. In absence of container flows, the two flow entries - * are the same. + * Per node and per group indexing */ - private ConcurrentMap> nodeFlows; - private ConcurrentMap> groupFlows; + private ConcurrentMap> nodeFlows; + private ConcurrentMap> groupFlows; + /* * Inactive flow list. This is for the global instance of FRM It will * contain all the flow entries which were installed on the global container @@ -130,17 +138,23 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, private IFlowProgrammerService programmer; private IClusterContainerServices clusterContainerService = null; private ISwitchManager switchManager; + private Thread frmEventHandler; + protected BlockingQueue pendingEvents; /** * Adds a flow entry onto the network node It runs various validity checks * and derive the final container flows merged entries that will be * attempted to be installed - * + * * @param flowEntry * the original flow entry application requested to add - * @return + * @param async + * the flag indicating if this is a asynchronous request + * @return the status of this request. In case of asynchronous call, it will + * contain the unique id assigned to this request */ - private Status addEntry(FlowEntry flowEntry) { + private Status addEntry(FlowEntry flowEntry, boolean async) { + // Sanity Check if (flowEntry == null || flowEntry.getNode() == null) { String msg = "Invalid FlowEntry"; @@ -153,8 +167,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * Derive the container flow merged entries to install In presence of N * container flows, we may end up with N different entries to install... */ - List toInstallList = deriveInstallEntries( - flowEntry.clone(), container.getContainerFlows()); + List toInstallList = deriveInstallEntries(flowEntry.clone(), container.getContainerFlows()); // Container Flow conflict Check if (toInstallList.isEmpty()) { @@ -169,9 +182,8 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, for (FlowEntryInstall entry : toInstallList) { // Conflict Check: Verify new entry would not overwrite existing // ones - if (findMatch(entry.getInstall(), false) != null) { - log.warn("Operation Rejected: A flow with same match " - + "and priority exists on the target node"); + if (this.installedSwView.containsKey(entry)) { + log.warn("Operation Rejected: A flow with same match and priority exists on the target node"); log.trace("Aborting to install {}", entry); continue; } @@ -181,8 +193,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, // Declare failure if all the container flow merged entries clash with // existing entries if (toInstallSafe.size() == 0) { - String msg = "A flow with same match and priority exists " - + "on the target node"; + String msg = "A flow with same match and priority exists on the target node"; String logMsg = msg + ": {}"; log.warn(logMsg, flowEntry); return new Status(StatusCode.CONFLICT, msg); @@ -190,22 +201,29 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, // Try to install an entry at the time Status error = new Status(null, null); + Status succeded = null; boolean oneSucceded = false; - for (FlowEntryInstall installEntry : toInstallList) { + for (FlowEntryInstall installEntry : toInstallSafe) { // Install and update database - Status ret = addEntriesInternal(installEntry); + Status ret = addEntriesInternal(installEntry, async); if (ret.isSuccess()) { oneSucceded = true; + /* + * The first successful status response will be returned For the + * asynchronous call, we can discard the container flow + * complication for now and assume we will always deal with one + * flow only per request + */ + succeded = ret; } else { error = ret; - log.warn("Failed to install the entry: {}. The failure is: {}", - installEntry, ret.getDescription()); + log.warn("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription()); } } - return (oneSucceded) ? new Status(StatusCode.SUCCESS, null) : error; + return (oneSucceded) ? succeded : error; } /** @@ -216,19 +234,16 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * flow entry is congruent with all the N container flows, then the output * install entry list will contain N entries. If the output list is empty, * it means the passed flow entry conflicts with all the container flows. - * + * * @param cFlowList * The list of container flows * @return the list of container flow merged entries good to be installed on * this container */ - private List deriveInstallEntries(FlowEntry request, - List cFlowList) { - List toInstallList = new ArrayList( - 1); + private List deriveInstallEntries(FlowEntry request, List cFlowList) { + List toInstallList = new ArrayList(1); - if (container.getContainerFlows() == null - || container.getContainerFlows().isEmpty()) { + if (container.getContainerFlows() == null || container.getContainerFlows().isEmpty()) { // No container flows => entry good to be installed unchanged toInstallList.add(new FlowEntryInstall(request.clone(), null)); } else { @@ -237,8 +252,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, // created for (ContainerFlow cFlow : container.getContainerFlows()) { if (cFlow.allowsFlow(request.getFlow())) { - toInstallList.add(new FlowEntryInstall(request.clone(), - cFlow)); + toInstallList.add(new FlowEntryInstall(request.clone(), cFlow)); } } } @@ -248,26 +262,27 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, /** * Modify a flow entry with a new one It runs various validity check and * derive the final container flows merged flow entries to work with - * + * * @param currentFlowEntry * @param newFlowEntry - * @return Success or error string + * @param async + * the flag indicating if this is a asynchronous request + * @return the status of this request. In case of asynchronous call, it will + * contain the unique id assigned to this request */ - private Status modifyEntry(FlowEntry currentFlowEntry, - FlowEntry newFlowEntry) { + private Status modifyEntry(FlowEntry currentFlowEntry, FlowEntry newFlowEntry, boolean async) { Status retExt; // Sanity checks - if (currentFlowEntry == null || currentFlowEntry.getNode() == null - || newFlowEntry == null || newFlowEntry.getNode() == null) { + if (currentFlowEntry == null || currentFlowEntry.getNode() == null || newFlowEntry == null + || newFlowEntry.getNode() == null) { String msg = "Modify: Invalid FlowEntry"; String logMsg = msg + ": {} or {}"; log.warn(logMsg, currentFlowEntry, newFlowEntry); return new Status(StatusCode.NOTACCEPTABLE, msg); } if (!currentFlowEntry.getNode().equals(newFlowEntry.getNode()) - || !currentFlowEntry.getFlowName().equals( - newFlowEntry.getFlowName())) { + || !currentFlowEntry.getFlowName().equals(newFlowEntry.getFlowName())) { String msg = "Modify: Incompatible Flow Entries"; String logMsg = msg + ": {} and {}"; log.warn(logMsg, currentFlowEntry, newFlowEntry); @@ -275,38 +290,34 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } // Equality Check - if (currentFlowEntry.equals(newFlowEntry)) { + if (currentFlowEntry.getFlow().equals(newFlowEntry.getFlow())) { String msg = "Modify skipped as flows are the same"; String logMsg = msg + ": {} and {}"; log.debug(logMsg, currentFlowEntry, newFlowEntry); return new Status(StatusCode.SUCCESS, msg); } - // Conflict Check: Verify the new entry would not conflict with another - // existing one - // This is a loose check on the previous original flow entry requests. - // No check - // on the container flow merged flow entries (if any) yet - FlowEntryInstall sameMatchOriginalEntry = findMatch(newFlowEntry, true); - if (sameMatchOriginalEntry != null - && !sameMatchOriginalEntry.getOriginal().equals( - currentFlowEntry)) { - String msg = "Operation Rejected: Another flow with same match " - + "and priority exists on the target node"; + /* + * Conflict Check: Verify the new entry would not conflict with an + * existing one. This is a loose check on the previous original flow + * entry requests. No check on the container flow merged flow entries + * (if any) yet + */ + FlowEntry sameMatchOriginalEntry = originalSwView.get(newFlowEntry); + if (sameMatchOriginalEntry != null && !sameMatchOriginalEntry.equals(currentFlowEntry)) { + String msg = "Operation Rejected: Another flow with same match and priority exists on the target node"; String logMsg = msg + ": {}"; log.warn(logMsg, currentFlowEntry); return new Status(StatusCode.CONFLICT, msg); } // Derive the installed and toInstall entries - List installedList = deriveInstallEntries( - currentFlowEntry.clone(), container.getContainerFlows()); - List toInstallList = deriveInstallEntries( - newFlowEntry.clone(), container.getContainerFlows()); + List installedList = deriveInstallEntries(currentFlowEntry.clone(), + container.getContainerFlows()); + List toInstallList = deriveInstallEntries(newFlowEntry.clone(), container.getContainerFlows()); if (toInstallList.isEmpty()) { - String msg = "Modify Operation Rejected: The new entry " - + "conflicts with all the container flows"; + String msg = "Modify Operation Rejected: The new entry conflicts with all the container flows"; String logMsg = msg + ": {}"; log.warn(logMsg, newFlowEntry); log.warn(msg); @@ -319,16 +330,17 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * This is only possible when the new entry and current entry have * different match. In this scenario the modification would ultimately * be handled as a remove and add operations in the protocol plugin. - * + * * Also, if any of the new flow entries would clash with an existing * one, we cannot proceed with the modify operation, because it would * fail for some entries and leave stale entries on the network node. * Modify path can be taken only if it can be performed completely, for * all entries. - * + * * So, for the above two cases, to simplify, let's decouple the modify * in: 1) remove current entries 2) install new entries */ + Status succeeded = null; boolean decouple = false; if (installedList.size() != toInstallList.size()) { log.info("Modify: New flow entry does not satisfy the same " @@ -337,14 +349,13 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } List toInstallSafe = new ArrayList(); for (FlowEntryInstall installEntry : toInstallList) { - // Conflict Check: Verify the new entry would not overwrite another - // existing one - FlowEntryInstall sameMatchEntry = findMatch( - installEntry.getInstall(), false); - if (sameMatchEntry != null - && !sameMatchEntry.getOriginal().equals(currentFlowEntry)) { - log.info("Modify: new container flow merged flow entry " - + "clashes with existing flow"); + /* + * Conflict Check: Verify the new entry would not overwrite another + * existing one + */ + FlowEntryInstall sameMatchEntry = installedSwView.get(installEntry); + if (sameMatchEntry != null && !sameMatchEntry.getOriginal().equals(currentFlowEntry)) { + log.info("Modify: new container flow merged flow entry clashes with existing flow"); decouple = true; } else { toInstallSafe.add(installEntry); @@ -354,11 +365,11 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, if (decouple) { // Remove current entries for (FlowEntryInstall currEntry : installedList) { - this.removeEntryInternal(currEntry); + this.removeEntryInternal(currEntry, async); } // Install new entries for (FlowEntryInstall newEntry : toInstallSafe) { - this.addEntriesInternal(newEntry); + succeeded = this.addEntriesInternal(newEntry, async); } } else { /* @@ -367,18 +378,17 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * (and might be wrong) that the same container flows that were * satisfied by the current entries are the same that are satisfied * by the new entries. Let's take the risk for now. - * + * * Note: modification has to be complete. If any entry modification * fails, we need to stop, restore the already modified entries, and * declare failure. */ - Status retModify; + Status retModify = null; int i = 0; int size = toInstallList.size(); while (i < size) { // Modify and update database - retModify = modifyEntryInternal(installedList.get(i), - toInstallList.get(i)); + retModify = modifyEntryInternal(installedList.get(i), toInstallList.get(i), async); if (retModify.isSuccess()) { i++; } else { @@ -387,14 +397,12 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } // Check if uncompleted modify if (i < size) { - log.warn("Unable to perform a complete modify for all " - + "the container flows merged entries"); + log.warn("Unable to perform a complete modify for all the container flows merged entries"); // Restore original entries int j = 0; while (j < i) { log.info("Attempting to restore initial entries"); - retExt = modifyEntryInternal(toInstallList.get(i), - installedList.get(i)); + retExt = modifyEntryInternal(toInstallList.get(i), installedList.get(i), async); if (retExt.isSuccess()) { j++; } else { @@ -408,37 +416,44 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, return new Status(StatusCode.INTERNALERROR, msg); } } + succeeded = retModify; } - return new Status(StatusCode.SUCCESS, null); + /* + * The first successful status response will be returned. For the + * asynchronous call, we can discard the container flow complication for + * now and assume we will always deal with one flow only per request + */ + return succeeded; } /** * This is the function that modifies the final container flows merged * entries on the network node and update the database. It expects that all * the validity checks are passed - * + * * @param currentEntries * @param newEntries - * @return + * @param async + * the flag indicating if this is a asynchronous request + * @return the status of this request. In case of asynchronous call, it will + * contain the unique id assigned to this request */ - private Status modifyEntryInternal(FlowEntryInstall currentEntries, - FlowEntryInstall newEntries) { + private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) { // Modify the flow on the network node - Status status = programmer.modifyFlow(currentEntries.getNode(), - currentEntries.getInstall().getFlow(), newEntries.getInstall() - .getFlow()); + Status status = (async) ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall() + .getFlow(), newEntries.getInstall().getFlow()) : programmer.modifyFlow(currentEntries.getNode(), + currentEntries.getInstall().getFlow(), newEntries.getInstall().getFlow()); if (!status.isSuccess()) { - log.warn( - "SDN Plugin failed to program the flow: {}. The failure is: {}", - newEntries.getInstall(), status.getDescription()); + log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(), + status.getDescription()); return status; } - log.trace("Modified {} => {}", currentEntries.getInstall(), - newEntries.getInstall()); + log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall()); // Update DB + newEntries.setRequestId(status.getRequestId()); updateLocalDatabase(currentEntries, false); updateLocalDatabase(newEntries, true); @@ -448,11 +463,15 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, /** * Remove a flow entry. If the entry is not present in the software view * (entry or node not present), it return successfully - * + * * @param flowEntry - * @return + * the flow entry to remove + * @param async + * the flag indicating if this is a asynchronous request + * @return the status of this request. In case of asynchronous call, it will + * contain the unique id assigned to this request */ - private Status removeEntry(FlowEntry flowEntry) { + private Status removeEntry(FlowEntry flowEntry, boolean async) { Status error = new Status(null, null); // Sanity Check @@ -464,41 +483,34 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } // Derive the container flows merged installed entries - List installedList = deriveInstallEntries( - flowEntry.clone(), container.getContainerFlows()); + List installedList = deriveInstallEntries(flowEntry.clone(), container.getContainerFlows()); - Set flowsOnNode = nodeFlows.get(flowEntry.getNode()); + Status succeeded = null; boolean atLeastOneRemoved = false; for (FlowEntryInstall entry : installedList) { - if (flowsOnNode == null) { - String msg = "Removal skipped (Node down) for flow entry"; - String logMsg = msg + ": {}"; - log.debug(logMsg, flowEntry); - return new Status(StatusCode.SUCCESS, msg); - } - if (!flowsOnNode.contains(entry)) { + if (!installedSwView.containsKey(entry)) { String logMsg = "Removal skipped (not present in software view) for flow entry: {}"; log.debug(logMsg, flowEntry); if (installedList.size() == 1) { // If we had only one entry to remove, we are done - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } else { continue; } } // Remove and update DB - Status ret = removeEntryInternal(entry); + Status ret = removeEntryInternal(entry, async); if (!ret.isSuccess()) { error = ret; - log.warn("Failed to remove the entry: {}. The failure is: {}", - entry.getInstall(), ret.getDescription()); + log.warn("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription()); if (installedList.size() == 1) { // If we had only one entry to remove, this is fatal failure return error; } } else { + succeeded = ret; atLeastOneRemoved = true; } } @@ -508,34 +520,35 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * of removing the stale entries later, or adjusting the software * database if not in sync with hardware */ - return (atLeastOneRemoved) ? new Status(StatusCode.SUCCESS, null) - : error; + return (atLeastOneRemoved) ? succeeded : error; } /** * This is the function that removes the final container flows merged entry * from the network node and update the database. It expects that all the * validity checks are passed - * + * * @param entry - * the FlowEntryInstall - * @return "Success" or error string + * the flow entry to remove + * @param async + * the flag indicating if this is a asynchronous request + * @return the status of this request. In case of asynchronous call, it will + * contain the unique id assigned to this request */ - private Status removeEntryInternal(FlowEntryInstall entry) { + private Status removeEntryInternal(FlowEntryInstall entry, boolean async) { // Mark the entry to be deleted (for CC just in case we fail) entry.toBeDeleted(); // Remove from node - Status status = programmer.removeFlow(entry.getNode(), entry - .getInstall().getFlow()); + Status status = (async) ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall().getFlow()) + : programmer.removeFlow(entry.getNode(), entry.getInstall().getFlow()); if (!status.isSuccess()) { - log.warn( - "SDN Plugin failed to program the flow: {}. The failure is: {}", - entry.getInstall(), status.getDescription()); + log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(), + status.getDescription()); return status; } - log.info("Removed {}", entry.getInstall()); + log.trace("Removed {}", entry.getInstall()); // Update DB updateLocalDatabase(entry, false); @@ -548,26 +561,29 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * on the network node and updates the database. It expects that all the * validity and conflict checks are passed. That means it does not check * whether this flow would conflict or overwrite an existing one. - * + * * @param entry - * the FlowEntryInstall - * @return "Success" or error string + * the flow entry to install + * @param async + * the flag indicating if this is a asynchronous request + * @return the status of this request. In case of asynchronous call, it will + * contain the unique id assigned to this request */ - private Status addEntriesInternal(FlowEntryInstall entry) { + private Status addEntriesInternal(FlowEntryInstall entry, boolean async) { // Install the flow on the network node - Status status = programmer.addFlow(entry.getNode(), entry.getInstall() - .getFlow()); + Status status = (async) ? programmer.addFlowAsync(entry.getNode(), entry.getInstall().getFlow()) : programmer + .addFlow(entry.getNode(), entry.getInstall().getFlow()); if (!status.isSuccess()) { - log.warn( - "SDN Plugin failed to program the flow: {}. The failure is: {}", - entry.getInstall(), status.getDescription()); + log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(), + status.getDescription()); return status; } - log.info("Added {}", entry.getInstall()); + log.trace("Added {}", entry.getInstall()); // Update DB + entry.setRequestId(status.getRequestId()); updateLocalDatabase(entry, true); return status; @@ -578,7 +594,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * means that if the function returns true, the passed flow entry is * congruent with at least one container flow, hence it is good to be * installed on this container. - * + * * @param flowEntry * @return true if flow conflicts with all the container flows, false * otherwise @@ -602,8 +618,10 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, return true; } - private synchronized void updateLocalDatabase(FlowEntryInstall entry, - boolean add) { + private void updateLocalDatabase(FlowEntryInstall entry, boolean add) { + // Update the software view + updateSwViewes(entry, add); + // Update node indexed flow database updateNodeFlowsDB(entry, add); @@ -611,31 +629,45 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, updateGroupFlowsDB(entry, add); } + /* + * Update the node mapped flows database + */ + private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) { + if (add) { + originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal()); + installedSwView.put(flowEntries, flowEntries); + } else { + originalSwView.remove(flowEntries.getOriginal()); + installedSwView.remove(flowEntries); + } + } + /* * Update the node mapped flows database */ private void updateNodeFlowsDB(FlowEntryInstall flowEntries, boolean add) { Node node = flowEntries.getNode(); - Set flowEntrylist = this.nodeFlows.get(node); - if (flowEntrylist == null) { - if (add == false) { + List nodeIndeces = this.nodeFlows.get(node); + if (nodeIndeces == null) { + if (!add) { return; } else { - flowEntrylist = new HashSet(); + nodeIndeces = new ArrayList(); } } - if (add == true) { - flowEntrylist.add(flowEntries); + if (add) { + nodeIndeces.add(flowEntries); } else { - flowEntrylist.remove(flowEntries); + nodeIndeces.remove(flowEntries); } - if (flowEntrylist.isEmpty()) { + // Update cache across cluster + if (nodeIndeces.isEmpty()) { this.nodeFlows.remove(node); } else { - this.nodeFlows.put(node, flowEntrylist); + this.nodeFlows.put(node, nodeIndeces); } } @@ -643,53 +675,33 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * Update the group name mapped flows database */ private void updateGroupFlowsDB(FlowEntryInstall flowEntries, boolean add) { - Set flowList; - FlowEntryInstall exists = null; - String flowName = flowEntries.getFlowName(); String groupName = flowEntries.getGroupName(); - if (this.groupFlows == null) { - return; - } - // Flow may not be part of a group if (groupName == null) { return; } - if (this.groupFlows.containsKey(groupName)) { - flowList = this.groupFlows.get(groupName); - } else { - if (add == false) { + List indices = this.groupFlows.get(groupName); + if (indices == null) { + if (!add) { return; } else { - flowList = new HashSet(); - } - } - - for (FlowEntryInstall flow : flowList) { - if (flow.equalsByNodeAndName(flowEntries.getNode(), flowName)) { - exists = flow; - break; + indices = new ArrayList(); } } - if (exists == null && add == false) { - return; - } - - if (exists != null) { - flowList.remove(exists); - } - - if (add == true) { - flowList.add(flowEntries); + if (add) { + indices.add(flowEntries); + } else { + indices.remove(flowEntries); } - if (flowList.isEmpty()) { + // Update cache across cluster + if (indices.isEmpty()) { this.groupFlows.remove(groupName); } else { - this.groupFlows.put(groupName, flowList); + this.groupFlows.put(groupName, indices); } } @@ -702,7 +714,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, FlowEntryInstall target = null; // Find in database - for (FlowEntryInstall entry : this.nodeFlows.get(node)) { + for (FlowEntryInstall entry : installedSwView.values()) { if (entry.equalsByNodeAndName(node, flowName)) { target = entry; break; @@ -715,17 +727,15 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } // Remove from node - Status status = programmer.removeFlow(target.getNode(), target - .getInstall().getFlow()); + Status status = programmer.removeFlow(target.getNode(), target.getInstall().getFlow()); // Update DB if (status.isSuccess()) { updateLocalDatabase(target, false); } else { // log the error - log.warn( - "SDN Plugin failed to remove the flow: {}. The failure is: {}", - target.getInstall(), status.getDescription()); + log.warn("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(), + status.getDescription()); } return status; @@ -740,7 +750,20 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, status = new Status(StatusCode.NOTACCEPTABLE, msg); log.warn(logMsg, flowEntry); } else { - status = addEntry(flowEntry); + status = addEntry(flowEntry, false); + } + return status; + } + + @Override + public Status installFlowEntryAsync(FlowEntry flowEntry) { + Status status; + if (inContainerMode) { + String msg = "Controller in container mode: Install Refused"; + status = new Status(StatusCode.NOTACCEPTABLE, msg); + log.warn(msg); + } else { + status = addEntry(flowEntry, true); } return status; } @@ -754,14 +777,26 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, status = new Status(StatusCode.NOTACCEPTABLE, msg); log.warn(logMsg, entry); } else { - status = removeEntry(entry); + status = removeEntry(entry, false); + } + return status; + } + + @Override + public Status uninstallFlowEntryAsync(FlowEntry flowEntry) { + Status status; + if (inContainerMode) { + String msg = "Controller in container mode: Uninstall Refused"; + status = new Status(StatusCode.NOTACCEPTABLE, msg); + log.warn(msg); + } else { + status = removeEntry(flowEntry, true); } return status; } @Override - public Status modifyFlowEntry(FlowEntry currentFlowEntry, - FlowEntry newFlowEntry) { + public Status modifyFlowEntry(FlowEntry currentFlowEntry, FlowEntry newFlowEntry) { Status status = null; if (inContainerMode) { String msg = "Controller in container mode: Modify Refused"; @@ -769,7 +804,20 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, status = new Status(StatusCode.NOTACCEPTABLE, msg); log.warn(logMsg, newFlowEntry); } else { - status = modifyEntry(currentFlowEntry, newFlowEntry); + status = modifyEntry(currentFlowEntry, newFlowEntry, false); + } + return status; + } + + @Override + public Status modifyFlowEntryAsync(FlowEntry current, FlowEntry newone) { + Status status = null; + if (inContainerMode) { + String msg = "Controller in container mode: Modify Refused"; + status = new Status(StatusCode.NOTACCEPTABLE, msg); + log.warn(msg); + } else { + status = modifyEntry(current, newone, true); } return status; } @@ -777,57 +825,98 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public Status modifyOrAddFlowEntry(FlowEntry newFlowEntry) { /* - * Run a loose check on the installed entries to decide whether to go - * with a add or modify method. A loose check means only check against - * the original flow entry requests and not against the installed flow + * Run a check on the original entries to decide whether to go with a + * add or modify method. A loose check means only check against the + * original flow entry requests and not against the installed flow * entries which are the result of the original entry merged with the * container flow(s) (if any). The modifyFlowEntry method in presence of * conflicts with the Container flows (if any) would revert back to a * delete + add pattern */ - FlowEntryInstall currentFlowEntries = findMatch(newFlowEntry, true); + FlowEntry currentFlowEntry = originalSwView.get(newFlowEntry); - if (currentFlowEntries != null) { - return modifyFlowEntry(currentFlowEntries.getOriginal(), - newFlowEntry); + if (currentFlowEntry != null) { + return modifyFlowEntry(currentFlowEntry, newFlowEntry); } else { return installFlowEntry(newFlowEntry); } } - /** - * Try to find in the database if a Flow with the same Match and priority of - * the passed one already exists for the specified network node. Flow, - * priority and network node are all specified in the FlowEntry If found, - * the respective FlowEntryInstall Object is returned - * - * @param flowEntry - * the FlowEntry to be tested against the ones installed - * @param looseCheck - * if true, the function will run the check against the original - * flow entry portion of the installed entries - * @return null if not found, otherwise the FlowEntryInstall which contains - * the existing flow entry - */ - private FlowEntryInstall findMatch(FlowEntry flowEntry, boolean looseCheck) { - Flow flow = flowEntry.getFlow(); - Match match = flow.getMatch(); - short priority = flow.getPriority(); - Set thisNodeList = nodeFlows.get(flowEntry.getNode()); - - if (thisNodeList != null) { - for (FlowEntryInstall flowEntries : thisNodeList) { - flow = (looseCheck == false) ? flowEntries.getInstall() - .getFlow() : flowEntries.getOriginal().getFlow(); - if (flow.getMatch().equals(match) - && flow.getPriority() == priority) { - return flowEntries; + @Override + public Status modifyOrAddFlowEntryAsync(FlowEntry newFlowEntry) { + /* + * Run a check on the original entries to decide whether to go with a + * add or modify method. A loose check means only check against the + * original flow entry requests and not against the installed flow + * entries which are the result of the original entry merged with the + * container flow(s) (if any). The modifyFlowEntry method in presence of + * conflicts with the Container flows (if any) would revert back to a + * delete + add pattern + */ + FlowEntry currentFlowEntry = originalSwView.get(newFlowEntry); + + if (currentFlowEntry != null) { + return modifyFlowEntryAsync(currentFlowEntry, newFlowEntry); + } else { + return installFlowEntryAsync(newFlowEntry); + } + } + + @Override + public Status uninstallFlowEntryGroup(String groupName) { + if (groupName == null || groupName.isEmpty()) { + return new Status(StatusCode.BADREQUEST, "Invalid group name"); + } + if (groupName.equals(FlowConfig.internalStaticFlowsGroup)) { + return new Status(StatusCode.BADREQUEST, "Static flows group cannot be deleted through this api"); + } + if (inContainerMode) { + String msg = "Controller in container mode: Group Uninstall Refused"; + String logMsg = msg + ": {}"; + log.warn(logMsg, groupName); + return new Status(StatusCode.NOTACCEPTABLE, msg); + } + int toBeRemoved = groupFlows.get(groupName).size(); + String error = ""; + if (groupFlows.containsKey(groupName)) { + List list = new ArrayList(groupFlows.get(groupName)); + for (FlowEntryInstall entry : list) { + Status status = this.removeEntry(entry.getOriginal(), false); + if (status.isSuccess()) { + toBeRemoved -= 1; + } else { + error = status.getDescription(); } } } - return null; + return (toBeRemoved == 0) ? new Status(StatusCode.SUCCESS) : new Status(StatusCode.INTERNALERROR, + "Not all the flows were removed: " + error); } + @Override + public Status uninstallFlowEntryGroupAsync(String groupName) { + if (groupName == null || groupName.isEmpty()) { + return new Status(StatusCode.BADREQUEST, "Invalid group name"); + } + if (groupName.equals(FlowConfig.internalStaticFlowsGroup)) { + return new Status(StatusCode.BADREQUEST, "Static flows group cannot be deleted through this api"); + } + if (inContainerMode) { + String msg = "Controller in container mode: Group Uninstall Refused"; + String logMsg = msg + ": {}"; + log.warn(logMsg, groupName); + return new Status(StatusCode.NOTACCEPTABLE, msg); + } + if (groupFlows.containsKey(groupName)) { + List list = new ArrayList(groupFlows.get(groupName)); + for (FlowEntryInstall entry : list) { + this.removeEntry(entry.getOriginal(), true); + } + } + return new Status(StatusCode.SUCCESS); + } + + @Override public boolean checkFlowEntryConflict(FlowEntry flowEntry) { return entryConflictsWithContainerFlows(flowEntry); } @@ -840,38 +929,25 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * on the network node */ private void updateFlowsContainerFlow() { - List oldCouples = new ArrayList(); - List toReinstall = new ArrayList(); - for (Entry> entry : this.nodeFlows - .entrySet()) { - oldCouples.clear(); - toReinstall.clear(); - if (entry.getValue() == null) { - continue; - } - // Create a set of old entries and one of original entries to be - // reinstalled - for (FlowEntryInstall oldCouple : entry.getValue()) { - oldCouples.add(oldCouple); - toReinstall.add(oldCouple.getOriginal()); - } + for (ConcurrentMap.Entry entry : installedSwView.entrySet()) { + FlowEntryInstall current = entry.getValue(); + FlowEntry reInstall = current.getOriginal(); // Remove the old couples. No validity checks to be run, use the // internal remove - for (FlowEntryInstall oldCouple : oldCouples) { - this.removeEntryInternal(oldCouple); - } + this.removeEntryInternal(current, false); + // Reinstall the original flow entries, via the regular path: new // cFlow merge + validations - for (FlowEntry flowEntry : toReinstall) { - this.installFlowEntry(flowEntry); - } + this.installFlowEntry(reInstall); } } public void nonClusterObjectCreate() { - nodeFlows = new ConcurrentHashMap>(); + originalSwView = new ConcurrentHashMap(); + installedSwView = new ConcurrentHashMap(); + nodeFlows = new ConcurrentHashMap>(); + groupFlows = new ConcurrentHashMap>(); TSPolicies = new ConcurrentHashMap(); - groupFlows = new ConcurrentHashMap>(); staticFlowsOrdinal = new ConcurrentHashMap(); portGroupConfigs = new ConcurrentHashMap(); portGroupData = new ConcurrentHashMap>(); @@ -881,10 +957,8 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } private void registerWithOSGIConsole() { - BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()) - .getBundleContext(); - bundleContext.registerService(CommandProvider.class.getName(), this, - null); + BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext(); + bundleContext.registerService(CommandProvider.class.getName(), this, null); } @Override @@ -927,54 +1001,44 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public List getFlowEntriesForGroup(String policyName) { - List list = null; - if (this.groupFlows != null && this.groupFlows.containsKey(policyName)) { - list = new ArrayList(); - for (FlowEntryInstall entries : groupFlows.get(policyName)) { - list.add(entries.getOriginal()); + List list = new ArrayList(); + if (policyName != null && !policyName.trim().isEmpty()) { + for (Map.Entry entry : this.originalSwView.entrySet()) { + if (policyName.equals(entry.getKey().getGroupName())) { + list.add(entry.getKey().clone()); + } } - return new ArrayList(); } return list; } @Override - public void addOutputPort(Node node, String flowName, - List portList) { - - Set flowEntryList = this.nodeFlows.get(node); + public void addOutputPort(Node node, String flowName, List portList) { - for (FlowEntryInstall flow : flowEntryList) { + for (FlowEntryInstall flow : this.nodeFlows.get(node)) { if (flow.getFlowName().equals(flowName)) { FlowEntry currentFlowEntry = flow.getOriginal(); FlowEntry newFlowEntry = currentFlowEntry.clone(); for (NodeConnector dstPort : portList) { newFlowEntry.getFlow().addAction(new Output(dstPort)); } - Status error = modifyEntry(currentFlowEntry, newFlowEntry); + Status error = modifyEntry(currentFlowEntry, newFlowEntry, false); if (error.isSuccess()) { - log.info("Ports {} added to FlowEntry {}", portList, - flowName); + log.info("Ports {} added to FlowEntry {}", portList, flowName); } else { - log.warn( - "Failed to add ports {} to Flow entry {}. The failure is: {}", - portList, currentFlowEntry.toString(), - error.getDescription()); + log.warn("Failed to add ports {} to Flow entry {}. The failure is: {}", portList, + currentFlowEntry.toString(), error.getDescription()); } return; } } - log.warn("Failed to add ports to Flow {} on Node {}: Entry Not Found", - flowName, node); + log.warn("Failed to add ports to Flow {} on Node {}: Entry Not Found", flowName, node); } @Override - public void removeOutputPort(Node node, String flowName, - List portList) { - - Set flowEntryList = this.nodeFlows.get(node); - - for (FlowEntryInstall flow : flowEntryList) { + public void removeOutputPort(Node node, String flowName, List portList) { + for (FlowEntryInstall index : this.nodeFlows.get(node)) { + FlowEntryInstall flow = this.installedSwView.get(index); if (flow.getFlowName().equals(flowName)) { FlowEntry currentFlowEntry = flow.getOriginal(); FlowEntry newFlowEntry = currentFlowEntry.clone(); @@ -982,45 +1046,37 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, Action action = new Output(dstPort); newFlowEntry.getFlow().removeAction(action); } - Status status = modifyEntry(currentFlowEntry, newFlowEntry); + Status status = modifyEntry(currentFlowEntry, newFlowEntry, false); if (status.isSuccess()) { - log.info("Ports {} removed from FlowEntry {}", portList, - flowName); + log.info("Ports {} removed from FlowEntry {}", portList, flowName); } else { - log.warn( - "Failed to remove ports {} from Flow entry {}. The failure is: {}", - portList, currentFlowEntry.toString(), - status.getDescription()); + log.warn("Failed to remove ports {} from Flow entry {}. The failure is: {}", portList, + currentFlowEntry.toString(), status.getDescription()); } return; } } - log.warn( - "Failed to remove ports from Flow {} on Node {}: Entry Not Found", - flowName, node); + log.warn("Failed to remove ports from Flow {} on Node {}: Entry Not Found", flowName, node); } /* * This function assumes the target flow has only one output port */ @Override - public void replaceOutputPort(Node node, String flowName, - NodeConnector outPort) { + public void replaceOutputPort(Node node, String flowName, NodeConnector outPort) { FlowEntry currentFlowEntry = null; FlowEntry newFlowEntry = null; - Set flowEntryList = this.nodeFlows.get(node); // Find the flow - for (FlowEntryInstall flow : flowEntryList) { + for (FlowEntryInstall index : this.nodeFlows.get(node)) { + FlowEntryInstall flow = this.installedSwView.get(index); if (flow.getFlowName().equals(flowName)) { currentFlowEntry = flow.getOriginal(); break; } } if (currentFlowEntry == null) { - log.warn( - "Failed to replace output port for flow {} on node {}: Entry Not Found", - flowName, node); + log.warn("Failed to replace output port for flow {} on node {}: Entry Not Found", flowName, node); return; } @@ -1037,24 +1093,21 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, newFlowEntry.getFlow().addAction(new Output(outPort)); // Modify on network node - Status status = modifyEntry(currentFlowEntry, newFlowEntry); + Status status = modifyEntry(currentFlowEntry, newFlowEntry, false); if (status.isSuccess()) { - log.info("Output port replaced with {} for flow {} on node {}", - outPort, flowName, node); + log.info("Output port replaced with {} for flow {} on node {}", outPort, flowName, node); } else { - log.warn( - "Failed to replace output port for flow {} on node {}. The failure is: {}", - flowName, node, status.getDescription()); + log.warn("Failed to replace output port for flow {} on node {}. The failure is: {}", flowName, node, + status.getDescription()); } return; } @Override public NodeConnector getOutputPort(Node node, String flowName) { - Set flowEntryList = this.nodeFlows.get(node); - - for (FlowEntryInstall flow : flowEntryList) { + for (FlowEntryInstall index : this.nodeFlows.get(node)) { + FlowEntryInstall flow = this.installedSwView.get(index); if (flow.getFlowName().equals(flowName)) { for (Action action : flow.getOriginal().getFlow().getActions()) { if (action.getType() == ActionType.OUTPUT) { @@ -1063,7 +1116,6 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } } } - return null; } @@ -1082,6 +1134,12 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, log.debug("FRM allocateCaches for Container {}", container); try { + clusterContainerService.createCache("frm.originalSwView", + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + + clusterContainerService.createCache("frm.installedSwView", + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); @@ -1107,9 +1165,9 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheConfigException cce) { - log.error("FRM CacheConfigException", cce); + log.error("FRM CacheConfigException"); } catch (CacheExistException cce) { - log.error("FRM CacheExistException", cce); + log.error("FRM CacheExistException"); } } @@ -1124,104 +1182,82 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, log.debug("FRM retrieveCaches for Container {}", container); + map = clusterContainerService.getCache("frm.originalSwView"); + if (map != null) { + originalSwView = (ConcurrentMap) map; + } else { + log.error("FRM Cache frm.originalSwView allocation failed for Container {}", container.getName()); + } + + map = clusterContainerService.getCache("frm.installedSwView"); + if (map != null) { + installedSwView = (ConcurrentMap) map; + } else { + log.error("FRM Cache frm.installedSwView allocation failed for Container {}", container.getName()); + } + map = clusterContainerService.getCache("frm.nodeFlows"); if (map != null) { - nodeFlows = (ConcurrentMap>) map; + nodeFlows = (ConcurrentMap>) map; } else { - log.error( - "FRM Cache frm.nodeFlows allocation failed for Container {}", - container); + log.error("FRM Cache frm.nodeFlows allocation failed for Container {}", container.getName()); } map = clusterContainerService.getCache("frm.groupFlows"); if (map != null) { - groupFlows = (ConcurrentMap>) map; + groupFlows = (ConcurrentMap>) map; } else { - log.error( - "FRM Cache frm.groupFlows allocation failed for Container {}", - container); + log.error("FRM Cache frm.groupFlows allocation failed for Container {}", container.getName()); } map = clusterContainerService.getCache("frm.staticFlows"); if (map != null) { staticFlows = (ConcurrentMap) map; } else { - log.error( - "FRM Cache frm.staticFlows allocation failed for Container {}", - container); + log.error("FRM Cache frm.staticFlows allocation failed for Container {}", container.getName()); } map = clusterContainerService.getCache("frm.flowsSaveEvent"); if (map != null) { flowsSaveEvent = (ConcurrentMap) map; } else { - log.error( - "FRM Cache frm.flowsSaveEvent allocation failed for Container {}", - container); + log.error("FRM Cache frm.flowsSaveEvent allocation failed for Container {}", container.getName()); } map = clusterContainerService.getCache("frm.staticFlowsOrdinal"); if (map != null) { staticFlowsOrdinal = (ConcurrentMap) map; } else { - log.error( - "FRM Cache frm.staticFlowsOrdinal allocation failed for Container {}", - container); + log.error("FRM Cache frm.staticFlowsOrdinal allocation failed for Container {}", container.getName()); } map = clusterContainerService.getCache("frm.portGroupConfigs"); if (map != null) { portGroupConfigs = (ConcurrentMap) map; } else { - log.error( - "FRM Cache frm.portGroupConfigs allocation failed for Container {}", - container); + log.error("FRM Cache frm.portGroupConfigs allocation failed for Container {}", container.getName()); } map = clusterContainerService.getCache("frm.portGroupData"); if (map != null) { portGroupData = (ConcurrentMap>) map; } else { - log.error( - "FRM Cache frm.portGroupData allocation failed for Container {}", - container); + log.error("FRM Cache frm.portGroupData allocation failed for Container {}", container.getName()); } map = clusterContainerService.getCache("frm.TSPolicies"); if (map != null) { TSPolicies = (ConcurrentMap) map; } else { - log.error( - "FRM Cache frm.TSPolicies allocation failed for Container {}", - container); - } - - } - - @SuppressWarnings("deprecation") - private void destroyCaches() { - if (this.clusterContainerService == null) { - log.warn("Un-initialized clusterContainerService, can't destroy cache"); - return; + log.error("FRM Cache frm.TSPolicies allocation failed for Container {}", container.getName()); } - log.debug("FRM destroyCaches for Container {}", container); - clusterContainerService.destroyCache("frm.nodeFlows"); - clusterContainerService.destroyCache("frm.TSPolicies"); - clusterContainerService.destroyCache("frm.groupFlows"); - clusterContainerService.destroyCache("frm.staticFlows"); - clusterContainerService.destroyCache("frm.flowsSaveEvent"); - clusterContainerService.destroyCache("frm.staticFlowsOrdinal"); - clusterContainerService.destroyCache("frm.portGroupData"); - clusterContainerService.destroyCache("frm.portGroupConfigs"); - nonClusterObjectCreate(); } private boolean flowConfigExists(FlowConfig config) { - // As per customer requirement, flow name has to be unique on per node - // id basis - for (FlowConfig fc : staticFlows.values()) { - if (fc.isByNameAndNodeIdEqual(config)) { + // Flow name has to be unique on per node id basis + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + if (entry.getValue().isByNameAndNodeIdEqual(config)) { return true; } } @@ -1230,31 +1266,27 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public Status addStaticFlow(FlowConfig config, boolean restore) { - StringBuffer resultStr = new StringBuffer(); boolean multipleFlowPush = false; String error; Status status; - config.setStatus(StatusCode.SUCCESS.toString()); + config.setStatus(SUCCESS); + + // Skip validation check if we are trying to restore a saved config + if (!restore && !(status = config.validate(container)).isSuccess()) { + log.warn("Invalid Configuration for flow {}. The failure is {}", config, status.getDescription()); + error = "Invalid Configuration (" + status.getDescription() + ")"; + config.setStatus(error); + return new Status(StatusCode.BADREQUEST, error); + } // Presence check if (flowConfigExists(config)) { error = "Entry with this name on specified switch already exists"; - log.warn( - "Entry with this name on specified switch already exists: {}", - config); + log.warn("Entry with this name on specified switch already exists: {}", config); config.setStatus(error); return new Status(StatusCode.CONFLICT, error); } - // Skip validation check if we are trying to restore a saved config - if (!restore && !config.isValid(container, resultStr)) { - log.warn("Invalid Configuration for flow {}. The failure is {}", - config, resultStr.toString()); - error = "Invalid Configuration (" + resultStr.toString() + ")"; - config.setStatus(error); - return new Status(StatusCode.BADREQUEST, error); - } - if ((config.getIngressPort() == null) && config.getPortGroup() != null) { for (String portGroupName : portGroupConfigs.keySet()) { if (portGroupName.equalsIgnoreCase(config.getPortGroup())) { @@ -1263,9 +1295,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } } if (!multipleFlowPush) { - log.warn( - "Invalid Configuration(Invalid PortGroup Name) for flow {}", - config); + log.warn("Invalid Configuration(Invalid PortGroup Name) for flow {}", config); error = "Invalid Configuration (Invalid PortGroup Name)"; config.setStatus(error); return new Status(StatusCode.BADREQUEST, error); @@ -1280,7 +1310,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, // Program hw if (config.installInHw()) { FlowEntry entry = config.getFlowEntry(); - status = this.addEntry(entry); + status = this.installFlowEntry(entry); if (!status.isSuccess()) { config.setStatus(status.getDescription()); if (!restore) { @@ -1297,9 +1327,9 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * multiple entry configuration (PortGroup) and hardware installation is * NOT done directly on this event. 3. The User prefers to retain the * configuration in Controller and skip hardware installation. - * + * * Hence it is safe to update the StaticFlow DB at this point. - * + * * Note : For the case of PortGrouping, it is essential to have this DB * populated before the PortGroupListeners can query for the DB * triggered using portGroupChanged event... @@ -1309,30 +1339,30 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, staticFlows.put(ordinal, config); if (multipleFlowPush) { - PortGroupConfig pgconfig = portGroupConfigs.get(config - .getPortGroup()); + PortGroupConfig pgconfig = portGroupConfigs.get(config.getPortGroup()); Map existingData = portGroupData.get(pgconfig); if (existingData != null) { portGroupChanged(pgconfig, existingData, true); } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } private void addStaticFlowsToSwitch(Node node) { - for (FlowConfig config : staticFlows.values()) { + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + FlowConfig config = entry.getValue(); if (config.isPortGroupEnabled()) { continue; } if (config.getNode().equals(node)) { - if (config.installInHw() - && !config.getStatus().equals( - StatusCode.SUCCESS.toString())) { - Status status = this.addEntry(config.getFlowEntry()); + if (config.installInHw() && !config.getStatus().equals(SUCCESS)) { + Status status = this.installFlowEntryAsync(config.getFlowEntry()); config.setStatus(status.getDescription()); } } } + // Update cluster cache + refreshClusterStaticFlowsStatus(node); } private void updateStaticFlowConfigsOnNodeDown(Node node) { @@ -1360,30 +1390,36 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, for (Integer index : toRemove) { staticFlows.remove(index); } + // Update cluster cache + refreshClusterStaticFlowsStatus(node); + } private void updateStaticFlowConfigsOnContainerModeChange(UpdateType update) { - log.trace("Updating Static Flow configs on container mode change: {}", - update); + log.trace("Updating Static Flow configs on container mode change: {}", update); - for (FlowConfig config : staticFlows.values()) { + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + FlowConfig config = entry.getValue(); if (config.isPortGroupEnabled()) { continue; } - if (config.installInHw()) { + if (config.installInHw() && !config.isInternalFlow()) { switch (update) { case ADDED: config.setStatus("Removed from node because in container mode"); break; case REMOVED: - config.setStatus(StatusCode.SUCCESS.toString()); + config.setStatus(SUCCESS); break; default: } } } + // Update cluster cache + refreshClusterStaticFlowsStatus(null); } + @Override public Status removeStaticFlow(FlowConfig config) { /* * No config.isInternal() check as NB does not take this path and GUI @@ -1392,80 +1428,98 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * reactive, so that we can remove the internal generated LLDP and ARP * punt flows */ - for (Map.Entry entry : staticFlows.entrySet()) { + + // Look for the target configuration entry + Integer key = 0; + FlowConfig target = null; + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { if (entry.getValue().isByNameAndNodeIdEqual(config)) { - // Program the network node - Status status = this.removeEntry(config.getFlowEntry()); - // Update configuration database if programming was successful - if (status.isSuccess()) { - staticFlows.remove(entry.getKey()); - return status; - } else { - entry.getValue().setStatus(status.getDescription()); - return status; - } + key = entry.getKey(); + target = entry.getValue(); + break; } } - return new Status(StatusCode.NOTFOUND, "Entry Not Present"); + if (target == null) { + return new Status(StatusCode.NOTFOUND, "Entry Not Present"); + } + + // Program the network node + Status status = this.uninstallFlowEntry(config.getFlowEntry()); + + // Update configuration database if programming was successful + if (status.isSuccess()) { + staticFlows.remove(key); + } + + return status; } @Override public Status removeStaticFlow(String name, Node node) { - for (Map.Entry mapEntry : staticFlows.entrySet()) { - FlowConfig entry = mapEntry.getValue(); - Status status = new Status(null, null); - if (entry.isByNameAndNodeIdEqual(name, node)) { - // Validity check for api3 entry point - if (entry.isInternalFlow()) { - String msg = "Invalid operation: Controller generated " - + "flow cannot be deleted"; - String logMsg = msg + ": {}"; - log.warn(logMsg, name); - return new Status(StatusCode.NOTACCEPTABLE, msg); - } - if (!entry.isPortGroupEnabled()) { - // Program the network node - status = this.removeEntry(entry.getFlowEntry()); - } - // Update configuration database if programming was successful - if (status.isSuccess()) { - staticFlows.remove(mapEntry.getKey()); - return status; - } else { - entry.setStatus(status.getDescription()); - return status; - } + // Look for the target configuration entry + Integer key = 0; + FlowConfig target = null; + for (ConcurrentMap.Entry mapEntry : staticFlows.entrySet()) { + if (mapEntry.getValue().isByNameAndNodeIdEqual(name, node)) { + key = mapEntry.getKey(); + target = mapEntry.getValue(); + break; } } - return new Status(StatusCode.NOTFOUND, "Entry Not Present"); + if (target == null) { + return new Status(StatusCode.NOTFOUND, "Entry Not Present"); + } + + // Validity check for api3 entry point + if (target.isInternalFlow()) { + String msg = "Invalid operation: Controller generated flow cannot be deleted"; + String logMsg = msg + ": {}"; + log.warn(logMsg, name); + return new Status(StatusCode.NOTACCEPTABLE, msg); + } + + if (target.isPortGroupEnabled()) { + String msg = "Invalid operation: Port Group flows cannot be deleted through this API"; + String logMsg = msg + ": {}"; + log.warn(logMsg, name); + return new Status(StatusCode.NOTACCEPTABLE, msg); + } + + // Program the network node + Status status = this.removeEntry(target.getFlowEntry(), false); + + // Update configuration database if programming was successful + if (status.isSuccess()) { + staticFlows.remove(key); + } + + return status; } + @Override public Status modifyStaticFlow(FlowConfig newFlowConfig) { // Validity check for api3 entry point if (newFlowConfig.isInternalFlow()) { - String msg = "Invalid operation: Controller generated flow " - + "cannot be modified"; + String msg = "Invalid operation: Controller generated flow cannot be modified"; String logMsg = msg + ": {}"; log.warn(logMsg, newFlowConfig); return new Status(StatusCode.NOTACCEPTABLE, msg); } // Validity Check - StringBuffer resultStr = new StringBuffer(); - if (!newFlowConfig.isValid(container, resultStr)) { - String msg = "Invalid Configuration (" + resultStr.toString() + ")"; + Status status = newFlowConfig.validate(container); + if (!status.isSuccess()) { + String msg = "Invalid Configuration (" + status.getDescription() + ")"; newFlowConfig.setStatus(msg); - log.warn("Invalid Configuration for flow {}. The failure is {}", - newFlowConfig, resultStr.toString()); + log.warn("Invalid Configuration for flow {}. The failure is {}", newFlowConfig, status.getDescription()); return new Status(StatusCode.BADREQUEST, msg); } FlowConfig oldFlowConfig = null; Integer index = null; - for (Map.Entry mapEntry : staticFlows.entrySet()) { + for (ConcurrentMap.Entry mapEntry : staticFlows.entrySet()) { FlowConfig entry = mapEntry.getValue(); - if (entry.isByNameAndNodeIdEqual(newFlowConfig.getName(), - newFlowConfig.getNode())) { + if (entry.isByNameAndNodeIdEqual(newFlowConfig.getName(), newFlowConfig.getNode())) { oldFlowConfig = entry; index = mapEntry.getKey(); break; @@ -1482,17 +1536,14 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, // Do not attempt to reinstall the flow, warn user if (newFlowConfig.equals(oldFlowConfig)) { String msg = "No modification detected"; - log.info( - "Static flow modification skipped. New flow and old flow are the same: {}", - newFlowConfig); + log.info("Static flow modification skipped. New flow and old flow are the same: {}", newFlowConfig); return new Status(StatusCode.SUCCESS, msg); } // If flow is installed, program the network node - Status status = new Status(StatusCode.SUCCESS, "Saved in config"); + status = new Status(StatusCode.SUCCESS, "Saved in config"); if (oldFlowConfig.installInHw()) { - status = this.modifyEntry(oldFlowConfig.getFlowEntry(), - newFlowConfig.getFlowEntry()); + status = this.modifyFlowEntry(oldFlowConfig.getFlowEntry(), newFlowConfig.getFlowEntry()); } // Update configuration database if programming was successful @@ -1518,61 +1569,80 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } // Validity check for api3 entry point if (config.isInternalFlow()) { - String msg = "Invalid operation: Controller generated flow " - + "cannot be modified"; + String msg = "Invalid operation: Controller generated flow cannot be modified"; String logMsg = msg + ": {}"; log.warn(logMsg, config); return new Status(StatusCode.NOTACCEPTABLE, msg); } + // Find the config entry + Integer key = 0; + FlowConfig target = null; for (Map.Entry entry : staticFlows.entrySet()) { FlowConfig conf = entry.getValue(); if (conf.isByNameAndNodeIdEqual(config)) { - // Program the network node - Status status = new Status(StatusCode.SUCCESS, null); - if (conf.installInHw()) { - status = this.removeEntry(conf.getFlowEntry()); - } else { - status = this.addEntry(conf.getFlowEntry()); - } - if (!status.isSuccess()) { - conf.setStatus(status.getDescription()); - return status; - } - + key = entry.getKey(); + target = conf; + break; + } + } + if (target != null) { + // Program the network node + Status status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this + .installFlowEntry(target.getFlowEntry()); + if (status.isSuccess()) { // Update Configuration database - conf.setStatus(StatusCode.SUCCESS.toString()); - conf.toggleStatus(); - return status; + target.setStatus(SUCCESS); + target.toggleInstallation(); + staticFlows.put(key, target); + } + return status; + } + + return new Status(StatusCode.NOTFOUND, "Unable to locate the entry. Failed to toggle status"); + } + + /** + * Reinsert all static flows entries in the cache to force cache updates in + * the cluster. This is useful when only some parameters were changed in the + * entries, like the status. + * + * @param node + * The node for which the static flow configurations have to be + * refreshed. If null, all nodes static flows will be refreshed. + */ + private void refreshClusterStaticFlowsStatus(Node node) { + // Refresh cluster cache + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + if (node == null || entry.getValue().getNode().equals(node)) { + staticFlows.put(entry.getKey(), entry.getValue()); } } - return new Status(StatusCode.NOTFOUND, - "Unable to locate the entry. Failed to toggle status"); } /** - * Uninstall all the Flow Entries present in the software view A copy of - * each entry is stored in the inactive list so that it can be re-applied - * when needed This function is called on the default container instance of - * FRM only when the first container is created + * Uninstall all the non-internal Flow Entries present in the software view. + * A copy of each entry is stored in the inactive list so that it can be + * re-applied when needed. This function is called on the global instance of + * FRM only, when the first container is created */ private void uninstallAllFlowEntries() { - log.info("Uninstalling all flows"); + log.info("Uninstalling all non-internal flows"); // Store entries / create target list - for (ConcurrentMap.Entry> mapEntry : nodeFlows - .entrySet()) { - for (FlowEntryInstall flowEntries : mapEntry.getValue()) { + for (ConcurrentMap.Entry mapEntry : installedSwView.entrySet()) { + FlowEntryInstall flowEntries = mapEntry.getValue(); + // Skip internal generated static flows + if (!flowEntries.isInternal()) { inactiveFlows.add(flowEntries.getOriginal()); } } // Now remove the entries for (FlowEntry flowEntry : inactiveFlows) { - Status status = this.removeEntry(flowEntry); + Status status = this.removeEntry(flowEntry, false); if (!status.isSuccess()) { - log.warn("Failed to remove entry: {}. The failure is: {}" - + flowEntry, status.getDescription()); + log.warn("Failed to remove entry: {}. The failure is: {}", flowEntry, status.getDescription()); } } } @@ -1586,28 +1656,23 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, log.info("Reinstalling all inactive flows"); for (FlowEntry flowEntry : this.inactiveFlows) { - Status status = this.addEntry(flowEntry); - if (!status.isSuccess()) { - log.warn("Failed to install entry: {}. The failure is: {}" - + flowEntry, status.getDescription()); - } + this.addEntry(flowEntry, false); } // Empty inactive list in any case inactiveFlows.clear(); } + @Override public List getStaticFlows() { - return getStaticFlowsOrderedList(staticFlows, staticFlowsOrdinal.get(0) - .intValue()); + return getStaticFlowsOrderedList(staticFlows, staticFlowsOrdinal.get(0).intValue()); } - // TODO: need to come out with a better algorithm for mantaining the order + // TODO: need to come out with a better algorithm for maintaining the order // of the configuration entries // with actual one, index associated to deleted entries cannot be reused and // map grows... - private List getStaticFlowsOrderedList( - ConcurrentMap flowMap, int maxKey) { + private List getStaticFlowsOrderedList(ConcurrentMap flowMap, int maxKey) { List orderedList = new ArrayList(); for (int i = 0; i <= maxKey; i++) { FlowConfig entry = flowMap.get(i); @@ -1620,9 +1685,9 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public FlowConfig getStaticFlow(String name, Node node) { - for (FlowConfig config : staticFlows.values()) { - if (config.isByNameAndNodeIdEqual(name, node)) { - return config; + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + if (entry.getValue().isByNameAndNodeIdEqual(name, node)) { + return entry.getValue(); } } return null; @@ -1631,9 +1696,9 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public List getStaticFlows(Node node) { List list = new ArrayList(); - for (FlowConfig config : staticFlows.values()) { - if (config.onNode(node)) { - list.add(config); + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + if (entry.getValue().onNode(node)) { + list.add(entry.getValue()); } } return list; @@ -1642,9 +1707,9 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public List getStaticFlowNamesForNode(Node node) { List list = new ArrayList(); - for (FlowConfig config : staticFlows.values()) { - if (config.onNode(node)) { - list.add(config.getName()); + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + if (entry.getValue().onNode(node)) { + list.add(entry.getValue().getName()); } } return list; @@ -1653,8 +1718,8 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public List getListNodeWithConfiguredFlows() { Set set = new HashSet(); - for (FlowConfig config : staticFlows.values()) { - set.add(config.getNode()); + for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { + set.add(entry.getValue().getNode()); } return new ArrayList(set); } @@ -1662,16 +1727,15 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @SuppressWarnings("unchecked") private void loadFlowConfiguration() { ObjectReader objReader = new ObjectReader(); - ConcurrentMap confList = (ConcurrentMap) objReader - .read(this, frmFileName); + ConcurrentMap confList = (ConcurrentMap) objReader.read(this, + frmFileName); - ConcurrentMap pgConfig = (ConcurrentMap) objReader - .read(this, portGroupFileName); + ConcurrentMap pgConfig = (ConcurrentMap) objReader.read(this, + portGroupFileName); if (pgConfig != null) { - for (Map.Entry entry : pgConfig.entrySet()) { - addPortGroupConfig(entry.getKey(), entry.getValue() - .getMatchString(), true); + for (ConcurrentMap.Entry entry : pgConfig.entrySet()) { + addPortGroupConfig(entry.getKey(), entry.getValue().getMatchString(), true); } } @@ -1681,8 +1745,9 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, int maxKey = 0; for (Integer key : confList.keySet()) { - if (key.intValue() > maxKey) + if (key.intValue() > maxKey) { maxKey = key.intValue(); + } } for (FlowConfig conf : getStaticFlowsOrderedList(confList, maxKey)) { @@ -1691,11 +1756,11 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } @Override - public Object readObject(ObjectInputStream ois) - throws FileNotFoundException, IOException, ClassNotFoundException { + public Object readObject(ObjectInputStream ois) throws FileNotFoundException, IOException, ClassNotFoundException { return ois.readObject(); } + @Override public Status saveConfig() { // Publish the save config event to the cluster nodes flowsSaveEvent.put(new Date().getTime(), SAVE); @@ -1704,7 +1769,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, private Status saveConfigInternal() { ObjectWriter objWriter = new ObjectWriter(); - ConcurrentHashMap nonDynamicFlows = new ConcurrentHashMap(); + ConcurrentMap nonDynamicFlows = new ConcurrentHashMap(); for (Integer ordinal : staticFlows.keySet()) { FlowConfig config = staticFlows.get(ordinal); // Do not save dynamic and controller generated static flows @@ -1714,8 +1779,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, nonDynamicFlows.put(ordinal, config); } objWriter.write(nonDynamicFlows, frmFileName); - objWriter.write(new ConcurrentHashMap( - portGroupConfigs), portGroupFileName); + objWriter.write(new ConcurrentHashMap(portGroupConfigs), portGroupFileName); return new Status(StatusCode.SUCCESS, null); } @@ -1724,8 +1788,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } @Override - public void entryUpdated(Long key, String new_value, String cacheName, - boolean originLocal) { + public void entryUpdated(Long key, String new_value, String cacheName, boolean originLocal) { saveConfigInternal(); } @@ -1751,10 +1814,8 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, allowARP.setName("**Punt ARP Reply"); allowARP.setPriority("500"); allowARP.setNode(node); - allowARP.setEtherType("0x" - + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); - allowARP.setDstMac(HexEncode.bytesToHexString(switchManager - .getControllerMAC())); + allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); + allowARP.setDstMac(HexEncode.bytesToHexString(switchManager.getControllerMAC())); allowARP.setActions(puntAction); addStaticFlow(allowARP, false); } @@ -1771,8 +1832,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, allowARP.setName("**Punt ARP"); allowARP.setPriority("1"); allowARP.setNode(node); - allowARP.setEtherType("0x" - + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); + allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); allowARP.setActions(puntAction); defaultConfigs.add(allowARP); @@ -1781,10 +1841,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, allowLLDP.setName("**Punt LLDP"); allowLLDP.setPriority("1"); allowLLDP.setNode(node); - allowLLDP - .setEtherType("0x" - + Integer.toHexString(EtherTypes.LLDP.intValue()) - .toUpperCase()); + allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()).toUpperCase()); allowLLDP.setActions(puntAction); defaultConfigs.add(allowLLDP); @@ -1807,73 +1864,36 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } } - log.info("Set Switch {} Mode to {}", node, proactive); + log.info("Set Switch {} Mode to {}", node, (proactive ? "proactive" : "reactive")); } /** * Remove from the databases all the flows installed on the node - * + * * @param node */ - private synchronized void cleanDatabaseForNode(Node node) { - log.info("Cleaning Flow database for Node {}", node.toString()); - - // Find out which groups the node's flows are part of - Set affectedGroups = new HashSet(); - Set flowEntryList = nodeFlows.get(node); - if (flowEntryList != null) { - for (FlowEntryInstall entry : flowEntryList) { - String groupName = entry.getGroupName(); - if (groupName != null) { - affectedGroups.add(groupName); - } - } - } + private void cleanDatabaseForNode(Node node) { + log.info("Cleaning Flow database for Node {}", node); + if (nodeFlows.containsKey(node)) { + List toRemove = new ArrayList(nodeFlows.get(node)); - // Remove the node's flows from the group indexed flow database - if (!affectedGroups.isEmpty()) { - for (String group : affectedGroups) { - Set flowList = groupFlows.get(group); - Set toRemove = new HashSet(); - for (FlowEntryInstall entry : flowList) { - if (node.equals(entry.getNode())) { - toRemove.add(entry); - } - } - flowList.removeAll(toRemove); - if (flowList.isEmpty()) { - groupFlows.remove(group); - } + for (FlowEntryInstall entry : toRemove) { + updateLocalDatabase(entry, false); } } - - // Remove the node's flows from the node indexed flow database - nodeFlows.remove(node); } @Override - public void notifyNode(Node node, UpdateType type, - Map propMap) { - switch (type) { - case ADDED: - addStaticFlowsToSwitch(node); - break; - case REMOVED: - cleanDatabaseForNode(node); - updateStaticFlowConfigsOnNodeDown(node); - break; - default: - break; - } + public void notifyNode(Node node, UpdateType type, Map propMap) { + this.pendingEvents.offer(new NodeUpdateEvent(type, node)); } @Override - public void notifyNodeConnector(NodeConnector nodeConnector, - UpdateType type, Map propMap) { + public void notifyNodeConnector(NodeConnector nodeConnector, UpdateType type, Map propMap) { + } - private FlowConfig getDerivedFlowConfig(FlowConfig original, - String configName, Short port) { + private FlowConfig getDerivedFlowConfig(FlowConfig original, String configName, Short port) { FlowConfig derivedFlow = new FlowConfig(original); derivedFlow.setDynamic(true); derivedFlow.setPortGroup(null); @@ -1882,38 +1902,28 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, return derivedFlow; } - private void addPortGroupFlows(PortGroupConfig config, Node node, - PortGroup data) { - for (Iterator it = staticFlows.values().iterator(); it - .hasNext();) { - FlowConfig staticFlow = it.next(); + private void addPortGroupFlows(PortGroupConfig config, Node node, PortGroup data) { + for (FlowConfig staticFlow : staticFlows.values()) { if (staticFlow.getPortGroup() == null) { continue; } - if ((staticFlow.getNode().equals(node)) - && (staticFlow.getPortGroup().equals(config.getName()))) { + if ((staticFlow.getNode().equals(node)) && (staticFlow.getPortGroup().equals(config.getName()))) { for (Short port : data.getPorts()) { - FlowConfig derivedFlow = getDerivedFlowConfig(staticFlow, - config.getName(), port); + FlowConfig derivedFlow = getDerivedFlowConfig(staticFlow, config.getName(), port); addStaticFlow(derivedFlow, false); } } } } - private void removePortGroupFlows(PortGroupConfig config, Node node, - PortGroup data) { - for (Iterator it = staticFlows.values().iterator(); it - .hasNext();) { - FlowConfig staticFlow = it.next(); + private void removePortGroupFlows(PortGroupConfig config, Node node, PortGroup data) { + for (FlowConfig staticFlow : staticFlows.values()) { if (staticFlow.getPortGroup() == null) { continue; } - if ((staticFlow.getNode().equals(node)) - && (staticFlow.getPortGroup().equals(config.getName()))) { + if (staticFlow.getNode().equals(node) && staticFlow.getPortGroup().equals(config.getName())) { for (Short port : data.getPorts()) { - FlowConfig derivedFlow = getDerivedFlowConfig(staticFlow, - config.getName(), port); + FlowConfig derivedFlow = getDerivedFlowConfig(staticFlow, config.getName(), port); removeStaticFlow(derivedFlow); } } @@ -1921,9 +1931,8 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } @Override - public void portGroupChanged(PortGroupConfig config, - Map data, boolean add) { - log.info("PortGroup Changed for :" + config + " Data: " + portGroupData); + public void portGroupChanged(PortGroupConfig config, Map data, boolean add) { + log.info("PortGroup Changed for: {} Data: {}", config, portGroupData); Map existingData = portGroupData.get(config); if (existingData != null) { for (Map.Entry entry : data.entrySet()) { @@ -1931,20 +1940,15 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, if (existingPortGroup == null) { if (add) { existingData.put(entry.getKey(), entry.getValue()); - addPortGroupFlows(config, entry.getKey(), - entry.getValue()); + addPortGroupFlows(config, entry.getKey(), entry.getValue()); } } else { if (add) { - existingPortGroup.getPorts().addAll( - entry.getValue().getPorts()); - addPortGroupFlows(config, entry.getKey(), - entry.getValue()); + existingPortGroup.getPorts().addAll(entry.getValue().getPorts()); + addPortGroupFlows(config, entry.getKey(), entry.getValue()); } else { - existingPortGroup.getPorts().removeAll( - entry.getValue().getPorts()); - removePortGroupFlows(config, entry.getKey(), - entry.getValue()); + existingPortGroup.getPorts().removeAll(entry.getValue().getPorts()); + removePortGroupFlows(config, entry.getKey(), entry.getValue()); } } } @@ -1958,16 +1962,17 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } } + @Override public boolean addPortGroupConfig(String name, String regex, boolean restore) { PortGroupConfig config = portGroupConfigs.get(name); - if (config != null) + if (config != null) { return false; + } if ((portGroupProvider == null) && !restore) { return false; } - if ((portGroupProvider != null) - && (!portGroupProvider.isMatchCriteriaSupported(regex))) { + if ((portGroupProvider != null) && (!portGroupProvider.isMatchCriteriaSupported(regex))) { return false; } @@ -1979,6 +1984,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, return true; } + @Override public boolean delPortGroupConfig(String name) { PortGroupConfig config = portGroupConfigs.get(name); if (config == null) { @@ -1998,8 +2004,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, return; } if (portGroupProvider != null) { - Map data = portGroupProvider - .getPortGroupData(config); + Map data = portGroupProvider.getPortGroupData(config); portGroupData.put(config, data); } } @@ -2016,22 +2021,6 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, return true; } - // Fir PortGroupProvider to use regular Dependency Manager - /* @SuppressWarnings("rawtypes") */ - /* public void bind(Object arg0, Map arg1) throws Exception { */ - /* if (arg0 instanceof PortGroupProvider) { */ - /* setPortGroupProvider((PortGroupProvider)arg0); */ - /* } */ - /* } */ - - /* @SuppressWarnings("rawtypes") */ - /* @Override */ - /* public void unbind(Object arg0, Map arg1) throws Exception { */ - /* if (arg0 instanceof PortGroupProvider) { */ - /* portGroupProvider = null; */ - /* } */ - /* } */ - public void setIContainer(IContainer s) { this.container = s; } @@ -2042,6 +2031,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } } + @Override public PortGroupProvider getPortGroupProvider() { return portGroupProvider; } @@ -2098,15 +2088,12 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, /** * Function called by the dependency manager when all the required * dependencies are satisfied - * + * */ void init() { - frmAware = Collections - .synchronizedSet(new HashSet()); - frmFileName = GlobalConstants.STARTUPHOME.toString() - + "frm_staticflows_" + this.getContainerName() + ".conf"; - portGroupFileName = GlobalConstants.STARTUPHOME.toString() - + "portgroup_" + this.getContainerName() + ".conf"; + frmAware = Collections.synchronizedSet(new HashSet()); + frmFileName = GlobalConstants.STARTUPHOME.toString() + "frm_staticflows_" + this.getContainerName() + ".conf"; + portGroupFileName = GlobalConstants.STARTUPHOME.toString() + "portgroup_" + this.getContainerName() + ".conf"; inContainerMode = false; @@ -2127,24 +2114,68 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, if (staticFlowsOrdinal.size() == 0) { staticFlowsOrdinal.put(0, Integer.valueOf(0)); } + + pendingEvents = new LinkedBlockingQueue(); + + // Initialize the event handler thread + frmEventHandler = new Thread(new Runnable() { + @Override + public void run() { + while (!stopping) { + try { + FRMEvent event = pendingEvents.take(); + if (event == null) { + log.warn("Dequeued null event"); + continue; + } + if (event instanceof NodeUpdateEvent) { + NodeUpdateEvent update = (NodeUpdateEvent) event; + Node node = update.getNode(); + switch (update.getUpdateType()) { + case ADDED: + addStaticFlowsToSwitch(node); + break; + case REMOVED: + cleanDatabaseForNode(node); + updateStaticFlowConfigsOnNodeDown(node); + break; + default: + } + } else if (event instanceof ErrorReportedEvent) { + ErrorReportedEvent errEvent = (ErrorReportedEvent) event; + processErrorEvent(errEvent); + } else { + log.warn("Dequeued unknown event {}", event.getClass().getSimpleName()); + } + } catch (InterruptedException e) { + log.warn("FRM EventHandler thread interrupted", e); + } + } + } + }, "FRM EventHandler Collector"); } /** * Function called by the dependency manager when at least one dependency * become unsatisfied or when the component is shutting down because for * example bundle is being stopped. - * + * */ void destroy() { - destroyCaches(); } /** * Function called by dependency manager after "init ()" is called and after * the services provided by the class are registered in the service registry - * + * */ void start() { + // Initialize graceful stop flag + stopping = false; + + // Start event handler thread + frmEventHandler.start(); + /* * Read startup and build database if we have not already gotten the * configurations synced from another node @@ -2158,9 +2189,10 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, * Function called by the dependency manager before the services exported by * the component are unregistered, this will be followed by a "destroy ()" * calls - * */ void stop() { + stopping = true; + uninstallAllFlowEntries(); } public void setFlowProgrammerService(IFlowProgrammerService service) { @@ -2184,14 +2216,13 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } @Override - public void tagUpdated(String containerName, Node n, short oldTag, - short newTag, UpdateType t) { + public void tagUpdated(String containerName, Node n, short oldTag, short newTag, UpdateType t) { } @Override - public void containerFlowUpdated(String containerName, - ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) { + public void containerFlowUpdated(String containerName, ContainerFlow previousFlow, ContainerFlow currentFlow, + UpdateType t) { /* * Whether it is an addition or removal, we have to recompute the merged * flows entries taking into account all the current container flows @@ -2201,8 +2232,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } @Override - public void nodeConnectorUpdated(String containerName, NodeConnector p, - UpdateType t) { + public void nodeConnectorUpdated(String containerName, NodeConnector p, UpdateType t) { // No action } @@ -2224,6 +2254,52 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, updateStaticFlowConfigsOnContainerModeChange(update); } + protected abstract class FRMEvent { + + } + + private class NodeUpdateEvent extends FRMEvent { + private final Node node; + private final UpdateType update; + + public NodeUpdateEvent(UpdateType update, Node node) { + this.update = update; + this.node = node; + } + + public UpdateType getUpdateType() { + return update; + } + + public Node getNode() { + return node; + } + } + + private class ErrorReportedEvent extends FRMEvent { + private final long rid; + private final Node node; + private final Object error; + + public ErrorReportedEvent(long rid, Node node, Object error) { + this.rid = rid; + this.node = node; + this.error = error; + } + + public long getRequestId() { + return rid; + } + + public Object getError() { + return error; + } + + public Node getNode() { + return node; + } + } + /* * OSGI COMMANDS */ @@ -2287,8 +2363,7 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, ci.println(this.programmer.addFlow(node, getSampleFlow(node))); } - public void _frmremoveflow(CommandInterpreter ci) - throws UnknownHostException { + public void _frmremoveflow(CommandInterpreter ci) throws UnknownHostException { Node node = null; String nodeId = ci.nextArgument(); if (nodeId == null) { @@ -2305,14 +2380,10 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } private Flow getSampleFlow(Node node) throws UnknownHostException { - NodeConnector port = NodeConnectorCreator.createOFNodeConnector( - (short) 24, node); - NodeConnector oport = NodeConnectorCreator.createOFNodeConnector( - (short) 30, node); - byte srcMac[] = { (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78, - (byte) 0x9a, (byte) 0xbc }; - byte dstMac[] = { (byte) 0x1a, (byte) 0x2b, (byte) 0x3c, (byte) 0x4d, - (byte) 0x5e, (byte) 0x6f }; + NodeConnector port = NodeConnectorCreator.createOFNodeConnector((short) 24, node); + NodeConnector oport = NodeConnectorCreator.createOFNodeConnector((short) 30, node); + byte srcMac[] = { (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78, (byte) 0x9a, (byte) 0xbc }; + byte dstMac[] = { (byte) 0x1a, (byte) 0x2b, (byte) 0x3c, (byte) 0x4d, (byte) 0x5e, (byte) 0x6f }; InetAddress srcIP = InetAddress.getByName("172.28.30.50"); InetAddress dstIP = InetAddress.getByName("171.71.9.52"); InetAddress ipMask = InetAddress.getByName("255.255.255.0"); @@ -2356,44 +2427,53 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } public void _frmNodeFlows(CommandInterpreter ci) { + String nodeId = ci.nextArgument(); + Node node = Node.fromString(nodeId); + if (node == null) { + ci.println("frmNodeFlows [verbose]"); + return; + } boolean verbose = false; String verboseCheck = ci.nextArgument(); if (verboseCheck != null) { verbose = verboseCheck.equals("true"); } + if (!nodeFlows.containsKey(node)) { + return; + } // Dump per node database - for (Entry> entry : this.nodeFlows - .entrySet()) { - Node node = entry.getKey(); - for (FlowEntryInstall flow : entry.getValue()) { - if (!verbose) { - ci.println(node + " " + flow.getFlowName()); - } else { - ci.println(node + " " + flow.toString()); - } + for (FlowEntryInstall entry : nodeFlows.get(node)) { + if (!verbose) { + ci.println(node + " " + installedSwView.get(entry).getFlowName()); + } else { + ci.println(node + " " + installedSwView.get(entry).toString()); } } } public void _frmGroupFlows(CommandInterpreter ci) { + String group = ci.nextArgument(); + if (group == null) { + ci.println("frmGroupFlows [verbose]"); + return; + } boolean verbose = false; String verboseCheck = ci.nextArgument(); if (verboseCheck != null) { verbose = verboseCheck.equalsIgnoreCase("true"); } + if (!groupFlows.containsKey(group)) { + return; + } // Dump per node database - for (Entry> entry : this.groupFlows - .entrySet()) { - String group = entry.getKey(); - ci.println("Group " + group + ":"); - for (FlowEntryInstall flow : entry.getValue()) { - if (!verbose) { - ci.println(flow.getNode() + " " + flow.getFlowName()); - } else { - ci.println(flow.getNode() + " " + flow.toString()); - } + ci.println("Group " + group + ":\n"); + for (FlowEntryInstall flowEntry : groupFlows.get(group)) { + if (!verbose) { + ci.println(flowEntry.getNode() + " " + flowEntry.getFlowName()); + } else { + ci.println(flowEntry.getNode() + " " + flowEntry.toString()); } } } @@ -2401,25 +2481,93 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, @Override public void flowRemoved(Node node, Flow flow) { log.trace("Received flow removed notification on {} for {}", node, flow); - // For flow entry identification, only match and priority matter - FlowEntry toFind = new FlowEntry("any", "any", flow, node); - FlowEntryInstall installedEntry = this.findMatch(toFind, false); + + // For flow entry identification, only node, match and priority matter + FlowEntryInstall test = new FlowEntryInstall(new FlowEntry("","",flow, node), null); + FlowEntryInstall installedEntry = this.installedSwView.get(test); if (installedEntry == null) { - log.trace("Entry is not know to us"); + log.trace("Entry is not known to us"); return; } // Update Static flow status + Integer key = 0; + FlowConfig target = null; for (Map.Entry entry : staticFlows.entrySet()) { FlowConfig conf = entry.getValue(); if (conf.isByNameAndNodeIdEqual(installedEntry.getFlowName(), node)) { - // Update Configuration database - conf.toggleStatus(); + key = entry.getKey(); + target = conf; break; } } + if (target != null) { + // Update Configuration database + target.toggleInstallation(); + target.setStatus(SUCCESS); + staticFlows.put(key, target); + } + // Update software views this.updateLocalDatabase(installedEntry, false); } + @Override + public void flowErrorReported(Node node, long rid, Object err) { + log.trace("Got error {} for message rid {} from node {}", new Object[] { err, rid, node }); + pendingEvents.offer(new ErrorReportedEvent(rid, node, err)); + } + + private void processErrorEvent(ErrorReportedEvent event) { + Node node = event.getNode(); + long rid = event.getRequestId(); + Object error = event.getError(); + String errorString = (error == null) ? "Not provided" : error.toString(); + /* + * If this was for a flow install, remove the corresponding entry from + * the software view. If it was a Looking for the rid going through the + * software database. TODO: A more efficient rid <-> FlowEntryInstall + * mapping will have to be added in future + */ + FlowEntryInstall target = null; + for (FlowEntryInstall index : nodeFlows.get(node)) { + FlowEntryInstall entry = installedSwView.get(index); + if (entry.getRequestId() == rid) { + target = entry; + break; + } + } + if (target != null) { + // This was a flow install, update database + this.updateLocalDatabase(target, false); + } + + // Notify listeners + if (frmAware != null) { + synchronized (frmAware) { + for (IForwardingRulesManagerAware frma : frmAware) { + try { + frma.requestFailed(rid, errorString); + } catch (Exception e) { + log.warn("Failed to notify {}", frma); + } + } + } + } + } + + @Override + public Status solicitStatusResponse(Node node, boolean blocking) { + Status rv = new Status(StatusCode.INTERNALERROR); + + if (this.programmer != null) { + if (blocking) { + rv = programmer.syncSendBarrierMessage(node); + } else { + rv = programmer.asyncSendBarrierMessage(node); + } + } + + return rv; + } }