import org.slf4j.LoggerFactory;
import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
logger.debug("Number of flows to push to switch: {}", ofFlowModInputs.size());
- for (FlowModInputBuilder ofFlowModInput : ofFlowModInputs) {
- final Long xId = getSession().getNextXid();
- ofFlowModInput.setXid(xId);
- logger.debug("Flow Insert xid:{}", xId);
+ Long xId = getSession().getNextXid();
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
- getMessageService().flowMod(ofFlowModInput.build(), getCookie());
- result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+ result = chainFlowMods(ofFlowModInputs, 0, getTaskContext(), getCookie());
- OFRpcTaskUtil.hookFutureNotification(this, result,
- getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput()));
- }
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(),
+ createFlowAddedNotification(xId, getInput()));
}
return result;
}
return task;
}
+ /**
+ * Recursive helper method for {@link OFRpcTaskFactory#createAddFlowTask()}
+ * and {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results
+ * of multiple flowmods.
+ * The next flowmod gets executed if the earlier one is successful.
+ * All the flowmods should have the same xid, in-order to cross-reference
+ * the notification
+ */
+ private static ListenableFuture<RpcResult<UpdateFlowOutput>> chainFlowMods(
+ final List<FlowModInputBuilder> ofFlowModInputs, final int index,
+ final OFRpcTaskContext taskContext, final SwitchConnectionDistinguisher cookie) {
+
+ Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
+ createResultForFlowMod(taskContext, ofFlowModInputs.get(index), cookie);
+
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ if(ofFlowModInputs.size() > index + 1) {
+ // there are more flowmods to chain
+ return Futures.transform(result,
+ new AsyncFunction<RpcResult<UpdateFlowOutput>, RpcResult<UpdateFlowOutput>>() {
+ @Override
+ public ListenableFuture<RpcResult<UpdateFlowOutput>> apply(RpcResult<UpdateFlowOutput> input) throws Exception {
+ if (input.isSuccessful()) {
+ return chainFlowMods(ofFlowModInputs, index + 1, taskContext, cookie);
+ } else {
+ logger.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}",
+ taskContext.getSession().getFeatures().getXid());
+ return Futures.immediateFuture(input);
+ }
+ }
+ }
+ );
+ } else {
+ return result;
+ }
+ }
+
+ private static Future<RpcResult<UpdateFlowOutput>> createResultForFlowMod(
+ OFRpcTaskContext taskContext, FlowModInputBuilder flowModInput,
+ SwitchConnectionDistinguisher cookie) {
+ flowModInput.setXid(taskContext.getSession().getFeatures().getXid());
+ return taskContext.getMessageService().flowMod(flowModInput.build(), cookie);
+ }
+
+
/**
* @param xId
* @return
OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
} else {
- Flow flow = null;
Long xId = getSession().getNextXid();
boolean updatedFlow = (getInput().getUpdatedFlow().getMatch().equals(getInput().getOriginalFlow().getMatch())) &&
(getInput().getUpdatedFlow().getPriority().equals(getInput().getOriginalFlow().getPriority()));
+ List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
+ List<FlowModInputBuilder> ofFlowModInputs =
+ FlowConvertor.toFlowModInputs(getInput().getUpdatedFlow(),
+ getVersion(), getSession().getFeatures().getDatapathId());
+
if (updatedFlow == false) {
// if neither match nor priority matches, then we would need to remove the flow and add it
//remove flow
RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(getInput().getOriginalFlow());
FlowModInputBuilder ofFlowRemoveInput = FlowConvertor.toFlowModInput(removeflow.build(),
- getVersion(),getSession().getFeatures().getDatapathId());
+ getVersion(),getSession().getFeatures().getDatapathId());
ofFlowRemoveInput.setXid(xId);
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLibRemove = getMessageService().
- flowMod(ofFlowRemoveInput.build(), getCookie());
-
- result = Futures.transform(JdkFutureAdapters.listenInPoolThread(resultFromOFLibRemove),
- decodeRemoveFlowAndCreateFlow(taskContext, getCookie()));
- } else {
- //update flow
- flow = getInput().getUpdatedFlow();
- result = JdkFutureAdapters.listenInPoolThread(createResultForAddFlow(
- taskContext, flow, getCookie()));
+ // remove flow should be the first
+ allFlowMods.add(ofFlowRemoveInput);
}
+
+ allFlowMods.addAll(ofFlowModInputs);
+ logger.debug("Number of flows to push to switch: {}", allFlowMods.size());
+ result = chainFlowMods(allFlowMods, 0, getTaskContext(), getCookie());
+
OFRpcTaskUtil.hookFutureNotification(this, result,
- getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput()));
+ getRpcNotificationProviderService(),
+ createFlowUpdatedNotification(xId, getInput()));
}
return result;
}
return task;
}
- /**
- * Helper method for {@link OFRpcTaskFactory#createUpdateFlowTask()}. Decides whether flow
- * removing ends successfully and if yes, it performs adding of new flow.
- *
- * @param taskContext
- * @param cookie
- * @return asyncFunction
- */
- protected static AsyncFunction<RpcResult<UpdateFlowOutput>, RpcResult<UpdateFlowOutput>>
- decodeRemoveFlowAndCreateFlow(final OFRpcTaskContext taskContext, final SwitchConnectionDistinguisher cookie) {
- return new AsyncFunction<RpcResult<UpdateFlowOutput>, RpcResult<UpdateFlowOutput>>() {
- @Override
- public ListenableFuture<RpcResult<UpdateFlowOutput>> apply(
- RpcResult<UpdateFlowOutput> input) throws Exception {
- if(input.isSuccessful()) {
- return JdkFutureAdapters.listenInPoolThread(createResultForAddFlow(taskContext, null, cookie));
- } else {
- return Futures.immediateFuture(input);
- }
- }
- };
- }
-
- /**
- * Helper method for {@link OFRpcTaskFactory#createUpdateFlowTask()}. It performs adding of new flow.
- *
- * @param taskContext
- * @param flow
- * @param cookie
- * @return future
- */
- protected static Future<RpcResult<UpdateFlowOutput>> createResultForAddFlow(OFRpcTaskContext taskContext,
- Flow flow,
- SwitchConnectionDistinguisher cookie) {
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(flow,
- taskContext.getSession().getFeatures().getVersion(),
- taskContext.getSession().getFeatures().getDatapathId());
- ofFlowModInput.setXid(taskContext.getSession().getFeatures().getXid());
- return taskContext.getMessageService().flowMod(ofFlowModInput.build(), cookie);
- }
-
+
/**
* @param xId
* @param input