X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fservices%2FCommonService.java;h=eff804bc09e2dfad4cdfa244e5e2cee8198adfc0;hb=e4faef067c5ed399ccdd686d26c09a0b7a6cf4b7;hp=11499c90e5d483d4f64601e095dc9ac22cfa9165;hpb=6edfe9da7f62af3a454b505e3cc103290041bd55;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java index 11499c90e5..eff804bc09 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java @@ -8,53 +8,78 @@ package org.opendaylight.openflowplugin.impl.services; import com.google.common.base.Function; -import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; -import org.opendaylight.yangtools.yang.binding.DataObject; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.math.BigInteger; -import java.util.concurrent.Future; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; -import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack; +import org.opendaylight.openflowplugin.api.openflow.device.Xid; +import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; -abstract class CommonService { +public abstract class CommonService { private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CommonService.class); private static final long WAIT_TIME = 2000; - protected final static Future> ERROR_RPC_RESULT = Futures.immediateFuture(RpcResultBuilder - . failed().withError(ErrorType.APPLICATION, "", "Request quota exceeded.").build()); - protected static final BigInteger PRIMARY_CONNECTION = new BigInteger("0"); - - // protected OFRpcTaskContext rpcTaskContext; - protected short version; - protected BigInteger datapathId; - protected RpcContext rpcContext; - protected DeviceContext deviceContext; - private ConnectionAdapter primaryConnectionAdapter; - - CommonService() { - } + private static final BigInteger PRIMARY_CONNECTION = BigInteger.ZERO; + + private final short version; + private final BigInteger datapathId; + private final RequestContextStack requestContextStack; + private final DeviceContext deviceContext; + private final ConnectionAdapter primaryConnectionAdapter; + private final MessageSpy messageSpy; - public CommonService(final RpcContext rpcContext) { - this.rpcContext = rpcContext; - this.deviceContext = rpcContext.getDeviceContext(); + public CommonService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) { + this.requestContextStack = requestContextStack; + this.deviceContext = deviceContext; final FeaturesReply features = this.deviceContext.getPrimaryConnectionContext().getFeatures(); this.datapathId = features.getDatapathId(); this.version = features.getVersion(); this.primaryConnectionAdapter = deviceContext.getPrimaryConnectionContext().getConnectionAdapter(); + this.messageSpy = deviceContext.getMessageSpy(); + } + + public static BigInteger getPrimaryConnection() { + return PRIMARY_CONNECTION; + } + + public short getVersion() { + return version; + } + + public BigInteger getDatapathId() { + return datapathId; + } + + public RequestContextStack getRequestContextStack() { + return requestContextStack; + } + + public DeviceContext getDeviceContext() { + return deviceContext; + } + + public ConnectionAdapter getPrimaryConnectionAdapter() { + return primaryConnectionAdapter; + } + + public MessageSpy getMessageSpy() { + return messageSpy; } protected long provideWaitTime() { return WAIT_TIME; } + protected ConnectionAdapter provideConnectionAdapter(final BigInteger connectionID) { if (connectionID == null) { return primaryConnectionAdapter; @@ -63,10 +88,8 @@ abstract class CommonService { return primaryConnectionAdapter; } - // TODO uncomment when getAuxiali.... will be merged to APIs - // final ConnectionContext auxiliaryConnectionContext = - // deviceContext.getAuxiliaryConnectionContext(connectionID); - final ConnectionContext auxiliaryConnectionContext = null; + final ConnectionContext auxiliaryConnectionContext = + deviceContext.getAuxiliaryConnectiobContexts(connectionID); if (auxiliaryConnectionContext != null) { return auxiliaryConnectionContext.getConnectionAdapter(); } @@ -74,24 +97,74 @@ abstract class CommonService { return primaryConnectionAdapter; } - Future> handleServiceCall(final BigInteger connectionID, - final Function, Future>> function) { - LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); + /** + * @param connectionID connection identifier + * @param function data sender + * @param rpc result backend type + * @param final rpc backend type + * @return + */ + public ListenableFuture> handleServiceCall(final BigInteger connectionID, + final Function, ListenableFuture>> function) { + DataCrateBuilder dataCrateBuilder = DataCrateBuilder.builder(); + return handleServiceCall(function, dataCrateBuilder); + } + + public ListenableFuture> handleServiceCall(final Function, ListenableFuture>> function) { + DataCrateBuilder dataCrateBuilder = DataCrateBuilder.builder(); + return handleServiceCall(function, dataCrateBuilder); + } - final RequestContext requestContext = rpcContext.createRequestContext(); - final SettableFuture> result = rpcContext.storeOrFail(requestContext); - final DataCrate dataCrate = DataCrateBuilder. builder().setiDConnection(connectionID) - .setRequestContext(requestContext).build(); - if (!result.isDone()) { - final Future> resultFromOFLib = function.apply(dataCrate); + /** + * @param + * @param + * @param function + * @param dataCrateBuilder predefined data + * @return + */ + public final ListenableFuture> handleServiceCall(final Function, ListenableFuture>> function, + final DataCrateBuilder dataCrateBuilder) { - final RpcResultConvertor rpcResultConvertor = new RpcResultConvertor<>(requestContext, deviceContext); - rpcResultConvertor.processResultFromOfJava(resultFromOFLib); + LOG.trace("Handling general service call"); + final RequestContext requestContext = createRequestContext(); + if (requestContext == null) { + LOG.trace("Request context refused."); + deviceContext.getMessageSpy().spyMessage(null, MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED); + return failedFuture(); + } - } else { - RequestContextUtil.closeRequstContext(requestContext); + Long reservedXid = deviceContext.getReservedXid(); + if (null == reservedXid) { + //retry + reservedXid = deviceContext.getReservedXid(); + if (null == reservedXid) { + deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_RESERVATION_REJECTED); + return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID."); + } } - return result; + final Xid xid = new Xid(reservedXid); + requestContext.setXid(xid); + DataCrate dataCrate = dataCrateBuilder.setRequestContext(requestContext) + .build(); + final ListenableFuture> resultFromOFLib; + + LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue()); + deviceContext.hookRequestCtx(xid, requestContext); + + messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT); + function.apply(dataCrate); + + return requestContext.getFuture(); + } + protected final RequestContext createRequestContext() { + return requestContextStack.createRequestContext(); + } + + protected static ListenableFuture> failedFuture() { + final RpcResult rpcResult = RpcResultBuilder.failed() + .withError(RpcError.ErrorType.APPLICATION, "", "Request quota exceeded").build(); + return Futures.immediateFuture(rpcResult); + } }