*/
package org.opendaylight.openflowplugin.openflow.md.core.sal;
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Future;
-
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.openflowjava.protocol.api.util.BinContent;
-import org.opendaylight.openflowplugin.openflow.md.OFConstants;
+import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor;
import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
import org.opendaylight.openflowplugin.openflow.md.util.OpenflowVersion;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
/**
*
*/
public abstract class OFRpcTaskFactory {
+ private static final Logger logger = LoggerFactory.getLogger(OFRpcTaskFactory.class);
/**
* @param taskContext
OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
} else {
// Convert the AddFlowInput to FlowModInput
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(),
+ List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(getInput(),
getVersion(), getSession().getFeatures().getDatapathId());
- final Long xId = getSession().getNextXid();
- ofFlowModInput.setXid(xId);
-
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
- getMessageService().flowMod(ofFlowModInput.build(), getCookie());
- result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
-
- OFRpcTaskUtil.hookFutureNotification(this, result,
- getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput()));
- }
+ logger.debug("Number of flows to push to switch: {}", ofFlowModInputs.size());
+
+ Long xId = getSession().getNextXid();
+
+ result = chainFlowMods(ofFlowModInputs, 0, getTaskContext(), getCookie());
+
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(),
+ createFlowAddedNotification(xId, getInput()));
+ }
return result;
}
};
-
return task;
}
+ /**
+ * Recursive helper method for {@link OFRpcTaskFactory#createAddFlowTask()}
+ * and {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results
+ * of multiple flowmods.
+ * The next flowmod gets executed if the earlier one is successful.
+ * All the flowmods should have the same xid, in-order to cross-reference
+ * the notification
+ */
+ private static ListenableFuture<RpcResult<UpdateFlowOutput>> chainFlowMods(
+ final List<FlowModInputBuilder> ofFlowModInputs, final int index,
+ final OFRpcTaskContext taskContext, final SwitchConnectionDistinguisher cookie) {
+
+ Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
+ createResultForFlowMod(taskContext, ofFlowModInputs.get(index), cookie);
+
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ if(ofFlowModInputs.size() > index + 1) {
+ // there are more flowmods to chain
+ return Futures.transform(result,
+ new AsyncFunction<RpcResult<UpdateFlowOutput>, RpcResult<UpdateFlowOutput>>() {
+ @Override
+ public ListenableFuture<RpcResult<UpdateFlowOutput>> apply(RpcResult<UpdateFlowOutput> input) throws Exception {
+ if (input.isSuccessful()) {
+ return chainFlowMods(ofFlowModInputs, index + 1, taskContext, cookie);
+ } else {
+ logger.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}",
+ taskContext.getSession().getFeatures().getXid());
+ return Futures.immediateFuture(input);
+ }
+ }
+ }
+ );
+ } else {
+ return result;
+ }
+ }
+
+ private static Future<RpcResult<UpdateFlowOutput>> createResultForFlowMod(
+ OFRpcTaskContext taskContext, FlowModInputBuilder flowModInput,
+ SwitchConnectionDistinguisher cookie) {
+ flowModInput.setXid(taskContext.getSession().getFeatures().getXid());
+ return taskContext.getMessageService().flowMod(flowModInput.build(), cookie);
+ }
+
+
/**
* @param xId
* @return
* @return UpdateFlow task
*/
public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
- OFRpcTaskContext taskContext, UpdateFlowInput input,
+ final OFRpcTaskContext taskContext, UpdateFlowInput input,
SwitchConnectionDistinguisher cookie) {
OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
} else {
- Flow flow = null;
Long xId = getSession().getNextXid();
boolean updatedFlow = (getInput().getUpdatedFlow().getMatch().equals(getInput().getOriginalFlow().getMatch())) &&
(getInput().getUpdatedFlow().getPriority().equals(getInput().getOriginalFlow().getPriority()));
+ List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
+ List<FlowModInputBuilder> ofFlowModInputs =
+ FlowConvertor.toFlowModInputs(getInput().getUpdatedFlow(),
+ getVersion(), getSession().getFeatures().getDatapathId());
if (updatedFlow == false) {
// if neither match nor priority matches, then we would need to remove the flow and add it
//remove flow
RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(getInput().getOriginalFlow());
FlowModInputBuilder ofFlowRemoveInput = FlowConvertor.toFlowModInput(removeflow.build(),
- getVersion(),getSession().getFeatures().getDatapathId());
+ getVersion(),getSession().getFeatures().getDatapathId());
ofFlowRemoveInput.setXid(xId);
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLibRemove = getMessageService().
- flowMod(ofFlowRemoveInput.build(), getCookie());
- //add flow
- AddFlowInputBuilder addFlow = new AddFlowInputBuilder(getInput().getUpdatedFlow());
- flow = addFlow.build();
- } else {
- //update flow
- flow = getInput().getUpdatedFlow();
+ // remove flow should be the first
+ allFlowMods.add(ofFlowRemoveInput);
}
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(flow, getVersion(),
- getSession().getFeatures().getDatapathId());
-
- ofFlowModInput.setXid(xId);
-
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
- getMessageService().flowMod(ofFlowModInput.build(), getCookie());
- result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+ allFlowMods.addAll(ofFlowModInputs);
+ logger.debug("Number of flows to push to switch: {}", allFlowMods.size());
+ result = chainFlowMods(allFlowMods, 0, getTaskContext(), getCookie());
OFRpcTaskUtil.hookFutureNotification(this, result,
- getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput()));
+ getRpcNotificationProviderService(),
+ createFlowUpdatedNotification(xId, getInput()));
}
return result;
}
};
return task;
}
+
/**
* @param xId
// Create multipart request header
MultipartRequestInputBuilder mprInput = createMultipartHeader(MultipartType.OFPMPGROUP,
- taskContext);
+ taskContext, xid);
// Set request body to main multipart request
mprInput.setMultipartRequestBody(caseBuilder.build());
MultipartRequestGroupDescCaseBuilder mprGroupDescCaseBuild =
new MultipartRequestGroupDescCaseBuilder();
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPGROUPDESC, taskContext);
+ createMultipartHeader(MultipartType.OFPMPGROUPDESC, taskContext, xid);
mprInput.setMultipartRequestBody(mprGroupDescCaseBuild.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild =
new MultipartRequestGroupFeaturesCaseBuilder();
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPGROUPFEATURES, taskContext);
+ createMultipartHeader(MultipartType.OFPMPGROUPFEATURES, taskContext, xid);
mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
caseBuilder.setMultipartRequestGroup(mprGroupBuild.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPGROUP, taskContext);
+ createMultipartHeader(MultipartType.OFPMPGROUP, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
caseBuilder.setMultipartRequestMeterConfig(mprMeterConfigBuild.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPMETERCONFIG, taskContext);
+ createMultipartHeader(MultipartType.OFPMPMETERCONFIG, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
caseBuilder.setMultipartRequestMeter(mprMeterBuild.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPMETER, taskContext);
+ createMultipartHeader(MultipartType.OFPMPMETER, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
new MultipartRequestMeterFeaturesCaseBuilder();
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPMETERFEATURES, taskContext);
+ createMultipartHeader(MultipartType.OFPMPMETERFEATURES, taskContext, xid);
mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
caseBuilder.setMultipartRequestMeter(mprMeterBuild.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPMETER, taskContext);
+ createMultipartHeader(MultipartType.OFPMPMETER, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
caseBuilder.setMultipartRequestPortStats(mprPortStatsBuilder.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPPORTSTATS, taskContext);
+ createMultipartHeader(MultipartType.OFPMPPORTSTATS, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
caseBuilder.setMultipartRequestPortStats(mprPortStatsBuilder.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPPORTSTATS, taskContext);
+ createMultipartHeader(MultipartType.OFPMPPORTSTATS, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
.getPrimaryConductor().getVersion(), mprFlowRequestBuilder);
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPFLOW, taskContext);
+ createMultipartHeader(MultipartType.OFPMPFLOW, taskContext, xid);
mprInput.setMultipartRequestBody(multipartRequestFlowCaseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
.getPrimaryConductor().getVersion(), mprFlowRequestBuilder);
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPFLOW, taskContext);
+ createMultipartHeader(MultipartType.OFPMPFLOW, taskContext, xid);
multipartRequestFlowCaseBuilder.setMultipartRequestFlow(mprFlowRequestBuilder.build());
mprInput.setMultipartRequestBody(multipartRequestFlowCaseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
// Set request body to main multipart request
multipartRequestFlowCaseBuilder.setMultipartRequestFlow(mprFlowRequestBuilder.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPFLOW, taskContext);
+ createMultipartHeader(MultipartType.OFPMPFLOW, taskContext, xid);
mprInput.setMultipartRequestBody(multipartRequestFlowCaseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
// Set request body to main multipart request
multipartRequestAggregateCaseBuilder.setMultipartRequestAggregate(mprAggregateRequestBuilder.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPAGGREGATE, taskContext);
+ createMultipartHeader(MultipartType.OFPMPAGGREGATE, taskContext, xid);
mprInput.setMultipartRequestBody(multipartRequestAggregateCaseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
// Set request body to main multipart request
multipartRequestAggregateCaseBuilder.setMultipartRequestAggregate(mprAggregateRequestBuilder.build());
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPAGGREGATE, taskContext);
+ createMultipartHeader(MultipartType.OFPMPAGGREGATE, taskContext, xid);
mprInput.setMultipartRequestBody(multipartRequestAggregateCaseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
// Set request body to main multipart request
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPTABLE, taskContext);
+ createMultipartHeader(MultipartType.OFPMPTABLE, taskContext, xid);
mprInput.setMultipartRequestBody(multipartRequestTableCaseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
// Set request body to main multipart request
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPQUEUE, taskContext);
+ createMultipartHeader(MultipartType.OFPMPQUEUE, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
// Set request body to main multipart request
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPQUEUE, taskContext);
+ createMultipartHeader(MultipartType.OFPMPQUEUE, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
// Set request body to main multipart request
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPQUEUE, taskContext);
+ createMultipartHeader(MultipartType.OFPMPQUEUE, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()
.multipartRequest(mprInput.build(), getCookie());
}
static MultipartRequestInputBuilder createMultipartHeader(MultipartType multipart,
- OFRpcTaskContext taskContext) {
+ OFRpcTaskContext taskContext, Long xid) {
MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
mprInput.setType(multipart);
mprInput.setVersion(taskContext.getSession().getPrimaryConductor().getVersion());
- mprInput.setXid(taskContext.getSession().getNextXid());
+ mprInput.setXid(xid);
mprInput.setFlags(new MultipartRequestFlags(false));
return mprInput;
}
// Set request body to main multipart request
MultipartRequestInputBuilder mprInput =
- createMultipartHeader(MultipartType.OFPMPTABLEFEATURES, taskContext);
+ createMultipartHeader(MultipartType.OFPMPTABLEFEATURES, taskContext, xid);
mprInput.setMultipartRequestBody(caseBuilder.build());
Future<RpcResult<Void>> resultFromOFLib = getMessageService()