+ /**
+ * Recursive helper method for {@link OFRpcTaskFactory#createAddFlowTask()}
+ * and {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results
+ * of multiple flowmods.
+ * The next flowmod gets executed if the earlier one is successful.
+ * All the flowmods should have the same xid, in-order to cross-reference
+ * the notification
+ */
+ private static ListenableFuture<RpcResult<UpdateFlowOutput>> chainFlowMods(
+ final List<FlowModInputBuilder> ofFlowModInputs, final int index,
+ final OFRpcTaskContext taskContext, final SwitchConnectionDistinguisher cookie) {
+
+ Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
+ createResultForFlowMod(taskContext, ofFlowModInputs.get(index), cookie);
+
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ if(ofFlowModInputs.size() > index + 1) {
+ // there are more flowmods to chain
+ return Futures.transform(result,
+ new AsyncFunction<RpcResult<UpdateFlowOutput>, RpcResult<UpdateFlowOutput>>() {
+ @Override
+ public ListenableFuture<RpcResult<UpdateFlowOutput>> apply(RpcResult<UpdateFlowOutput> input) throws Exception {
+ if (input.isSuccessful()) {
+ return chainFlowMods(ofFlowModInputs, index + 1, taskContext, cookie);
+ } else {
+ logger.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}",
+ taskContext.getSession().getFeatures().getXid());
+ return Futures.immediateFuture(input);
+ }
+ }
+ }
+ );
+ } else {
+ return result;
+ }
+ }
+
+ private static Future<RpcResult<UpdateFlowOutput>> createResultForFlowMod(
+ OFRpcTaskContext taskContext, FlowModInputBuilder flowModInput,
+ SwitchConnectionDistinguisher cookie) {
+ flowModInput.setXid(taskContext.getSession().getFeatures().getXid());
+ return taskContext.getMessageService().flowMod(flowModInput.build(), cookie);
+ }
+
+