From 6a97ad9e38ec2ad669c7b51acb86c9b3a5cefad1 Mon Sep 17 00:00:00 2001 From: Timotej Kubas Date: Wed, 22 Apr 2015 11:31:11 +0200 Subject: [PATCH] tidy up addFlow futures Change-Id: Ib29873bb207ff6b5ef63f31926c06578ace63ec8 Signed-off-by: Timotej Kubas --- .../impl/device/BarrierProcessor.java | 4 +- .../impl/rpc/RpcContextImpl.java | 5 +- .../impl/services/CommonService.java | 33 +++- .../services/OFJResult2RequestCtxFuture.java | 2 + .../impl/services/SalFlowServiceImpl.java | 178 ++++++++---------- 5 files changed, 108 insertions(+), 114 deletions(-) diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessor.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessor.java index feb19cc87c..cdb2a79a77 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessor.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessor.java @@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.impl.device; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.handlers.OutstandingMessageExtractor; +import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +42,9 @@ public class BarrierProcessor { LOG.trace("processing barrier response [{}]", barrierXid); RequestContext nextRequestContext; while ((nextRequestContext = messageExtractor.extractNextOutstandingMessage(barrierXid)) != null ) { - LOG.trace("flushing outstanding request [{}]", nextRequestContext.getXid().getValue()); + LOG.trace("flushing outstanding request [{}], closing", nextRequestContext.getXid().getValue()); nextRequestContext.getFuture().cancel(false); + RequestContextUtil.closeRequstContext(nextRequestContext); } } } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java index dbbffd4bd5..5e43c18418 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java @@ -7,8 +7,6 @@ */ package org.opendaylight.openflowplugin.impl.rpc; -import org.slf4j.Logger; - import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collections; @@ -23,6 +21,7 @@ import org.opendaylight.yangtools.yang.binding.RpcService; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.slf4j.Logger; public class RpcContextImpl implements RpcContext { @@ -93,7 +92,7 @@ public class RpcContextImpl implements RpcContext { public void forgetRequestContext(final RequestContext requestContext) { synchronizedRequestsList.remove(requestContext); LOG.trace("Removed request context with xid {}. Context request in list {}.", - requestContext.getXid(), synchronizedRequestsList.size()); + requestContext.getXid().getValue(), synchronizedRequestsList.size()); } @Override diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java index 321b4747e4..7a60761585 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java @@ -80,15 +80,37 @@ public abstract class CommonService { return primaryConnectionAdapter; } - public Future> handleServiceCall(final BigInteger connectionID, - final Function, ListenableFuture>> function) { - LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); + /** + * @param connectionID connection identifier + * @param function data sender + * @param rpc result backend type + * @param final rpc backend type + * @return + */ + public ListenableFuture> handleServiceCall(final BigInteger connectionID, + final Function, ListenableFuture>> function) { + DataCrateBuilder dataCrateBuilder = DataCrateBuilder.builder(); + return handleServiceCall(connectionID, function, dataCrateBuilder); + } + + /** + * @param + * @param + * @param connectionID + * @param function + * @param dataCrateBuilder predefined data + * @return + */ + public ListenableFuture> handleServiceCall(final BigInteger connectionID, + final Function, ListenableFuture>> function, + final DataCrateBuilder dataCrateBuilder) { + LOG.debug("Handling general service call"); final RequestContext requestContext = requestContextStack.createRequestContext(); final SettableFuture> result = requestContextStack.storeOrFail(requestContext); if (!result.isDone()) { - final DataCrate dataCrate = DataCrateBuilder.builder().setiDConnection(connectionID) - .setRequestContext(requestContext).build(); + DataCrate dataCrate = dataCrateBuilder.setiDConnection(connectionID).setRequestContext(requestContext) + .build(); requestContext.setXid(deviceContext.getNextXid()); LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue()); @@ -101,7 +123,6 @@ public abstract class CommonService { } else { messageSpy.spyMessage(requestContext, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE); - RequestContextUtil.closeRequstContext(requestContext); } return result; } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/OFJResult2RequestCtxFuture.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/OFJResult2RequestCtxFuture.java index cc2d15a67f..91c5e315e5 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/OFJResult2RequestCtxFuture.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/OFJResult2RequestCtxFuture.java @@ -55,6 +55,7 @@ public class OFJResult2RequestCtxFuture { RpcResultBuilder.failed().withRpcErrors(fRpcResult.getErrors()).build()); RequestContextUtil.closeRequstContext(requestContext); } + // else: message was successfully sent - waiting for callback on requestContext.future to get invoked } @Override @@ -66,6 +67,7 @@ public class OFJResult2RequestCtxFuture { requestContext.getFuture().set(RpcResultBuilder.success().build()); } else { deviceContext.getMessageSpy().spyMessage(requestContext, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); + deviceContext.unhookRequestCtx(requestContext.getXid()); LOG.trace("Exception occured while processing OF Java response for XID {}.", requestContext.getXid().getValue(), throwable); requestContext.getFuture().set( RpcResultBuilder.failed() diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java index 916b546a4e..1989511600 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java @@ -13,15 +13,14 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; -import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack; import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor; @@ -62,34 +61,6 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService super(requestContextStack, deviceContext); } - ListenableFuture> handleServiceCall(final BigInteger connectionID, - final FlowModInputBuilder flowModInputBuilder, final Function, ListenableFuture>> function) { - LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); - - final RequestContext requestContext = requestContextStack.createRequestContext(); - final SettableFuture> result = requestContextStack.storeOrFail(requestContext); - - if (!result.isDone()) { - - final DataCrate dataCrate = DataCrateBuilder.builder().setiDConnection(connectionID) - .setRequestContext(requestContext).setFlowModInputBuilder(flowModInputBuilder).build(); - - requestContext.setXid(deviceContext.getNextXid()); - - LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue()); - deviceContext.hookRequestCtx(requestContext.getXid(), requestContext); - - final ListenableFuture> resultFromOFLib = function.apply(dataCrate); - - final OFJResult2RequestCtxFuture OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext); - OFJResult2RequestCtxFuture.processResultFromOfJava(resultFromOFLib); - - } else { - RequestContextUtil.closeRequstContext(requestContext); - } - return result; - } - @Override public Future> addFlow(final AddFlowInput input) { final FlowId flowId; @@ -105,13 +76,17 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService deviceContext.getDeviceFlowRegistry().store(flowHash, flowDescriptor); final List ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, datapathId); - final ListenableFuture future = processFlowModInputBuilders(ofFlowModInputs); + final ListenableFuture> future = processFlowModInputBuilders(ofFlowModInputs); - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(final Object o) { + public void onSuccess(final RpcResult rpcResult) { messageSpy.spyMessage(input, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS); - LOG.debug("flow add finished without error, id={}", flowId.getValue()); + if (rpcResult.isSuccessful()) { + LOG.debug("flow add finished without error, id={}", flowId.getValue()); + } else { + LOG.debug("flow add failed with error, id={}", flowId.getValue()); + } } @Override @@ -217,55 +192,93 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService private ListenableFuture> processFlowModInputBuilders( final List ofFlowModInputs) { final List>> partialFutures = new ArrayList<>(); + for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) { - ListenableFuture> partialFuture = handleServiceCall(PRIMARY_CONNECTION, flowModInputBuilder, + DataCrateBuilder dataCrateBuilder = DataCrateBuilder.builder().setFlowModInputBuilder(flowModInputBuilder); + ListenableFuture> partialFuture = handleServiceCall( + PRIMARY_CONNECTION, new Function, ListenableFuture>>() { @Override public ListenableFuture> apply(final DataCrate data) { return createResultForFlowMod(data); } - }); + }, + dataCrateBuilder + ); partialFutures.add(partialFuture); } - final ListenableFuture>> allFutures = Futures.allAsList(partialFutures); + // processing of final (optionally composite future) + final ListenableFuture>> allFutures = Futures.successfulAsList(partialFutures); final SettableFuture> finalFuture = SettableFuture.create(); Futures.addCallback(allFutures, new FutureCallback>>() { @Override public void onSuccess(List> results) { - Iterator flowModInputBldIterator = ofFlowModInputs.iterator(); List rpcErrorLot = new ArrayList<>(); - for (RpcResult rpcResult : results) { + RpcResultBuilder resultBuilder; + + Iterator flowModInputBldIterator = ofFlowModInputs.iterator(); + Iterator> resultIterator = results.iterator(); + + for (ListenableFuture> partFutureFromRqCtx : partialFutures) { FlowModInputBuilder flowModInputBld = flowModInputBldIterator.next(); - if (rpcResult.isSuccessful()) { - Long xid = flowModInputBld.getXid(); - LOG.warn("Positive confirmation of flow push is not supported by OF-spec"); - LOG.warn("flow future result was successful [{}] = this should have never happen", - xid); - rpcErrorLot.add(RpcResultBuilder.newError(ErrorType.APPLICATION, "", - "flow future result was successful ["+xid+"] = this should have never happen")); - } else { - rpcErrorLot.addAll(rpcResult.getErrors()); + RpcResult result = resultIterator.next(); + Long xid = flowModInputBld.getXid(); + + + LOG.trace("flowMod future processing [{}], result={}", xid, result); + if (partFutureFromRqCtx.isCancelled()) { // one and only positive case + if (LOG.isTraceEnabled()) { + LOG.trace("flow future result was cancelled [{}] = barrier passed it without error", xid); + } + } else { // all negative cases + if (result == null) { // there is exception or null value set + try { + partFutureFromRqCtx.get(); + } catch (Exception e) { + rpcErrorLot.add(RpcResultBuilder.newError(ErrorType.APPLICATION, "", + "flow future result [" + xid + "] failed with exception", + OFConstants.APPLICATION_TAG, e.getMessage(), e)); + + // xid might be not available in case requestContext not even stored + if (xid != null) { + deviceContext.unhookRequestCtx(new Xid(xid)); + } + } + } else { + if (result.isSuccessful()) { // positive confirmation - never happens + LOG.warn("Positive confirmation of flow push is not supported by OF-spec"); + LOG.warn("flow future result was successful [{}] = this should have never happen", + xid); + rpcErrorLot.add(RpcResultBuilder.newError(ErrorType.APPLICATION, "", + "flow future result was successful [" + xid + "] = this should have never happen")); + } else { // standard error occurred + LOG.trace("passing original rpcErrors [{}]", xid); + if (LOG.isTraceEnabled()) { + for (RpcError rpcError : result.getErrors()) { + LOG.trace("passed rpcError [{}]: {}", xid, rpcError); + } + } + rpcErrorLot.addAll(result.getErrors()); + } + } } } - finalFuture.set(RpcResultBuilder.failed().withRpcErrors(rpcErrorLot).build()); + + if (rpcErrorLot.isEmpty()) { + resultBuilder = RpcResultBuilder.success(); + } else { + resultBuilder = RpcResultBuilder.failed().withRpcErrors(rpcErrorLot); + } + + finalFuture.set(resultBuilder.build()); } @Override public void onFailure(Throwable t) { LOG.trace("Flow mods chained future failed."); - RpcResultBuilder resultBuilder; - if (allFutures.isCancelled()) { - if (LOG.isTraceEnabled()) { - for (FlowModInputBuilder ofFlowModInput : ofFlowModInputs) { - LOG.trace("flow future result was cancelled [{}] = barrier passed it without error", - ofFlowModInput.getXid()); - } - } - resultBuilder = RpcResultBuilder.success(); - } else { - resultBuilder = RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "", t.getMessage()); - } + RpcResultBuilder resultBuilder = RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "", t.getMessage()); finalFuture.set(resultBuilder.build()); } }); @@ -285,49 +298,6 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService flowModInput); final ListenableFuture> result = JdkFutureAdapters.listenInPoolThread(flowModResult); - final RequestContext requestContext = data.getRequestContext(); - - Futures.addCallback(result, new FutureCallback>() { - @Override - public void onSuccess(final RpcResult voidRpcResult) { - if (!voidRpcResult.isSuccessful()) { - // remove current request from request cache in deviceContext - messageSpy.spyMessage(flowModInput, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); - deviceContext.unhookRequestCtx(requestContext.getXid()); - // handle requestContext failure - StringBuilder rpcErrors = new StringBuilder(); - if (null != voidRpcResult.getErrors() && voidRpcResult.getErrors().size() > 0) { - for (RpcError error : voidRpcResult.getErrors()) { - rpcErrors.append(error.getMessage()); - } - } - LOG.trace("OF Java result for XID {} was not successful. Errors : {}", requestContext.getXid().getValue(), rpcErrors.toString()); - requestContext.getFuture().set( - RpcResultBuilder.failed().withRpcErrors(voidRpcResult.getErrors()).build()); - RequestContextUtil.closeRequstContext(requestContext); - } - - } - - @Override - public void onFailure(final Throwable throwable) { - if (result.isCancelled()) { - messageSpy.spyMessage(flowModInput, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); - LOG.trace("Asymmetric message - no response from OF Java expected for XID {}. Closing as successful.", requestContext.getXid().getValue()); - requestContext.getFuture().set(RpcResultBuilder.success().build()); - } else { - messageSpy.spyMessage(flowModInput, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); - LOG.trace("Exception occured while processing OF Java response for XID {}.", requestContext.getXid().getValue(), throwable); - requestContext.getFuture().set( - RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "", "Flow translation to OF JAVA failed.") - .build()); - } - - RequestContextUtil.closeRequstContext(requestContext); - - } - }); return result; } -- 2.36.6