import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import org.eclipse.osgi.framework.console.CommandInterpreter;
-import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.configuration.ConfigurationObject;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.sal.connection.ConnectionLocality;
+import org.opendaylight.controller.configuration.IConfigurationContainerService;
import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.containermanager.IContainerManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
import org.opendaylight.controller.forwardingrulesmanager.implementation.data.FlowEntryDistributionOrder;
import org.opendaylight.controller.sal.action.Action;
import org.opendaylight.controller.sal.action.ActionType;
-import org.opendaylight.controller.sal.action.Controller;
-import org.opendaylight.controller.sal.action.Flood;
import org.opendaylight.controller.sal.action.Output;
-import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.Property;
import org.opendaylight.controller.sal.match.MatchType;
import org.opendaylight.controller.sal.utils.EtherTypes;
import org.opendaylight.controller.sal.utils.GlobalConstants;
-import org.opendaylight.controller.sal.utils.HexEncode;
import org.opendaylight.controller.sal.utils.IObjectReader;
-import org.opendaylight.controller.sal.utils.IPProtocols;
-import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
-import org.opendaylight.controller.sal.utils.NodeCreator;
-import org.opendaylight.controller.sal.utils.ObjectReader;
-import org.opendaylight.controller.sal.utils.ObjectWriter;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.switchmanager.ISwitchManagerAware;
import org.opendaylight.controller.switchmanager.Subnet;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ForwardingRulesManager implements
IForwardingRulesManager,
PortGroupChangeListener,
- IContainerListener,
+ IContainerLocalListener,
ISwitchManagerAware,
IConfigurationContainerAware,
IInventoryListener,
IObjectReader,
ICacheUpdateAware<Object,Object>,
- CommandProvider,
IFlowProgrammerListener {
- private static final String NODEDOWN = "Node is Down";
- private static final String SUCCESS = StatusCode.SUCCESS.toString();
+
private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class);
- private static final String PORTREMOVED = "Port removed";
private static final Logger logsync = LoggerFactory.getLogger("FRMsync");
- private String frmFileName;
- private String portGroupFileName;
+ private static final String PORT_REMOVED = "Port removed";
+ private static final String NODE_DOWN = "Node is Down";
+ private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry";
+ private static final String STATIC_FLOWS_FILE_NAME = "frm_staticflows.conf";
+ private static final String PORT_GROUP_FILE_NAME = "portgroup.conf";
private ConcurrentMap<Integer, FlowConfig> staticFlows;
private ConcurrentMap<Integer, Integer> staticFlowsOrdinal;
private ConcurrentMap<String, PortGroupConfig> portGroupConfigs;
private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
private ConcurrentMap<String, Object> TSPolicies;
+ private IContainerManager containerManager;
+ private IConfigurationContainerService configurationService;
private boolean inContainerMode; // being used by global instance only
- private boolean stopping;
+ protected boolean stopping;
/*
* Flow database. It's the software view of what was requested to install
* necessity non-transactional as long as need to be able to synchronize
* states also while a transaction is in progress
*/
- static final String WORKORDERCACHE = "frm.workOrder";
- static final String WORKSTATUSCACHE = "frm.workStatus";
+ static final String WORK_ORDER_CACHE = "frm.workOrder";
+ static final String WORK_STATUS_CACHE = "frm.workStatus";
+ static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView";
+ static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView";
/*
* Data structure responsible for distributing the FlowEntryInstall requests
* not picked by anyone, which is always a case can happen especially on
* Node disconnect cases.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+ protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
/*
* Data structure responsible for retrieving the results of the workOrder
* TODO: The workStatus entries need to have a lifetime associated in case
* of requestor controller leaving the cluster.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+ protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
/*
* Local Map used to hold the Future which a caller can use to monitor for
private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+ /*
+ * Max pool size for the executor
+ */
+ private static final int maxPoolSize = 10;
+
/**
* @param e
* Entry being installed/updated/removed
private Status addEntry(FlowEntry flowEntry, boolean async) {
// Sanity Check
- if (flowEntry == null || flowEntry.getNode() == null) {
- String msg = "Invalid FlowEntry";
- String logMsg = msg + ": {}";
+ if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) {
+ String logMsg = INVALID_FLOW_ENTRY + ": {}";
log.warn(logMsg, flowEntry);
- return new Status(StatusCode.NOTACCEPTABLE, msg);
+ return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY);
+ }
+
+ /*
+ * Redundant Check: Check if the request is a redundant one from the
+ * same application the flowEntry is equal to an existing one. Given we
+ * do not have an application signature in the requested FlowEntry yet,
+ * we are here detecting the above condition by comparing the flow
+ * names, if set. If they are equal to the installed flow, most likely
+ * this is a redundant installation request from the same application
+ * and we can silently return success
+ *
+ * TODO: in future a sort of application reference list mechanism will
+ * be added to the FlowEntry so that exact flow can be used by different
+ * applications.
+ */
+ FlowEntry present = this.originalSwView.get(flowEntry);
+ if (present != null) {
+ boolean sameFlow = present.getFlow().equals(flowEntry.getFlow());
+ boolean sameApp = present.getFlowName() != null && present.getFlowName().equals(flowEntry.getFlowName());
+ if (sameFlow && sameApp) {
+ log.trace("Skipping redundant request for flow {} on node {}", flowEntry.getFlowName(),
+ flowEntry.getNode());
+ return new Status(StatusCode.SUCCESS, "Entry is already present");
+ }
}
/*
for (FlowEntryInstall installEntry : toInstallSafe) {
// Install and update database
- Status ret = addEntriesInternal(installEntry, async);
+ Status ret = addEntryInternal(installEntry, async);
if (ret.isSuccess()) {
oneSucceded = true;
succeded = ret;
} else {
error = ret;
- log.warn("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription());
+ log.trace("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription());
}
}
// Sanity checks
if (currentFlowEntry == null || currentFlowEntry.getNode() == null || newFlowEntry == null
- || newFlowEntry.getNode() == null) {
- String msg = "Modify: Invalid FlowEntry";
+ || newFlowEntry.getNode() == null || newFlowEntry.getFlow() == null) {
+ String msg = "Modify: " + INVALID_FLOW_ENTRY;
String logMsg = msg + ": {} or {}";
log.warn(logMsg, currentFlowEntry, newFlowEntry);
return new Status(StatusCode.NOTACCEPTABLE, msg);
Status succeeded = null;
boolean decouple = false;
if (installedList.size() != toInstallList.size()) {
- log.info("Modify: New flow entry does not satisfy the same "
+ log.trace("Modify: New flow entry does not satisfy the same "
+ "number of container flows as the original entry does");
decouple = true;
}
*/
FlowEntryInstall sameMatchEntry = installedSwView.get(installEntry);
if (sameMatchEntry != null && !sameMatchEntry.getOriginal().equals(currentFlowEntry)) {
- log.info("Modify: new container flow merged flow entry clashes with existing flow");
+ log.trace("Modify: new container flow merged flow entry clashes with existing flow");
decouple = true;
} else {
toInstallSafe.add(installEntry);
}
// Install new entries
for (FlowEntryInstall newEntry : toInstallSafe) {
- succeeded = this.addEntriesInternal(newEntry, async);
+ succeeded = this.addEntryInternal(newEntry, async);
}
} else {
/*
/**
* This is the function that modifies the final container flows merged
* entries on the network node and update the database. It expects that all
- * the validity checks are passed
+ * the validity checks are passed.
+ * This function is supposed to be called only on the controller on which
+ * the IFRM call is executed.
*
* @param currentEntries
* @param newEntries
* contain the unique id assigned to this request
*/
private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) {
+ Status status = new Status(StatusCode.UNDEFINED);
FlowEntryDistributionOrderFutureTask futureStatus =
distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
if (futureStatus != null) {
- Status retStatus = new Status(StatusCode.UNDEFINED);
try {
- retStatus = futureStatus.get();
- if (retStatus.getCode()
+ status = futureStatus.get();
+ if (status.getCode()
.equals(StatusCode.TIMEOUT)) {
// A timeout happened, lets cleanup the workMonitor
workMonitor.remove(futureStatus.getOrder());
} catch (ExecutionException e) {
log.error("", e);
}
- return retStatus;
} else {
// Modify the flow on the network node
- Status status = async ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall()
- .getFlow(), newEntries.getInstall()
- .getFlow()) : programmer.modifyFlow(currentEntries.getNode(), currentEntries.getInstall()
- .getFlow(), newEntries.getInstall()
- .getFlow());
+ status = modifyEntryInHw(currentEntries, newEntries, async);
+ }
- if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(),
- status.getDescription());
- return status;
- }
+ if (!status.isSuccess()) {
+ log.trace("{} SDN Plugin failed to program the flow: {}. The failure is: {}",
+ (futureStatus != null) ? "Remote" : "Local", newEntries.getInstall(), status.getDescription());
+ return status;
+ }
- log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall());
+ log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall());
- // Update DB
- newEntries.setRequestId(status.getRequestId());
- updateLocalDatabase(currentEntries, false);
- updateLocalDatabase(newEntries, true);
+ // Update DB
+ newEntries.setRequestId(status.getRequestId());
+ updateSwViews(currentEntries, false);
+ updateSwViews(newEntries, true);
- return status;
- }
+ return status;
+ }
+
+ private Status modifyEntryInHw(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) {
+ return async ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall().getFlow(),
+ newEntries.getInstall().getFlow()) : programmer.modifyFlow(currentEntries.getNode(), currentEntries
+ .getInstall().getFlow(), newEntries.getInstall().getFlow());
}
/**
Status error = new Status(null, null);
// Sanity Check
- if (flowEntry == null || flowEntry.getNode() == null) {
- String msg = "Invalid FlowEntry";
- String logMsg = msg + ": {}";
+ if (flowEntry == null || flowEntry.getNode() == null || flowEntry.getFlow() == null) {
+ String logMsg = INVALID_FLOW_ENTRY + ": {}";
log.warn(logMsg, flowEntry);
- return new Status(StatusCode.NOTACCEPTABLE, msg);
+ return new Status(StatusCode.NOTACCEPTABLE, INVALID_FLOW_ENTRY);
}
// Derive the container flows merged installed entries
if (!ret.isSuccess()) {
error = ret;
- log.warn("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription());
+ log.trace("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription());
if (installedList.size() == 1) {
// If we had only one entry to remove, this is fatal failure
return error;
* This is the function that removes the final container flows merged entry
* from the network node and update the database. It expects that all the
* validity checks are passed
+ * This function is supposed to be called only on the controller on which
+ * the IFRM call is executed.
*
* @param entry
* the flow entry to remove
* contain the unique id assigned to this request
*/
private Status removeEntryInternal(FlowEntryInstall entry, boolean async) {
+ Status status = new Status(StatusCode.UNDEFINED);
FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
if (futureStatus != null) {
- Status retStatus = new Status(StatusCode.UNDEFINED);
try {
- retStatus = futureStatus.get();
- if (retStatus.getCode()
- .equals(StatusCode.TIMEOUT)) {
+ status = futureStatus.get();
+ if (status.getCode().equals(StatusCode.TIMEOUT)) {
// A timeout happened, lets cleanup the workMonitor
workMonitor.remove(futureStatus.getOrder());
}
} catch (ExecutionException e) {
log.error("", e);
}
- return retStatus;
} else {
// Mark the entry to be deleted (for CC just in case we fail)
entry.toBeDeleted();
// Remove from node
- Status status = async ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall()
- .getFlow()) : programmer.removeFlow(entry.getNode(), entry.getInstall()
- .getFlow());
-
- if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
- status.getDescription());
- return status;
- }
- log.trace("Removed {}", entry.getInstall());
-
- // Update DB
- updateLocalDatabase(entry, false);
+ status = removeEntryInHw(entry, async);
+ }
+ if (!status.isSuccess()) {
+ log.trace("{} SDN Plugin failed to remove the flow: {}. The failure is: {}",
+ (futureStatus != null) ? "Remote" : "Local", entry.getInstall(), status.getDescription());
return status;
}
+
+ log.trace("Removed {}", entry.getInstall());
+
+ // Update DB
+ updateSwViews(entry, false);
+
+ return status;
+ }
+
+ private Status removeEntryInHw(FlowEntryInstall entry, boolean async) {
+ return async ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall().getFlow()) : programmer
+ .removeFlow(entry.getNode(), entry.getInstall().getFlow());
}
/**
* on the network node and updates the database. It expects that all the
* validity and conflict checks are passed. That means it does not check
* whether this flow would conflict or overwrite an existing one.
+ * This function is supposed to be called only on the controller on which
+ * the IFRM call is executed.
*
* @param entry
* the flow entry to install
* @return the status of this request. In case of asynchronous call, it will
* contain the unique id assigned to this request
*/
- private Status addEntriesInternal(FlowEntryInstall entry, boolean async) {
+ private Status addEntryInternal(FlowEntryInstall entry, boolean async) {
+ Status status = new Status(StatusCode.UNDEFINED);
FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
if (futureStatus != null) {
- Status retStatus = new Status(StatusCode.UNDEFINED);
try {
- retStatus = futureStatus.get();
- if (retStatus.getCode()
- .equals(StatusCode.TIMEOUT)) {
+ status = futureStatus.get();
+ if (status.getCode().equals(StatusCode.TIMEOUT)) {
// A timeout happened, lets cleanup the workMonitor
workMonitor.remove(futureStatus.getOrder());
}
} catch (ExecutionException e) {
log.error("", e);
}
- return retStatus;
} else {
- // Install the flow on the network node
- Status status = async ? programmer.addFlowAsync(entry.getNode(), entry.getInstall()
- .getFlow()) : programmer.addFlow(entry.getNode(), entry.getInstall()
- .getFlow());
+ status = addEntryInHw(entry, async);
+ }
- if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
- status.getDescription());
- return status;
- }
+ if (!status.isSuccess()) {
+ log.trace("{} SDN Plugin failed to program the flow: {}. The failure is: {}",
+ (futureStatus != null) ? "Remote" : "Local", entry.getInstall(), status.getDescription());
+ return status;
+ }
- log.trace("Added {}", entry.getInstall());
+ log.trace("Added {}", entry.getInstall());
- // Update DB
- entry.setRequestId(status.getRequestId());
- updateLocalDatabase(entry, true);
+ // Update DB
+ entry.setRequestId(status.getRequestId());
+ updateSwViews(entry, true);
- return status;
- }
+ return status;
+ }
+
+ private Status addEntryInHw(FlowEntryInstall entry, boolean async) {
+ // Install the flow on the network node
+ return async ? programmer.addFlowAsync(entry.getNode(), entry.getInstall().getFlow()) : programmer.addFlow(
+ entry.getNode(), entry.getInstall().getFlow());
}
/**
return null;
}
- private void updateLocalDatabase(FlowEntryInstall entry, boolean add) {
- // Update the software view
- updateSwViewes(entry, add);
-
+ private void updateIndexDatabase(FlowEntryInstall entry, boolean add) {
// Update node indexed flow database
updateNodeFlowsDB(entry, add);
/*
* Update the node mapped flows database
*/
- private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) {
+ private void updateSwViews(FlowEntryInstall flowEntries, boolean add) {
if (add) {
originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal());
installedSwView.put(flowEntries, flowEntries);
}
if (add) {
+ // there may be an already existing entry.
+ // remove it before adding the new one.
+ // This is necessary since we have observed that in some cases
+ // Infinispan does aggregation for operations (eg:- remove and then put a different value)
+ // related to the same key within the same transaction.
+ // Need this defensive code as the new FlowEntryInstall may be different
+ // than the old one even though the equals method returns true. This is because
+ // the equals method does not take into account the action list.
+ if(nodeIndeces.contains(flowEntries)) {
+ nodeIndeces.remove(flowEntries);
+ }
nodeIndeces.add(flowEntries);
} else {
nodeIndeces.remove(flowEntries);
}
if (add) {
+ // same comments in the similar code section in
+ // updateNodeFlowsDB method apply here too
+ if(indices.contains(flowEntries)) {
+ indices.remove(flowEntries);
+ }
indices.add(flowEntries);
} else {
indices.remove(flowEntries);
// Update DB
if (status.isSuccess()) {
- updateLocalDatabase(target, false);
+ updateSwViews(target, false);
} else {
// log the error
- log.warn("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
+ log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
status.getDescription());
}
* merged flow may conflict with an existing old container flows merged flow
* on the network node
*/
- private void updateFlowsContainerFlow() {
+ protected void updateFlowsContainerFlow() {
Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
// First remove all installed entries
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
private void nonClusterObjectCreate() {
originalSwView = new ConcurrentHashMap<FlowEntry, FlowEntry>();
installedSwView = new ConcurrentHashMap<FlowEntryInstall, FlowEntryInstall>();
- nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
- groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
TSPolicies = new ConcurrentHashMap<String, Object>();
staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
portGroupConfigs = new ConcurrentHashMap<String, PortGroupConfig>();
inactiveFlows = new ConcurrentHashMap<FlowEntry, FlowEntry>();
}
- private void registerWithOSGIConsole() {
- BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
- bundleContext.registerService(CommandProvider.class.getName(), this, null);
- }
-
@Override
public void setTSPolicyData(String policyname, Object o, boolean add) {
if (policyName != null && !policyName.trim().isEmpty()) {
for (Map.Entry<FlowEntry, FlowEntry> entry : this.originalSwView.entrySet()) {
if (policyName.equals(entry.getKey().getGroupName())) {
- list.add(entry.getKey().clone());
+ list.add(entry.getValue().clone());
}
}
}
if (policyName != null && !policyName.trim().isEmpty()) {
for (Map.Entry<FlowEntryInstall, FlowEntryInstall> entry : this.installedSwView.entrySet()) {
if (policyName.equals(entry.getKey().getGroupName())) {
- list.add(entry.getKey().getInstall().clone());
+ list.add(entry.getValue().getInstall().clone());
}
}
}
}
Status error = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (error.isSuccess()) {
- log.info("Ports {} added to FlowEntry {}", portList, flowName);
+ log.trace("Ports {} added to FlowEntry {}", portList, flowName);
} else {
log.warn("Failed to add ports {} to Flow entry {}. The failure is: {}", portList,
currentFlowEntry.toString(), error.getDescription());
}
Status status = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (status.isSuccess()) {
- log.info("Ports {} removed from FlowEntry {}", portList, flowName);
+ log.trace("Ports {} removed from FlowEntry {}", portList, flowName);
} else {
log.warn("Failed to remove ports {} from Flow entry {}. The failure is: {}", portList,
currentFlowEntry.toString(), status.getDescription());
Status status = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (status.isSuccess()) {
- log.info("Output port replaced with {} for flow {} on node {}", outPort, flowName, node);
+ log.trace("Output port replaced with {} for flow {} on node {}", outPort, flowName, node);
} else {
log.warn("Failed to replace output port for flow {} on node {}. The failure is: {}", flowName, node,
status.getDescription());
log.debug("Allocating caches for Container {}", container.getName());
try {
- clusterContainerService.createCache("frm.originalSwView",
+ clusterContainerService.createCache(ORIGINAL_SW_VIEW_CACHE,
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.installedSwView",
+ clusterContainerService.createCache(INSTALLED_SW_VIEW_CACHE,
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.inactiveFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.nodeFlows",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
- clusterContainerService.createCache("frm.groupFlows",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.flowsSaveEvent",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlowsOrdinal",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.TSPolicies",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ clusterContainerService.createCache(WORK_STATUS_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
- clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ clusterContainerService.createCache(WORK_ORDER_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
log.debug("Retrieving Caches for Container {}", container.getName());
- map = clusterContainerService.getCache("frm.originalSwView");
+ map = clusterContainerService.getCache(ORIGINAL_SW_VIEW_CACHE);
if (map != null) {
originalSwView = (ConcurrentMap<FlowEntry, FlowEntry>) map;
} else {
log.error("Retrieval of frm.originalSwView cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.installedSwView");
+ map = clusterContainerService.getCache(INSTALLED_SW_VIEW_CACHE);
if (map != null) {
installedSwView = (ConcurrentMap<FlowEntryInstall, FlowEntryInstall>) map;
} else {
log.error("Retrieval of frm.inactiveFlows cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.nodeFlows");
- if (map != null) {
- nodeFlows = (ConcurrentMap<Node, List<FlowEntryInstall>>) map;
- } else {
- log.error("Retrieval of cache failed for Container {}", container.getName());
- }
-
- map = clusterContainerService.getCache("frm.groupFlows");
- if (map != null) {
- groupFlows = (ConcurrentMap<String, List<FlowEntryInstall>>) map;
- } else {
- log.error("Retrieval of frm.groupFlows cache failed for Container {}", container.getName());
- }
-
map = clusterContainerService.getCache("frm.staticFlows");
if (map != null) {
staticFlows = (ConcurrentMap<Integer, FlowConfig>) map;
log.error("Retrieval of frm.TSPolicies cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache(WORKORDERCACHE);
+ map = clusterContainerService.getCache(WORK_ORDER_CACHE);
if (map != null) {
workOrder = (ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall>) map;
} else {
- log.error("Retrieval of " + WORKORDERCACHE + " cache failed for Container {}", container.getName());
+ log.error("Retrieval of " + WORK_ORDER_CACHE + " cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache(WORKSTATUSCACHE);
+ map = clusterContainerService.getCache(WORK_STATUS_CACHE);
if (map != null) {
workStatus = (ConcurrentMap<FlowEntryDistributionOrder, Status>) map;
} else {
- log.error("Retrieval of " + WORKSTATUSCACHE + " cache failed for Container {}", container.getName());
+ log.error("Retrieval of " + WORK_STATUS_CACHE + " cache failed for Container {}", container.getName());
}
}
boolean multipleFlowPush = false;
String error;
Status status;
- config.setStatus(SUCCESS);
+ config.setStatus(StatusCode.SUCCESS.toString());
// Presence check
if (flowConfigExists(config)) {
continue;
}
if (config.getNode().equals(node)) {
- if (config.installInHw() && !config.getStatus().equals(SUCCESS)) {
+ if (config.installInHw() && !config.getStatus().equals(StatusCode.SUCCESS.toString())) {
Status status = this.installFlowEntryAsync(config.getFlowEntry());
config.setStatus(status.getDescription());
}
// Take note of this controller generated static flow
toRemove.add(entry.getKey());
} else {
- config.setStatus(NODEDOWN);
+ config.setStatus(NODE_DOWN);
}
}
}
config.setStatus("Removed from node because in container mode");
break;
case REMOVED:
- config.setStatus(SUCCESS);
+ config.setStatus(StatusCode.SUCCESS.toString());
break;
default:
}
// Do not attempt to reinstall the flow, warn user
if (newFlowConfig.equals(oldFlowConfig)) {
String msg = "No modification detected";
- log.info("Static flow modification skipped. New flow and old flow are the same: {}", newFlowConfig);
+ log.trace("Static flow modification skipped. New flow and old flow are the same: {}", newFlowConfig);
return new Status(StatusCode.SUCCESS, msg);
}
.installFlowEntry(target.getFlowEntry());
if (status.isSuccess()) {
// Update Configuration database
- target.setStatus(SUCCESS);
+ target.setStatus(StatusCode.SUCCESS.toString());
target.toggleInstallation();
staticFlows.put(key, target);
}
* inactive list
*/
private void uninstallAllFlowEntries(boolean preserveFlowEntries) {
- log.info("Uninstalling all non-internal flows");
+ log.trace("Uninstalling all non-internal flows");
List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>();
if (n != null && connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
Status status = this.removeEntryInternal(flowEntryHw, false);
if (!status.isSuccess()) {
- log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+ log.trace("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
}
} else {
log.debug("Not removing entry {} because not connected locally, the remote guy will do it's job",
* default container instance of FRM only when the last container is deleted
*/
private void reinstallAllFlowEntries() {
- log.info("Reinstalling all inactive flows");
+ log.trace("Reinstalling all inactive flows");
for (FlowEntry flowEntry : this.inactiveFlows.keySet()) {
this.addEntry(flowEntry, false);
@Override
public List<FlowConfig> getStaticFlows() {
- return getStaticFlowsOrderedList(staticFlows, staticFlowsOrdinal.get(0).intValue());
- }
-
- // TODO: need to come out with a better algorithm for maintaining the order
- // of the configuration entries
- // with actual one, index associated to deleted entries cannot be reused and
- // map grows...
- private List<FlowConfig> getStaticFlowsOrderedList(ConcurrentMap<Integer, FlowConfig> flowMap, int maxKey) {
- List<FlowConfig> orderedList = new ArrayList<FlowConfig>();
- for (int i = 0; i <= maxKey; i++) {
- FlowConfig entry = flowMap.get(i);
- if (entry != null) {
- orderedList.add(entry);
- }
- }
- return orderedList;
+ return new ArrayList<FlowConfig>(staticFlows.values());
}
@Override
return new ArrayList<Node>(set);
}
- @SuppressWarnings("unchecked")
private void loadFlowConfiguration() {
- ObjectReader objReader = new ObjectReader();
- ConcurrentMap<Integer, FlowConfig> confList = (ConcurrentMap<Integer, FlowConfig>) objReader.read(this,
- frmFileName);
-
- ConcurrentMap<String, PortGroupConfig> pgConfig = (ConcurrentMap<String, PortGroupConfig>) objReader.read(this,
- portGroupFileName);
-
- if (pgConfig != null) {
- for (ConcurrentMap.Entry<String, PortGroupConfig> entry : pgConfig.entrySet()) {
- addPortGroupConfig(entry.getKey(), entry.getValue().getMatchString(), true);
- }
- }
-
- if (confList == null) {
- return;
- }
-
- int maxKey = 0;
- for (Integer key : confList.keySet()) {
- if (key.intValue() > maxKey) {
- maxKey = key.intValue();
- }
+ for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, PORT_GROUP_FILE_NAME)) {
+ addPortGroupConfig(((PortGroupConfig) conf).getName(), ((PortGroupConfig) conf).getMatchString(), true);
}
- for (FlowConfig conf : getStaticFlowsOrderedList(confList, maxKey)) {
- addStaticFlowInternal(conf, true);
+ for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, STATIC_FLOWS_FILE_NAME)) {
+ addStaticFlowInternal((FlowConfig) conf, true);
}
}
}
private Status saveConfigInternal() {
- ObjectWriter objWriter = new ObjectWriter();
- ConcurrentMap<Integer, FlowConfig> nonDynamicFlows = new ConcurrentHashMap<Integer, FlowConfig>();
+ List<ConfigurationObject> nonDynamicFlows = new ArrayList<ConfigurationObject>();
+
for (Integer ordinal : staticFlows.keySet()) {
FlowConfig config = staticFlows.get(ordinal);
// Do not save dynamic and controller generated static flows
if (config.isDynamic() || config.isInternalFlow()) {
continue;
}
- nonDynamicFlows.put(ordinal, config);
+ nonDynamicFlows.add(config);
}
- objWriter.write(nonDynamicFlows, frmFileName);
- objWriter.write(new ConcurrentHashMap<String, PortGroupConfig>(portGroupConfigs), portGroupFileName);
- return new Status(StatusCode.SUCCESS, null);
+
+ configurationService.persistConfiguration(nonDynamicFlows, STATIC_FLOWS_FILE_NAME);
+ configurationService.persistConfiguration(new ArrayList<ConfigurationObject>(portGroupConfigs.values()),
+ PORT_GROUP_FILE_NAME);
+
+ return new Status(StatusCode.SUCCESS);
}
@Override
public void subnetNotify(Subnet sub, boolean add) {
}
- private void installImplicitARPReplyPunt(Node node) {
-
- if (node == null) {
- return;
+ private boolean programInternalFlow(boolean proactive, FlowConfig fc) {
+ boolean retVal = true; // program flows unless determined otherwise
+ if(proactive) {
+ // if the flow already exists do not program
+ if(flowConfigExists(fc)) {
+ retVal = false;
+ }
+ } else {
+ // if the flow does not exist do not program
+ if(!flowConfigExists(fc)) {
+ retVal = false;
+ }
}
-
- List<String> puntAction = new ArrayList<String>();
- puntAction.add(ActionType.CONTROLLER.toString());
-
- FlowConfig allowARP = new FlowConfig();
- allowARP.setInstallInHw(true);
- allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP Reply" + FlowConfig.INTERNALSTATICFLOWEND);
- allowARP.setPriority("500");
- allowARP.setNode(node);
- allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase());
- allowARP.setDstMac(HexEncode.bytesToHexString(switchManager.getControllerMAC()));
- allowARP.setActions(puntAction);
- addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name
+ return retVal;
}
/**
dropAllConfig.setActions(dropAction);
defaultConfigs.add(dropAllConfig);
- log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
+ log.trace("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
for (FlowConfig fc : defaultConfigs) {
- Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
- if (status.isSuccess()) {
- log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
+ // check if the frm really needs to act on the notification.
+ // this is to check against duplicate notifications
+ if(programInternalFlow(proactive, fc)) {
+ Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
+ if (status.isSuccess()) {
+ log.trace("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
+ } else {
+ log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"),
+ fc.getName());
+ }
} else {
- log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"),
- fc.getName());
+ log.debug("Got redundant install request for internal flow: {} on node: {}. Request not sent to FRM.", fc.getName(), node);
}
}
return new Status(StatusCode.SUCCESS);
* @param node
*/
private void cleanDatabaseForNode(Node node) {
- log.info("Cleaning Flow database for Node {}", node);
+ log.trace("Cleaning Flow database for Node {}", node);
if (nodeFlows.containsKey(node)) {
List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>(nodeFlows.get(node));
for (FlowEntryInstall entry : toRemove) {
- updateLocalDatabase(entry, false);
+ updateSwViews(entry, false);
}
}
}
List<FlowConfig> flowConfigForNode = getStaticFlows(nodeConnector.getNode());
for (FlowConfig flowConfig : flowConfigForNode) {
if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) {
- if (flowConfig.installInHw() && !flowConfig.getStatus().equals(SUCCESS)) {
+ if (flowConfig.installInHw() && !flowConfig.getStatus().equals(StatusCode.SUCCESS.toString())) {
Status status = this.installFlowEntry(flowConfig.getFlowEntry());
if (!status.isSuccess()) {
flowConfig.setStatus(status.getDescription());
} else {
- flowConfig.setStatus(SUCCESS);
+ flowConfig.setStatus(StatusCode.SUCCESS.toString());
}
updated = true;
}
if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
if (flowConfig != null) {
- flowConfig.setStatus(PORTREMOVED);
+ flowConfig.setStatus(PORT_REMOVED);
updated = true;
}
}
@Override
public void portGroupChanged(PortGroupConfig config, Map<Node, PortGroup> data, boolean add) {
- log.info("PortGroup Changed for: {} Data: {}", config, portGroupData);
+ log.trace("PortGroup Changed for: {} Data: {}", config, portGroupData);
Map<Node, PortGroup> existingData = portGroupData.get(config);
if (existingData != null) {
for (Map.Entry<Node, PortGroup> entry : data.entrySet()) {
return true;
}
- private void usePortGroupConfig(String name) {
- PortGroupConfig config = portGroupConfigs.get(name);
- if (config == null) {
- return;
- }
- if (portGroupProvider != null) {
- Map<Node, PortGroup> data = portGroupProvider.getPortGroupData(config);
- portGroupData.put(config, data);
- }
- }
-
@Override
public Map<String, PortGroupConfig> getPortGroupConfigs() {
return portGroupConfigs;
}
}
+ public void setConfigurationContainerService(IConfigurationContainerService service) {
+ log.trace("Got configuration service set request {}", service);
+ this.configurationService = service;
+ }
+
+ public void unsetConfigurationContainerService(IConfigurationContainerService service) {
+ log.trace("Got configuration service UNset request");
+ this.configurationService = null;
+ }
+
@Override
public PortGroupProvider getPortGroupProvider() {
return portGroupProvider;
*
*/
void init() {
- frmFileName = GlobalConstants.STARTUPHOME.toString() + "frm_staticflows_" + this.getContainerName() + ".conf";
- portGroupFileName = GlobalConstants.STARTUPHOME.toString() + "portgroup_" + this.getContainerName() + ".conf";
inContainerMode = false;
portGroupProvider.registerPortGroupChange(this);
}
- cacheStartup();
+ nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
+ groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
- registerWithOSGIConsole();
+ cacheStartup();
/*
* If we are not the first cluster node to come up, do not initialize
public void run() {
while (!stopping) {
try {
- FRMEvent event = pendingEvents.take();
+ final FRMEvent event = pendingEvents.take();
if (event == null) {
log.warn("Dequeued null event");
continue;
}
+ log.trace("Dequeued {} event", event.getClass().getSimpleName());
if (event instanceof NodeUpdateEvent) {
NodeUpdateEvent update = (NodeUpdateEvent) event;
Node node = update.getNode();
/*
* Take care of handling the remote Work request
*/
- WorkOrderEvent work = (WorkOrderEvent) event;
- FlowEntryDistributionOrder fe = work.getFe();
- if (fe != null) {
- logsync.trace("Executing the workOrder {}", fe);
- Status gotStatus = null;
- FlowEntryInstall feiCurrent = fe.getEntry();
- FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
- switch (fe.getUpType()) {
- case ADDED:
- /*
- * TODO: Not still sure how to handle the
- * sync entries
- */
- gotStatus = addEntriesInternal(feiCurrent, true);
- break;
- case CHANGED:
- gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
- break;
- case REMOVED:
- gotStatus = removeEntryInternal(feiCurrent, true);
- break;
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ WorkOrderEvent work = (WorkOrderEvent) event;
+ FlowEntryDistributionOrder fe = work.getFe();
+ if (fe != null) {
+ logsync.trace("Executing the workOrder {}", fe);
+ Status gotStatus = null;
+ FlowEntryInstall feiCurrent = fe.getEntry();
+ FlowEntryInstall feiNew = workOrder.get(fe);
+ switch (fe.getUpType()) {
+ case ADDED:
+ gotStatus = addEntryInHw(feiCurrent, false);
+ break;
+ case CHANGED:
+ gotStatus = modifyEntryInHw(feiCurrent, feiNew, false);
+ break;
+ case REMOVED:
+ gotStatus = removeEntryInHw(feiCurrent, false);
+ break;
+ }
+ // Remove the Order
+ workOrder.remove(fe);
+ logsync.trace(
+ "The workOrder has been executed and now the status is being returned {}", fe);
+ // Place the status
+ workStatus.put(fe, gotStatus);
+ } else {
+ log.warn("Not expected null WorkOrder", work);
+ }
}
- // Remove the Order
- workOrder.remove(fe);
- logsync.trace(
- "The workOrder has been executed and now the status is being returned {}", fe);
- // Place the status
- workStatus.put(fe, gotStatus);
- } else {
- log.warn("Not expected null WorkOrder", work);
+ };
+ if(executor != null) {
+ executor.execute(r);
}
} else if (event instanceof WorkStatusCleanup) {
/*
* flow merging is not an injective function
*/
updateFlowsContainerFlow();
+ } else if (event instanceof UpdateIndexDBs) {
+ UpdateIndexDBs update = (UpdateIndexDBs)event;
+ updateIndexDatabase(update.getFei(), update.isAddition());
} else {
- log.warn("Dequeued unknown event {}", event.getClass()
- .getSimpleName());
+ log.warn("Dequeued unknown event {}", event.getClass().getSimpleName());
}
} catch (InterruptedException e) {
// clear pending events
*
*/
void start() {
+ /*
+ * If running in default container, need to know if controller is in
+ * container mode
+ */
+ if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) {
+ inContainerMode = containerManager.inContainerMode();
+ }
+
// Initialize graceful stop flag
stopping = false;
// Allocate the executor service
- this.executor = Executors.newSingleThreadExecutor();
+ this.executor = Executors.newFixedThreadPool(maxPoolSize);
// Start event handler thread
frmEventHandler.start();
+ // replay the installedSwView data structure to populate
+ // node flows and group flows
+ for (FlowEntryInstall fei : installedSwView.values()) {
+ pendingEvents.offer(new UpdateIndexDBs(fei, true));
+ }
+
/*
- * Read startup and build database if we have not already gotten the
- * configurations synced from another node
+ * Read startup and build database if we are the coordinator
*/
- if (staticFlows.isEmpty()) {
- loadFlowConfiguration();
- }
+ loadFlowConfiguration();
+ }
+
+ /**
+ * Function called by the dependency manager before Container is Stopped and Destroyed.
+ */
+ public void containerStop() {
+ uninstallAllFlowEntries(false);
}
/**
*/
void stop() {
stopping = true;
- uninstallAllFlowEntries(false);
// Shutdown executor
this.executor.shutdownNow();
// Now walk all the workMonitor and wake up the one sleeping because
}
}
- /*
- * OSGI COMMANDS
- */
- @Override
- public String getHelp() {
- StringBuffer help = new StringBuffer();
- help.append("---FRM Matrix Application---\n");
- help.append("\t printMatrixData - Prints the Matrix Configs\n");
- help.append("\t addMatrixConfig <name> <regex>\n");
- help.append("\t delMatrixConfig <name>\n");
- help.append("\t useMatrixConfig <name>\n");
- return help.toString();
- }
-
- public void _printMatrixData(CommandInterpreter ci) {
- ci.println("Configs : ");
- ci.println("---------");
- ci.println(portGroupConfigs);
-
- ci.println("Data : ");
- ci.println("------");
- ci.println(portGroupData);
- }
-
- public void _addMatrixConfig(CommandInterpreter ci) {
- String name = ci.nextArgument();
- String regex = ci.nextArgument();
- addPortGroupConfig(name, regex, false);
- }
-
- public void _delMatrixConfig(CommandInterpreter ci) {
- String name = ci.nextArgument();
- delPortGroupConfig(name);
- }
-
- public void _useMatrixConfig(CommandInterpreter ci) {
- String name = ci.nextArgument();
- usePortGroupConfig(name);
- }
-
- public void _arpPunt(CommandInterpreter ci) {
- String switchId = ci.nextArgument();
- long swid = HexEncode.stringToLong(switchId);
- Node node = NodeCreator.createOFNode(swid);
- installImplicitARPReplyPunt(node);
- }
+ private class UpdateIndexDBs extends FRMEvent {
+ private FlowEntryInstall fei;
+ private boolean add;
- public void _frmaddflow(CommandInterpreter ci) throws UnknownHostException {
- Node node = null;
- String nodeId = ci.nextArgument();
- if (nodeId == null) {
- ci.print("Node id not specified");
- return;
- }
- try {
- node = NodeCreator.createOFNode(Long.valueOf(nodeId));
- } catch (NumberFormatException e) {
- ci.print("Node id not a number");
- return;
+ /**
+ *
+ * @param fei the flow entry which was installed/removed on the netwrok node
+ * @param update
+ */
+ UpdateIndexDBs(FlowEntryInstall fei, boolean add) {
+ this.fei = fei;
+ this.add = add;
}
- ci.println(this.programmer.addFlow(node, getSampleFlow(node)));
- }
- public void _frmremoveflow(CommandInterpreter ci) throws UnknownHostException {
- Node node = null;
- String nodeId = ci.nextArgument();
- if (nodeId == null) {
- ci.print("Node id not specified");
- return;
- }
- try {
- node = NodeCreator.createOFNode(Long.valueOf(nodeId));
- } catch (NumberFormatException e) {
- ci.print("Node id not a number");
- return;
+
+ /**
+ * @return the flowEntryInstall object which was added/removed
+ * to/from the installed software view cache
+ */
+ public FlowEntryInstall getFei() {
+ return fei;
}
- ci.println(this.programmer.removeFlow(node, getSampleFlow(node)));
- }
-
- private Flow getSampleFlow(Node node) throws UnknownHostException {
- NodeConnector port = NodeConnectorCreator.createOFNodeConnector((short) 24, node);
- NodeConnector oport = NodeConnectorCreator.createOFNodeConnector((short) 30, node);
- byte srcMac[] = { (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78, (byte) 0x9a, (byte) 0xbc };
- byte dstMac[] = { (byte) 0x1a, (byte) 0x2b, (byte) 0x3c, (byte) 0x4d, (byte) 0x5e, (byte) 0x6f };
- InetAddress srcIP = InetAddress.getByName("172.28.30.50");
- InetAddress dstIP = InetAddress.getByName("171.71.9.52");
- InetAddress ipMask = InetAddress.getByName("255.255.255.0");
- InetAddress ipMask2 = InetAddress.getByName("255.0.0.0");
- short ethertype = EtherTypes.IPv4.shortValue();
- short vlan = (short) 27;
- byte vlanPr = 3;
- Byte tos = 4;
- byte proto = IPProtocols.TCP.byteValue();
- short src = (short) 55000;
- short dst = 80;
- /*
- * Create a SAL Flow aFlow
+ /**
+ *
+ * @return whether this was an flow addition or removal
*/
- Match match = new Match();
- match.setField(MatchType.IN_PORT, port);
- match.setField(MatchType.DL_SRC, srcMac);
- match.setField(MatchType.DL_DST, dstMac);
- match.setField(MatchType.DL_TYPE, ethertype);
- match.setField(MatchType.DL_VLAN, vlan);
- match.setField(MatchType.DL_VLAN_PR, vlanPr);
- match.setField(MatchType.NW_SRC, srcIP, ipMask);
- match.setField(MatchType.NW_DST, dstIP, ipMask2);
- match.setField(MatchType.NW_TOS, tos);
- match.setField(MatchType.NW_PROTO, proto);
- match.setField(MatchType.TP_SRC, src);
- match.setField(MatchType.TP_DST, dst);
-
- List<Action> actions = new ArrayList<Action>();
- actions.add(new Output(oport));
- actions.add(new PopVlan());
- actions.add(new Flood());
- actions.add(new Controller());
- return new Flow(match, actions);
+ public boolean isAddition() {
+ return add;
+ }
}
@Override
return saveConfig();
}
- public void _frmNodeFlows(CommandInterpreter ci) {
- String nodeId = ci.nextArgument();
- Node node = Node.fromString(nodeId);
- if (node == null) {
- ci.println("frmNodeFlows <node> [verbose]");
- return;
- }
- boolean verbose = false;
- String verboseCheck = ci.nextArgument();
- if (verboseCheck != null) {
- verbose = verboseCheck.equals("true");
- }
-
- if (!nodeFlows.containsKey(node)) {
- return;
- }
- // Dump per node database
- for (FlowEntryInstall entry : nodeFlows.get(node)) {
- if (!verbose) {
- ci.println(node + " " + installedSwView.get(entry).getFlowName());
- } else {
- ci.println(node + " " + installedSwView.get(entry).toString());
- }
- }
- }
-
- public void _frmGroupFlows(CommandInterpreter ci) {
- String group = ci.nextArgument();
- if (group == null) {
- ci.println("frmGroupFlows <group> [verbose]");
- return;
- }
- boolean verbose = false;
- String verboseCheck = ci.nextArgument();
- if (verboseCheck != null) {
- verbose = verboseCheck.equalsIgnoreCase("true");
- }
-
- if (!groupFlows.containsKey(group)) {
- return;
- }
- // Dump per node database
- ci.println("Group " + group + ":\n");
- for (FlowEntryInstall flowEntry : groupFlows.get(group)) {
- if (!verbose) {
- ci.println(flowEntry.getNode() + " " + flowEntry.getFlowName());
- } else {
- ci.println(flowEntry.getNode() + " " + flowEntry.toString());
- }
- }
- }
-
- public void _frmProcessErrorEvent(CommandInterpreter ci) throws UnknownHostException {
- Node node = null;
- long reqId = 0L;
- String nodeId = ci.nextArgument();
- if (nodeId == null) {
- ci.print("Node id not specified");
- return;
- }
- String requestId = ci.nextArgument();
- if (requestId == null) {
- ci.print("Request id not specified");
- return;
- }
- try {
- node = NodeCreator.createOFNode(Long.valueOf(nodeId));
- } catch (NumberFormatException e) {
- ci.print("Node id not a number");
- return;
- }
- try {
- reqId = Long.parseLong(requestId);
- } catch (NumberFormatException e) {
- ci.print("Request id not a number");
- return;
- }
- // null for error object is good enough for now
- ErrorReportedEvent event = new ErrorReportedEvent(reqId, node, null);
- this.processErrorEvent(event);
- }
-
@Override
public void flowRemoved(Node node, Flow flow) {
log.trace("Received flow removed notification on {} for {}", node, flow);
}
if (target != null) {
// Update Configuration database
- target.toggleInstallation();
- target.setStatus(SUCCESS);
+ if (target.getHardTimeout() != null || target.getIdleTimeout() != null) {
+ /*
+ * No need for checking if actual values: these strings were
+ * validated at configuration creation. Also, after a switch
+ * down scenario, no use to reinstall a timed flow. Mark it as
+ * "do not install". User can manually toggle it.
+ */
+ target.toggleInstallation();
+ }
+ target.setStatus(StatusCode.GONE.toString());
staticFlows.put(key, target);
}
// Update software views
- this.updateLocalDatabase(installedEntry, false);
+ this.updateSwViews(installedEntry, false);
}
@Override
}
if (target != null) {
// This was a flow install, update database
- this.updateLocalDatabase(target, false);
+ this.updateSwViews(target, false);
// also update the config
if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) {
ConcurrentMap.Entry<Integer, FlowConfig> staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode());
// staticFlowEntry should never be null.
// the null check is just an extra defensive check.
if(staticFlowEntry != null) {
- staticFlows.remove(staticFlowEntry.getKey());
+ // Modify status and update cluster cache
+ log.debug("Updating static flow configuration on async error event");
+ String status = String.format("Cannot be installed on node. reason: %s", errorString);
+ staticFlowEntry.getValue().setStatus(status);
+ refreshClusterStaticFlowsStatus(node);
}
}
}
this.connectionManager = s;
}
+ public void unsetIContainerManager(IContainerManager s) {
+ if (s == this.containerManager) {
+ this.containerManager = null;
+ }
+ }
+
+ public void setIContainerManager(IContainerManager s) {
+ this.containerManager = s;
+ }
+
@Override
public void entryCreated(Object key, String cacheName, boolean originLocal) {
/*
@Override
public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+ /*
+ * Streamline the updates for the per node and per group index databases
+ */
+ if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+ pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)new_value, true));
+ }
+
if (originLocal) {
/*
* Local updates are of no interest
*/
return;
}
- if (cacheName.equals(WORKORDERCACHE)) {
+ if (cacheName.equals(WORK_ORDER_CACHE)) {
logsync.trace("Got a WorkOrderCacheUpdate for {}", key);
/*
* This is the case of one workOrder becoming available, so we need
// processing
pendingEvents.offer(new WorkOrderEvent(fe, (FlowEntryInstall) new_value));
}
- } else if (cacheName.equals(WORKSTATUSCACHE)) {
+ } else if (cacheName.equals(WORK_STATUS_CACHE)) {
logsync.trace("Got a WorkStatusCacheUpdate for {}", key);
/*
* This is the case of one workOrder being completed and a status
@Override
public void entryDeleted(Object key, String cacheName, boolean originLocal) {
/*
- * Do nothing
+ * Streamline the updates for the per node and per group index databases
*/
+ if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+ pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, false));
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FlowEntry> getFlowEntriesForNode(Node node) {
+ List<FlowEntry> list = new ArrayList<FlowEntry>();
+ if (node != null) {
+ for (Map.Entry<FlowEntry, FlowEntry> entry : this.originalSwView.entrySet()) {
+ if (node.equals(entry.getKey().getNode())) {
+ list.add(entry.getValue().clone());
+ }
+ }
+ }
+ return list;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FlowEntry> getInstalledFlowEntriesForNode(Node node) {
+ List<FlowEntry> list = new ArrayList<FlowEntry>();
+ if (node != null) {
+ List<FlowEntryInstall> flowEntryInstallList = this.nodeFlows.get(node);
+ if(flowEntryInstallList != null) {
+ for(FlowEntryInstall fi: flowEntryInstallList) {
+ list.add(fi.getInstall().clone());
+ }
+ }
+ }
+ return list;
}
}