X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager%2Finternal%2FForwardingRulesManager.java;h=41075b9554339d7bd503ba9e1dd952bcfa080ac0;hp=520825762a40e8bfec9b89c5c0541c41edeedfc1;hb=91d7c1ee52322acad08e9f81228ac36b3aa684f5;hpb=6fa9558f9afe51ea0221a164d01d6099eb020763 diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index 520825762a..41075b9554 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.forwardingrulesmanager.internal; import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -29,8 +28,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import org.eclipse.osgi.framework.console.CommandInterpreter; -import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.ICacheUpdateAware; @@ -69,7 +66,6 @@ import org.opendaylight.controller.sal.match.MatchType; import org.opendaylight.controller.sal.utils.EtherTypes; import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.IObjectReader; -import org.opendaylight.controller.sal.utils.NodeCreator; import org.opendaylight.controller.sal.utils.ObjectReader; import org.opendaylight.controller.sal.utils.ObjectWriter; import org.opendaylight.controller.sal.utils.Status; @@ -78,8 +74,6 @@ import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.switchmanager.ISwitchManagerAware; import org.opendaylight.controller.switchmanager.Subnet; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,13 +91,12 @@ public class ForwardingRulesManager implements IInventoryListener, IObjectReader, ICacheUpdateAware, - CommandProvider, IFlowProgrammerListener { private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class); private static final Logger logsync = LoggerFactory.getLogger("FRMsync"); - private static final String PORTREMOVED = "Port removed"; - private static final String NODEDOWN = "Node is Down"; + private static final String PORT_REMOVED = "Port removed"; + private static final String NODE_DOWN = "Node is Down"; private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry"; private String frmFileName; private String portGroupFileName; @@ -160,6 +153,8 @@ public class ForwardingRulesManager implements */ static final String WORK_ORDER_CACHE = "frm.workOrder"; static final String WORK_STATUS_CACHE = "frm.workStatus"; + static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView"; + static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView"; /* * Data structure responsible for distributing the FlowEntryInstall requests @@ -603,8 +598,8 @@ public class ForwardingRulesManager implements // Update DB newEntries.setRequestId(status.getRequestId()); - updateLocalDatabase(currentEntries, false); - updateLocalDatabase(newEntries, true); + updateSwViews(currentEntries, false); + updateSwViews(newEntries, true); return status; } @@ -718,7 +713,7 @@ public class ForwardingRulesManager implements log.trace("Removed {}", entry.getInstall()); // Update DB - updateLocalDatabase(entry, false); + updateSwViews(entry, false); return status; } @@ -770,7 +765,7 @@ public class ForwardingRulesManager implements // Update DB entry.setRequestId(status.getRequestId()); - updateLocalDatabase(entry, true); + updateSwViews(entry, true); return status; } @@ -815,10 +810,7 @@ public class ForwardingRulesManager implements return null; } - private void updateLocalDatabase(FlowEntryInstall entry, boolean add) { - // Update the software view - updateSwViewes(entry, add); - + private void updateIndexDatabase(FlowEntryInstall entry, boolean add) { // Update node indexed flow database updateNodeFlowsDB(entry, add); @@ -829,7 +821,7 @@ public class ForwardingRulesManager implements /* * Update the node mapped flows database */ - private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) { + private void updateSwViews(FlowEntryInstall flowEntries, boolean add) { if (add) { originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal()); installedSwView.put(flowEntries, flowEntries); @@ -855,6 +847,17 @@ public class ForwardingRulesManager implements } if (add) { + // there may be an already existing entry. + // remove it before adding the new one. + // This is necessary since we have observed that in some cases + // Infinispan does aggregation for operations (eg:- remove and then put a different value) + // related to the same key within the same transaction. + // Need this defensive code as the new FlowEntryInstall may be different + // than the old one even though the equals method returns true. This is because + // the equals method does not take into account the action list. + if(nodeIndeces.contains(flowEntries)) { + nodeIndeces.remove(flowEntries); + } nodeIndeces.add(flowEntries); } else { nodeIndeces.remove(flowEntries); @@ -889,6 +892,11 @@ public class ForwardingRulesManager implements } if (add) { + // same comments in the similar code section in + // updateNodeFlowsDB method apply here too + if(indices.contains(flowEntries)) { + indices.remove(flowEntries); + } indices.add(flowEntries); } else { indices.remove(flowEntries); @@ -928,7 +936,7 @@ public class ForwardingRulesManager implements // Update DB if (status.isSuccess()) { - updateLocalDatabase(target, false); + updateSwViews(target, false); } else { // log the error log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(), @@ -1171,8 +1179,6 @@ public class ForwardingRulesManager implements private void nonClusterObjectCreate() { originalSwView = new ConcurrentHashMap(); installedSwView = new ConcurrentHashMap(); - nodeFlows = new ConcurrentHashMap>(); - groupFlows = new ConcurrentHashMap>(); TSPolicies = new ConcurrentHashMap(); staticFlowsOrdinal = new ConcurrentHashMap(); portGroupConfigs = new ConcurrentHashMap(); @@ -1181,11 +1187,6 @@ public class ForwardingRulesManager implements inactiveFlows = new ConcurrentHashMap(); } - private void registerWithOSGIConsole() { - BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext(); - bundleContext.registerService(CommandProvider.class.getName(), this, null); - } - @Override public void setTSPolicyData(String policyname, Object o, boolean add) { @@ -1230,7 +1231,7 @@ public class ForwardingRulesManager implements if (policyName != null && !policyName.trim().isEmpty()) { for (Map.Entry entry : this.originalSwView.entrySet()) { if (policyName.equals(entry.getKey().getGroupName())) { - list.add(entry.getKey().clone()); + list.add(entry.getValue().clone()); } } } @@ -1243,7 +1244,7 @@ public class ForwardingRulesManager implements if (policyName != null && !policyName.trim().isEmpty()) { for (Map.Entry entry : this.installedSwView.entrySet()) { if (policyName.equals(entry.getKey().getGroupName())) { - list.add(entry.getKey().getInstall().clone()); + list.add(entry.getValue().getInstall().clone()); } } } @@ -1371,21 +1372,15 @@ public class ForwardingRulesManager implements log.debug("Allocating caches for Container {}", container.getName()); try { - clusterContainerService.createCache("frm.originalSwView", + clusterContainerService.createCache(ORIGINAL_SW_VIEW_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.installedSwView", + clusterContainerService.createCache(INSTALLED_SW_VIEW_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.inactiveFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.nodeFlows", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - - clusterContainerService.createCache("frm.groupFlows", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); @@ -1426,14 +1421,14 @@ public class ForwardingRulesManager implements log.debug("Retrieving Caches for Container {}", container.getName()); - map = clusterContainerService.getCache("frm.originalSwView"); + map = clusterContainerService.getCache(ORIGINAL_SW_VIEW_CACHE); if (map != null) { originalSwView = (ConcurrentMap) map; } else { log.error("Retrieval of frm.originalSwView cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache("frm.installedSwView"); + map = clusterContainerService.getCache(INSTALLED_SW_VIEW_CACHE); if (map != null) { installedSwView = (ConcurrentMap) map; } else { @@ -1447,20 +1442,6 @@ public class ForwardingRulesManager implements log.error("Retrieval of frm.inactiveFlows cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache("frm.nodeFlows"); - if (map != null) { - nodeFlows = (ConcurrentMap>) map; - } else { - log.error("Retrieval of cache failed for Container {}", container.getName()); - } - - map = clusterContainerService.getCache("frm.groupFlows"); - if (map != null) { - groupFlows = (ConcurrentMap>) map; - } else { - log.error("Retrieval of frm.groupFlows cache failed for Container {}", container.getName()); - } - map = clusterContainerService.getCache("frm.staticFlows"); if (map != null) { staticFlows = (ConcurrentMap) map; @@ -1658,7 +1639,7 @@ public class ForwardingRulesManager implements // Take note of this controller generated static flow toRemove.add(entry.getKey()); } else { - config.setStatus(NODEDOWN); + config.setStatus(NODE_DOWN); } } } @@ -2169,7 +2150,7 @@ public class ForwardingRulesManager implements List toRemove = new ArrayList(nodeFlows.get(node)); for (FlowEntryInstall entry : toRemove) { - updateLocalDatabase(entry, false); + updateSwViews(entry, false); } } } @@ -2293,7 +2274,7 @@ public class ForwardingRulesManager implements if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) { FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode()); if (flowConfig != null) { - flowConfig.setStatus(PORTREMOVED); + flowConfig.setStatus(PORT_REMOVED); updated = true; } } @@ -2488,9 +2469,10 @@ public class ForwardingRulesManager implements portGroupProvider.registerPortGroupChange(this); } - cacheStartup(); + nodeFlows = new ConcurrentHashMap>(); + groupFlows = new ConcurrentHashMap>(); - registerWithOSGIConsole(); + cacheStartup(); /* * If we are not the first cluster node to come up, do not initialize @@ -2589,9 +2571,11 @@ public class ForwardingRulesManager implements * flow merging is not an injective function */ updateFlowsContainerFlow(); + } else if (event instanceof UpdateIndexDBs) { + UpdateIndexDBs update = (UpdateIndexDBs)event; + updateIndexDatabase(update.getFei(), update.isAddition()); } else { - log.warn("Dequeued unknown event {}", event.getClass() - .getSimpleName()); + log.warn("Dequeued unknown event {}", event.getClass().getSimpleName()); } } catch (InterruptedException e) { // clear pending events @@ -2640,6 +2624,12 @@ public class ForwardingRulesManager implements // Start event handler thread frmEventHandler.start(); + // replay the installedSwView data structure to populate + // node flows and group flows + for (FlowEntryInstall fei : installedSwView.values()) { + pendingEvents.offer(new UpdateIndexDBs(fei, true)); + } + /* * Read startup and build database if we have not already gotten the * configurations synced from another node @@ -2649,6 +2639,13 @@ public class ForwardingRulesManager implements } } + /** + * Function called by the dependency manager before Container is Stopped and Destroyed. + */ + public void containerStop() { + uninstallAllFlowEntries(false); + } + /** * Function called by the dependency manager before the services exported by * the component are unregistered, this will be followed by a "destroy ()" @@ -2656,7 +2653,6 @@ public class ForwardingRulesManager implements */ void stop() { stopping = true; - uninstallAllFlowEntries(false); // Shutdown executor this.executor.shutdownNow(); // Now walk all the workMonitor and wake up the one sleeping because @@ -2878,100 +2874,41 @@ public class ForwardingRulesManager implements } } - /* - * OSGI COMMANDS - */ - @Override - public String getHelp() { - StringBuffer help = new StringBuffer(); - return help.toString(); - } - - @Override - public Status saveConfiguration() { - return saveConfig(); - } + private class UpdateIndexDBs extends FRMEvent { + private FlowEntryInstall fei; + private boolean add; - public void _frmNodeFlows(CommandInterpreter ci) { - String nodeId = ci.nextArgument(); - Node node = Node.fromString(nodeId); - if (node == null) { - ci.println("frmNodeFlows [verbose]"); - return; - } - boolean verbose = false; - String verboseCheck = ci.nextArgument(); - if (verboseCheck != null) { - verbose = verboseCheck.equals("true"); + /** + * + * @param fei the flow entry which was installed/removed on the netwrok node + * @param update + */ + UpdateIndexDBs(FlowEntryInstall fei, boolean add) { + this.fei = fei; + this.add = add; } - if (!nodeFlows.containsKey(node)) { - return; - } - // Dump per node database - for (FlowEntryInstall entry : nodeFlows.get(node)) { - if (!verbose) { - ci.println(node + " " + installedSwView.get(entry).getFlowName()); - } else { - ci.println(node + " " + installedSwView.get(entry).toString()); - } - } - } - public void _frmGroupFlows(CommandInterpreter ci) { - String group = ci.nextArgument(); - if (group == null) { - ci.println("frmGroupFlows [verbose]"); - return; - } - boolean verbose = false; - String verboseCheck = ci.nextArgument(); - if (verboseCheck != null) { - verbose = verboseCheck.equalsIgnoreCase("true"); + /** + * @return the flowEntryInstall object which was added/removed + * to/from the installed software view cache + */ + public FlowEntryInstall getFei() { + return fei; } - if (!groupFlows.containsKey(group)) { - return; - } - // Dump per node database - ci.println("Group " + group + ":\n"); - for (FlowEntryInstall flowEntry : groupFlows.get(group)) { - if (!verbose) { - ci.println(flowEntry.getNode() + " " + flowEntry.getFlowName()); - } else { - ci.println(flowEntry.getNode() + " " + flowEntry.toString()); - } + /** + * + * @return whether this was an flow addition or removal + */ + public boolean isAddition() { + return add; } } - public void _frmProcessErrorEvent(CommandInterpreter ci) throws UnknownHostException { - Node node = null; - long reqId = 0L; - String nodeId = ci.nextArgument(); - if (nodeId == null) { - ci.print("Node id not specified"); - return; - } - String requestId = ci.nextArgument(); - if (requestId == null) { - ci.print("Request id not specified"); - return; - } - try { - node = NodeCreator.createOFNode(Long.valueOf(nodeId)); - } catch (NumberFormatException e) { - ci.print("Node id not a number"); - return; - } - try { - reqId = Long.parseLong(requestId); - } catch (NumberFormatException e) { - ci.print("Request id not a number"); - return; - } - // null for error object is good enough for now - ErrorReportedEvent event = new ErrorReportedEvent(reqId, node, null); - this.processErrorEvent(event); + @Override + public Status saveConfiguration() { + return saveConfig(); } @Override @@ -2999,13 +2936,21 @@ public class ForwardingRulesManager implements } if (target != null) { // Update Configuration database - target.toggleInstallation(); - target.setStatus(StatusCode.SUCCESS.toString()); + if (target.getHardTimeout() != null || target.getIdleTimeout() != null) { + /* + * No need for checking if actual values: these strings were + * validated at configuration creation. Also, after a switch + * down scenario, no use to reinstall a timed flow. Mark it as + * "do not install". User can manually toggle it. + */ + target.toggleInstallation(); + } + target.setStatus(StatusCode.GONE.toString()); staticFlows.put(key, target); } // Update software views - this.updateLocalDatabase(installedEntry, false); + this.updateSwViews(installedEntry, false); } @Override @@ -3042,14 +2987,18 @@ public class ForwardingRulesManager implements } if (target != null) { // This was a flow install, update database - this.updateLocalDatabase(target, false); + this.updateSwViews(target, false); // also update the config if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) { ConcurrentMap.Entry staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode()); // staticFlowEntry should never be null. // the null check is just an extra defensive check. if(staticFlowEntry != null) { - staticFlows.remove(staticFlowEntry.getKey()); + // Modify status and update cluster cache + log.debug("Updating static flow configuration on async error event"); + String status = String.format("Cannot be installed on node. reason: %s", errorString); + staticFlowEntry.getValue().setStatus(status); + refreshClusterStaticFlowsStatus(node); } } } @@ -3112,6 +3061,13 @@ public class ForwardingRulesManager implements @Override public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) { + /* + * Streamline the updates for the per node and per group index databases + */ + if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) { + pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)new_value, true)); + } + if (originLocal) { /* * Local updates are of no interest @@ -3163,8 +3119,11 @@ public class ForwardingRulesManager implements @Override public void entryDeleted(Object key, String cacheName, boolean originLocal) { /* - * Do nothing + * Streamline the updates for the per node and per group index databases */ + if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) { + pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, false)); + } } /** @@ -3176,7 +3135,7 @@ public class ForwardingRulesManager implements if (node != null) { for (Map.Entry entry : this.originalSwView.entrySet()) { if (node.equals(entry.getKey().getNode())) { - list.add(entry.getKey().clone()); + list.add(entry.getValue().clone()); } } }