From 9edf847d749c35c9c2ef802a9a0c0e4ebf7bd9a9 Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Tue, 26 Aug 2014 15:27:26 -0700 Subject: [PATCH] Bug-1421 - Chaining of results and updateFlow 1. Chained the flowmod results when more than one flows are added/updated 2. Fixed updateFlow when match/priority were changed. After RemoveFlow, add flow was being passed in as null. 3. Create a common set of methods to be used for both add and update flow tasks Change-Id: I5d3576150bc5da79764c1b311e9c51df387f8c6f Signed-off-by: Kamal Rameshan --- .../md/core/sal/OFRpcTaskFactory.java | 132 +++++++++--------- 1 file changed, 68 insertions(+), 64 deletions(-) diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java index 6eea0497da..5cbb4f9ea2 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java @@ -165,6 +165,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -202,18 +203,13 @@ public abstract class OFRpcTaskFactory { logger.debug("Number of flows to push to switch: {}", ofFlowModInputs.size()); - for (FlowModInputBuilder ofFlowModInput : ofFlowModInputs) { - final Long xId = getSession().getNextXid(); - ofFlowModInput.setXid(xId); - logger.debug("Flow Insert xid:{}", xId); + Long xId = getSession().getNextXid(); - Future> resultFromOFLib = - getMessageService().flowMod(ofFlowModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + result = chainFlowMods(ofFlowModInputs, 0, getTaskContext(), getCookie()); - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput())); - } + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), + createFlowAddedNotification(xId, getInput())); } return result; } @@ -221,6 +217,52 @@ public abstract class OFRpcTaskFactory { return task; } + /** + * Recursive helper method for {@link OFRpcTaskFactory#createAddFlowTask()} + * and {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results + * of multiple flowmods. + * The next flowmod gets executed if the earlier one is successful. + * All the flowmods should have the same xid, in-order to cross-reference + * the notification + */ + private static ListenableFuture> chainFlowMods( + final List ofFlowModInputs, final int index, + final OFRpcTaskContext taskContext, final SwitchConnectionDistinguisher cookie) { + + Future> resultFromOFLib = + createResultForFlowMod(taskContext, ofFlowModInputs.get(index), cookie); + + ListenableFuture> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + if(ofFlowModInputs.size() > index + 1) { + // there are more flowmods to chain + return Futures.transform(result, + new AsyncFunction, RpcResult>() { + @Override + public ListenableFuture> apply(RpcResult input) throws Exception { + if (input.isSuccessful()) { + return chainFlowMods(ofFlowModInputs, index + 1, taskContext, cookie); + } else { + logger.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}", + taskContext.getSession().getFeatures().getXid()); + return Futures.immediateFuture(input); + } + } + } + ); + } else { + return result; + } + } + + private static Future> createResultForFlowMod( + OFRpcTaskContext taskContext, FlowModInputBuilder flowModInput, + SwitchConnectionDistinguisher cookie) { + flowModInput.setXid(taskContext.getSession().getFeatures().getXid()); + return taskContext.getMessageService().flowMod(flowModInput.build(), cookie); + } + + /** * @param xId * @return @@ -260,31 +302,33 @@ public abstract class OFRpcTaskFactory { OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); } else { - Flow flow = null; Long xId = getSession().getNextXid(); boolean updatedFlow = (getInput().getUpdatedFlow().getMatch().equals(getInput().getOriginalFlow().getMatch())) && (getInput().getUpdatedFlow().getPriority().equals(getInput().getOriginalFlow().getPriority())); + List allFlowMods = new ArrayList<>(); + List ofFlowModInputs = + FlowConvertor.toFlowModInputs(getInput().getUpdatedFlow(), + getVersion(), getSession().getFeatures().getDatapathId()); + if (updatedFlow == false) { // if neither match nor priority matches, then we would need to remove the flow and add it //remove flow RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(getInput().getOriginalFlow()); FlowModInputBuilder ofFlowRemoveInput = FlowConvertor.toFlowModInput(removeflow.build(), - getVersion(),getSession().getFeatures().getDatapathId()); + getVersion(),getSession().getFeatures().getDatapathId()); ofFlowRemoveInput.setXid(xId); - Future> resultFromOFLibRemove = getMessageService(). - flowMod(ofFlowRemoveInput.build(), getCookie()); - - result = Futures.transform(JdkFutureAdapters.listenInPoolThread(resultFromOFLibRemove), - decodeRemoveFlowAndCreateFlow(taskContext, getCookie())); - } else { - //update flow - flow = getInput().getUpdatedFlow(); - result = JdkFutureAdapters.listenInPoolThread(createResultForAddFlow( - taskContext, flow, getCookie())); + // remove flow should be the first + allFlowMods.add(ofFlowRemoveInput); } + + allFlowMods.addAll(ofFlowModInputs); + logger.debug("Number of flows to push to switch: {}", allFlowMods.size()); + result = chainFlowMods(allFlowMods, 0, getTaskContext(), getCookie()); + OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput())); + getRpcNotificationProviderService(), + createFlowUpdatedNotification(xId, getInput())); } return result; } @@ -292,47 +336,7 @@ public abstract class OFRpcTaskFactory { return task; } - /** - * Helper method for {@link OFRpcTaskFactory#createUpdateFlowTask()}. Decides whether flow - * removing ends successfully and if yes, it performs adding of new flow. - * - * @param taskContext - * @param cookie - * @return asyncFunction - */ - protected static AsyncFunction, RpcResult> - decodeRemoveFlowAndCreateFlow(final OFRpcTaskContext taskContext, final SwitchConnectionDistinguisher cookie) { - return new AsyncFunction, RpcResult>() { - @Override - public ListenableFuture> apply( - RpcResult input) throws Exception { - if(input.isSuccessful()) { - return JdkFutureAdapters.listenInPoolThread(createResultForAddFlow(taskContext, null, cookie)); - } else { - return Futures.immediateFuture(input); - } - } - }; - } - - /** - * Helper method for {@link OFRpcTaskFactory#createUpdateFlowTask()}. It performs adding of new flow. - * - * @param taskContext - * @param flow - * @param cookie - * @return future - */ - protected static Future> createResultForAddFlow(OFRpcTaskContext taskContext, - Flow flow, - SwitchConnectionDistinguisher cookie) { - FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(flow, - taskContext.getSession().getFeatures().getVersion(), - taskContext.getSession().getFeatures().getDatapathId()); - ofFlowModInput.setXid(taskContext.getSession().getFeatures().getXid()); - return taskContext.getMessageService().flowMod(ofFlowModInput.build(), cookie); - } - + /** * @param xId * @param input -- 2.36.6