*
* @param flowEntry
* the original flow entry application requested to add
- * @return
+ * @param async
+ * the flag indicating if this is a asynchronous request
+ * @return the status of this request. In case of asynchronous call, it
+ * will contain the unique id assigned to this request
*/
- private Status addEntry(FlowEntry flowEntry) {
+ private Status addEntry(FlowEntry flowEntry, boolean async) {
+
// Sanity Check
if (flowEntry == null || flowEntry.getNode() == null) {
String msg = "Invalid FlowEntry";
// Try to install an entry at the time
Status error = new Status(null, null);
+ Status succeded = null;
boolean oneSucceded = false;
- for (FlowEntryInstall installEntry : toInstallList) {
+ for (FlowEntryInstall installEntry : toInstallSafe) {
// Install and update database
- Status ret = addEntriesInternal(installEntry);
+ Status ret = addEntriesInternal(installEntry, async);
if (ret.isSuccess()) {
oneSucceded = true;
+ /*
+ * The first successful status response will be returned
+ * For the asynchronous call, we can discard the container flow
+ * complication for now and assume we will always deal with
+ * one flow only per request
+ */
+ succeded = ret;
} else {
error = ret;
log.warn("Failed to install the entry: {}. The failure is: {}",
}
}
- return (oneSucceded) ? new Status(StatusCode.SUCCESS, null) : error;
+ return (oneSucceded) ? succeded : error;
}
/**
*
* @param currentFlowEntry
* @param newFlowEntry
- * @return Success or error string
+ * @param async
+ * the flag indicating if this is a asynchronous request
+ * @return the status of this request. In case of asynchronous call, it
+ * will contain the unique id assigned to this request
*/
private Status modifyEntry(FlowEntry currentFlowEntry,
- FlowEntry newFlowEntry) {
+ FlowEntry newFlowEntry, boolean async) {
+
Status retExt;
// Sanity checks
* So, for the above two cases, to simplify, let's decouple the modify
* in: 1) remove current entries 2) install new entries
*/
+ Status succeeded = null;
boolean decouple = false;
if (installedList.size() != toInstallList.size()) {
log.info("Modify: New flow entry does not satisfy the same "
if (decouple) {
// Remove current entries
for (FlowEntryInstall currEntry : installedList) {
- this.removeEntryInternal(currEntry);
+ this.removeEntryInternal(currEntry, async);
}
// Install new entries
for (FlowEntryInstall newEntry : toInstallSafe) {
- this.addEntriesInternal(newEntry);
+ succeeded = this.addEntriesInternal(newEntry, async);
}
} else {
/*
* fails, we need to stop, restore the already modified entries, and
* declare failure.
*/
- Status retModify;
+ Status retModify = null;
int i = 0;
int size = toInstallList.size();
while (i < size) {
// Modify and update database
retModify = modifyEntryInternal(installedList.get(i),
- toInstallList.get(i));
+ toInstallList.get(i), async);
if (retModify.isSuccess()) {
i++;
} else {
while (j < i) {
log.info("Attempting to restore initial entries");
retExt = modifyEntryInternal(toInstallList.get(i),
- installedList.get(i));
+ installedList.get(i), async);
if (retExt.isSuccess()) {
j++;
} else {
return new Status(StatusCode.INTERNALERROR, msg);
}
}
+ succeeded = retModify;
}
- return new Status(StatusCode.SUCCESS, null);
+ /*
+ * The first successful status response will be returned.
+ * For the asynchronous call, we can discard the container flow
+ * complication for now and assume we will always deal with
+ * one flow only per request
+ */
+ return succeeded;
}
/**
*
* @param currentEntries
* @param newEntries
- * @return
+ * @param async
+ * the flag indicating if this is a asynchronous request
+ * @return the status of this request. In case of asynchronous call, it
+ * will contain the unique id assigned to this request
*/
private Status modifyEntryInternal(FlowEntryInstall currentEntries,
- FlowEntryInstall newEntries) {
+ FlowEntryInstall newEntries, boolean async) {
// Modify the flow on the network node
- Status status = programmer.modifyFlow(currentEntries.getNode(),
- currentEntries.getInstall().getFlow(), newEntries.getInstall()
- .getFlow());
+ Status status = (async)?
+ programmer.modifyFlowAsync(currentEntries.getNode(),
+ currentEntries.getInstall().getFlow(), newEntries.getInstall()
+ .getFlow()) :
+ programmer.modifyFlow(currentEntries.getNode(),
+ currentEntries.getInstall().getFlow(), newEntries.getInstall()
+ .getFlow());
+
if (!status.isSuccess()) {
log.warn(
newEntries.getInstall());
// Update DB
+ newEntries.setRequestId(status.getRequestId());
updateLocalDatabase(currentEntries, false);
updateLocalDatabase(newEntries, true);
* (entry or node not present), it return successfully
*
* @param flowEntry
- * @return
+ * the flow entry to remove
+ * @param async
+ * the flag indicating if this is a asynchronous request
+ * @return the status of this request. In case of asynchronous call, it
+ * will contain the unique id assigned to this request
*/
- private Status removeEntry(FlowEntry flowEntry) {
+ private synchronized Status removeEntry(FlowEntry flowEntry, boolean async) {
Status error = new Status(null, null);
// Sanity Check
flowEntry.clone(), container.getContainerFlows());
Set<FlowEntryInstall> flowsOnNode = nodeFlows.get(flowEntry.getNode());
+ Status succeeded = null;
boolean atLeastOneRemoved = false;
for (FlowEntryInstall entry : installedList) {
if (flowsOnNode == null) {
log.debug(logMsg, flowEntry);
if (installedList.size() == 1) {
// If we had only one entry to remove, we are done
- return new Status(StatusCode.SUCCESS, null);
+ return new Status(StatusCode.SUCCESS);
} else {
continue;
}
}
// Remove and update DB
- Status ret = removeEntryInternal(entry);
+ Status ret = removeEntryInternal(entry, async);
if (!ret.isSuccess()) {
error = ret;
return error;
}
} else {
+ succeeded = ret;
atLeastOneRemoved = true;
}
}
* of removing the stale entries later, or adjusting the software
* database if not in sync with hardware
*/
- return (atLeastOneRemoved) ? new Status(StatusCode.SUCCESS, null)
- : error;
+ return (atLeastOneRemoved) ? succeeded : error;
}
/**
* validity checks are passed
*
* @param entry
- * the FlowEntryInstall
- * @return "Success" or error string
+ * the flow entry to remove
+ * @param async
+ * the flag indicating if this is a asynchronous request
+ * @return the status of this request. In case of asynchronous call, it
+ * will contain the unique id assigned to this request
*/
- private Status removeEntryInternal(FlowEntryInstall entry) {
+ private Status removeEntryInternal(FlowEntryInstall entry, boolean async) {
// Mark the entry to be deleted (for CC just in case we fail)
entry.toBeDeleted();
// Remove from node
- Status status = programmer.removeFlow(entry.getNode(), entry
- .getInstall().getFlow());
+ Status status = (async)?
+ programmer.removeFlowAsync(entry.getNode(), entry
+ .getInstall().getFlow()) :
+ programmer.removeFlow(entry.getNode(), entry
+ .getInstall().getFlow());
+
if (!status.isSuccess()) {
log.warn(
entry.getInstall(), status.getDescription());
return status;
}
- log.info("Removed {}", entry.getInstall());
+ log.trace("Removed {}", entry.getInstall());
// Update DB
updateLocalDatabase(entry, false);
* whether this flow would conflict or overwrite an existing one.
*
* @param entry
- * the FlowEntryInstall
- * @return "Success" or error string
+ * the flow entry to install
+ * @param async
+ * the flag indicating if this is a asynchronous request
+ * @return the status of this request. In case of asynchronous call, it
+ * will contain the unique id assigned to this request
*/
- private Status addEntriesInternal(FlowEntryInstall entry) {
+ private Status addEntriesInternal(FlowEntryInstall entry, boolean async) {
// Install the flow on the network node
- Status status = programmer.addFlow(entry.getNode(), entry.getInstall()
- .getFlow());
+ Status status = (async)?
+ programmer.addFlowAsync(entry.getNode(), entry.getInstall()
+ .getFlow()) :
+ programmer.addFlow(entry.getNode(), entry.getInstall()
+ .getFlow());
+
if (!status.isSuccess()) {
log.warn(
return status;
}
- log.info("Added {}", entry.getInstall());
+ log.trace("Added {}", entry.getInstall());
// Update DB
+ entry.setRequestId(status.getRequestId());
updateLocalDatabase(entry, true);
return status;
/*
* Update the node mapped flows database
*/
- private void updateNodeFlowsDB(FlowEntryInstall flowEntries, boolean add) {
+ private synchronized void updateNodeFlowsDB(FlowEntryInstall flowEntries, boolean add) {
Node node = flowEntries.getNode();
Set<FlowEntryInstall> flowEntrylist = this.nodeFlows.get(node);
* entry is effectively present in the local database
*/
@SuppressWarnings("unused")
- private Status removeEntry(Node node, String flowName) {
+ private synchronized Status removeEntry(Node node, String flowName) {
FlowEntryInstall target = null;
// Find in database
status = new Status(StatusCode.NOTACCEPTABLE, msg);
log.warn(logMsg, flowEntry);
} else {
- status = addEntry(flowEntry);
+ status = addEntry(flowEntry, false);
+ }
+ return status;
+ }
+
+ @Override
+ public Status installFlowEntryAsync(FlowEntry flowEntry) {
+ Status status;
+ if (inContainerMode) {
+ String msg = "Controller in container mode: Install Refused";
+ status = new Status(StatusCode.NOTACCEPTABLE, msg);
+ log.warn(msg);
+ } else {
+ status = addEntry(flowEntry, true);
}
return status;
}
status = new Status(StatusCode.NOTACCEPTABLE, msg);
log.warn(logMsg, entry);
} else {
- status = removeEntry(entry);
+ status = removeEntry(entry, false);
}
return status;
}
+ @Override
+ public Status uninstallFlowEntryAsync(FlowEntry flowEntry) {
+ Status status;
+ if (inContainerMode) {
+ String msg = "Controller in container mode: Uninstall Refused";
+ status = new Status(StatusCode.NOTACCEPTABLE, msg);
+ log.warn(msg);
+ } else {
+ status = removeEntry(flowEntry, true);
+ }
+ return status;
+ }
+
@Override
public Status modifyFlowEntry(FlowEntry currentFlowEntry,
FlowEntry newFlowEntry) {
status = new Status(StatusCode.NOTACCEPTABLE, msg);
log.warn(logMsg, newFlowEntry);
} else {
- status = modifyEntry(currentFlowEntry, newFlowEntry);
+ status = modifyEntry(currentFlowEntry, newFlowEntry, false);
+ }
+ return status;
+ }
+
+ @Override
+ public Status modifyFlowEntryAsync(FlowEntry current, FlowEntry newone) {
+ Status status = null;
+ if (inContainerMode) {
+ String msg = "Controller in container mode: Modify Refused";
+ status = new Status(StatusCode.NOTACCEPTABLE, msg);
+ log.warn(msg);
+ } else {
+ status = modifyEntry(current, newone, true);
}
return status;
}
}
}
+ @Override
+ public Status modifyOrAddFlowEntryAsync(FlowEntry newone) {
+ /*
+ * Run a loose check on the installed entries to decide whether to go
+ * with a add or modify method. A loose check means only check against
+ * the original flow entry requests and not against the installed flow
+ * entries which are the result of the original entry merged with the
+ * container flow(s) (if any). The modifyFlowEntry method in presence of
+ * conflicts with the Container flows (if any) would revert back to a
+ * delete + add pattern
+ */
+ FlowEntryInstall currentFlowEntries = findMatch(newone, true);
+
+ if (currentFlowEntries != null) {
+ return modifyFlowEntryAsync(currentFlowEntries.getOriginal(),
+ newone);
+ } else {
+ return installFlowEntryAsync(newone);
+ }
+ }
+
+
/**
* Try to find in the database if a Flow with the same Match and priority of
* the passed one already exists for the specified network node. Flow,
* @return null if not found, otherwise the FlowEntryInstall which contains
* the existing flow entry
*/
- private FlowEntryInstall findMatch(FlowEntry flowEntry, boolean looseCheck) {
+ private synchronized FlowEntryInstall findMatch(FlowEntry flowEntry, boolean looseCheck) {
Flow flow = flowEntry.getFlow();
Match match = flow.getMatch();
short priority = flow.getPriority();
* merged flow may conflict with an existing old container flows merged flow
* on the network node
*/
- private void updateFlowsContainerFlow() {
+ private synchronized void updateFlowsContainerFlow() {
List<FlowEntryInstall> oldCouples = new ArrayList<FlowEntryInstall>();
List<FlowEntry> toReinstall = new ArrayList<FlowEntry>();
for (Entry<Node, Set<FlowEntryInstall>> entry : this.nodeFlows
// Remove the old couples. No validity checks to be run, use the
// internal remove
for (FlowEntryInstall oldCouple : oldCouples) {
- this.removeEntryInternal(oldCouple);
+ this.removeEntryInternal(oldCouple, false);
}
// Reinstall the original flow entries, via the regular path: new
// cFlow merge + validations
@Override
public List<FlowEntry> getFlowEntriesForGroup(String policyName) {
- List<FlowEntry> list = null;
+ List<FlowEntry> list = new ArrayList<FlowEntry>();
if (this.groupFlows != null && this.groupFlows.containsKey(policyName)) {
- list = new ArrayList<FlowEntry>();
for (FlowEntryInstall entries : groupFlows.get(policyName)) {
list.add(entries.getOriginal());
}
- return new ArrayList<FlowEntry>();
}
return list;
}
@Override
- public void addOutputPort(Node node, String flowName,
+ public synchronized void addOutputPort(Node node, String flowName,
List<NodeConnector> portList) {
Set<FlowEntryInstall> flowEntryList = this.nodeFlows.get(node);
for (NodeConnector dstPort : portList) {
newFlowEntry.getFlow().addAction(new Output(dstPort));
}
- Status error = modifyEntry(currentFlowEntry, newFlowEntry);
+ Status error = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (error.isSuccess()) {
log.info("Ports {} added to FlowEntry {}", portList,
flowName);
}
@Override
- public void removeOutputPort(Node node, String flowName,
+ public synchronized void removeOutputPort(Node node, String flowName,
List<NodeConnector> portList) {
Set<FlowEntryInstall> flowEntryList = this.nodeFlows.get(node);
Action action = new Output(dstPort);
newFlowEntry.getFlow().removeAction(action);
}
- Status status = modifyEntry(currentFlowEntry, newFlowEntry);
+ Status status = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (status.isSuccess()) {
log.info("Ports {} removed from FlowEntry {}", portList,
flowName);
* This function assumes the target flow has only one output port
*/
@Override
- public void replaceOutputPort(Node node, String flowName,
+ public synchronized void replaceOutputPort(Node node, String flowName,
NodeConnector outPort) {
FlowEntry currentFlowEntry = null;
FlowEntry newFlowEntry = null;
newFlowEntry.getFlow().addAction(new Output(outPort));
// Modify on network node
- Status status = modifyEntry(currentFlowEntry, newFlowEntry);
+ Status status = modifyEntry(currentFlowEntry, newFlowEntry, false);
if (status.isSuccess()) {
log.info("Output port replaced with {} for flow {} on node {}",
}
@Override
- public NodeConnector getOutputPort(Node node, String flowName) {
+ public synchronized NodeConnector getOutputPort(Node node, String flowName) {
Set<FlowEntryInstall> flowEntryList = this.nodeFlows.get(node);
for (FlowEntryInstall flow : flowEntryList) {
// Program hw
if (config.installInHw()) {
FlowEntry entry = config.getFlowEntry();
- status = this.addEntry(entry);
+ status = this.addEntry(entry, false);
if (!status.isSuccess()) {
config.setStatus(status.getDescription());
if (!restore) {
portGroupChanged(pgconfig, existingData, true);
}
}
- return new Status(StatusCode.SUCCESS, null);
+ return new Status(StatusCode.SUCCESS);
}
private void addStaticFlowsToSwitch(Node node) {
if (config.installInHw()
&& !config.getStatus().equals(
StatusCode.SUCCESS.toString())) {
- Status status = this.addEntry(config.getFlowEntry());
+ Status status = this.addEntry(config.getFlowEntry(), false);
config.setStatus(status.getDescription());
}
}
for (Map.Entry<Integer, FlowConfig> entry : staticFlows.entrySet()) {
if (entry.getValue().isByNameAndNodeIdEqual(config)) {
// Program the network node
- Status status = this.removeEntry(config.getFlowEntry());
+ Status status = this.removeEntry(config.getFlowEntry(), false);
// Update configuration database if programming was successful
if (status.isSuccess()) {
staticFlows.remove(entry.getKey());
}
if (!entry.isPortGroupEnabled()) {
// Program the network node
- status = this.removeEntry(entry.getFlowEntry());
+ status = this.removeEntry(entry.getFlowEntry(), false);
}
// Update configuration database if programming was successful
if (status.isSuccess()) {
Status status = new Status(StatusCode.SUCCESS, "Saved in config");
if (oldFlowConfig.installInHw()) {
status = this.modifyEntry(oldFlowConfig.getFlowEntry(),
- newFlowConfig.getFlowEntry());
+ newFlowConfig.getFlowEntry(), false);
}
// Update configuration database if programming was successful
FlowConfig conf = entry.getValue();
if (conf.isByNameAndNodeIdEqual(config)) {
// Program the network node
- Status status = new Status(StatusCode.SUCCESS, null);
+ Status status = new Status(StatusCode.SUCCESS);
if (conf.installInHw()) {
- status = this.removeEntry(conf.getFlowEntry());
+ status = this.removeEntry(conf.getFlowEntry(), false);
} else {
- status = this.addEntry(conf.getFlowEntry());
+ status = this.addEntry(conf.getFlowEntry(), false);
}
if (!status.isSuccess()) {
conf.setStatus(status.getDescription());
// Now remove the entries
for (FlowEntry flowEntry : inactiveFlows) {
- Status status = this.removeEntry(flowEntry);
+ Status status = this.removeEntry(flowEntry, false);
if (!status.isSuccess()) {
log.warn("Failed to remove entry: {}. The failure is: {}",
flowEntry, status.getDescription());
log.info("Reinstalling all inactive flows");
for (FlowEntry flowEntry : this.inactiveFlows) {
- Status status = this.addEntry(flowEntry);
+ Status status = this.addEntry(flowEntry, false);
if (!status.isSuccess()) {
log.warn("Failed to install entry: {}. The failure is: {}",
flowEntry, status.getDescription());
}
@Override
- public void flowErrorReported(Node node, long rid, Object err) {
- log.error("Got error {} for message rid {} from node {}", new Object[] {
- err, rid, node });
+ public synchronized void flowErrorReported(Node node, long rid, Object err) {
+ log.trace("Got error {} for message rid {} from node {}",
+ new Object[] {err, rid, node });
+ /*
+ * If this was for a flow install, remove the corresponding entry
+ * from the software view. If it was a Looking for the rid going through the
+ * software database.
+ * TODO: A more efficient rid <-> FlowEntryInstall mapping will
+ * have to be added in future
+ */
+ Set<FlowEntryInstall> entries = nodeFlows.get(node);
+ if (entries != null) {
+ FlowEntryInstall target = null;
+ for (FlowEntryInstall entry : entries) {
+ if (entry.getRequestId() == rid) {
+ target = entry;
+ break;
+ }
+ }
+ if (target != null) {
+ // This was a flow install, update database
+ this.updateLocalDatabase(target, false);
+ }
+ }
+
+ // Notify listeners
+ if (frmAware != null) {
+ synchronized (frmAware) {
+ for (IForwardingRulesManagerAware frma : frmAware) {
+ try {
+ frma.requestFailed(rid, err.toString());
+ } catch (Exception e) {
+ log.warn("Failed to notify {}", frma);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Status solicitStatusResponse(Node node, boolean blocking) {
+ Status rv = new Status(StatusCode.INTERNALERROR);
+
+ if (this.programmer != null) {
+ if (blocking) {
+ rv = programmer.syncSendBarrierMessage(node);
+ } else {
+ rv = programmer.asyncSendBarrierMessage(node);
+ }
+ }
+
+ return rv;
}
}