import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import org.eclipse.osgi.framework.console.CommandInterpreter;
-import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.sal.utils.EtherTypes;
import org.opendaylight.controller.sal.utils.GlobalConstants;
import org.opendaylight.controller.sal.utils.IObjectReader;
-import org.opendaylight.controller.sal.utils.NodeCreator;
import org.opendaylight.controller.sal.utils.ObjectReader;
import org.opendaylight.controller.sal.utils.ObjectWriter;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.switchmanager.ISwitchManagerAware;
import org.opendaylight.controller.switchmanager.Subnet;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
IInventoryListener,
IObjectReader,
ICacheUpdateAware<Object,Object>,
- CommandProvider,
IFlowProgrammerListener {
private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class);
private static final Logger logsync = LoggerFactory.getLogger("FRMsync");
- private static final String PORTREMOVED = "Port removed";
- private static final String NODEDOWN = "Node is Down";
+ private static final String PORT_REMOVED = "Port removed";
+ private static final String NODE_DOWN = "Node is Down";
private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry";
private String frmFileName;
private String portGroupFileName;
*/
static final String WORK_ORDER_CACHE = "frm.workOrder";
static final String WORK_STATUS_CACHE = "frm.workStatus";
+ static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView";
+ static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView";
/*
* Data structure responsible for distributing the FlowEntryInstall requests
// Update DB
newEntries.setRequestId(status.getRequestId());
- updateLocalDatabase(currentEntries, false);
- updateLocalDatabase(newEntries, true);
+ updateSwViews(currentEntries, false);
+ updateSwViews(newEntries, true);
return status;
}
log.trace("Removed {}", entry.getInstall());
// Update DB
- updateLocalDatabase(entry, false);
+ updateSwViews(entry, false);
return status;
}
// Update DB
entry.setRequestId(status.getRequestId());
- updateLocalDatabase(entry, true);
+ updateSwViews(entry, true);
return status;
}
return null;
}
- private void updateLocalDatabase(FlowEntryInstall entry, boolean add) {
- // Update the software view
- updateSwViewes(entry, add);
-
+ private void updateIndexDatabase(FlowEntryInstall entry, boolean add) {
// Update node indexed flow database
updateNodeFlowsDB(entry, add);
/*
* Update the node mapped flows database
*/
- private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) {
+ private void updateSwViews(FlowEntryInstall flowEntries, boolean add) {
if (add) {
originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal());
installedSwView.put(flowEntries, flowEntries);
}
if (add) {
+ // there may be an already existing entry.
+ // remove it before adding the new one.
+ // This is necessary since we have observed that in some cases
+ // Infinispan does aggregation for operations (eg:- remove and then put a different value)
+ // related to the same key within the same transaction.
+ // Need this defensive code as the new FlowEntryInstall may be different
+ // than the old one even though the equals method returns true. This is because
+ // the equals method does not take into account the action list.
+ if(nodeIndeces.contains(flowEntries)) {
+ nodeIndeces.remove(flowEntries);
+ }
nodeIndeces.add(flowEntries);
} else {
nodeIndeces.remove(flowEntries);
}
if (add) {
+ // same comments in the similar code section in
+ // updateNodeFlowsDB method apply here too
+ if(indices.contains(flowEntries)) {
+ indices.remove(flowEntries);
+ }
indices.add(flowEntries);
} else {
indices.remove(flowEntries);
// Update DB
if (status.isSuccess()) {
- updateLocalDatabase(target, false);
+ updateSwViews(target, false);
} else {
// log the error
log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
private void nonClusterObjectCreate() {
originalSwView = new ConcurrentHashMap<FlowEntry, FlowEntry>();
installedSwView = new ConcurrentHashMap<FlowEntryInstall, FlowEntryInstall>();
- nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
- groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
TSPolicies = new ConcurrentHashMap<String, Object>();
staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
portGroupConfigs = new ConcurrentHashMap<String, PortGroupConfig>();
inactiveFlows = new ConcurrentHashMap<FlowEntry, FlowEntry>();
}
- private void registerWithOSGIConsole() {
- BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
- bundleContext.registerService(CommandProvider.class.getName(), this, null);
- }
-
@Override
public void setTSPolicyData(String policyname, Object o, boolean add) {
if (policyName != null && !policyName.trim().isEmpty()) {
for (Map.Entry<FlowEntry, FlowEntry> entry : this.originalSwView.entrySet()) {
if (policyName.equals(entry.getKey().getGroupName())) {
- list.add(entry.getKey().clone());
+ list.add(entry.getValue().clone());
}
}
}
if (policyName != null && !policyName.trim().isEmpty()) {
for (Map.Entry<FlowEntryInstall, FlowEntryInstall> entry : this.installedSwView.entrySet()) {
if (policyName.equals(entry.getKey().getGroupName())) {
- list.add(entry.getKey().getInstall().clone());
+ list.add(entry.getValue().getInstall().clone());
}
}
}
log.debug("Allocating caches for Container {}", container.getName());
try {
- clusterContainerService.createCache("frm.originalSwView",
+ clusterContainerService.createCache(ORIGINAL_SW_VIEW_CACHE,
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.installedSwView",
+ clusterContainerService.createCache(INSTALLED_SW_VIEW_CACHE,
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.inactiveFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.nodeFlows",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
- clusterContainerService.createCache("frm.groupFlows",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
log.debug("Retrieving Caches for Container {}", container.getName());
- map = clusterContainerService.getCache("frm.originalSwView");
+ map = clusterContainerService.getCache(ORIGINAL_SW_VIEW_CACHE);
if (map != null) {
originalSwView = (ConcurrentMap<FlowEntry, FlowEntry>) map;
} else {
log.error("Retrieval of frm.originalSwView cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.installedSwView");
+ map = clusterContainerService.getCache(INSTALLED_SW_VIEW_CACHE);
if (map != null) {
installedSwView = (ConcurrentMap<FlowEntryInstall, FlowEntryInstall>) map;
} else {
log.error("Retrieval of frm.inactiveFlows cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.nodeFlows");
- if (map != null) {
- nodeFlows = (ConcurrentMap<Node, List<FlowEntryInstall>>) map;
- } else {
- log.error("Retrieval of cache failed for Container {}", container.getName());
- }
-
- map = clusterContainerService.getCache("frm.groupFlows");
- if (map != null) {
- groupFlows = (ConcurrentMap<String, List<FlowEntryInstall>>) map;
- } else {
- log.error("Retrieval of frm.groupFlows cache failed for Container {}", container.getName());
- }
-
map = clusterContainerService.getCache("frm.staticFlows");
if (map != null) {
staticFlows = (ConcurrentMap<Integer, FlowConfig>) map;
// Take note of this controller generated static flow
toRemove.add(entry.getKey());
} else {
- config.setStatus(NODEDOWN);
+ config.setStatus(NODE_DOWN);
}
}
}
List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>(nodeFlows.get(node));
for (FlowEntryInstall entry : toRemove) {
- updateLocalDatabase(entry, false);
+ updateSwViews(entry, false);
}
}
}
if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
if (flowConfig != null) {
- flowConfig.setStatus(PORTREMOVED);
+ flowConfig.setStatus(PORT_REMOVED);
updated = true;
}
}
portGroupProvider.registerPortGroupChange(this);
}
- cacheStartup();
+ nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
+ groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
- registerWithOSGIConsole();
+ cacheStartup();
/*
* If we are not the first cluster node to come up, do not initialize
* flow merging is not an injective function
*/
updateFlowsContainerFlow();
+ } else if (event instanceof UpdateIndexDBs) {
+ UpdateIndexDBs update = (UpdateIndexDBs)event;
+ updateIndexDatabase(update.getFei(), update.isAddition());
} else {
- log.warn("Dequeued unknown event {}", event.getClass()
- .getSimpleName());
+ log.warn("Dequeued unknown event {}", event.getClass().getSimpleName());
}
} catch (InterruptedException e) {
// clear pending events
// Start event handler thread
frmEventHandler.start();
+ // replay the installedSwView data structure to populate
+ // node flows and group flows
+ for (FlowEntryInstall fei : installedSwView.values()) {
+ pendingEvents.offer(new UpdateIndexDBs(fei, true));
+ }
+
/*
* Read startup and build database if we have not already gotten the
* configurations synced from another node
}
}
+ /**
+ * Function called by the dependency manager before Container is Stopped and Destroyed.
+ */
+ public void containerStop() {
+ uninstallAllFlowEntries(false);
+ }
+
/**
* Function called by the dependency manager before the services exported by
* the component are unregistered, this will be followed by a "destroy ()"
*/
void stop() {
stopping = true;
- uninstallAllFlowEntries(false);
// Shutdown executor
this.executor.shutdownNow();
// Now walk all the workMonitor and wake up the one sleeping because
}
}
- /*
- * OSGI COMMANDS
- */
- @Override
- public String getHelp() {
- StringBuffer help = new StringBuffer();
- return help.toString();
- }
-
- @Override
- public Status saveConfiguration() {
- return saveConfig();
- }
+ private class UpdateIndexDBs extends FRMEvent {
+ private FlowEntryInstall fei;
+ private boolean add;
- public void _frmNodeFlows(CommandInterpreter ci) {
- String nodeId = ci.nextArgument();
- Node node = Node.fromString(nodeId);
- if (node == null) {
- ci.println("frmNodeFlows <node> [verbose]");
- return;
- }
- boolean verbose = false;
- String verboseCheck = ci.nextArgument();
- if (verboseCheck != null) {
- verbose = verboseCheck.equals("true");
+ /**
+ *
+ * @param fei the flow entry which was installed/removed on the netwrok node
+ * @param update
+ */
+ UpdateIndexDBs(FlowEntryInstall fei, boolean add) {
+ this.fei = fei;
+ this.add = add;
}
- if (!nodeFlows.containsKey(node)) {
- return;
- }
- // Dump per node database
- for (FlowEntryInstall entry : nodeFlows.get(node)) {
- if (!verbose) {
- ci.println(node + " " + installedSwView.get(entry).getFlowName());
- } else {
- ci.println(node + " " + installedSwView.get(entry).toString());
- }
- }
- }
- public void _frmGroupFlows(CommandInterpreter ci) {
- String group = ci.nextArgument();
- if (group == null) {
- ci.println("frmGroupFlows <group> [verbose]");
- return;
- }
- boolean verbose = false;
- String verboseCheck = ci.nextArgument();
- if (verboseCheck != null) {
- verbose = verboseCheck.equalsIgnoreCase("true");
+ /**
+ * @return the flowEntryInstall object which was added/removed
+ * to/from the installed software view cache
+ */
+ public FlowEntryInstall getFei() {
+ return fei;
}
- if (!groupFlows.containsKey(group)) {
- return;
- }
- // Dump per node database
- ci.println("Group " + group + ":\n");
- for (FlowEntryInstall flowEntry : groupFlows.get(group)) {
- if (!verbose) {
- ci.println(flowEntry.getNode() + " " + flowEntry.getFlowName());
- } else {
- ci.println(flowEntry.getNode() + " " + flowEntry.toString());
- }
+ /**
+ *
+ * @return whether this was an flow addition or removal
+ */
+ public boolean isAddition() {
+ return add;
}
}
- public void _frmProcessErrorEvent(CommandInterpreter ci) throws UnknownHostException {
- Node node = null;
- long reqId = 0L;
- String nodeId = ci.nextArgument();
- if (nodeId == null) {
- ci.print("Node id not specified");
- return;
- }
- String requestId = ci.nextArgument();
- if (requestId == null) {
- ci.print("Request id not specified");
- return;
- }
- try {
- node = NodeCreator.createOFNode(Long.valueOf(nodeId));
- } catch (NumberFormatException e) {
- ci.print("Node id not a number");
- return;
- }
- try {
- reqId = Long.parseLong(requestId);
- } catch (NumberFormatException e) {
- ci.print("Request id not a number");
- return;
- }
- // null for error object is good enough for now
- ErrorReportedEvent event = new ErrorReportedEvent(reqId, node, null);
- this.processErrorEvent(event);
+ @Override
+ public Status saveConfiguration() {
+ return saveConfig();
}
@Override
}
if (target != null) {
// Update Configuration database
- target.toggleInstallation();
- target.setStatus(StatusCode.SUCCESS.toString());
+ if (target.getHardTimeout() != null || target.getIdleTimeout() != null) {
+ /*
+ * No need for checking if actual values: these strings were
+ * validated at configuration creation. Also, after a switch
+ * down scenario, no use to reinstall a timed flow. Mark it as
+ * "do not install". User can manually toggle it.
+ */
+ target.toggleInstallation();
+ }
+ target.setStatus(StatusCode.GONE.toString());
staticFlows.put(key, target);
}
// Update software views
- this.updateLocalDatabase(installedEntry, false);
+ this.updateSwViews(installedEntry, false);
}
@Override
}
if (target != null) {
// This was a flow install, update database
- this.updateLocalDatabase(target, false);
+ this.updateSwViews(target, false);
// also update the config
if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) {
ConcurrentMap.Entry<Integer, FlowConfig> staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode());
// staticFlowEntry should never be null.
// the null check is just an extra defensive check.
if(staticFlowEntry != null) {
- staticFlows.remove(staticFlowEntry.getKey());
+ // Modify status and update cluster cache
+ log.debug("Updating static flow configuration on async error event");
+ String status = String.format("Cannot be installed on node. reason: %s", errorString);
+ staticFlowEntry.getValue().setStatus(status);
+ refreshClusterStaticFlowsStatus(node);
}
}
}
@Override
public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+ /*
+ * Streamline the updates for the per node and per group index databases
+ */
+ if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+ pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)new_value, true));
+ }
+
if (originLocal) {
/*
* Local updates are of no interest
@Override
public void entryDeleted(Object key, String cacheName, boolean originLocal) {
/*
- * Do nothing
+ * Streamline the updates for the per node and per group index databases
*/
+ if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+ pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, false));
+ }
}
/**
if (node != null) {
for (Map.Entry<FlowEntry, FlowEntry> entry : this.originalSwView.entrySet()) {
if (node.equals(entry.getKey().getNode())) {
- list.add(entry.getKey().clone());
+ list.add(entry.getValue().clone());
}
}
}