import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import org.eclipse.osgi.framework.console.CommandInterpreter;
-import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.connectionmanager.ConnectionLocality;
import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.containermanager.IContainerManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
import org.opendaylight.controller.forwardingrulesmanager.implementation.data.FlowEntryDistributionOrder;
import org.opendaylight.controller.sal.action.Action;
import org.opendaylight.controller.sal.action.ActionType;
-import org.opendaylight.controller.sal.action.Controller;
-import org.opendaylight.controller.sal.action.Flood;
import org.opendaylight.controller.sal.action.Output;
-import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
+import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.Property;
import org.opendaylight.controller.sal.match.MatchType;
import org.opendaylight.controller.sal.utils.EtherTypes;
import org.opendaylight.controller.sal.utils.GlobalConstants;
-import org.opendaylight.controller.sal.utils.HexEncode;
import org.opendaylight.controller.sal.utils.IObjectReader;
-import org.opendaylight.controller.sal.utils.IPProtocols;
-import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
-import org.opendaylight.controller.sal.utils.NodeCreator;
import org.opendaylight.controller.sal.utils.ObjectReader;
import org.opendaylight.controller.sal.utils.ObjectWriter;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.switchmanager.ISwitchManagerAware;
import org.opendaylight.controller.switchmanager.Subnet;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ForwardingRulesManager implements
IForwardingRulesManager,
PortGroupChangeListener,
- IContainerListener,
+ IContainerLocalListener,
ISwitchManagerAware,
IConfigurationContainerAware,
IInventoryListener,
IObjectReader,
- ICacheUpdateAware,
- CommandProvider,
+ ICacheUpdateAware<Object,Object>,
IFlowProgrammerListener {
- private static final String NODEDOWN = "Node is Down";
- private static final String SUCCESS = StatusCode.SUCCESS.toString();
+
private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class);
- private static final String PORTREMOVED = "Port removed";
private static final Logger logsync = LoggerFactory.getLogger("FRMsync");
+ private static final String PORT_REMOVED = "Port removed";
+ private static final String NODE_DOWN = "Node is Down";
+ private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry";
private String frmFileName;
private String portGroupFileName;
private ConcurrentMap<Integer, FlowConfig> staticFlows;
private ConcurrentMap<String, PortGroupConfig> portGroupConfigs;
private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
private ConcurrentMap<String, Object> TSPolicies;
+ private IContainerManager containerManager;
private boolean inContainerMode; // being used by global instance only
- private boolean stopping;
+ protected boolean stopping;
/*
* Flow database. It's the software view of what was requested to install
* necessity non-transactional as long as need to be able to synchronize
* states also while a transaction is in progress
*/
- static final String WORKORDERCACHE = "frm.workOrder";
- static final String WORKSTATUSCACHE = "frm.workStatus";
+ static final String WORK_ORDER_CACHE = "frm.workOrder";
+ static final String WORK_STATUS_CACHE = "frm.workStatus";
+ static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView";
+ static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView";
/*
* Data structure responsible for distributing the FlowEntryInstall requests
* not picked by anyone, which is always a case can happen especially on
* Node disconnect cases.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+ protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
/*
* Data structure responsible for retrieving the results of the workOrder
* TODO: The workStatus entries need to have a lifetime associated in case
* of requestor controller leaving the cluster.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+ protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
/*
* Local Map used to hold the Future which a caller can use to monitor for
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
/*
- * Create an executor pool to create the distributionOrder, this is a stop
- * gap solution caused by an issue with non-transactional caches in the
- * implementation we use, being currently worked on. It has been noticed in
- * fact that when non-transactional caches are being used sometime the key
- * are no distributed to all the nodes properly. To workaround the issue
- * transactional caches are being used, but there was a reason for using
- * non-transactional caches to start with, in fact we needed to be able in
- * the context of a northbound transaction to program the FRM entries
- * irrespective of the fact that transaction would commit or no else we
- * would not be able to achieve the entry programming and implement the
- * scheme for recovery from network element failures. Bottom line, now in
- * order to make sure an update on a transactional cache goes out while in a
- * transaction that need to be initiated by a different thread.
+ * Max pool size for the executor
*/
- private ExecutorService executor;
-
- class DistributeOrderCallable implements Callable<Future<Status>> {
- private FlowEntryInstall e;
- private FlowEntryInstall u;
- private UpdateType t;
- DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
- this.e = e;
- this.u = u;
- this.t = t;
- }
-
- @Override
- public Future<Status> call() throws Exception {
- if (e == null || t == null) {
- logsync.error("Unexpected null Entry up update type");
- return null;
- }
- // Create the work order and distribute it
- FlowEntryDistributionOrder fe =
- new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
- // First create the monitor job
- FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
- logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
- workMonitor.put(fe, ret);
- if (t.equals(UpdateType.CHANGED)) {
- // Then distribute the work
- workOrder.put(fe, u);
- } else {
- // Then distribute the work
- workOrder.put(fe, e);
- }
- logsync.trace("WorkOrder requested");
- // Now create an Handle to monitor the execution of the operation
- return ret;
- }
- }
+ private static final int maxPoolSize = 10;
/**
* @param e
* @return a Future object for monitoring the progress of the result, or
* null in case the processing should take place locally
*/
- private Future<Status> distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ private FlowEntryDistributionOrderFutureTask distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u,
+ UpdateType t) {
// A null entry it's an unexpected condition, anyway it's safe to keep
// the handling local
if (e == null) {
Node n = e.getNode();
if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
- Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
- if (worker != null) {
- Future<Future<Status>> workerRes = this.executor.submit(worker);
- try {
- return workerRes.get();
- } catch (InterruptedException e1) {
- // we where interrupted, not a big deal.
- return null;
- } catch (ExecutionException e1) {
- logsync.error(
- "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
- e);
- return null;
- }
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", n, fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
}
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
}
logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
private Status addEntry(FlowEntry flowEntry, boolean async) {
// Sanity Check
- if (flowEntry == null || flowEntry.getNode() == null) {
- String msg = "Invalid FlowEntry";
- String logMsg = msg + ": {}";
+ if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) {
+ String logMsg = INVALID_FLOW_ENTRY + ": {}";
log.warn(logMsg, flowEntry);
- return new Status(StatusCode.NOTACCEPTABLE, msg);
+ return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY);
+ }
+
+ /*
+ * Redundant Check: Check if the request is a redundant one from the
+ * same application the flowEntry is equal to an existing one. Given we
+ * do not have an application signature in the requested FlowEntry yet,
+ * we are here detecting the above condition by comparing the flow
+ * names, if set. If they are equal to the installed flow, most likely
+ * this is a redundant installation request from the same application
+ * and we can silently return success
+ *
+ * TODO: in future a sort of application reference list mechanism will
+ * be added to the FlowEntry so that exact flow can be used by different
+ * applications.
+ */
+ FlowEntry present = this.originalSwView.get(flowEntry);
+ if (present != null) {
+ boolean sameFlow = present.getFlow().equals(flowEntry.getFlow());
+ boolean sameApp = present.getFlowName() != null && present.getFlowName().equals(flowEntry.getFlowName());
+ if (sameFlow && sameApp) {
+ log.trace("Skipping redundant request for flow {} on node {}", flowEntry.getFlowName(),
+ flowEntry.getNode());
+ return new Status(StatusCode.SUCCESS, "Entry is already present");
+ }
}
/*
succeded = ret;
} else {
error = ret;
- log.warn("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription());
+ log.trace("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription());
}
}
// Sanity checks
if (currentFlowEntry == null || currentFlowEntry.getNode() == null || newFlowEntry == null
- || newFlowEntry.getNode() == null) {
- String msg = "Modify: Invalid FlowEntry";
+ || newFlowEntry.getNode() == null || newFlowEntry.getFlow() == null) {
+ String msg = "Modify: " + INVALID_FLOW_ENTRY;
String logMsg = msg + ": {} or {}";
log.warn(logMsg, currentFlowEntry, newFlowEntry);
return new Status(StatusCode.NOTACCEPTABLE, msg);
Status succeeded = null;
boolean decouple = false;
if (installedList.size() != toInstallList.size()) {
- log.info("Modify: New flow entry does not satisfy the same "
+ log.trace("Modify: New flow entry does not satisfy the same "
+ "number of container flows as the original entry does");
decouple = true;
}
*/
FlowEntryInstall sameMatchEntry = installedSwView.get(installEntry);
if (sameMatchEntry != null && !sameMatchEntry.getOriginal().equals(currentFlowEntry)) {
- log.info("Modify: new container flow merged flow entry clashes with existing flow");
+ log.trace("Modify: new container flow merged flow entry clashes with existing flow");
decouple = true;
} else {
toInstallSafe.add(installEntry);
* contain the unique id assigned to this request
*/
private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
+ FlowEntryDistributionOrderFutureTask futureStatus =
+ distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
.getFlow());
if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(),
+ log.trace("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(),
status.getDescription());
return status;
}
// Update DB
newEntries.setRequestId(status.getRequestId());
- updateLocalDatabase(currentEntries, false);
- updateLocalDatabase(newEntries, true);
+ updateSwViews(currentEntries, false);
+ updateSwViews(newEntries, true);
return status;
}
Status error = new Status(null, null);
// Sanity Check
- if (flowEntry == null || flowEntry.getNode() == null) {
- String msg = "Invalid FlowEntry";
- String logMsg = msg + ": {}";
+ if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) {
+ String logMsg = INVALID_FLOW_ENTRY + ": {}";
log.warn(logMsg, flowEntry);
- return new Status(StatusCode.NOTACCEPTABLE, msg);
+ return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY);
}
// Derive the container flows merged installed entries
if (!ret.isSuccess()) {
error = ret;
- log.warn("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription());
+ log.trace("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription());
if (installedList.size() == 1) {
// If we had only one entry to remove, this is fatal failure
return error;
* contain the unique id assigned to this request
*/
private Status removeEntryInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
.getFlow());
if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
+ log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", entry.getInstall(),
status.getDescription());
return status;
}
log.trace("Removed {}", entry.getInstall());
// Update DB
- updateLocalDatabase(entry, false);
+ updateSwViews(entry, false);
return status;
}
* contain the unique id assigned to this request
*/
private Status addEntriesInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
.getFlow());
if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
+ log.trace("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
status.getDescription());
return status;
}
// Update DB
entry.setRequestId(status.getRequestId());
- updateLocalDatabase(entry, true);
+ updateSwViews(entry, true);
return status;
}
return true;
}
- private void updateLocalDatabase(FlowEntryInstall entry, boolean add) {
- // Update the software view
- updateSwViewes(entry, add);
+ private ConcurrentMap.Entry<Integer, FlowConfig> getStaticFlowEntry(String name, Node node) {
+ for (ConcurrentMap.Entry<Integer, FlowConfig> flowEntry : staticFlows.entrySet()) {
+ FlowConfig flowConfig = flowEntry.getValue();
+ if (flowConfig.isByNameAndNodeIdEqual(name, node)) {
+ return flowEntry;
+ }
+ }
+ return null;
+ }
+ private void updateIndexDatabase(FlowEntryInstall entry, boolean add) {
// Update node indexed flow database
updateNodeFlowsDB(entry, add);
/*
* Update the node mapped flows database
*/
- private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) {
+ private void updateSwViews(FlowEntryInstall flowEntries, boolean add) {
if (add) {
originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal());
installedSwView.put(flowEntries, flowEntries);
}
if (add) {
+ // there may be an already existing entry.
+ // remove it before adding the new one.
+ // This is necessary since we have observed that in some cases
+ // Infinispan does aggregation for operations (eg:- remove and then put a different value)
+ // related to the same key within the same transaction.
+ // Need this defensive code as the new FlowEntryInstall may be different
+ // than the old one even though the equals method returns true. This is because
+ // the equals method does not take into account the action list.
+ if(nodeIndeces.contains(flowEntries)) {
+ nodeIndeces.remove(flowEntries);
+ }
nodeIndeces.add(flowEntries);
} else {
nodeIndeces.remove(flowEntries);
}
if (add) {
+ // same comments in the similar code section in
+ // updateNodeFlowsDB method apply here too
+ if(indices.contains(flowEntries)) {
+ indices.remove(flowEntries);
+ }
indices.add(flowEntries);
} else {
indices.remove(flowEntries);
// Update DB
if (status.isSuccess()) {
- updateLocalDatabase(target, false);
+ updateSwViews(target, false);
} else {
// log the error
- log.warn("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
+ log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
status.getDescription());
}
* merged flow may conflict with an existing old container flows merged flow
* on the network node
*/
- private void updateFlowsContainerFlow() {
+ protected void updateFlowsContainerFlow() {
Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
// First remove all installed entries
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
private void nonClusterObjectCreate() {
originalSwView = new ConcurrentHashMap<FlowEntry, FlowEntry>();
installedSwView = new ConcurrentHashMap<FlowEntryInstall, FlowEntryInstall>();
- nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
- groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
TSPolicies = new ConcurrentHashMap<String, Object>();
staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
portGroupConfigs = new ConcurrentHashMap<String, PortGroupConfig>();
inactiveFlows = new ConcurrentHashMap<FlowEntry, FlowEntry>();
}
- private void registerWithOSGIConsole() {
- BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
- bundleContext.registerService(CommandProvider.class.getName(), this, null);
- }
-
@Override
public void setTSPolicyData(String policyname, Object o, boolean add) {
if (policyName != null && !policyName.trim().isEmpty()) {
for (Map.Entry<FlowEntry, FlowEntry> entry : this.originalSwView.entrySet()) {
if (policyName.equals(entry.getKey().getGroupName())) {
- list.add(entry.getKey().clone());
+ list.add(entry.getValue().clone());
}
}
}
if (policyName != null && !policyName.trim().isEmpty()) {
for (Map.Entry<FlowEntryInstall, FlowEntryInstall> entry : this.installedSwView.entrySet()) {
if (policyName.equals(entry.getKey().getGroupName())) {
- list.add(entry.getKey().getInstall().clone());
+ list.add(entry.getValue().getInstall().clone());
}
}
}
}
Status error = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (error.isSuccess()) {
- log.info("Ports {} added to FlowEntry {}", portList, flowName);
+ log.trace("Ports {} added to FlowEntry {}", portList, flowName);
} else {
log.warn("Failed to add ports {} to Flow entry {}. The failure is: {}", portList,
currentFlowEntry.toString(), error.getDescription());
}
Status status = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (status.isSuccess()) {
- log.info("Ports {} removed from FlowEntry {}", portList, flowName);
+ log.trace("Ports {} removed from FlowEntry {}", portList, flowName);
} else {
log.warn("Failed to remove ports {} from Flow entry {}. The failure is: {}", portList,
currentFlowEntry.toString(), status.getDescription());
Status status = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (status.isSuccess()) {
- log.info("Output port replaced with {} for flow {} on node {}", outPort, flowName, node);
+ log.trace("Output port replaced with {} for flow {} on node {}", outPort, flowName, node);
} else {
log.warn("Failed to replace output port for flow {} on node {}. The failure is: {}", flowName, node,
status.getDescription());
retrieveCaches();
}
- @SuppressWarnings("deprecation")
private void allocateCaches() {
if (this.clusterContainerService == null) {
log.warn("Un-initialized clusterContainerService, can't create cache");
log.debug("Allocating caches for Container {}", container.getName());
try {
- clusterContainerService.createCache("frm.originalSwView",
+ clusterContainerService.createCache(ORIGINAL_SW_VIEW_CACHE,
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.installedSwView",
+ clusterContainerService.createCache(INSTALLED_SW_VIEW_CACHE,
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.inactiveFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.nodeFlows",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
- clusterContainerService.createCache("frm.groupFlows",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.flowsSaveEvent",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlowsOrdinal",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.TSPolicies",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ clusterContainerService.createCache(WORK_STATUS_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
- clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ clusterContainerService.createCache(WORK_ORDER_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
}
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void retrieveCaches() {
ConcurrentMap<?, ?> map;
log.debug("Retrieving Caches for Container {}", container.getName());
- map = clusterContainerService.getCache("frm.originalSwView");
+ map = clusterContainerService.getCache(ORIGINAL_SW_VIEW_CACHE);
if (map != null) {
originalSwView = (ConcurrentMap<FlowEntry, FlowEntry>) map;
} else {
log.error("Retrieval of frm.originalSwView cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.installedSwView");
+ map = clusterContainerService.getCache(INSTALLED_SW_VIEW_CACHE);
if (map != null) {
installedSwView = (ConcurrentMap<FlowEntryInstall, FlowEntryInstall>) map;
} else {
log.error("Retrieval of frm.inactiveFlows cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.nodeFlows");
- if (map != null) {
- nodeFlows = (ConcurrentMap<Node, List<FlowEntryInstall>>) map;
- } else {
- log.error("Retrieval of cache failed for Container {}", container.getName());
- }
-
- map = clusterContainerService.getCache("frm.groupFlows");
- if (map != null) {
- groupFlows = (ConcurrentMap<String, List<FlowEntryInstall>>) map;
- } else {
- log.error("Retrieval of frm.groupFlows cache failed for Container {}", container.getName());
- }
-
map = clusterContainerService.getCache("frm.staticFlows");
if (map != null) {
staticFlows = (ConcurrentMap<Integer, FlowConfig>) map;
log.error("Retrieval of frm.TSPolicies cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache(WORKORDERCACHE);
+ map = clusterContainerService.getCache(WORK_ORDER_CACHE);
if (map != null) {
workOrder = (ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall>) map;
} else {
- log.error("Retrieval of " + WORKORDERCACHE + " cache failed for Container {}", container.getName());
+ log.error("Retrieval of " + WORK_ORDER_CACHE + " cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache(WORKSTATUSCACHE);
+ map = clusterContainerService.getCache(WORK_STATUS_CACHE);
if (map != null) {
workStatus = (ConcurrentMap<FlowEntryDistributionOrder, Status>) map;
} else {
- log.error("Retrieval of " + WORKSTATUSCACHE + " cache failed for Container {}", container.getName());
+ log.error("Retrieval of " + WORK_STATUS_CACHE + " cache failed for Container {}", container.getName());
}
}
boolean multipleFlowPush = false;
String error;
Status status;
- config.setStatus(SUCCESS);
+ config.setStatus(StatusCode.SUCCESS.toString());
// Presence check
if (flowConfigExists(config)) {
continue;
}
if (config.getNode().equals(node)) {
- if (config.installInHw() && !config.getStatus().equals(SUCCESS)) {
+ if (config.installInHw() && !config.getStatus().equals(StatusCode.SUCCESS.toString())) {
Status status = this.installFlowEntryAsync(config.getFlowEntry());
config.setStatus(status.getDescription());
}
// Take note of this controller generated static flow
toRemove.add(entry.getKey());
} else {
- config.setStatus(NODEDOWN);
+ config.setStatus(NODE_DOWN);
}
}
}
config.setStatus("Removed from node because in container mode");
break;
case REMOVED:
- config.setStatus(SUCCESS);
+ config.setStatus(StatusCode.SUCCESS.toString());
break;
default:
}
// Do not attempt to reinstall the flow, warn user
if (newFlowConfig.equals(oldFlowConfig)) {
String msg = "No modification detected";
- log.info("Static flow modification skipped. New flow and old flow are the same: {}", newFlowConfig);
+ log.trace("Static flow modification skipped. New flow and old flow are the same: {}", newFlowConfig);
return new Status(StatusCode.SUCCESS, msg);
}
.installFlowEntry(target.getFlowEntry());
if (status.isSuccess()) {
// Update Configuration database
- target.setStatus(SUCCESS);
+ target.setStatus(StatusCode.SUCCESS.toString());
target.toggleInstallation();
staticFlows.put(key, target);
}
* If requested, a copy of each original flow entry will be stored in the
* inactive list so that it can be re-applied when needed (This is typically
* the case when running in the default container and controller moved to
- * container mode)
+ * container mode) NOTE WELL: The routine as long as does a bulk change will
+ * operate only on the entries for nodes locally attached so to avoid
+ * redundant operations initiated by multiple nodes
*
* @param preserveFlowEntries
* if true, a copy of each original entry is stored in the
* inactive list
*/
private void uninstallAllFlowEntries(boolean preserveFlowEntries) {
- log.info("Uninstalling all non-internal flows");
+ log.trace("Uninstalling all non-internal flows");
List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>();
// Now remove the entries
for (FlowEntryInstall flowEntryHw : toRemove) {
- Status status = this.removeEntryInternal(flowEntryHw, false);
- if (!status.isSuccess()) {
- log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+ Node n = flowEntryHw.getNode();
+ if (n != null && connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
+ Status status = this.removeEntryInternal(flowEntryHw, false);
+ if (!status.isSuccess()) {
+ log.trace("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+ }
+ } else {
+ log.debug("Not removing entry {} because not connected locally, the remote guy will do it's job",
+ flowEntryHw);
}
}
}
* default container instance of FRM only when the last container is deleted
*/
private void reinstallAllFlowEntries() {
- log.info("Reinstalling all inactive flows");
+ log.trace("Reinstalling all inactive flows");
for (FlowEntry flowEntry : this.inactiveFlows.keySet()) {
this.addEntry(flowEntry, false);
@Override
public FlowConfig getStaticFlow(String name, Node node) {
- for (ConcurrentMap.Entry<Integer, FlowConfig> entry : staticFlows.entrySet()) {
- if (entry.getValue().isByNameAndNodeIdEqual(name, node)) {
- return entry.getValue();
- }
+ ConcurrentMap.Entry<Integer, FlowConfig> entry = getStaticFlowEntry(name, node);
+ if(entry != null) {
+ return entry.getValue();
}
return null;
}
public void subnetNotify(Subnet sub, boolean add) {
}
- private void installImplicitARPReplyPunt(Node node) {
-
- if (node == null) {
- return;
- }
-
- List<String> puntAction = new ArrayList<String>();
- puntAction.add(ActionType.CONTROLLER.toString());
-
- FlowConfig allowARP = new FlowConfig();
- allowARP.setInstallInHw(true);
- allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP Reply" + FlowConfig.INTERNALSTATICFLOWEND);
- allowARP.setPriority("500");
- allowARP.setNode(node);
- allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase());
- allowARP.setDstMac(HexEncode.bytesToHexString(switchManager.getControllerMAC()));
- allowARP.setActions(puntAction);
- addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name
- }
-
/**
* (non-Javadoc)
*
* pratice to have in it's context operations that can take time,
* hence moving off to a different thread for async processing.
*/
+ private ExecutorService executor;
@Override
public void modeChangeNotify(final Node node, final boolean proactive) {
Callable<Status> modeChangeCallable = new Callable<Status>() {
dropAllConfig.setActions(dropAction);
defaultConfigs.add(dropAllConfig);
- log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
+ log.trace("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
for (FlowConfig fc : defaultConfigs) {
Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
if (status.isSuccess()) {
- log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
+ log.trace("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
} else {
log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"),
fc.getName());
* @param node
*/
private void cleanDatabaseForNode(Node node) {
- log.info("Cleaning Flow database for Node {}", node);
+ log.trace("Cleaning Flow database for Node {}", node);
if (nodeFlows.containsKey(node)) {
List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>(nodeFlows.get(node));
for (FlowEntryInstall entry : toRemove) {
- updateLocalDatabase(entry, false);
+ updateSwViews(entry, false);
}
}
}
@Override
public void notifyNodeConnector(NodeConnector nodeConnector, UpdateType type, Map<String, Property> propMap) {
+ boolean updateStaticFlowCluster = false;
+ switch (type) {
+ case ADDED:
+ break;
+ case CHANGED:
+ Config config = (propMap == null) ? null : (Config) propMap.get(Config.ConfigPropName);
+ if (config != null) {
+ switch (config.getValue()) {
+ case Config.ADMIN_DOWN:
+ log.trace("Port {} is administratively down: uninstalling interested flows", nodeConnector);
+ updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector);
+ break;
+ case Config.ADMIN_UP:
+ log.trace("Port {} is administratively up: installing interested flows", nodeConnector);
+ updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nodeConnector);
+ break;
+ case Config.ADMIN_UNDEF:
+ break;
+ default:
+ }
+ }
+ break;
+ case REMOVED:
+ // This is the case where a switch port is removed from the SDN agent space
+ log.trace("Port {} was removed from our control: uninstalling interested flows", nodeConnector);
+ updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector);
+ break;
+ default:
+
+ }
+
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nodeConnector.getNode());
+ }
+ }
+
+ /*
+ * It goes through the static flows configuration, it identifies the ones
+ * which have the specified node connector as input or output port and
+ * install them on the network node if they are marked to be installed in
+ * hardware and their status shows they were not installed yet
+ */
+ private boolean installFlowsOnNodeConnectorUp(NodeConnector nodeConnector) {
+ boolean updated = false;
+ List<FlowConfig> flowConfigForNode = getStaticFlows(nodeConnector.getNode());
+ for (FlowConfig flowConfig : flowConfigForNode) {
+ if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) {
+ if (flowConfig.installInHw() && !flowConfig.getStatus().equals(StatusCode.SUCCESS.toString())) {
+ Status status = this.installFlowEntry(flowConfig.getFlowEntry());
+ if (!status.isSuccess()) {
+ flowConfig.setStatus(status.getDescription());
+ } else {
+ flowConfig.setStatus(StatusCode.SUCCESS.toString());
+ }
+ updated = true;
+ }
+ }
+ }
+ return updated;
+ }
+
+ /*
+ * Remove from the network node all the flows which have the specified node
+ * connector as input or output port. If any of the flow entry is a static
+ * flow, it updates the correspondent configuration.
+ */
+ private boolean removeFlowsOnNodeConnectorDown(NodeConnector nodeConnector) {
+ boolean updated = false;
+ List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nodeConnector.getNode());
+ if (nodeFlowEntries == null) {
+ return updated;
+ }
+ for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
+ if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nodeConnector)) {
+ Status status = this.removeEntryInternal(fei, true);
+ if (!status.isSuccess()) {
+ continue;
+ }
+ /*
+ * If the flow entry is a static flow, then update its
+ * configuration
+ */
+ if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
+ FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
+ if (flowConfig != null) {
+ flowConfig.setStatus(PORT_REMOVED);
+ updated = true;
+ }
+ }
+ }
+ }
+ return updated;
}
private FlowConfig getDerivedFlowConfig(FlowConfig original, String configName, Short port) {
@Override
public void portGroupChanged(PortGroupConfig config, Map<Node, PortGroup> data, boolean add) {
- log.info("PortGroup Changed for: {} Data: {}", config, portGroupData);
+ log.trace("PortGroup Changed for: {} Data: {}", config, portGroupData);
Map<Node, PortGroup> existingData = portGroupData.get(config);
if (existingData != null) {
for (Map.Entry<Node, PortGroup> entry : data.entrySet()) {
return true;
}
- private void usePortGroupConfig(String name) {
- PortGroupConfig config = portGroupConfigs.get(name);
- if (config == null) {
- return;
- }
- if (portGroupProvider != null) {
- Map<Node, PortGroup> data = portGroupProvider.getPortGroupData(config);
- portGroupData.put(config, data);
- }
- }
-
@Override
public Map<String, PortGroupConfig> getPortGroupConfigs() {
return portGroupConfigs;
portGroupProvider.registerPortGroupChange(this);
}
- cacheStartup();
+ nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
+ groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
- registerWithOSGIConsole();
+ cacheStartup();
/*
* If we are not the first cluster node to come up, do not initialize
public void run() {
while (!stopping) {
try {
- FRMEvent event = pendingEvents.take();
+ final FRMEvent event = pendingEvents.take();
if (event == null) {
log.warn("Dequeued null event");
continue;
}
+ log.trace("Dequeued {} event", event.getClass().getSimpleName());
if (event instanceof NodeUpdateEvent) {
NodeUpdateEvent update = (NodeUpdateEvent) event;
Node node = update.getNode();
/*
* Take care of handling the remote Work request
*/
- WorkOrderEvent work = (WorkOrderEvent) event;
- FlowEntryDistributionOrder fe = work.getFe();
- if (fe != null) {
- logsync.trace("Executing the workOrder {}", fe);
- Status gotStatus = null;
- FlowEntryInstall feiCurrent = fe.getEntry();
- FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
- switch (fe.getUpType()) {
- case ADDED:
- /*
- * TODO: Not still sure how to handle the
- * sync entries
- */
- gotStatus = addEntriesInternal(feiCurrent, true);
- break;
- case CHANGED:
- gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
- break;
- case REMOVED:
- gotStatus = removeEntryInternal(feiCurrent, true);
- break;
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ WorkOrderEvent work = (WorkOrderEvent) event;
+ FlowEntryDistributionOrder fe = work.getFe();
+ if (fe != null) {
+ logsync.trace("Executing the workOrder {}", fe);
+ Status gotStatus = null;
+ FlowEntryInstall feiCurrent = fe.getEntry();
+ FlowEntryInstall feiNew = workOrder.get(fe);
+ switch (fe.getUpType()) {
+ case ADDED:
+ gotStatus = addEntriesInternal(feiCurrent, false);
+ break;
+ case CHANGED:
+ gotStatus = modifyEntryInternal(feiCurrent, feiNew, false);
+ break;
+ case REMOVED:
+ gotStatus = removeEntryInternal(feiCurrent, false);
+ break;
+ }
+ // Remove the Order
+ workOrder.remove(fe);
+ logsync.trace(
+ "The workOrder has been executed and now the status is being returned {}", fe);
+ // Place the status
+ workStatus.put(fe, gotStatus);
+ } else {
+ log.warn("Not expected null WorkOrder", work);
+ }
}
- // Remove the Order
- workOrder.remove(fe);
- logsync.trace(
- "The workOrder has been executed and now the status is being returned {}", fe);
- // Place the status
- workStatus.put(fe, gotStatus);
- } else {
- log.warn("Not expected null WorkOrder", work);
+ };
+ if(executor != null) {
+ executor.execute(r);
}
} else if (event instanceof WorkStatusCleanup) {
/*
} else {
log.warn("Not expected null WorkStatus", work);
}
+ } else if (event instanceof ContainerFlowChangeEvent) {
+ /*
+ * Whether it is an addition or removal, we have to
+ * recompute the merged flows entries taking into
+ * account all the current container flows because
+ * flow merging is not an injective function
+ */
+ updateFlowsContainerFlow();
+ } else if (event instanceof UpdateIndexDBs) {
+ UpdateIndexDBs update = (UpdateIndexDBs)event;
+ updateIndexDatabase(update.getFei(), update.isAddition());
} else {
- log.warn("Dequeued unknown event {}", event.getClass()
- .getSimpleName());
+ log.warn("Dequeued unknown event {}", event.getClass().getSimpleName());
}
} catch (InterruptedException e) {
// clear pending events
*
*/
void start() {
+ /*
+ * If running in default container, need to know if controller is in
+ * container mode
+ */
+ if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) {
+ inContainerMode = containerManager.inContainerMode();
+ }
+
// Initialize graceful stop flag
stopping = false;
// Allocate the executor service
- this.executor = Executors.newSingleThreadExecutor();
+ this.executor = Executors.newFixedThreadPool(maxPoolSize);
// Start event handler thread
frmEventHandler.start();
+ // replay the installedSwView data structure to populate
+ // node flows and group flows
+ for (FlowEntryInstall fei : installedSwView.values()) {
+ pendingEvents.offer(new UpdateIndexDBs(fei, true));
+ }
+
/*
- * Read startup and build database if we have not already gotten the
- * configurations synced from another node
+ * Read startup and build database if we are the coordinator
*/
- if (staticFlows.isEmpty()) {
+ if ((clusterContainerService != null) && (clusterContainerService.amICoordinator())) {
loadFlowConfiguration();
}
}
+ /**
+ * Function called by the dependency manager before Container is Stopped and Destroyed.
+ */
+ public void containerStop() {
+ uninstallAllFlowEntries(false);
+ }
+
/**
* Function called by the dependency manager before the services exported by
* the component are unregistered, this will be followed by a "destroy ()"
*/
void stop() {
stopping = true;
- uninstallAllFlowEntries(false);
// Shutdown executor
this.executor.shutdownNow();
+ // Now walk all the workMonitor and wake up the one sleeping because
+ // destruction is happening
+ for (FlowEntryDistributionOrder fe : workMonitor.keySet()) {
+ FlowEntryDistributionOrderFutureTask task = workMonitor.get(fe);
+ task.cancel(true);
+ }
}
public void setFlowProgrammerService(IFlowProgrammerService service) {
}
log.trace("Container {}: Updating installed flows because of container flow change: {} {}",
container.getName(), t, current);
- /*
- * Whether it is an addition or removal, we have to recompute the merged
- * flows entries taking into account all the current container flows
- * because flow merging is not an injective function
- */
- updateFlowsContainerFlow();
+ ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t);
+ pendingEvents.offer(ev);
}
@Override
switch (t) {
case REMOVED:
-
- List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nc.getNode());
- if (nodeFlowEntries == null) {
- return;
- }
- for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
- if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nc)) {
- Status status = this.removeEntryInternal(fei, true);
- if (!status.isSuccess()) {
- continue;
- }
- /*
- * If the flow entry is a static flow, then update its
- * configuration
- */
- if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
- FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
- if (flowConfig != null) {
- flowConfig.setStatus(PORTREMOVED);
- updateStaticFlowCluster = true;
- }
- }
- }
- }
- if (updateStaticFlowCluster) {
- refreshClusterStaticFlowsStatus(nc.getNode());
- }
+ log.trace("Port {} was removed from container: uninstalling interested flows", nc);
+ updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nc);
break;
case ADDED:
- List<FlowConfig> flowConfigForNode = getStaticFlows(nc.getNode());
- for (FlowConfig flowConfig : flowConfigForNode) {
- if (doesFlowContainNodeConnector(flowConfig.getFlow(), nc)) {
- if (flowConfig.installInHw()) {
- Status status = this.installFlowEntry(flowConfig.getFlowEntry());
- if (!status.isSuccess()) {
- flowConfig.setStatus(status.getDescription());
- } else {
- flowConfig.setStatus(SUCCESS);
- }
- updateStaticFlowCluster = true;
- }
- }
- }
- if (updateStaticFlowCluster) {
- refreshClusterStaticFlowsStatus(nc.getNode());
- }
+ log.trace("Port {} was added to container: reinstall interested flows", nc);
+ updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nc);
+
break;
case CHANGED:
break;
default:
}
+
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nc.getNode());
+ }
}
@Override
return newEntry;
}
}
+ private class ContainerFlowChangeEvent extends FRMEvent {
+ private final ContainerFlow previous;
+ private final ContainerFlow current;
+ private final UpdateType type;
+
+ public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) {
+ this.previous = previous;
+ this.current = current;
+ this.type = type;
+ }
+
+ public ContainerFlow getPrevious() {
+ return this.previous;
+ }
+
+ public ContainerFlow getCurrent() {
+ return this.current;
+ }
+
+ public UpdateType getType() {
+ return this.type;
+ }
+ }
+
private class WorkStatusCleanup extends FRMEvent {
private FlowEntryDistributionOrder fe;
}
}
- /*
- * OSGI COMMANDS
- */
- @Override
- public String getHelp() {
- StringBuffer help = new StringBuffer();
- help.append("---FRM Matrix Application---\n");
- help.append("\t printMatrixData - Prints the Matrix Configs\n");
- help.append("\t addMatrixConfig <name> <regex>\n");
- help.append("\t delMatrixConfig <name>\n");
- help.append("\t useMatrixConfig <name>\n");
- return help.toString();
- }
-
- public void _printMatrixData(CommandInterpreter ci) {
- ci.println("Configs : ");
- ci.println("---------");
- ci.println(portGroupConfigs);
-
- ci.println("Data : ");
- ci.println("------");
- ci.println(portGroupData);
- }
-
- public void _addMatrixConfig(CommandInterpreter ci) {
- String name = ci.nextArgument();
- String regex = ci.nextArgument();
- addPortGroupConfig(name, regex, false);
- }
-
- public void _delMatrixConfig(CommandInterpreter ci) {
- String name = ci.nextArgument();
- delPortGroupConfig(name);
- }
-
- public void _useMatrixConfig(CommandInterpreter ci) {
- String name = ci.nextArgument();
- usePortGroupConfig(name);
- }
+ private class UpdateIndexDBs extends FRMEvent {
+ private FlowEntryInstall fei;
+ private boolean add;
- public void _arpPunt(CommandInterpreter ci) {
- String switchId = ci.nextArgument();
- long swid = HexEncode.stringToLong(switchId);
- Node node = NodeCreator.createOFNode(swid);
- installImplicitARPReplyPunt(node);
- }
-
- public void _frmaddflow(CommandInterpreter ci) throws UnknownHostException {
- Node node = null;
- String nodeId = ci.nextArgument();
- if (nodeId == null) {
- ci.print("Node id not specified");
- return;
- }
- try {
- node = NodeCreator.createOFNode(Long.valueOf(nodeId));
- } catch (NumberFormatException e) {
- ci.print("Node id not a number");
- return;
+ /**
+ *
+ * @param fei the flow entry which was installed/removed on the netwrok node
+ * @param update
+ */
+ UpdateIndexDBs(FlowEntryInstall fei, boolean add) {
+ this.fei = fei;
+ this.add = add;
}
- ci.println(this.programmer.addFlow(node, getSampleFlow(node)));
- }
- public void _frmremoveflow(CommandInterpreter ci) throws UnknownHostException {
- Node node = null;
- String nodeId = ci.nextArgument();
- if (nodeId == null) {
- ci.print("Node id not specified");
- return;
- }
- try {
- node = NodeCreator.createOFNode(Long.valueOf(nodeId));
- } catch (NumberFormatException e) {
- ci.print("Node id not a number");
- return;
+
+ /**
+ * @return the flowEntryInstall object which was added/removed
+ * to/from the installed software view cache
+ */
+ public FlowEntryInstall getFei() {
+ return fei;
}
- ci.println(this.programmer.removeFlow(node, getSampleFlow(node)));
- }
-
- private Flow getSampleFlow(Node node) throws UnknownHostException {
- NodeConnector port = NodeConnectorCreator.createOFNodeConnector((short) 24, node);
- NodeConnector oport = NodeConnectorCreator.createOFNodeConnector((short) 30, node);
- byte srcMac[] = { (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78, (byte) 0x9a, (byte) 0xbc };
- byte dstMac[] = { (byte) 0x1a, (byte) 0x2b, (byte) 0x3c, (byte) 0x4d, (byte) 0x5e, (byte) 0x6f };
- InetAddress srcIP = InetAddress.getByName("172.28.30.50");
- InetAddress dstIP = InetAddress.getByName("171.71.9.52");
- InetAddress ipMask = InetAddress.getByName("255.255.255.0");
- InetAddress ipMask2 = InetAddress.getByName("255.0.0.0");
- short ethertype = EtherTypes.IPv4.shortValue();
- short vlan = (short) 27;
- byte vlanPr = 3;
- Byte tos = 4;
- byte proto = IPProtocols.TCP.byteValue();
- short src = (short) 55000;
- short dst = 80;
- /*
- * Create a SAL Flow aFlow
+ /**
+ *
+ * @return whether this was an flow addition or removal
*/
- Match match = new Match();
- match.setField(MatchType.IN_PORT, port);
- match.setField(MatchType.DL_SRC, srcMac);
- match.setField(MatchType.DL_DST, dstMac);
- match.setField(MatchType.DL_TYPE, ethertype);
- match.setField(MatchType.DL_VLAN, vlan);
- match.setField(MatchType.DL_VLAN_PR, vlanPr);
- match.setField(MatchType.NW_SRC, srcIP, ipMask);
- match.setField(MatchType.NW_DST, dstIP, ipMask2);
- match.setField(MatchType.NW_TOS, tos);
- match.setField(MatchType.NW_PROTO, proto);
- match.setField(MatchType.TP_SRC, src);
- match.setField(MatchType.TP_DST, dst);
-
- List<Action> actions = new ArrayList<Action>();
- actions.add(new Output(oport));
- actions.add(new PopVlan());
- actions.add(new Flood());
- actions.add(new Controller());
- return new Flow(match, actions);
+ public boolean isAddition() {
+ return add;
+ }
}
@Override
return saveConfig();
}
- public void _frmNodeFlows(CommandInterpreter ci) {
- String nodeId = ci.nextArgument();
- Node node = Node.fromString(nodeId);
- if (node == null) {
- ci.println("frmNodeFlows <node> [verbose]");
- return;
- }
- boolean verbose = false;
- String verboseCheck = ci.nextArgument();
- if (verboseCheck != null) {
- verbose = verboseCheck.equals("true");
- }
-
- if (!nodeFlows.containsKey(node)) {
- return;
- }
- // Dump per node database
- for (FlowEntryInstall entry : nodeFlows.get(node)) {
- if (!verbose) {
- ci.println(node + " " + installedSwView.get(entry).getFlowName());
- } else {
- ci.println(node + " " + installedSwView.get(entry).toString());
- }
- }
- }
-
- public void _frmGroupFlows(CommandInterpreter ci) {
- String group = ci.nextArgument();
- if (group == null) {
- ci.println("frmGroupFlows <group> [verbose]");
- return;
- }
- boolean verbose = false;
- String verboseCheck = ci.nextArgument();
- if (verboseCheck != null) {
- verbose = verboseCheck.equalsIgnoreCase("true");
- }
-
- if (!groupFlows.containsKey(group)) {
- return;
- }
- // Dump per node database
- ci.println("Group " + group + ":\n");
- for (FlowEntryInstall flowEntry : groupFlows.get(group)) {
- if (!verbose) {
- ci.println(flowEntry.getNode() + " " + flowEntry.getFlowName());
- } else {
- ci.println(flowEntry.getNode() + " " + flowEntry.toString());
- }
- }
- }
-
@Override
public void flowRemoved(Node node, Flow flow) {
log.trace("Received flow removed notification on {} for {}", node, flow);
}
if (target != null) {
// Update Configuration database
- target.toggleInstallation();
- target.setStatus(SUCCESS);
+ if (target.getHardTimeout() != null || target.getIdleTimeout() != null) {
+ /*
+ * No need for checking if actual values: these strings were
+ * validated at configuration creation. Also, after a switch
+ * down scenario, no use to reinstall a timed flow. Mark it as
+ * "do not install". User can manually toggle it.
+ */
+ target.toggleInstallation();
+ }
+ target.setStatus(StatusCode.GONE.toString());
staticFlows.put(key, target);
}
// Update software views
- this.updateLocalDatabase(installedEntry, false);
+ this.updateSwViews(installedEntry, false);
}
@Override
* mapping will have to be added in future
*/
FlowEntryInstall target = null;
- for (FlowEntryInstall index : nodeFlows.get(node)) {
- FlowEntryInstall entry = installedSwView.get(index);
- if (entry.getRequestId() == rid) {
- target = entry;
- break;
+ List<FlowEntryInstall> flowEntryInstallList = nodeFlows.get(node);
+ // flowEntryInstallList could be null.
+ // so check for it.
+ if(flowEntryInstallList != null) {
+ for (FlowEntryInstall index : flowEntryInstallList) {
+ FlowEntryInstall entry = installedSwView.get(index);
+ if(entry != null) {
+ if (entry.getRequestId() == rid) {
+ target = entry;
+ break;
+ }
+ }
}
}
if (target != null) {
// This was a flow install, update database
- this.updateLocalDatabase(target, false);
+ this.updateSwViews(target, false);
+ // also update the config
+ if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) {
+ ConcurrentMap.Entry<Integer, FlowConfig> staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode());
+ // staticFlowEntry should never be null.
+ // the null check is just an extra defensive check.
+ if(staticFlowEntry != null) {
+ // Modify status and update cluster cache
+ log.debug("Updating static flow configuration on async error event");
+ String status = String.format("Cannot be installed on node. reason: %s", errorString);
+ staticFlowEntry.getValue().setStatus(status);
+ refreshClusterStaticFlowsStatus(node);
+ }
+ }
}
// Notify listeners
this.connectionManager = s;
}
+ public void unsetIContainerManager(IContainerManager s) {
+ if (s == this.containerManager) {
+ this.containerManager = null;
+ }
+ }
+
+ public void setIContainerManager(IContainerManager s) {
+ this.containerManager = s;
+ }
+
@Override
public void entryCreated(Object key, String cacheName, boolean originLocal) {
/*
@Override
public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+ /*
+ * Streamline the updates for the per node and per group index databases
+ */
+ if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+ pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)new_value, true));
+ }
+
if (originLocal) {
/*
* Local updates are of no interest
*/
return;
}
- if (cacheName.equals(WORKORDERCACHE)) {
+ if (cacheName.equals(WORK_ORDER_CACHE)) {
logsync.trace("Got a WorkOrderCacheUpdate for {}", key);
/*
* This is the case of one workOrder becoming available, so we need
// processing
pendingEvents.offer(new WorkOrderEvent(fe, (FlowEntryInstall) new_value));
}
- } else if (cacheName.equals(WORKSTATUSCACHE)) {
+ } else if (cacheName.equals(WORK_STATUS_CACHE)) {
logsync.trace("Got a WorkStatusCacheUpdate for {}", key);
/*
* This is the case of one workOrder being completed and a status
*/
if (fe.getRequestorController()
.equals(clusterContainerService.getMyAddress())) {
- FlowEntryDistributionOrderFutureTask fet = workMonitor.get(fe);
+ FlowEntryDistributionOrderFutureTask fet = workMonitor.remove(fe);
if (fet != null) {
logsync.trace("workStatus response is for us {}", fe);
// Signal we got the status
@Override
public void entryDeleted(Object key, String cacheName, boolean originLocal) {
/*
- * Do nothing
+ * Streamline the updates for the per node and per group index databases
*/
+ if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+ pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, false));
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FlowEntry> getFlowEntriesForNode(Node node) {
+ List<FlowEntry> list = new ArrayList<FlowEntry>();
+ if (node != null) {
+ for (Map.Entry<FlowEntry, FlowEntry> entry : this.originalSwView.entrySet()) {
+ if (node.equals(entry.getKey().getNode())) {
+ list.add(entry.getValue().clone());
+ }
+ }
+ }
+ return list;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FlowEntry> getInstalledFlowEntriesForNode(Node node) {
+ List<FlowEntry> list = new ArrayList<FlowEntry>();
+ if (node != null) {
+ List<FlowEntryInstall> flowEntryInstallList = this.nodeFlows.get(node);
+ if(flowEntryInstallList != null) {
+ for(FlowEntryInstall fi: flowEntryInstallList) {
+ list.add(fi.getInstall().clone());
+ }
+ }
+ }
+ return list;
}
}