From: Jozef Gloncak Date: Mon, 30 Mar 2015 12:13:44 +0000 (+0200) Subject: FlowService - standalone request context for multiflow input X-Git-Tag: release/lithium~591 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F35%2F17335%2F2;p=openflowplugin.git FlowService - standalone request context for multiflow input If input flow is devided to more (2 maybe more) flows then result Future isn't chained but for every flow is created standalone request context with unique Xid. Methods addFlow and updateFlow return future object which will contains global result of all partial futures (if one fail then global fail, if all success then global success) Change-Id: I0406888a0568d6ba3905113cde69e2a96aa78141 Signed-off-by: Jozef Gloncak --- diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java index fcf0dd43d3..eb97f74159 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java @@ -7,19 +7,22 @@ */ package org.opendaylight.openflowplugin.impl.services; -import com.google.common.base.Function; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import com.google.common.util.concurrent.AsyncFunction; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.yangtools.yang.binding.DataObject; +import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.Future; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; -import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskFactory; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor; import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; @@ -45,17 +48,53 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService super(rpcContext); } + private class DataCrate { + final BigInteger iDConnection; + final FlowModInputBuilder flowModInputBuilder; + + public DataCrate(final BigInteger iDConnection, final FlowModInputBuilder flowModInputBuilder) { + this.iDConnection = iDConnection; + this.flowModInputBuilder = flowModInputBuilder; + } + + /** + * @return the flowModInputBuilder + */ + public FlowModInputBuilder getFlowModInputBuilder() { + return flowModInputBuilder; + } + + /** + * @return the iDConnection + */ + public BigInteger getiDConnection() { + return iDConnection; + } + } + + ListenableFuture> handleServiceCall(final DataCrate dataCrate, + final Function>> function) { + LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); + + final RequestContext requestContext = rpcContext.createRequestContext(); + final SettableFuture> result = rpcContext.storeOrFail(requestContext); + + if (!result.isDone()) { + final Future> resultFromOFLib = function.apply(dataCrate); + + final RpcResultConvertor rpcResultConvertor = new RpcResultConvertor<>(requestContext, deviceContext); + rpcResultConvertor.processResultFromOfJava(resultFromOFLib); + + } else { + RequestContextUtil.closeRequstContext(requestContext); + } + return result; + } + @Override public Future> addFlow(final AddFlowInput input) { - return this. handleServiceCall(PRIMARY_CONNECTION, - new Function>>() { - @Override - public ListenableFuture> apply(final BigInteger IDConnection) { - final List ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, - datapathId); - return chainFlowMods(ofFlowModInputs, 0, IDConnection); - } - }); + final List ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, datapathId); + return processFlowModInputBuilders(ofFlowModInputs); } @Override @@ -96,54 +135,50 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService } allFlowMods.addAll(ofFlowModInputs); - LOG.debug("Number of flows to push to switch: {}", allFlowMods.size()); - Collections. emptyList(); - return this. handleServiceCall(PRIMARY_CONNECTION, - new Function>>() { - @Override - public Future> apply(final BigInteger cookie) { - return chainFlowMods(allFlowMods, 0, cookie); - } - }); + return processFlowModInputBuilders(allFlowMods); } - /** - * Recursive helper method for - * {@link OFRpcTaskFactory#chainFlowMods(java.util.List, int, org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext, org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher)} - * {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results of multiple flowmods. The next flowmod gets - * executed if the earlier one is successful. All the flowmods should have the same xid, in-order to cross-reference - * the notification - */ - protected ListenableFuture> chainFlowMods(final List ofFlowModInputs, - final int index, final BigInteger cookie) { - - final Future> resultFromOFLib = createResultForFlowMod(ofFlowModInputs.get(index), cookie); - - final ListenableFuture> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - if (ofFlowModInputs.size() > index + 1) { - // there are more flowmods to chain - return Futures.transform(result, new AsyncFunction, RpcResult>() { + private Future> processFlowModInputBuilders(final List ofFlowModInputs) { + final List>> partialFutures = new ArrayList<>(); + for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) { + ListenableFuture> partialFuture = handleServiceCall(new DataCrate( + PRIMARY_CONNECTION, flowModInputBuilder), new Function>>() { @Override - public ListenableFuture> apply(final RpcResult input) throws Exception { - if (input.isSuccessful()) { - return chainFlowMods(ofFlowModInputs, index + 1, cookie); - } else { - LOG.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}", ofFlowModInputs.get(index) - .getXid()); - return Futures.immediateFuture(input); - } + public ListenableFuture> apply(final DataCrate dataCrate) { + return createResultForFlowMod(dataCrate.getFlowModInputBuilder(), dataCrate.getiDConnection()); } }); - } else { - return result; + partialFutures.add(partialFuture); } + + ListenableFuture>> allFutures = Futures.allAsList(partialFutures); + final SettableFuture> finalFuture = SettableFuture.create(); + Futures.addCallback(allFutures, new FutureCallback>>() { + @Override + public void onSuccess(List> result) { + for (RpcResult rpcResult : result) { + if (rpcResult.isSuccessful()) { + //TODO: AddFlowOutput has getTransactionId() - shouldn't it have some value? + finalFuture.set(RpcResultBuilder. success().build()); + } + } + } + + @Override + public void onFailure(Throwable t) { + finalFuture.set(RpcResultBuilder. failed() + .withError(ErrorType.APPLICATION, "", t.getMessage()).build()); + } + }); + + return finalFuture; } - protected Future> createResultForFlowMod(final FlowModInputBuilder flowModInput, + protected ListenableFuture> createResultForFlowMod(final FlowModInputBuilder flowModInput, final BigInteger cookie) { flowModInput.setXid(deviceContext.getNextXid().getValue()); - return provideConnectionAdapter(cookie).flowMod(flowModInput.build()); + Future> flowModResult = provideConnectionAdapter(cookie).flowMod(flowModInput.build()); + return JdkFutureAdapters.listenInPoolThread(flowModResult); } }