*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.base.Function;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import com.google.common.util.concurrent.AsyncFunction;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
-import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskFactory;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
super(rpcContext);
}
+ private class DataCrate {
+ final BigInteger iDConnection;
+ final FlowModInputBuilder flowModInputBuilder;
+
+ public DataCrate(final BigInteger iDConnection, final FlowModInputBuilder flowModInputBuilder) {
+ this.iDConnection = iDConnection;
+ this.flowModInputBuilder = flowModInputBuilder;
+ }
+
+ /**
+ * @return the flowModInputBuilder
+ */
+ public FlowModInputBuilder getFlowModInputBuilder() {
+ return flowModInputBuilder;
+ }
+
+ /**
+ * @return the iDConnection
+ */
+ public BigInteger getiDConnection() {
+ return iDConnection;
+ }
+ }
+
+ <T extends DataObject, F> ListenableFuture<RpcResult<T>> handleServiceCall(final DataCrate dataCrate,
+ final Function<DataCrate, Future<RpcResult<F>>> function) {
+ LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
+
+ final RequestContext<T> requestContext = rpcContext.createRequestContext();
+ final SettableFuture<RpcResult<T>> result = rpcContext.storeOrFail(requestContext);
+
+ if (!result.isDone()) {
+ final Future<RpcResult<F>> resultFromOFLib = function.apply(dataCrate);
+
+ final RpcResultConvertor<T> rpcResultConvertor = new RpcResultConvertor<>(requestContext, deviceContext);
+ rpcResultConvertor.processResultFromOfJava(resultFromOFLib);
+
+ } else {
+ RequestContextUtil.closeRequstContext(requestContext);
+ }
+ return result;
+ }
+
@Override
public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
- return this.<AddFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
- new Function<BigInteger, Future<RpcResult<Void>>>() {
- @Override
- public ListenableFuture<RpcResult<Void>> apply(final BigInteger IDConnection) {
- final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version,
- datapathId);
- return chainFlowMods(ofFlowModInputs, 0, IDConnection);
- }
- });
+ final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, datapathId);
+ return processFlowModInputBuilders(ofFlowModInputs);
}
@Override
}
allFlowMods.addAll(ofFlowModInputs);
- LOG.debug("Number of flows to push to switch: {}", allFlowMods.size());
- Collections.<String> emptyList();
- return this.<UpdateFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
- new Function<BigInteger, Future<RpcResult<Void>>>() {
- @Override
- public Future<RpcResult<Void>> apply(final BigInteger cookie) {
- return chainFlowMods(allFlowMods, 0, cookie);
- }
- });
+ return processFlowModInputBuilders(allFlowMods);
}
- /**
- * Recursive helper method for
- * {@link OFRpcTaskFactory#chainFlowMods(java.util.List, int, org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext, org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher)}
- * {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results of multiple flowmods. The next flowmod gets
- * executed if the earlier one is successful. All the flowmods should have the same xid, in-order to cross-reference
- * the notification
- */
- protected ListenableFuture<RpcResult<Void>> chainFlowMods(final List<FlowModInputBuilder> ofFlowModInputs,
- final int index, final BigInteger cookie) {
-
- final Future<RpcResult<Void>> resultFromOFLib = createResultForFlowMod(ofFlowModInputs.get(index), cookie);
-
- final ListenableFuture<RpcResult<Void>> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
-
- if (ofFlowModInputs.size() > index + 1) {
- // there are more flowmods to chain
- return Futures.transform(result, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+ private <T extends DataObject> Future<RpcResult<T>> processFlowModInputBuilders(final List<FlowModInputBuilder> ofFlowModInputs) {
+ final List<ListenableFuture<RpcResult<T>>> partialFutures = new ArrayList<>();
+ for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) {
+ ListenableFuture<RpcResult<T>> partialFuture = handleServiceCall(new DataCrate(
+ PRIMARY_CONNECTION, flowModInputBuilder), new Function<DataCrate, Future<RpcResult<Void>>>() {
@Override
- public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
- if (input.isSuccessful()) {
- return chainFlowMods(ofFlowModInputs, index + 1, cookie);
- } else {
- LOG.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}", ofFlowModInputs.get(index)
- .getXid());
- return Futures.immediateFuture(input);
- }
+ public ListenableFuture<RpcResult<Void>> apply(final DataCrate dataCrate) {
+ return createResultForFlowMod(dataCrate.getFlowModInputBuilder(), dataCrate.getiDConnection());
}
});
- } else {
- return result;
+ partialFutures.add(partialFuture);
}
+
+ ListenableFuture<List<RpcResult<T>>> allFutures = Futures.allAsList(partialFutures);
+ final SettableFuture<RpcResult<T>> finalFuture = SettableFuture.create();
+ Futures.addCallback(allFutures, new FutureCallback<List<RpcResult<T>>>() {
+ @Override
+ public void onSuccess(List<RpcResult<T>> result) {
+ for (RpcResult<T> rpcResult : result) {
+ if (rpcResult.isSuccessful()) {
+ //TODO: AddFlowOutput has getTransactionId() - shouldn't it have some value?
+ finalFuture.set(RpcResultBuilder.<T> success().build());
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ finalFuture.set(RpcResultBuilder.<T> failed()
+ .withError(ErrorType.APPLICATION, "", t.getMessage()).build());
+ }
+ });
+
+ return finalFuture;
}
- protected Future<RpcResult<Void>> createResultForFlowMod(final FlowModInputBuilder flowModInput,
+ protected ListenableFuture<RpcResult<Void>> createResultForFlowMod(final FlowModInputBuilder flowModInput,
final BigInteger cookie) {
flowModInput.setXid(deviceContext.getNextXid().getValue());
- return provideConnectionAdapter(cookie).flowMod(flowModInput.build());
+ Future<RpcResult<Void>> flowModResult = provideConnectionAdapter(cookie).flowMod(flowModInput.build());
+ return JdkFutureAdapters.listenInPoolThread(flowModResult);
}
}