X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fservices%2FSalFlowServiceImpl.java;h=3912e027980f73e6f9fdedcb1341cb3e7b4cd1dc;hb=a20de7a6effc5e4a37940e0ef5db507ad0730953;hp=55114109d089e711724315fc88eb7c367bbd32af;hpb=074ea6426dc95ebe9475cc146d534bf6d6c38541;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java index 55114109d0..3912e02798 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java @@ -10,19 +10,17 @@ package org.opendaylight.openflowplugin.impl.services; import com.google.common.base.Function; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack; -import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor; import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowHash; @@ -48,6 +46,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -65,6 +64,9 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService @Override public Future> addFlow(final AddFlowInput input) { getMessageSpy().spyMessage(input.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENTERED); + + final List ofFlowModInputs = FlowConvertor.toFlowModInputs(input, getVersion(), getDatapathId()); + final ListenableFuture> future = processFlowModInputBuilders(ofFlowModInputs); final FlowId flowId; if (null != input.getFlowRef()) { flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId(); @@ -72,18 +74,15 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService flowId = FlowUtil.createAlienFlowId(input.getTableId()); } - final DeviceContext deviceContext = getDeviceContext(); final FlowHash flowHash = FlowHashFactory.create(input, deviceContext.getPrimaryConnectionContext().getFeatures().getVersion()); final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId); + deviceContext.getDeviceFlowRegistry().store(flowHash, flowDescriptor); + Futures.addCallback(future, new FutureCallback>() { - final List ofFlowModInputs = FlowConvertor.toFlowModInputs(input, getVersion(), getDatapathId()); - final ListenableFuture> future = processFlowModInputBuilders(ofFlowModInputs); - Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(final RpcResult rpcResult) { - deviceContext.getDeviceFlowRegistry().store(flowHash, flowDescriptor); if (rpcResult.isSuccessful()) { LOG.debug("flow add finished without error, id={}", flowId.getValue()); } else { @@ -104,15 +103,15 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService @Override public Future> removeFlow(final RemoveFlowInput input) { LOG.trace("Calling remove flow for flow with ID ={}.", input.getFlowRef()); - return this.handleServiceCall(new Function, ListenableFuture>>() { + return this.handleServiceCall(new Function, ListenableFuture>>() { @Override - public ListenableFuture> apply(final DataCrate data) { + public ListenableFuture> apply(final RequestContext requestContext) { final FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, getVersion(), getDatapathId()); - final ListenableFuture> future = createResultForFlowMod(data, ofFlowModInput); - Futures.addCallback(future, new FutureCallback() { + final ListenableFuture> future = createResultForFlowMod(requestContext, ofFlowModInput); + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(final Object o) { + public void onSuccess(final RpcResult o) { final DeviceContext deviceContext = getDeviceContext(); getMessageSpy().spyMessage(input.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS); FlowHash flowHash = FlowHashFactory.create(input, deviceContext.getPrimaryConnectionContext().getFeatures().getVersion()); @@ -123,7 +122,7 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService public void onFailure(final Throwable throwable) { getMessageSpy().spyMessage(input.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE); LOG.trace("Flow modification failed..", throwable); - StringBuffer errors = new StringBuffer(); + StringBuilder errors = new StringBuilder(); try { RpcResult result = future.get(); Collection rpcErrors = result.getErrors(); @@ -169,10 +168,10 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService } allFlowMods.addAll(ofFlowModInputs); - ListenableFuture future = processFlowModInputBuilders(allFlowMods); - Futures.addCallback(future, new FutureCallback() { + ListenableFuture> future = processFlowModInputBuilders(allFlowMods); + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(final Object o) { + public void onSuccess(final RpcResult o) { final DeviceContext deviceContext = getDeviceContext(); getMessageSpy().spyMessage(input.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS); final short version = deviceContext.getPrimaryConnectionContext().getFeatures().getVersion(); @@ -194,117 +193,64 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService return future; } - private ListenableFuture> processFlowModInputBuilders( - final List ofFlowModInputs) { + private ListenableFuture> processFlowModInputBuilders(final List ofFlowModInputs) { + final List>> partialFutures = new ArrayList<>(); - for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) { - DataCrateBuilder dataCrateBuilder = DataCrateBuilder.builder().setFlowModInputBuilder(flowModInputBuilder); + for (final FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) { ListenableFuture> partialFuture = handleServiceCall( - getPrimaryConnection(), - new Function, ListenableFuture>>() { + new Function, ListenableFuture>>() { @Override - public ListenableFuture> apply(final DataCrate data) { - return createResultForFlowMod(data); + public ListenableFuture> apply(final RequestContext requestContext) { + return createResultForFlowMod(requestContext, flowModInputBuilder); } - }, - dataCrateBuilder - ); + }); partialFutures.add(partialFuture); } - // processing of final (optionally composite future) final ListenableFuture>> allFutures = Futures.successfulAsList(partialFutures); final SettableFuture> finalFuture = SettableFuture.create(); Futures.addCallback(allFutures, new FutureCallback>>() { @Override - public void onSuccess(List> results) { - List rpcErrorLot = new ArrayList<>(); - RpcResultBuilder resultBuilder; - - Iterator flowModInputBldIterator = ofFlowModInputs.iterator(); - Iterator> resultIterator = results.iterator(); - - for (ListenableFuture> partFutureFromRqCtx : partialFutures) { - FlowModInputBuilder flowModInputBld = flowModInputBldIterator.next(); - RpcResult result = resultIterator.next(); - Long xid = flowModInputBld.getXid(); - - - LOG.trace("flowMod future processing [{}], result={}", xid, result); - if (partFutureFromRqCtx.isCancelled()) { // one and only positive case - if (LOG.isTraceEnabled()) { - LOG.trace("flow future result was cancelled [{}] = barrier passed it without error", xid); - } - } else { // all negative cases - if (result == null) { // there is exception or null value set - try { - partFutureFromRqCtx.get(); - } catch (Exception e) { - rpcErrorLot.add(RpcResultBuilder.newError(ErrorType.APPLICATION, "", - "flow future result [" + xid + "] failed with exception", - OFConstants.APPLICATION_TAG, e.getMessage(), e)); - - // xid might be not available in case requestContext not even stored - if (xid != null) { - final DeviceContext deviceContext = getDeviceContext(); - deviceContext.unhookRequestCtx(new Xid(xid)); - } - } - } else { - if (result.isSuccessful()) { // positive confirmation - never happens - LOG.warn("Positive confirmation of flow push is not supported by OF-spec"); - LOG.warn("flow future result was successful [{}] = this should have never happen", - xid); - rpcErrorLot.add(RpcResultBuilder.newError(ErrorType.APPLICATION, "", - "flow future result was successful [" + xid + "] = this should have never happen")); - } else { // standard error occurred - LOG.trace("passing original rpcErrors [{}]", xid); - if (LOG.isTraceEnabled()) { - for (RpcError rpcError : result.getErrors()) { - LOG.trace("passed rpcError [{}]: {}", xid, rpcError); - } - } - rpcErrorLot.addAll(result.getErrors()); - } - } - } - } - - if (rpcErrorLot.isEmpty()) { - resultBuilder = RpcResultBuilder.success(); - } else { - resultBuilder = RpcResultBuilder.failed().withRpcErrors(rpcErrorLot); - } - - finalFuture.set(resultBuilder.build()); + public void onSuccess(final List> results) { + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + finalFuture.set(rpcResultBuilder.build()); } @Override - public void onFailure(Throwable t) { - LOG.trace("Flow mods chained future failed."); - RpcResultBuilder resultBuilder = RpcResultBuilder.failed() - .withError(ErrorType.APPLICATION, "", t.getMessage()); - finalFuture.set(resultBuilder.build()); + public void onFailure(final Throwable t) { + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed(); + finalFuture.set(rpcResultBuilder.build()); } }); return finalFuture; } - protected ListenableFuture> createResultForFlowMod(final DataCrate data) { - return createResultForFlowMod(data, data.getFlowModInputBuilder()); - } - - protected ListenableFuture> createResultForFlowMod(final DataCrate data, final FlowModInputBuilder flowModInputBuilder) { - final Xid xid = data.getRequestContext().getXid(); - flowModInputBuilder.setXid(xid.getValue()); + protected ListenableFuture> createResultForFlowMod(final RequestContext requestContext, final FlowModInputBuilder flowModInputBuilder) { + final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider(); + final long xid = requestContext.getXid().getValue(); + flowModInputBuilder.setXid(xid); final FlowModInput flowModInput = flowModInputBuilder.build(); - Future> flowModResult = provideConnectionAdapter(data.getiDConnection()).flowMod( - flowModInput); - final ListenableFuture> result = JdkFutureAdapters.listenInPoolThread(flowModResult); - return result; + final SettableFuture> settableFuture = SettableFuture.create(); + outboundQueue.commitEntry(xid, flowModInput, new FutureCallback() { + @Override + public void onSuccess(final OfHeader ofHeader) { + RequestContextUtil.closeRequstContext(requestContext); + getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS); + + settableFuture.set(SUCCESSFUL_RPCRESULT); + } + + @Override + public void onFailure(final Throwable throwable) { + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed().withError(ErrorType.APPLICATION, throwable.getMessage(), throwable); + RequestContextUtil.closeRequstContext(requestContext); + settableFuture.set(rpcResultBuilder.build()); + } + }); + return settableFuture; } }