X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager%2Finternal%2FForwardingRulesManager.java;h=520825762a40e8bfec9b89c5c0541c41edeedfc1;hp=2eaafb698917d74225866068b385ec8a3f74aeba;hb=6fa9558f9afe51ea0221a164d01d6099eb020763;hpb=dd018bda91b9c3cd222aaa1bd2d8f734152571e8 diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index 2eaafb6989..520825762a 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -11,21 +11,22 @@ package org.opendaylight.controller.forwardingrulesmanager.internal; import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.eclipse.osgi.framework.console.CommandInterpreter; @@ -36,6 +37,8 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; +import org.opendaylight.controller.connectionmanager.IConnectionManager; +import org.opendaylight.controller.containermanager.IContainerManager; import org.opendaylight.controller.forwardingrulesmanager.FlowConfig; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall; @@ -45,15 +48,15 @@ import org.opendaylight.controller.forwardingrulesmanager.PortGroup; import org.opendaylight.controller.forwardingrulesmanager.PortGroupChangeListener; import org.opendaylight.controller.forwardingrulesmanager.PortGroupConfig; import org.opendaylight.controller.forwardingrulesmanager.PortGroupProvider; +import org.opendaylight.controller.forwardingrulesmanager.implementation.data.FlowEntryDistributionOrder; import org.opendaylight.controller.sal.action.Action; import org.opendaylight.controller.sal.action.ActionType; -import org.opendaylight.controller.sal.action.Controller; -import org.opendaylight.controller.sal.action.Flood; import org.opendaylight.controller.sal.action.Output; -import org.opendaylight.controller.sal.action.PopVlan; +import org.opendaylight.controller.sal.connection.ConnectionLocality; +import org.opendaylight.controller.sal.core.Config; import org.opendaylight.controller.sal.core.ContainerFlow; import org.opendaylight.controller.sal.core.IContainer; -import org.opendaylight.controller.sal.core.IContainerListener; +import org.opendaylight.controller.sal.core.IContainerLocalListener; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.core.Property; @@ -65,10 +68,7 @@ import org.opendaylight.controller.sal.match.Match; import org.opendaylight.controller.sal.match.MatchType; import org.opendaylight.controller.sal.utils.EtherTypes; import org.opendaylight.controller.sal.utils.GlobalConstants; -import org.opendaylight.controller.sal.utils.HexEncode; import org.opendaylight.controller.sal.utils.IObjectReader; -import org.opendaylight.controller.sal.utils.IPProtocols; -import org.opendaylight.controller.sal.utils.NodeConnectorCreator; import org.opendaylight.controller.sal.utils.NodeCreator; import org.opendaylight.controller.sal.utils.ObjectReader; import org.opendaylight.controller.sal.utils.ObjectWriter; @@ -88,14 +88,23 @@ import org.slf4j.LoggerFactory; * the network. It also maintains the central repository of all the forwarding * rules installed on the network nodes. */ -public class ForwardingRulesManager implements IForwardingRulesManager, PortGroupChangeListener, - IContainerListener, ISwitchManagerAware, IConfigurationContainerAware, IInventoryListener, IObjectReader, - ICacheUpdateAware, CommandProvider, IFlowProgrammerListener { - private static final String SAVE = "Save"; - private static final String NODEDOWN = "Node is Down"; - private static final String SUCCESS = StatusCode.SUCCESS.toString(); +public class ForwardingRulesManager implements + IForwardingRulesManager, + PortGroupChangeListener, + IContainerLocalListener, + ISwitchManagerAware, + IConfigurationContainerAware, + IInventoryListener, + IObjectReader, + ICacheUpdateAware, + CommandProvider, + IFlowProgrammerListener { + private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class); - private Map flowsSaveEvent; + private static final Logger logsync = LoggerFactory.getLogger("FRMsync"); + private static final String PORTREMOVED = "Port removed"; + private static final String NODEDOWN = "Node is Down"; + private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry"; private String frmFileName; private String portGroupFileName; private ConcurrentMap staticFlows; @@ -103,8 +112,9 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou private ConcurrentMap portGroupConfigs; private ConcurrentMap> portGroupData; private ConcurrentMap TSPolicies; + private IContainerManager containerManager; private boolean inContainerMode; // being used by global instance only - private boolean stopping; + protected boolean stopping; /* * Flow database. It's the software view of what was requested to install @@ -131,7 +141,8 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou private ConcurrentMap inactiveFlows; private IContainer container; - private Set frmAware; + private Set frmAware = + Collections.synchronizedSet(new HashSet()); private PortGroupProvider portGroupProvider; private IFlowProgrammerService programmer; private IClusterContainerServices clusterContainerService = null; @@ -139,6 +150,103 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou private Thread frmEventHandler; protected BlockingQueue pendingEvents; + // Distributes FRM programming in the cluster + private IConnectionManager connectionManager; + + /* + * Name clustered caches used to support FRM entry distribution these are by + * necessity non-transactional as long as need to be able to synchronize + * states also while a transaction is in progress + */ + static final String WORK_ORDER_CACHE = "frm.workOrder"; + static final String WORK_STATUS_CACHE = "frm.workStatus"; + + /* + * Data structure responsible for distributing the FlowEntryInstall requests + * in the cluster. The key value is entry that is being either Installed or + * Updated or Delete. The value field is the same of the key value in case + * of Installation or Deletion, it's the new entry in case of Modification, + * this because the clustering caches don't allow null values. + * + * The logic behind this data structure is that the controller that initiate + * the request will place the order here, someone will pick it and then will + * remove from this data structure because is being served. + * + * TODO: We need to have a way to cleanup this data structure if entries are + * not picked by anyone, which is always a case can happen especially on + * Node disconnect cases. + */ + protected ConcurrentMap workOrder; + + /* + * Data structure responsible for retrieving the results of the workOrder + * submitted to the cluster. + * + * The logic behind this data structure is that the controller that has + * executed the order will then place the result in workStatus signaling + * that there was a success or a failure. + * + * TODO: The workStatus entries need to have a lifetime associated in case + * of requestor controller leaving the cluster. + */ + protected ConcurrentMap workStatus; + + /* + * Local Map used to hold the Future which a caller can use to monitor for + * completion + */ + private ConcurrentMap workMonitor = + new ConcurrentHashMap(); + + /* + * Max pool size for the executor + */ + private static final int maxPoolSize = 10; + + /** + * @param e + * Entry being installed/updated/removed + * @param u + * New entry will be placed after the update operation. Valid + * only for UpdateType.CHANGED, null for all the other cases + * @param t + * Type of update + * @return a Future object for monitoring the progress of the result, or + * null in case the processing should take place locally + */ + private FlowEntryDistributionOrderFutureTask distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, + UpdateType t) { + // A null entry it's an unexpected condition, anyway it's safe to keep + // the handling local + if (e == null) { + return null; + } + + Node n = e.getNode(); + if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) { + // Create the work order and distribute it + FlowEntryDistributionOrder fe = + new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress()); + // First create the monitor job + FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe); + logsync.trace("Node {} not local so sending fe {}", n, fe); + workMonitor.put(fe, ret); + if (t.equals(UpdateType.CHANGED)) { + // Then distribute the work + workOrder.put(fe, u); + } else { + // Then distribute the work + workOrder.put(fe, e); + } + logsync.trace("WorkOrder requested"); + // Now create an Handle to monitor the execution of the operation + return ret; + } + + logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t); + return null; + } + /** * Adds a flow entry onto the network node It runs various validity checks * and derive the final container flows merged entries that will be @@ -154,11 +262,34 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou private Status addEntry(FlowEntry flowEntry, boolean async) { // Sanity Check - if (flowEntry == null || flowEntry.getNode() == null) { - String msg = "Invalid FlowEntry"; - String logMsg = msg + ": {}"; + if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) { + String logMsg = INVALID_FLOW_ENTRY + ": {}"; log.warn(logMsg, flowEntry); - return new Status(StatusCode.NOTACCEPTABLE, msg); + return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY); + } + + /* + * Redundant Check: Check if the request is a redundant one from the + * same application the flowEntry is equal to an existing one. Given we + * do not have an application signature in the requested FlowEntry yet, + * we are here detecting the above condition by comparing the flow + * names, if set. If they are equal to the installed flow, most likely + * this is a redundant installation request from the same application + * and we can silently return success + * + * TODO: in future a sort of application reference list mechanism will + * be added to the FlowEntry so that exact flow can be used by different + * applications. + */ + FlowEntry present = this.originalSwView.get(flowEntry); + if (present != null) { + boolean sameFlow = present.getFlow().equals(flowEntry.getFlow()); + boolean sameApp = present.getFlowName() != null && present.getFlowName().equals(flowEntry.getFlowName()); + if (sameFlow && sameApp) { + log.trace("Skipping redundant request for flow {} on node {}", flowEntry.getFlowName(), + flowEntry.getNode()); + return new Status(StatusCode.SUCCESS, "Entry is already present"); + } } /* @@ -217,7 +348,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou succeded = ret; } else { error = ret; - log.warn("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription()); + log.trace("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription()); } } @@ -273,8 +404,8 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou // Sanity checks if (currentFlowEntry == null || currentFlowEntry.getNode() == null || newFlowEntry == null - || newFlowEntry.getNode() == null) { - String msg = "Modify: Invalid FlowEntry"; + || newFlowEntry.getNode() == null || newFlowEntry.getFlow() == null) { + String msg = "Modify: " + INVALID_FLOW_ENTRY; String logMsg = msg + ": {} or {}"; log.warn(logMsg, currentFlowEntry, newFlowEntry); return new Status(StatusCode.NOTACCEPTABLE, msg); @@ -437,25 +568,46 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * contain the unique id assigned to this request */ private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) { - // Modify the flow on the network node - Status status = (async) ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall() - .getFlow(), newEntries.getInstall().getFlow()) : programmer.modifyFlow(currentEntries.getNode(), - currentEntries.getInstall().getFlow(), newEntries.getInstall().getFlow()); + FlowEntryDistributionOrderFutureTask futureStatus = + distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED); + if (futureStatus != null) { + Status retStatus = new Status(StatusCode.UNDEFINED); + try { + retStatus = futureStatus.get(); + if (retStatus.getCode() + .equals(StatusCode.TIMEOUT)) { + // A timeout happened, lets cleanup the workMonitor + workMonitor.remove(futureStatus.getOrder()); + } + } catch (InterruptedException e) { + log.error("", e); + } catch (ExecutionException e) { + log.error("", e); + } + return retStatus; + } else { + // Modify the flow on the network node + Status status = async ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall() + .getFlow(), newEntries.getInstall() + .getFlow()) : programmer.modifyFlow(currentEntries.getNode(), currentEntries.getInstall() + .getFlow(), newEntries.getInstall() + .getFlow()); - if (!status.isSuccess()) { - log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(), - status.getDescription()); - return status; - } + if (!status.isSuccess()) { + log.trace("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(), + status.getDescription()); + return status; + } - log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall()); + log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall()); - // Update DB - newEntries.setRequestId(status.getRequestId()); - updateLocalDatabase(currentEntries, false); - updateLocalDatabase(newEntries, true); + // Update DB + newEntries.setRequestId(status.getRequestId()); + updateLocalDatabase(currentEntries, false); + updateLocalDatabase(newEntries, true); - return status; + return status; + } } /** @@ -473,11 +625,10 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou Status error = new Status(null, null); // Sanity Check - if (flowEntry == null || flowEntry.getNode() == null) { - String msg = "Invalid FlowEntry"; - String logMsg = msg + ": {}"; + if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) { + String logMsg = INVALID_FLOW_ENTRY + ": {}"; log.warn(logMsg, flowEntry); - return new Status(StatusCode.NOTACCEPTABLE, msg); + return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY); } // Derive the container flows merged installed entries @@ -502,7 +653,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou if (!ret.isSuccess()) { error = ret; - log.warn("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription()); + log.trace("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription()); if (installedList.size() == 1) { // If we had only one entry to remove, this is fatal failure return error; @@ -534,24 +685,43 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * contain the unique id assigned to this request */ private Status removeEntryInternal(FlowEntryInstall entry, boolean async) { - // Mark the entry to be deleted (for CC just in case we fail) - entry.toBeDeleted(); + FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED); + if (futureStatus != null) { + Status retStatus = new Status(StatusCode.UNDEFINED); + try { + retStatus = futureStatus.get(); + if (retStatus.getCode() + .equals(StatusCode.TIMEOUT)) { + // A timeout happened, lets cleanup the workMonitor + workMonitor.remove(futureStatus.getOrder()); + } + } catch (InterruptedException e) { + log.error("", e); + } catch (ExecutionException e) { + log.error("", e); + } + return retStatus; + } else { + // Mark the entry to be deleted (for CC just in case we fail) + entry.toBeDeleted(); - // Remove from node - Status status = (async) ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall().getFlow()) - : programmer.removeFlow(entry.getNode(), entry.getInstall().getFlow()); + // Remove from node + Status status = async ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall() + .getFlow()) : programmer.removeFlow(entry.getNode(), entry.getInstall() + .getFlow()); - if (!status.isSuccess()) { - log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(), - status.getDescription()); - return status; - } - log.trace("Removed {}", entry.getInstall()); + if (!status.isSuccess()) { + log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", entry.getInstall(), + status.getDescription()); + return status; + } + log.trace("Removed {}", entry.getInstall()); - // Update DB - updateLocalDatabase(entry, false); + // Update DB + updateLocalDatabase(entry, false); - return status; + return status; + } } /** @@ -568,23 +738,42 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * contain the unique id assigned to this request */ private Status addEntriesInternal(FlowEntryInstall entry, boolean async) { - // Install the flow on the network node - Status status = (async) ? programmer.addFlowAsync(entry.getNode(), entry.getInstall().getFlow()) : programmer - .addFlow(entry.getNode(), entry.getInstall().getFlow()); + FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED); + if (futureStatus != null) { + Status retStatus = new Status(StatusCode.UNDEFINED); + try { + retStatus = futureStatus.get(); + if (retStatus.getCode() + .equals(StatusCode.TIMEOUT)) { + // A timeout happened, lets cleanup the workMonitor + workMonitor.remove(futureStatus.getOrder()); + } + } catch (InterruptedException e) { + log.error("", e); + } catch (ExecutionException e) { + log.error("", e); + } + return retStatus; + } else { + // Install the flow on the network node + Status status = async ? programmer.addFlowAsync(entry.getNode(), entry.getInstall() + .getFlow()) : programmer.addFlow(entry.getNode(), entry.getInstall() + .getFlow()); - if (!status.isSuccess()) { - log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(), - status.getDescription()); - return status; - } + if (!status.isSuccess()) { + log.trace("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(), + status.getDescription()); + return status; + } - log.trace("Added {}", entry.getInstall()); + log.trace("Added {}", entry.getInstall()); - // Update DB - entry.setRequestId(status.getRequestId()); - updateLocalDatabase(entry, true); + // Update DB + entry.setRequestId(status.getRequestId()); + updateLocalDatabase(entry, true); - return status; + return status; + } } /** @@ -616,6 +805,16 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou return true; } + private ConcurrentMap.Entry getStaticFlowEntry(String name, Node node) { + for (ConcurrentMap.Entry flowEntry : staticFlows.entrySet()) { + FlowConfig flowConfig = flowEntry.getValue(); + if (flowConfig.isByNameAndNodeIdEqual(name, node)) { + return flowEntry; + } + } + return null; + } + private void updateLocalDatabase(FlowEntryInstall entry, boolean add) { // Update the software view updateSwViewes(entry, add); @@ -732,7 +931,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou updateLocalDatabase(target, false); } else { // log the error - log.warn("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(), + log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(), status.getDescription()); } @@ -950,7 +1149,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * merged flow may conflict with an existing old container flows merged flow * on the network node */ - private void updateFlowsContainerFlow() { + protected void updateFlowsContainerFlow() { Set toReInstall = new HashSet(); // First remove all installed entries for (ConcurrentMap.Entry entry : installedSwView.entrySet()) { @@ -979,7 +1178,6 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou portGroupConfigs = new ConcurrentHashMap(); portGroupData = new ConcurrentHashMap>(); staticFlows = new ConcurrentHashMap(); - flowsSaveEvent = new HashMap(); inactiveFlows = new ConcurrentHashMap(); } @@ -1039,6 +1237,19 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou return list; } + @Override + public List getInstalledFlowEntriesForGroup(String policyName) { + List list = new ArrayList(); + if (policyName != null && !policyName.trim().isEmpty()) { + for (Map.Entry entry : this.installedSwView.entrySet()) { + if (policyName.equals(entry.getKey().getGroupName())) { + list.add(entry.getKey().getInstall().clone()); + } + } + } + return list; + } + @Override public void addOutputPort(Node node, String flowName, List portList) { @@ -1151,7 +1362,6 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou retrieveCaches(); } - @SuppressWarnings("deprecation") private void allocateCaches() { if (this.clusterContainerService == null) { log.warn("Un-initialized clusterContainerService, can't create cache"); @@ -1162,37 +1372,40 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou try { clusterContainerService.createCache("frm.originalSwView", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.installedSwView", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.inactiveFlows", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.nodeFlows", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.groupFlows", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.staticFlows", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); - - clusterContainerService.createCache("frm.flowsSaveEvent", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.staticFlowsOrdinal", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.portGroupConfigs", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.portGroupData", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.TSPolicies", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + + clusterContainerService.createCache(WORK_STATUS_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); + + clusterContainerService.createCache(WORK_ORDER_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); } catch (CacheConfigException cce) { log.error("CacheConfigException"); @@ -1201,7 +1414,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) private void retrieveCaches() { ConcurrentMap map; @@ -1255,13 +1468,6 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou log.error("Retrieval of frm.staticFlows cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache("frm.flowsSaveEvent"); - if (map != null) { - flowsSaveEvent = (ConcurrentMap) map; - } else { - log.error("Retrieval of frm.flowsSaveEvent cache failed for Container {}", container.getName()); - } - map = clusterContainerService.getCache("frm.staticFlowsOrdinal"); if (map != null) { staticFlowsOrdinal = (ConcurrentMap) map; @@ -1290,6 +1496,19 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou log.error("Retrieval of frm.TSPolicies cache failed for Container {}", container.getName()); } + map = clusterContainerService.getCache(WORK_ORDER_CACHE); + if (map != null) { + workOrder = (ConcurrentMap) map; + } else { + log.error("Retrieval of " + WORK_ORDER_CACHE + " cache failed for Container {}", container.getName()); + } + + map = clusterContainerService.getCache(WORK_STATUS_CACHE); + if (map != null) { + workStatus = (ConcurrentMap) map; + } else { + log.error("Retrieval of " + WORK_STATUS_CACHE + " cache failed for Container {}", container.getName()); + } } private boolean flowConfigExists(FlowConfig config) { @@ -1334,7 +1553,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou boolean multipleFlowPush = false; String error; Status status; - config.setStatus(SUCCESS); + config.setStatus(StatusCode.SUCCESS.toString()); // Presence check if (flowConfigExists(config)) { @@ -1412,7 +1631,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou continue; } if (config.getNode().equals(node)) { - if (config.installInHw() && !config.getStatus().equals(SUCCESS)) { + if (config.installInHw() && !config.getStatus().equals(StatusCode.SUCCESS.toString())) { Status status = this.installFlowEntryAsync(config.getFlowEntry()); config.setStatus(status.getDescription()); } @@ -1466,7 +1685,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou config.setStatus("Removed from node because in container mode"); break; case REMOVED: - config.setStatus(SUCCESS); + config.setStatus(StatusCode.SUCCESS.toString()); break; default: } @@ -1644,12 +1863,16 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } } if (target != null) { - // Program the network node - Status status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this - .installFlowEntry(target.getFlowEntry()); + Status status = target.validate(container); + if (!status.isSuccess()) { + log.warn(status.getDescription()); + return status; + } + status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this + .installFlowEntry(target.getFlowEntry()); if (status.isSuccess()) { // Update Configuration database - target.setStatus(SUCCESS); + target.setStatus(StatusCode.SUCCESS.toString()); target.toggleInstallation(); staticFlows.put(key, target); } @@ -1679,27 +1902,46 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou /** * Uninstall all the non-internal Flow Entries present in the software view. - * A copy of each entry is stored in the inactive list so that it can be - * re-applied when needed. This function is called on the global instance of - * FRM only, when the first container is created + * If requested, a copy of each original flow entry will be stored in the + * inactive list so that it can be re-applied when needed (This is typically + * the case when running in the default container and controller moved to + * container mode) NOTE WELL: The routine as long as does a bulk change will + * operate only on the entries for nodes locally attached so to avoid + * redundant operations initiated by multiple nodes + * + * @param preserveFlowEntries + * if true, a copy of each original entry is stored in the + * inactive list */ - private void uninstallAllFlowEntries() { + private void uninstallAllFlowEntries(boolean preserveFlowEntries) { log.info("Uninstalling all non-internal flows"); + List toRemove = new ArrayList(); + // Store entries / create target list for (ConcurrentMap.Entry mapEntry : installedSwView.entrySet()) { FlowEntryInstall flowEntries = mapEntry.getValue(); // Skip internal generated static flows if (!flowEntries.isInternal()) { - inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal()); + toRemove.add(flowEntries); + // Store the original entries if requested + if (preserveFlowEntries) { + inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal()); + } } } // Now remove the entries - for (FlowEntry flowEntry : inactiveFlows.keySet()) { - Status status = this.removeEntry(flowEntry, false); - if (!status.isSuccess()) { - log.warn("Failed to remove entry: {}. The failure is: {}", flowEntry, status.getDescription()); + for (FlowEntryInstall flowEntryHw : toRemove) { + Node n = flowEntryHw.getNode(); + if (n != null && connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) { + Status status = this.removeEntryInternal(flowEntryHw, false); + if (!status.isSuccess()) { + log.trace("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription()); + } + } else { + log.debug("Not removing entry {} because not connected locally, the remote guy will do it's job", + flowEntryHw); } } } @@ -1742,10 +1984,9 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou @Override public FlowConfig getStaticFlow(String name, Node node) { - for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { - if (entry.getValue().isByNameAndNodeIdEqual(name, node)) { - return entry.getValue(); - } + ConcurrentMap.Entry entry = getStaticFlowEntry(name, node); + if(entry != null) { + return entry.getValue(); } return null; } @@ -1819,8 +2060,6 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou @Override public Status saveConfig() { - // Publish the save config event to the cluster nodes - flowsSaveEvent.put(new Date().getTime(), SAVE); return saveConfigInternal(); } @@ -1840,88 +2079,83 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou return new Status(StatusCode.SUCCESS, null); } - @Override - public void entryCreated(Long key, String cacheName, boolean local) { - } - - @Override - public void entryUpdated(Long key, String new_value, String cacheName, boolean originLocal) { - saveConfigInternal(); - } - - @Override - public void entryDeleted(Long key, String cacheName, boolean originLocal) { - } - @Override public void subnetNotify(Subnet sub, boolean add) { } - private void installImplicitARPReplyPunt(Node node) { - - if (node == null) { - return; - } - - List puntAction = new ArrayList(); - puntAction.add(ActionType.CONTROLLER.toString()); - - FlowConfig allowARP = new FlowConfig(); - allowARP.setInstallInHw(true); - allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP Reply" + FlowConfig.INTERNALSTATICFLOWEND); - allowARP.setPriority("500"); - allowARP.setNode(node); - allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); - allowARP.setDstMac(HexEncode.bytesToHexString(switchManager.getControllerMAC())); - allowARP.setActions(puntAction); - addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name - } - + /** + * (non-Javadoc) + * + * @see org.opendaylight.controller.switchmanager.ISwitchManagerAware#modeChangeNotify(org.opendaylight.controller.sal.core.Node, + * boolean) + * + * This method can be called from within the OSGi framework context, + * given the programming operation can take sometime, it not good + * pratice to have in it's context operations that can take time, + * hence moving off to a different thread for async processing. + */ + private ExecutorService executor; @Override - public void modeChangeNotify(Node node, boolean proactive) { - List defaultConfigs = new ArrayList(); - - List puntAction = new ArrayList(); - puntAction.add(ActionType.CONTROLLER.toString()); - - FlowConfig allowARP = new FlowConfig(); - allowARP.setInstallInHw(true); - allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND); - allowARP.setPriority("1"); - allowARP.setNode(node); - allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); - allowARP.setActions(puntAction); - defaultConfigs.add(allowARP); - - FlowConfig allowLLDP = new FlowConfig(); - allowLLDP.setInstallInHw(true); - allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND); - allowLLDP.setPriority("1"); - allowLLDP.setNode(node); - allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()).toUpperCase()); - allowLLDP.setActions(puntAction); - defaultConfigs.add(allowLLDP); - - List dropAction = new ArrayList(); - dropAction.add(ActionType.DROP.toString()); - - FlowConfig dropAllConfig = new FlowConfig(); - dropAllConfig.setInstallInHw(true); - dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop" + FlowConfig.INTERNALSTATICFLOWEND); - dropAllConfig.setPriority("0"); - dropAllConfig.setNode(node); - dropAllConfig.setActions(dropAction); - defaultConfigs.add(dropAllConfig); - - log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive")); - for (FlowConfig fc : defaultConfigs) { - Status status = (proactive) ? addStaticFlowInternal(fc, true) : removeStaticFlow(fc); - if (status.isSuccess()) { - log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName()); - } else { - log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), fc.getName()); + public void modeChangeNotify(final Node node, final boolean proactive) { + Callable modeChangeCallable = new Callable() { + @Override + public Status call() throws Exception { + List defaultConfigs = new ArrayList(); + + List puntAction = new ArrayList(); + puntAction.add(ActionType.CONTROLLER.toString()); + + FlowConfig allowARP = new FlowConfig(); + allowARP.setInstallInHw(true); + allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND); + allowARP.setPriority("1"); + allowARP.setNode(node); + allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()) + .toUpperCase()); + allowARP.setActions(puntAction); + defaultConfigs.add(allowARP); + + FlowConfig allowLLDP = new FlowConfig(); + allowLLDP.setInstallInHw(true); + allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND); + allowLLDP.setPriority("1"); + allowLLDP.setNode(node); + allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()) + .toUpperCase()); + allowLLDP.setActions(puntAction); + defaultConfigs.add(allowLLDP); + + List dropAction = new ArrayList(); + dropAction.add(ActionType.DROP.toString()); + + FlowConfig dropAllConfig = new FlowConfig(); + dropAllConfig.setInstallInHw(true); + dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop" + + FlowConfig.INTERNALSTATICFLOWEND); + dropAllConfig.setPriority("0"); + dropAllConfig.setNode(node); + dropAllConfig.setActions(dropAction); + defaultConfigs.add(dropAllConfig); + + log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive")); + for (FlowConfig fc : defaultConfigs) { + Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc); + if (status.isSuccess()) { + log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName()); + } else { + log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), + fc.getName()); + } + } + return new Status(StatusCode.SUCCESS); } - } + }; + + /* + * Execute the work outside the caller context, this could be an + * expensive operation and we don't want to block the caller for it. + */ + this.executor.submit(modeChangeCallable); } /** @@ -1940,6 +2174,32 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } } + private boolean doesFlowContainNodeConnector(Flow flow, NodeConnector nc) { + if (nc == null) { + return false; + } + + Match match = flow.getMatch(); + if (match.isPresent(MatchType.IN_PORT)) { + NodeConnector matchPort = (NodeConnector) match.getField(MatchType.IN_PORT).getValue(); + if (matchPort.equals(nc)) { + return true; + } + } + List actionsList = flow.getActions(); + if (actionsList != null) { + for (Action action : actionsList) { + if (action instanceof Output) { + NodeConnector actionPort = ((Output) action).getPort(); + if (actionPort.equals(nc)) { + return true; + } + } + } + } + return false; + } + @Override public void notifyNode(Node node, UpdateType type, Map propMap) { this.pendingEvents.offer(new NodeUpdateEvent(type, node)); @@ -1947,7 +2207,99 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou @Override public void notifyNodeConnector(NodeConnector nodeConnector, UpdateType type, Map propMap) { + boolean updateStaticFlowCluster = false; + + switch (type) { + case ADDED: + break; + case CHANGED: + Config config = (propMap == null) ? null : (Config) propMap.get(Config.ConfigPropName); + if (config != null) { + switch (config.getValue()) { + case Config.ADMIN_DOWN: + log.trace("Port {} is administratively down: uninstalling interested flows", nodeConnector); + updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector); + break; + case Config.ADMIN_UP: + log.trace("Port {} is administratively up: installing interested flows", nodeConnector); + updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nodeConnector); + break; + case Config.ADMIN_UNDEF: + break; + default: + } + } + break; + case REMOVED: + // This is the case where a switch port is removed from the SDN agent space + log.trace("Port {} was removed from our control: uninstalling interested flows", nodeConnector); + updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector); + break; + default: + + } + + if (updateStaticFlowCluster) { + refreshClusterStaticFlowsStatus(nodeConnector.getNode()); + } + } + + /* + * It goes through the static flows configuration, it identifies the ones + * which have the specified node connector as input or output port and + * install them on the network node if they are marked to be installed in + * hardware and their status shows they were not installed yet + */ + private boolean installFlowsOnNodeConnectorUp(NodeConnector nodeConnector) { + boolean updated = false; + List flowConfigForNode = getStaticFlows(nodeConnector.getNode()); + for (FlowConfig flowConfig : flowConfigForNode) { + if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) { + if (flowConfig.installInHw() && !flowConfig.getStatus().equals(StatusCode.SUCCESS.toString())) { + Status status = this.installFlowEntry(flowConfig.getFlowEntry()); + if (!status.isSuccess()) { + flowConfig.setStatus(status.getDescription()); + } else { + flowConfig.setStatus(StatusCode.SUCCESS.toString()); + } + updated = true; + } + } + } + return updated; + } + /* + * Remove from the network node all the flows which have the specified node + * connector as input or output port. If any of the flow entry is a static + * flow, it updates the correspondent configuration. + */ + private boolean removeFlowsOnNodeConnectorDown(NodeConnector nodeConnector) { + boolean updated = false; + List nodeFlowEntries = nodeFlows.get(nodeConnector.getNode()); + if (nodeFlowEntries == null) { + return updated; + } + for (FlowEntryInstall fei : new ArrayList(nodeFlowEntries)) { + if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nodeConnector)) { + Status status = this.removeEntryInternal(fei, true); + if (!status.isSuccess()) { + continue; + } + /* + * If the flow entry is a static flow, then update its + * configuration + */ + if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) { + FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode()); + if (flowConfig != null) { + flowConfig.setStatus(PORTREMOVED); + updated = true; + } + } + } + } + return updated; } private FlowConfig getDerivedFlowConfig(FlowConfig original, String configName, Short port) { @@ -2055,17 +2407,6 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou return true; } - private void usePortGroupConfig(String name) { - PortGroupConfig config = portGroupConfigs.get(name); - if (config == null) { - return; - } - if (portGroupProvider != null) { - Map data = portGroupProvider.getPortGroupData(config); - portGroupData.put(config, data); - } - } - @Override public Map getPortGroupConfigs() { return portGroupConfigs; @@ -2138,7 +2479,6 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * */ void init() { - frmAware = Collections.synchronizedSet(new HashSet()); frmFileName = GlobalConstants.STARTUPHOME.toString() + "frm_staticflows_" + this.getContainerName() + ".conf"; portGroupFileName = GlobalConstants.STARTUPHOME.toString() + "portgroup_" + this.getContainerName() + ".conf"; @@ -2168,11 +2508,12 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou public void run() { while (!stopping) { try { - FRMEvent event = pendingEvents.take(); + final FRMEvent event = pendingEvents.take(); if (event == null) { log.warn("Dequeued null event"); continue; } + log.trace("Dequeued {} event", event.getClass().getSimpleName()); if (event instanceof NodeUpdateEvent) { NodeUpdateEvent update = (NodeUpdateEvent) event; Node node = update.getNode(); @@ -2189,11 +2530,72 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } else if (event instanceof ErrorReportedEvent) { ErrorReportedEvent errEvent = (ErrorReportedEvent) event; processErrorEvent(errEvent); + } else if (event instanceof WorkOrderEvent) { + /* + * Take care of handling the remote Work request + */ + Runnable r = new Runnable() { + @Override + public void run() { + WorkOrderEvent work = (WorkOrderEvent) event; + FlowEntryDistributionOrder fe = work.getFe(); + if (fe != null) { + logsync.trace("Executing the workOrder {}", fe); + Status gotStatus = null; + FlowEntryInstall feiCurrent = fe.getEntry(); + FlowEntryInstall feiNew = workOrder.get(fe); + switch (fe.getUpType()) { + case ADDED: + gotStatus = addEntriesInternal(feiCurrent, false); + break; + case CHANGED: + gotStatus = modifyEntryInternal(feiCurrent, feiNew, false); + break; + case REMOVED: + gotStatus = removeEntryInternal(feiCurrent, false); + break; + } + // Remove the Order + workOrder.remove(fe); + logsync.trace( + "The workOrder has been executed and now the status is being returned {}", fe); + // Place the status + workStatus.put(fe, gotStatus); + } else { + log.warn("Not expected null WorkOrder", work); + } + } + }; + if(executor != null) { + executor.execute(r); + } + } else if (event instanceof WorkStatusCleanup) { + /* + * Take care of handling the remote Work request + */ + WorkStatusCleanup work = (WorkStatusCleanup) event; + FlowEntryDistributionOrder fe = work.getFe(); + if (fe != null) { + logsync.trace("The workStatus {} is being removed", fe); + workStatus.remove(fe); + } else { + log.warn("Not expected null WorkStatus", work); + } + } else if (event instanceof ContainerFlowChangeEvent) { + /* + * Whether it is an addition or removal, we have to + * recompute the merged flows entries taking into + * account all the current container flows because + * flow merging is not an injective function + */ + updateFlowsContainerFlow(); } else { - log.warn("Dequeued unknown event {}", event.getClass().getSimpleName()); + log.warn("Dequeued unknown event {}", event.getClass() + .getSimpleName()); } } catch (InterruptedException e) { - log.warn("FRM EventHandler thread interrupted", e); + // clear pending events + pendingEvents.clear(); } } } @@ -2207,6 +2609,12 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * */ void destroy() { + // Interrupt the thread + frmEventHandler.interrupt(); + // Clear the pendingEvents queue + pendingEvents.clear(); + frmAware.clear(); + workMonitor.clear(); } /** @@ -2215,9 +2623,20 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * */ void start() { + /* + * If running in default container, need to know if controller is in + * container mode + */ + if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) { + inContainerMode = containerManager.inContainerMode(); + } + // Initialize graceful stop flag stopping = false; + // Allocate the executor service + this.executor = Executors.newFixedThreadPool(maxPoolSize); + // Start event handler thread frmEventHandler.start(); @@ -2237,7 +2656,15 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou */ void stop() { stopping = true; - uninstallAllFlowEntries(); + uninstallAllFlowEntries(false); + // Shutdown executor + this.executor.shutdownNow(); + // Now walk all the workMonitor and wake up the one sleeping because + // destruction is happening + for (FlowEntryDistributionOrder fe : workMonitor.keySet()) { + FlowEntryDistributionOrderFutureTask task = workMonitor.get(fe); + task.cancel(true); + } } public void setFlowProgrammerService(IFlowProgrammerService service) { @@ -2274,19 +2701,36 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } log.trace("Container {}: Updating installed flows because of container flow change: {} {}", container.getName(), t, current); - /* - * Whether it is an addition or removal, we have to recompute the merged - * flows entries taking into account all the current container flows - * because flow merging is not an injective function - */ - updateFlowsContainerFlow(); + ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t); + pendingEvents.offer(ev); } @Override - public void nodeConnectorUpdated(String containerName, NodeConnector p, UpdateType t) { + public void nodeConnectorUpdated(String containerName, NodeConnector nc, UpdateType t) { if (!container.getName().equals(containerName)) { return; } + + boolean updateStaticFlowCluster = false; + + switch (t) { + case REMOVED: + log.trace("Port {} was removed from container: uninstalling interested flows", nc); + updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nc); + break; + case ADDED: + log.trace("Port {} was added to container: reinstall interested flows", nc); + updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nc); + + break; + case CHANGED: + break; + default: + } + + if (updateStaticFlowCluster) { + refreshClusterStaticFlowsStatus(nc.getNode()); + } } @Override @@ -2297,8 +2741,15 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } switch (update) { case ADDED: + /* + * Controller is moving to container mode. We are in the default + * container context, we need to remove all our non-internal flows + * to prevent any container isolation breakage. We also need to + * preserve our flow so that they can be re-installed if we move + * back to non container mode (no containers). + */ this.inContainerMode = true; - this.uninstallAllFlowEntries(); + this.uninstallAllFlowEntries(true); break; case REMOVED: this.inContainerMode = false; @@ -2357,125 +2808,83 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } } - /* - * OSGI COMMANDS - */ - @Override - public String getHelp() { - StringBuffer help = new StringBuffer(); - help.append("---FRM Matrix Application---\n"); - help.append("\t printMatrixData - Prints the Matrix Configs\n"); - help.append("\t addMatrixConfig \n"); - help.append("\t delMatrixConfig \n"); - help.append("\t useMatrixConfig \n"); - return help.toString(); - } - - public void _printMatrixData(CommandInterpreter ci) { - ci.println("Configs : "); - ci.println("---------"); - ci.println(portGroupConfigs); + private class WorkOrderEvent extends FRMEvent { + private FlowEntryDistributionOrder fe; + private FlowEntryInstall newEntry; - ci.println("Data : "); - ci.println("------"); - ci.println(portGroupData); - } + /** + * @param fe + * @param newEntry + */ + WorkOrderEvent(FlowEntryDistributionOrder fe, FlowEntryInstall newEntry) { + this.fe = fe; + this.newEntry = newEntry; + } - public void _addMatrixConfig(CommandInterpreter ci) { - String name = ci.nextArgument(); - String regex = ci.nextArgument(); - addPortGroupConfig(name, regex, false); - } + /** + * @return the fe + */ + public FlowEntryDistributionOrder getFe() { + return fe; + } - public void _delMatrixConfig(CommandInterpreter ci) { - String name = ci.nextArgument(); - delPortGroupConfig(name); + /** + * @return the newEntry + */ + public FlowEntryInstall getNewEntry() { + return newEntry; + } } + private class ContainerFlowChangeEvent extends FRMEvent { + private final ContainerFlow previous; + private final ContainerFlow current; + private final UpdateType type; - public void _useMatrixConfig(CommandInterpreter ci) { - String name = ci.nextArgument(); - usePortGroupConfig(name); - } + public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) { + this.previous = previous; + this.current = current; + this.type = type; + } - public void _arpPunt(CommandInterpreter ci) { - String switchId = ci.nextArgument(); - long swid = HexEncode.stringToLong(switchId); - Node node = NodeCreator.createOFNode(swid); - installImplicitARPReplyPunt(node); - } + public ContainerFlow getPrevious() { + return this.previous; + } - public void _frmaddflow(CommandInterpreter ci) throws UnknownHostException { - Node node = null; - String nodeId = ci.nextArgument(); - if (nodeId == null) { - ci.print("Node id not specified"); - return; + public ContainerFlow getCurrent() { + return this.current; } - try { - node = NodeCreator.createOFNode(Long.valueOf(nodeId)); - } catch (NumberFormatException e) { - ci.print("Node id not a number"); - return; + + public UpdateType getType() { + return this.type; } - ci.println(this.programmer.addFlow(node, getSampleFlow(node))); } - public void _frmremoveflow(CommandInterpreter ci) throws UnknownHostException { - Node node = null; - String nodeId = ci.nextArgument(); - if (nodeId == null) { - ci.print("Node id not specified"); - return; - } - try { - node = NodeCreator.createOFNode(Long.valueOf(nodeId)); - } catch (NumberFormatException e) { - ci.print("Node id not a number"); - return; + + private class WorkStatusCleanup extends FRMEvent { + private FlowEntryDistributionOrder fe; + + /** + * @param fe + */ + WorkStatusCleanup(FlowEntryDistributionOrder fe) { + this.fe = fe; } - ci.println(this.programmer.removeFlow(node, getSampleFlow(node))); - } - - private Flow getSampleFlow(Node node) throws UnknownHostException { - NodeConnector port = NodeConnectorCreator.createOFNodeConnector((short) 24, node); - NodeConnector oport = NodeConnectorCreator.createOFNodeConnector((short) 30, node); - byte srcMac[] = { (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78, (byte) 0x9a, (byte) 0xbc }; - byte dstMac[] = { (byte) 0x1a, (byte) 0x2b, (byte) 0x3c, (byte) 0x4d, (byte) 0x5e, (byte) 0x6f }; - InetAddress srcIP = InetAddress.getByName("172.28.30.50"); - InetAddress dstIP = InetAddress.getByName("171.71.9.52"); - InetAddress ipMask = InetAddress.getByName("255.255.255.0"); - InetAddress ipMask2 = InetAddress.getByName("255.0.0.0"); - short ethertype = EtherTypes.IPv4.shortValue(); - short vlan = (short) 27; - byte vlanPr = 3; - Byte tos = 4; - byte proto = IPProtocols.TCP.byteValue(); - short src = (short) 55000; - short dst = 80; - /* - * Create a SAL Flow aFlow + /** + * @return the fe */ - Match match = new Match(); - match.setField(MatchType.IN_PORT, port); - match.setField(MatchType.DL_SRC, srcMac); - match.setField(MatchType.DL_DST, dstMac); - match.setField(MatchType.DL_TYPE, ethertype); - match.setField(MatchType.DL_VLAN, vlan); - match.setField(MatchType.DL_VLAN_PR, vlanPr); - match.setField(MatchType.NW_SRC, srcIP, ipMask); - match.setField(MatchType.NW_DST, dstIP, ipMask2); - match.setField(MatchType.NW_TOS, tos); - match.setField(MatchType.NW_PROTO, proto); - match.setField(MatchType.TP_SRC, src); - match.setField(MatchType.TP_DST, dst); - - List actions = new ArrayList(); - actions.add(new Output(oport)); - actions.add(new PopVlan()); - actions.add(new Flood()); - actions.add(new Controller()); - return new Flow(match, actions); + public FlowEntryDistributionOrder getFe() { + return fe; + } + } + + /* + * OSGI COMMANDS + */ + @Override + public String getHelp() { + StringBuffer help = new StringBuffer(); + return help.toString(); } @Override @@ -2535,6 +2944,36 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou } } + public void _frmProcessErrorEvent(CommandInterpreter ci) throws UnknownHostException { + Node node = null; + long reqId = 0L; + String nodeId = ci.nextArgument(); + if (nodeId == null) { + ci.print("Node id not specified"); + return; + } + String requestId = ci.nextArgument(); + if (requestId == null) { + ci.print("Request id not specified"); + return; + } + try { + node = NodeCreator.createOFNode(Long.valueOf(nodeId)); + } catch (NumberFormatException e) { + ci.print("Node id not a number"); + return; + } + try { + reqId = Long.parseLong(requestId); + } catch (NumberFormatException e) { + ci.print("Request id not a number"); + return; + } + // null for error object is good enough for now + ErrorReportedEvent event = new ErrorReportedEvent(reqId, node, null); + this.processErrorEvent(event); + } + @Override public void flowRemoved(Node node, Flow flow) { log.trace("Received flow removed notification on {} for {}", node, flow); @@ -2561,7 +3000,7 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou if (target != null) { // Update Configuration database target.toggleInstallation(); - target.setStatus(SUCCESS); + target.setStatus(StatusCode.SUCCESS.toString()); staticFlows.put(key, target); } @@ -2587,16 +3026,32 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou * mapping will have to be added in future */ FlowEntryInstall target = null; - for (FlowEntryInstall index : nodeFlows.get(node)) { - FlowEntryInstall entry = installedSwView.get(index); - if (entry.getRequestId() == rid) { - target = entry; - break; + List flowEntryInstallList = nodeFlows.get(node); + // flowEntryInstallList could be null. + // so check for it. + if(flowEntryInstallList != null) { + for (FlowEntryInstall index : flowEntryInstallList) { + FlowEntryInstall entry = installedSwView.get(index); + if(entry != null) { + if (entry.getRequestId() == rid) { + target = entry; + break; + } + } } } if (target != null) { // This was a flow install, update database this.updateLocalDatabase(target, false); + // also update the config + if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) { + ConcurrentMap.Entry staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode()); + // staticFlowEntry should never be null. + // the null check is just an extra defensive check. + if(staticFlowEntry != null) { + staticFlows.remove(staticFlowEntry.getKey()); + } + } } // Notify listeners @@ -2627,4 +3082,121 @@ public class ForwardingRulesManager implements IForwardingRulesManager, PortGrou return rv; } + + public void unsetIConnectionManager(IConnectionManager s) { + if (s == this.connectionManager) { + this.connectionManager = null; + } + } + + public void setIConnectionManager(IConnectionManager s) { + this.connectionManager = s; + } + + public void unsetIContainerManager(IContainerManager s) { + if (s == this.containerManager) { + this.containerManager = null; + } + } + + public void setIContainerManager(IContainerManager s) { + this.containerManager = s; + } + + @Override + public void entryCreated(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ + } + + @Override + public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) { + if (originLocal) { + /* + * Local updates are of no interest + */ + return; + } + if (cacheName.equals(WORK_ORDER_CACHE)) { + logsync.trace("Got a WorkOrderCacheUpdate for {}", key); + /* + * This is the case of one workOrder becoming available, so we need + * to dispatch the work to the appropriate handler + */ + FlowEntryDistributionOrder fe = (FlowEntryDistributionOrder) key; + FlowEntryInstall fei = fe.getEntry(); + if (fei == null) { + return; + } + Node n = fei.getNode(); + if (connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) { + logsync.trace("workOrder for fe {} processed locally", fe); + // I'm the controller in charge for the request, queue it for + // processing + pendingEvents.offer(new WorkOrderEvent(fe, (FlowEntryInstall) new_value)); + } + } else if (cacheName.equals(WORK_STATUS_CACHE)) { + logsync.trace("Got a WorkStatusCacheUpdate for {}", key); + /* + * This is the case of one workOrder being completed and a status + * returned + */ + FlowEntryDistributionOrder fe = (FlowEntryDistributionOrder) key; + /* + * Check if the order was initiated by this controller in that case + * we need to actually look at the status returned + */ + if (fe.getRequestorController() + .equals(clusterContainerService.getMyAddress())) { + FlowEntryDistributionOrderFutureTask fet = workMonitor.remove(fe); + if (fet != null) { + logsync.trace("workStatus response is for us {}", fe); + // Signal we got the status + fet.gotStatus(fe, workStatus.get(fe)); + pendingEvents.offer(new WorkStatusCleanup(fe)); + } + } + } + } + + @Override + public void entryDeleted(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ + } + + /** + * {@inheritDoc} + */ + @Override + public List getFlowEntriesForNode(Node node) { + List list = new ArrayList(); + if (node != null) { + for (Map.Entry entry : this.originalSwView.entrySet()) { + if (node.equals(entry.getKey().getNode())) { + list.add(entry.getKey().clone()); + } + } + } + return list; + } + + /** + * {@inheritDoc} + */ + @Override + public List getInstalledFlowEntriesForNode(Node node) { + List list = new ArrayList(); + if (node != null) { + List flowEntryInstallList = this.nodeFlows.get(node); + if(flowEntryInstallList != null) { + for(FlowEntryInstall fi: flowEntryInstallList) { + list.add(fi.getInstall().clone()); + } + } + } + return list; + } }