Cleanup RequestContextStack 76/20576/3
authorRobert Varga <rovarga@cisco.com>
Fri, 15 May 2015 21:24:50 +0000 (23:24 +0200)
committerRobert Varga <rovarga@cisco.com>
Sat, 16 May 2015 01:08:51 +0000 (03:08 +0200)
Instead of exposing the limits as a tweakable API, we should specify the
limit at instantiation as an implementation-specific thing.

We document that RequestContextStack.createRequestContext() can return
null and let callers deal with that.

This change allows us to implement RpcContext throttling using a simple
counting Semaphore, which is more scalable and efficient than an
explicit queue. To do that, we turn RequestContextImpl into AbstractRequestContext,
which is subclassed as needed by the various RequestContextStack
implementations.

Change-Id: Ibbeafad6cd7f740284264cca22412e11696cede8
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/RequestContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/RequestContextStack.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/rpc/RpcContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/AbstractRequestContext.java [moved from openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RequestContextImpl.java with 68% similarity]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/FlowCapableTransactionServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/NodeConfigServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/RequestContextUtil.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalEchoServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/api/openflow/device/RpcContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/api/openflow/device/RpcManagerImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/rpc/AbstractRequestContextTest.java [moved from openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/rpc/RequestContextImplTest.java with 50% similarity]

index d21a46fa04da1e95263260107c6cd76772b97f9d..7ee42252470c7011d51999540ef359cacf928062 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.openflowplugin.api.openflow.device;
 
 
+
 /**
  * Request context handles all requests on device. Number of requests is limited by request quota. When this quota is
  * exceeded all rpc's will end up with exception.
@@ -41,4 +42,6 @@ public interface RequestContext<T> extends RequestFutureContext<T>, AutoCloseabl
      */
     void setWaitTimeout(long waitTimeout);
 
+    @Override
+    void close();
 }
index 7cde91601cc084d2f99f703cd416160ef8c84a17..00a0a586a7bc19a34d7e919cf79333156b4a2f32 100644 (file)
@@ -5,33 +5,18 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.api.openflow.device;
 
-import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.yangtools.yang.common.RpcResult;
+import javax.annotation.Nullable;
 
 /**
  * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 1.4.2015.
  */
 public interface RequestContextStack {
-
-    <T> void forgetRequestContext(RequestContext<T> requestContext);
-
-    /**
-     * Method adds request to request queue which has limited quota. After number of requests exceeds quota limit future
-     * will be done immediately and will contain information about exceeded request quota.
-     *
-     * @param data
-     */
-    <T> SettableFuture<RpcResult<T>> storeOrFail(RequestContext<T> data);
-
     /**
      * Method returns new request context for current request.
      *
-     * @return
+     * @return A request context, or null if one cannot be created.
      */
-    <T> RequestContext<T> createRequestContext();
-
+    @Nullable <T> RequestContext<T> createRequestContext();
 }
index 94ce2b29090bd51db2fc9421e2c0a5d60d8cffff..cb14a136691a7a31826f50df9b3e25f3bdcae3cb 100644 (file)
@@ -19,16 +19,5 @@ import org.opendaylight.yangtools.yang.binding.RpcService;
  * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 25.2.2015.
  */
 public interface RpcContext extends RequestContextStack, AutoCloseable, DeviceDisconnectedHandler {
-
     <S extends RpcService> void registerRpcServiceImplementation(Class<S> serviceClass, S serviceInstance);
-
-
-    /**
-     * Method for setting request quota value. When the Request Context quota is exceeded, incoming RPCs fail
-     * immediately, with a well-defined error.
-     *
-     * @param maxRequestsPerDevice
-     */
-    void setRequestContextQuota(int maxRequestsPerDevice);
-
 }
index 0560e945d5034c25fe9ad27e5103ae9e9b316f20..c02764b2f761b30f049b8dde93e5d1ad4589d4ea 100644 (file)
@@ -11,7 +11,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.HashedWheelTimer;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -52,7 +51,7 @@ import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil
 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
 import org.opendaylight.openflowplugin.impl.connection.ThrottledNotificationsOffererImpl;
 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
-import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
+import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
@@ -136,19 +135,14 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         this.messageIntelligenceAgency = messageIntelligenceAgency;
 
         emptyRequestContextStack = new RequestContextStack() {
-            @Override
-            public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
-                //NOOP
-            }
-
-            @Override
-            public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> data) {
-                return data.getFuture();
-            }
-
             @Override
             public <T> RequestContext<T> createRequestContext() {
-                return new RequestContextImpl<>(this);
+                return new AbstractRequestContext<T>() {
+                    @Override
+                    public void close() {
+                        //NOOP
+                    }
+                };
             }
         };
     }
similarity index 68%
rename from openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RequestContextImpl.java
rename to openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/AbstractRequestContext.java
index 5af5880f10fe6e77867516a4235af977abde6872..4208ca9f3d30786d5a864b65a41ea47cc75ab6c9 100644 (file)
@@ -9,27 +9,16 @@ package org.opendaylight.openflowplugin.impl.rpc;
 
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
-/**
- * @author joe
- */
-public class RequestContextImpl<T> implements RequestContext<T> {
-
-    private final RequestContextStack requestContextStack;
+public abstract class AbstractRequestContext<T> implements RequestContext<T> {
     private SettableFuture<RpcResult<T>> rpcResultFuture;
     private long waitTimeout;
     private Xid xid;
 
-    public RequestContextImpl(RequestContextStack requestContextStack) {
-        this.requestContextStack = requestContextStack;
-    }
+    protected AbstractRequestContext() {
 
-    @Override
-    public void close() {
-        requestContextStack.forgetRequestContext(this);
     }
 
     @Override
@@ -46,7 +35,7 @@ public class RequestContextImpl<T> implements RequestContext<T> {
     }
 
     @Override
-    public void setXid(Xid xid) {
+    public void setXid(final Xid xid) {
         this.xid = xid;
     }
 
@@ -56,7 +45,7 @@ public class RequestContextImpl<T> implements RequestContext<T> {
     }
 
     @Override
-    public void setWaitTimeout(long waitTimeout) {
+    public void setWaitTimeout(final long waitTimeout) {
         this.waitTimeout = waitTimeout;
     }
 }
index 0c12637e54944f8067266ec83696e51320db8583..ab39f92f3acd2e1f9e65e2beea69832dd3e86899 100644 (file)
@@ -7,10 +7,9 @@
  */
 package org.opendaylight.openflowplugin.impl.rpc;
 
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collection;
 import java.util.HashSet;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.Semaphore;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
@@ -22,30 +21,24 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RpcContextImpl implements RpcContext {
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(RpcContextImpl.class);
-    private MessageSpy messagSpy;
-    final RpcProviderRegistry rpcProviderRegistry;
+    private static final Logger LOG = LoggerFactory.getLogger(RpcContextImpl.class);
+    private final RpcProviderRegistry rpcProviderRegistry;
+    private final MessageSpy messageSpy;
+    private final Semaphore tracker;
 
     // TODO: add private Sal salBroker
     private final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
     private final Collection<RoutedRpcRegistration<?>> rpcRegistrations = new HashSet<>();
 
-
-    @GuardedBy("requestsList")
-    private final Collection<RequestContext<?>> requestsList = new HashSet<RequestContext<?>>();
-
-    private int maxRequestsPerDevice;
-
-    public RpcContextImpl(final MessageSpy messagSpy, final RpcProviderRegistry rpcProviderRegistry, final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier) {
-        this.messagSpy = messagSpy;
+    public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier, final int maxRequests) {
+        this.messageSpy = messageSpy;
         this.rpcProviderRegistry = rpcProviderRegistry;
         this.nodeInstanceIdentifier = nodeInstanceIdentifier;
+        tracker = new Semaphore(maxRequests, true);
     }
 
     /**
@@ -61,64 +54,34 @@ public class RpcContextImpl implements RpcContext {
         LOG.debug("Registration of service {} for device {}.", serviceClass, nodeInstanceIdentifier);
     }
 
-    @Override
-    public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> requestContext) {
-        final SettableFuture<RpcResult<T>> rpcResultFuture = requestContext.getFuture();
-
-        final boolean success;
-        // FIXME: use a fixed-size collection, with lockless reserve/set queue
-        synchronized (requestsList) {
-            if (requestsList.size() < maxRequestsPerDevice) {
-                requestsList.add(requestContext);
-                success = true;
-            } else {
-                success = false;
-            }
-        }
-
-        if (!success) {
-            final RpcResult<T> rpcResult = RpcResultBuilder.<T>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "", "Device's request queue is full.").build();
-            rpcResultFuture.set(rpcResult);
-        }
-
-        return rpcResultFuture;
-    }
-
     /**
      * Unregisters all services.
      *
      * @see java.lang.AutoCloseable#close()
      */
     @Override
-    public void close() throws Exception {
+    public void close() {
         for (final RoutedRpcRegistration<?> rpcRegistration : rpcRegistrations) {
             rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
             rpcRegistration.close();
         }
     }
 
-    /**
-     * @see org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext#setRequestContextQuota(int)
-     */
-    @Override
-    public void setRequestContextQuota(final int maxRequestsPerDevice) {
-        this.maxRequestsPerDevice = maxRequestsPerDevice;
-    }
-
     @Override
-    public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
-        synchronized (requestsList) {
-            requestsList.remove(requestContext);
-            LOG.trace("Removed request context with xid {}. Context request in list {}.",
-                    requestContext.getXid().getValue(), requestsList.size());
-            messagSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED);
+    public <T> RequestContext<T> createRequestContext() {
+        if (!tracker.tryAcquire()) {
+            LOG.trace("Device queue {} at capacity", this);
+            return null;
         }
-    }
 
-    @Override
-    public <T> RequestContext<T> createRequestContext() {
-        return new RequestContextImpl<T>(this);
+        return new AbstractRequestContext<T>() {
+            @Override
+            public void close() {
+                tracker.release();
+                LOG.trace("Removed request context with xid {}", getXid().getValue());
+                messageSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED);
+            }
+        };
     }
 
     @Override
@@ -126,9 +89,5 @@ public class RpcContextImpl implements RpcContext {
         for (RoutedRpcRegistration<?> registration : rpcRegistrations) {
             registration.close();
         }
-
-        synchronized (requestsList) {
-            requestsList.clear();
-        }
     }
 }
index f8e7a2506e6bb676ed323a0e2ed284297300a287..a619261a1ab4f8448bf6811c0d10f6b8ce1d7bbb 100644 (file)
@@ -33,8 +33,7 @@ public class RpcManagerImpl implements RpcManager {
 
     @Override
     public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
-        final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext.getDeviceState().getNodeInstanceIdentifier());
-        rpcContext.setRequestContextQuota(maxRequestsQuota.intValue());
+        final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext.getDeviceState().getNodeInstanceIdentifier(), maxRequestsQuota.intValue());
         deviceContext.setDeviceDisconnectedHandler(rpcContext);
         MdSalRegistratorUtils.registerServices(rpcContext, deviceContext);
         // finish device initialization cycle back to DeviceManager
index 3fb28d3450b43dc4f1e9d47a22afda5f45cb9cfb..eff804bc09e2dfad4cdfa244e5e2cee8198adfc0 100644 (file)
@@ -10,9 +10,7 @@ package org.opendaylight.openflowplugin.impl.services;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.math.BigInteger;
-import java.util.concurrent.Future;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -21,7 +19,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -29,10 +27,6 @@ import org.slf4j.Logger;
 public abstract class CommonService {
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CommonService.class);
     private static final long WAIT_TIME = 2000;
-    private final static Future<RpcResult<Void>> ERROR_RPC_RESULT = Futures.immediateFuture(RpcResultBuilder
-            .<Void>failed().withError(ErrorType.APPLICATION, "", "Request quota exceeded.").build());
-
-
     private static final BigInteger PRIMARY_CONNECTION = BigInteger.ZERO;
 
     private final short version;
@@ -43,7 +37,7 @@ public abstract class CommonService {
     private final MessageSpy messageSpy;
 
 
-    public CommonService(final RequestContextStack requestContextStack, DeviceContext deviceContext) {
+    public CommonService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
         this.requestContextStack = requestContextStack;
         this.deviceContext = deviceContext;
         final FeaturesReply features = this.deviceContext.getPrimaryConnectionContext().getFeatures();
@@ -132,12 +126,11 @@ public abstract class CommonService {
                                                                          final DataCrateBuilder<T> dataCrateBuilder) {
 
         LOG.trace("Handling general service call");
-        final RequestContext<T> requestContext = requestContextStack.createRequestContext();
-        final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
-        if (result.isDone()) {
+        final RequestContext<T> requestContext = createRequestContext();
+        if (requestContext == null) {
             LOG.trace("Request context refused.");
-            deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
-            return result;
+            deviceContext.getMessageSpy().spyMessage(null, MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
+            return failedFuture();
         }
 
         Long reservedXid = deviceContext.getReservedXid();
@@ -145,9 +138,8 @@ public abstract class CommonService {
             //retry
             reservedXid = deviceContext.getReservedXid();
             if (null == reservedXid) {
-                RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
                 deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_RESERVATION_REJECTED);
-                return result;
+                return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
             }
         }
         final Xid xid = new Xid(reservedXid);
@@ -162,8 +154,17 @@ public abstract class CommonService {
         messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT);
         function.apply(dataCrate);
 
-        return result;
+        return requestContext.getFuture();
 
     }
 
+    protected final <T> RequestContext<T> createRequestContext() {
+        return requestContextStack.createRequestContext();
+    }
+
+    protected static <T> ListenableFuture<RpcResult<T>> failedFuture() {
+        final RpcResult<T> rpcResult = RpcResultBuilder.<T>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "", "Request quota exceeded").build();
+        return Futures.immediateFuture(rpcResult);
+    }
 }
index 4fc6821f182b04cef874d50bb3a78f9bb980ff51..856c3ad597495a309935fa21fc0412ece3445a69 100644 (file)
@@ -7,11 +7,9 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
@@ -24,7 +22,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -38,74 +35,51 @@ public class FlowCapableTransactionServiceImpl extends CommonService implements
     }
 
     @Override
-    public Future<RpcResult<Void>> sendBarrier(SendBarrierInput input) {
+    public Future<RpcResult<Void>> sendBarrier(final SendBarrierInput input) {
         final RequestContext<Void> requestContext = getRequestContextStack().createRequestContext();
-        final SettableFuture<RpcResult<Void>> sendBarrierOutput = getRequestContextStack()
-                .storeOrFail(requestContext);
-        if (!sendBarrierOutput.isDone()) {
-            final DeviceContext deviceContext = getDeviceContext();
-            final Long reservedXid = deviceContext.getReservedXid();
-            if (null == reservedXid){
-                RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
-                return sendBarrierOutput;
-            }
-            final Xid xid = new Xid(reservedXid);
-            requestContext.setXid(xid);
-
-            final BarrierInputBuilder barrierInputOFJavaBuilder = new BarrierInputBuilder();
-            barrierInputOFJavaBuilder.setVersion(getVersion());
-            barrierInputOFJavaBuilder.setXid(xid.getValue());
+        if (requestContext == null) {
+            getMessageSpy().spyMessage(null, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+            return failedFuture();
+        }
 
-            LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
-            deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+        final DeviceContext deviceContext = getDeviceContext();
+        final Long reservedXid = deviceContext.getReservedXid();
+        if (null == reservedXid) {
+            return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+        }
+        final Xid xid = new Xid(reservedXid);
+        requestContext.setXid(xid);
 
-            final BarrierInput barrierInputOFJava = barrierInputOFJavaBuilder.build();
+        final BarrierInputBuilder barrierInputOFJavaBuilder = new BarrierInputBuilder();
+        barrierInputOFJavaBuilder.setVersion(getVersion());
+        barrierInputOFJavaBuilder.setXid(xid.getValue());
 
-            final Future<RpcResult<BarrierOutput>> barrierOutputOFJava = getPrimaryConnectionAdapter()
-                    .barrier(barrierInputOFJava);
-            LOG.debug("Barrier with xid {} was sent from controller.", xid);
+        LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
+        deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
 
-            ListenableFuture<RpcResult<BarrierOutput>> listenableBarrierOutputOFJava = JdkFutureAdapters
-                    .listenInPoolThread(barrierOutputOFJava);
+        final BarrierInput barrierInputOFJava = barrierInputOFJavaBuilder.build();
 
-            // callback on OF JAVA future
-            SuccessCallback<BarrierOutput, Void> successCallback = new SuccessCallback<BarrierOutput, Void>(
-                    deviceContext, requestContext, listenableBarrierOutputOFJava) {
+        // FIXME: should be submitted through OutboundQueue
+        final Future<RpcResult<BarrierOutput>> barrierOutputOFJava = getPrimaryConnectionAdapter()
+                .barrier(barrierInputOFJava);
+        LOG.debug("Barrier with xid {} was sent from controller.", xid);
 
-                @Override
-                public RpcResult<Void> transform(RpcResult<BarrierOutput> rpcResult) {
-                    //no transformation, because output for request context is Void
-                    LOG.debug("Barrier reply with xid {} was obtained by controller.", rpcResult.getResult().getXid());
-                    return RpcResultBuilder.<Void>success().build();
-                }
-            };
-            Futures.addCallback(listenableBarrierOutputOFJava, successCallback);
-        } else {
-            getMessageSpy().spyMessage(requestContext, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
-        }
+        ListenableFuture<RpcResult<BarrierOutput>> listenableBarrierOutputOFJava = JdkFutureAdapters
+                .listenInPoolThread(barrierOutputOFJava);
 
-        //callback on request context future
-        Futures.addCallback(sendBarrierOutput, new FutureCallback<RpcResult<Void>>() {
+        // callback on OF JAVA future
+        SuccessCallback<BarrierOutput, Void> successCallback = new SuccessCallback<BarrierOutput, Void>(
+                deviceContext, requestContext, listenableBarrierOutputOFJava) {
 
             @Override
-            public void onSuccess(RpcResult<Void> result) {
+            public RpcResult<Void> transform(final RpcResult<BarrierOutput> rpcResult) {
+                //no transformation, because output for request context is Void
+                LOG.debug("Barrier reply with xid {} was obtained by controller.", rpcResult.getResult().getXid());
+                return RpcResultBuilder.<Void>success().build();
             }
+        };
+        Futures.addCallback(listenableBarrierOutputOFJava, successCallback);
 
-            @Override
-            public void onFailure(Throwable t) {
-                if (sendBarrierOutput.isCancelled()) {
-                    requestContext.getFuture().set(
-                            RpcResultBuilder.<Void>failed()
-                                    .withError(ErrorType.APPLICATION, "Barrier response wasn't obtained until barrier.")
-                                    .build());
-                    LOG.debug("Barrier reply with xid {} wasn't obtained by controller.", requestContext.getXid());
-
-                }
-            }
-        });
-
-        return sendBarrierOutput;
-
+        return requestContext.getFuture();
     }
-
 }
index 9d9a84e7b7311fbc02cf752118b373071c61d17f..c7adfc5fa00d756a3d919203e8e74027238b769e 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.openflowplugin.impl.services;
 
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
@@ -22,48 +21,41 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInputBuilder;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
-/**
- * @author joe
- */
 public class NodeConfigServiceImpl extends CommonService implements NodeConfigService {
 
-    private final RequestContextStack requestContextStack;
+    // FIXME: should be only in CommonService
     private final DeviceContext deviceContext;
 
     public NodeConfigServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
         super(requestContextStack, deviceContext);
-        this.requestContextStack = requestContextStack;
         this.deviceContext = deviceContext;
     }
 
-
     @Override
     public Future<RpcResult<SetConfigOutput>> setConfig(final SetConfigInput input) {
-        final RequestContext requestContext = requestContextStack.createRequestContext();
-        final SettableFuture<RpcResult<SetConfigOutput>> result = requestContextStack.storeOrFail(requestContext);
-        if (!result.isDone()) {
-            SetConfigInputBuilder builder = new SetConfigInputBuilder();
-            SwitchConfigFlag flag = SwitchConfigFlag.valueOf(input.getFlag());
-            final Long reserverXid = deviceContext.getReservedXid();
-            if (null == reserverXid){
-                RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
-                return result;
-            }
+        final RequestContext<SetConfigOutput> requestContext = createRequestContext();
+        if (requestContext == null) {
+            return failedFuture();
+        }
+
+        SetConfigInputBuilder builder = new SetConfigInputBuilder();
+        SwitchConfigFlag flag = SwitchConfigFlag.valueOf(input.getFlag());
+        final Long reserverXid = deviceContext.getReservedXid();
+        if (null == reserverXid) {
+            return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+        }
 
-            final Xid xid = new Xid(reserverXid);
-            builder.setXid(xid.getValue());
-            builder.setFlags(flag);
-            builder.setMissSendLen(input.getMissSearchLength());
-            builder.setVersion(getVersion());
-            ListenableFuture<RpcResult<Void>> futureResultFromOfLib;
-            synchronized (deviceContext) {
-                futureResultFromOfLib = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter().setConfig(builder.build()));
-            }
-            OFJResult2RequestCtxFuture<SetConfigOutput> OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext);
-            OFJResult2RequestCtxFuture.processResultFromOfJava(futureResultFromOfLib);
-        } else {
-            RequestContextUtil.closeRequstContext(requestContext);
+        final Xid xid = new Xid(reserverXid);
+        builder.setXid(xid.getValue());
+        builder.setFlags(flag);
+        builder.setMissSendLen(input.getMissSearchLength());
+        builder.setVersion(getVersion());
+        ListenableFuture<RpcResult<Void>> futureResultFromOfLib;
+        synchronized (deviceContext) {
+            futureResultFromOfLib = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter().setConfig(builder.build()));
         }
-        return result;
+        OFJResult2RequestCtxFuture<SetConfigOutput> OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext);
+        OFJResult2RequestCtxFuture.processResultFromOfJava(futureResultFromOfLib);
+        return requestContext.getFuture();
     }
 }
index 05d45d25b624512c066083d8fd09289d13b74ba6..fb9969e46832a3405ae470ed5b700faa9d6aecf9 100644 (file)
@@ -7,8 +7,10 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 
@@ -21,11 +23,11 @@ public final class RequestContextUtil {
     }
 
 
-    public static void closeRequestContextWithRpcError(final RequestContext<?> requestContext, String errorMessage) {
-
-        RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed().withRpcError(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "", errorMessage));
+    public static <T> SettableFuture<RpcResult<T>> closeRequestContextWithRpcError(final RequestContext<T> requestContext, final String errorMessage) {
+        RpcResultBuilder<T> rpcResultBuilder = RpcResultBuilder.<T>failed().withRpcError(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "", errorMessage));
         requestContext.getFuture().set(rpcResultBuilder.build());
         closeRequstContext(requestContext);
+        return requestContext.getFuture();
     }
 
     public static void closeRequstContext(final RequestContext<?> requestContext) {
index 616c40550ad0d4f407be6f01853683482b457446..c168b4e18a5ac4097ae869c47507f8ba31911f9f 100644 (file)
@@ -7,11 +7,9 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
@@ -26,7 +24,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.Send
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -43,77 +40,54 @@ public class SalEchoServiceImpl extends CommonService implements SalEchoService
     @Override
     public Future<RpcResult<SendEchoOutput>> sendEcho(final SendEchoInput sendEchoInput) {
         final RequestContext<SendEchoOutput> requestContext = getRequestContextStack().createRequestContext();
-        final SettableFuture<RpcResult<SendEchoOutput>> sendEchoOutput = getRequestContextStack()
-                .storeOrFail(requestContext);
-        if (!sendEchoOutput.isDone()) {
-            final DeviceContext deviceContext = getDeviceContext();
-            Long reserverXid = deviceContext.getReservedXid();
-            if (null == reserverXid) {
-                if (null == reserverXid) {
-                    reserverXid = deviceContext.getReservedXid();
-                    RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
-                    return sendEchoOutput;
-                }
-            }
-            final Xid xid = new Xid(reserverXid);
-            requestContext.setXid(xid);
-
-            LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
-            deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+        if (requestContext == null) {
+            getMessageSpy().spyMessage(null, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+            return failedFuture();
+        }
 
-            final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
-            echoInputOFJavaBuilder.setVersion(getVersion());
-            echoInputOFJavaBuilder.setXid(xid.getValue());
-            echoInputOFJavaBuilder.setData(sendEchoInput.getData());
-            final EchoInput echoInputOFJava = echoInputOFJavaBuilder.build();
 
-            final Future<RpcResult<EchoOutput>> rpcEchoOutputOFJava = getPrimaryConnectionAdapter()
-                    .echo(echoInputOFJava);
-            LOG.debug("Echo with xid {} was sent from controller", xid);
+        final DeviceContext deviceContext = getDeviceContext();
+        Long reserverXid = deviceContext.getReservedXid();
+        if (null == reserverXid) {
+            reserverXid = deviceContext.getReservedXid();
+            return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+        }
+        final Xid xid = new Xid(reserverXid);
+        requestContext.setXid(xid);
 
-            ListenableFuture<RpcResult<EchoOutput>> listenableRpcEchoOutputOFJava = JdkFutureAdapters
-                    .listenInPoolThread(rpcEchoOutputOFJava);
+        LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
+        deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
 
-            // callback on OF JAVA future
-            SuccessCallback<EchoOutput, SendEchoOutput> successCallback = new SuccessCallback<EchoOutput, SendEchoOutput>(
-                    deviceContext, requestContext, listenableRpcEchoOutputOFJava) {
+        final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
+        echoInputOFJavaBuilder.setVersion(getVersion());
+        echoInputOFJavaBuilder.setXid(xid.getValue());
+        echoInputOFJavaBuilder.setData(sendEchoInput.getData());
+        final EchoInput echoInputOFJava = echoInputOFJavaBuilder.build();
 
-                @Override
-                public RpcResult<SendEchoOutput> transform(RpcResult<EchoOutput> rpcResult) {
-                    EchoOutput echoOutputOFJava = rpcResult.getResult();
-                    SendEchoOutputBuilder sendEchoOutputBuilder = new SendEchoOutputBuilder();
-                    sendEchoOutputBuilder.setData(echoOutputOFJava.getData());
+        // FIXME: should be submitted via OutboundQueue
+        final Future<RpcResult<EchoOutput>> rpcEchoOutputOFJava = getPrimaryConnectionAdapter()
+                .echo(echoInputOFJava);
+        LOG.debug("Echo with xid {} was sent from controller", xid);
 
-                    LOG.debug("Echo with xid {} was received by controller.", rpcResult.getResult().getXid());
-                    return RpcResultBuilder.success(sendEchoOutputBuilder.build()).build();
-                }
-            };
-            Futures.addCallback(listenableRpcEchoOutputOFJava, successCallback);
-        } else {
-            getMessageSpy().spyMessage(requestContext, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
-        }
+        ListenableFuture<RpcResult<EchoOutput>> listenableRpcEchoOutputOFJava = JdkFutureAdapters
+                .listenInPoolThread(rpcEchoOutputOFJava);
 
-        // callback on request context future
-        Futures.addCallback(sendEchoOutput, new FutureCallback<RpcResult<SendEchoOutput>>() {
+        // callback on OF JAVA future
+        SuccessCallback<EchoOutput, SendEchoOutput> successCallback = new SuccessCallback<EchoOutput, SendEchoOutput>(
+                deviceContext, requestContext, listenableRpcEchoOutputOFJava) {
 
             @Override
-            public void onSuccess(RpcResult<SendEchoOutput> result) {
-            }
+            public RpcResult<SendEchoOutput> transform(final RpcResult<EchoOutput> rpcResult) {
+                EchoOutput echoOutputOFJava = rpcResult.getResult();
+                SendEchoOutputBuilder sendEchoOutputBuilder = new SendEchoOutputBuilder();
+                sendEchoOutputBuilder.setData(echoOutputOFJava.getData());
 
-            @Override
-            public void onFailure(Throwable t) {
-                if (sendEchoOutput.isCancelled()) {
-                    requestContext.getFuture().set(
-                            RpcResultBuilder.<SendEchoOutput>failed()
-                                    .withError(ErrorType.APPLICATION, "Echo response wasn't obtained until barrier.")
-                                    .build());
-                    LOG.debug("Echo reply with xid {} wasn't received by controller until barrier.",
-                            requestContext.getXid());
-                }
+                LOG.debug("Echo with xid {} was received by controller.", rpcResult.getResult().getXid());
+                return RpcResultBuilder.success(sendEchoOutputBuilder.build()).build();
             }
-        });
+        };
+        Futures.addCallback(listenableRpcEchoOutputOFJava, successCallback);
 
-        return sendEchoOutput;
+        return requestContext.getFuture();
     }
-
 }
index 7be9e11d1f1e955b925332720cc1ace3972c45c8..27ee72b5cd8e3d2ebc4b61f474db49c1a2e330fc 100644 (file)
@@ -21,11 +21,10 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
-import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
+import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,17 +34,15 @@ import org.slf4j.LoggerFactory;
 public class StatisticsContextImpl implements StatisticsContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
-    public static final String CONNECTION_CLOSED = "Connection closed.";
+    private static final String CONNECTION_CLOSED = "Connection closed.";
     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
     private final DeviceContext deviceContext;
 
-
     private final StatisticsGatheringService statisticsGatheringService;
 
     public StatisticsContextImpl(final DeviceContext deviceContext) {
         this.deviceContext = deviceContext;
         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
-
     }
 
     @Override
@@ -104,25 +101,21 @@ public class StatisticsContextImpl implements StatisticsContext {
         return resultingFuture;
     }
 
-    @Override
-    public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
-        requestContexts.remove(requestContext);
-    }
-
-    @Override
-    public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> data) {
-        requestContexts.add(data);
-        return data.getFuture();
-    }
-
     @Override
     public <T> RequestContext<T> createRequestContext() {
-        return new RequestContextImpl<>(this);
+        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>() {
+            @Override
+            public void close() {
+                requestContexts.remove(this);
+            }
+        };
+        requestContexts.add(ret);
+        return ret;
     }
 
     @Override
-    public void close() throws Exception {
-        for (final RequestContext requestContext : requestContexts) {
+    public void close() {
+        for (final RequestContext<?> requestContext : requestContexts) {
             RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
         }
     }
index 252a80d9d0324f9637990d68b892910fd70be0a2..cccb2137cfc2387b041271243c3fd33fd8021102 100644 (file)
@@ -7,13 +7,8 @@
  */
 package org.opendaylight.openflowplugin.api.openflow.device;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Iterator;
-import java.util.concurrent.Future;
+import static org.junit.Assert.assertNull;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -23,15 +18,12 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.impl.rpc.RpcContextImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 
 /**
  * @author joe
@@ -47,16 +39,12 @@ public class RpcContextImplTest {
     @Mock
     private MessageSpy messageSpy;
 
-    private RpcContext rpcContext;
-
-    private static final String QUEUE_IS_FULL = "Device's request queue is full.";
+    private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
 
     @Before
     public void setup() {
         NodeId nodeId = new NodeId("openflow:1");
-        KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
-
-        rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier);
+        nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
     }
 
     @Test
@@ -66,25 +54,16 @@ public class RpcContextImplTest {
 
     @Test
     public void testStoreOrFail() throws Exception {
+        final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 100);
         RequestContext requestContext = rpcContext.createRequestContext();
-        rpcContext.setRequestContextQuota(100);
-        Future<RpcResult<UpdateFlowOutput>> resultFuture = rpcContext.storeOrFail(requestContext);
-        assertNotNull(resultFuture);
-        assertFalse(resultFuture.isDone());
+        assertNotNull(requestContext);
+
     }
 
     @Test
     public void testStoreOrFailThatFails() throws Exception {
+        final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 0);
         RequestContext requestContext = rpcContext.createRequestContext();
-        rpcContext.setRequestContextQuota(0);
-        Future<RpcResult<UpdateFlowOutput>> resultFuture = rpcContext.storeOrFail(requestContext);
-        assertNotNull(resultFuture);
-        assertTrue(resultFuture.isDone());
-        RpcResult<UpdateFlowOutput> updateFlowOutputRpcResult = resultFuture.get();
-        assertNotNull(updateFlowOutputRpcResult);
-        assertEquals(1, updateFlowOutputRpcResult.getErrors().size());
-        Iterator<RpcError> iterator = updateFlowOutputRpcResult.getErrors().iterator();
-        assertEquals(QUEUE_IS_FULL, iterator.next().getMessage());
+        assertNull(requestContext);
     }
-
 }
index 3cd4f4fb0d4cede560bd51f13ddd2da32ede4a12..3fea20faaf7bb15825b23829c8928cd0e7722bdc 100644 (file)
@@ -96,9 +96,8 @@ public class RpcManagerImplTest {
         // TODO: how to invoke service remotely?
         NodeId nodeId = new NodeId("openflow:1");
         KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
-        final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, nodeInstanceIdentifier);
+        final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, nodeInstanceIdentifier, capacity);
         when(mockedProviderContext.getRpcService(SalFlowService.class)).thenReturn(new SalFlowServiceImpl(rpcContext, mockedDeviceContext));
-        rpcContext.setRequestContextQuota(capacity);
 
         final SalFlowService salFlowService = mockedProviderContext.getRpcService(SalFlowService.class);
         final Future<RpcResult<AddFlowOutput>> addedFlow = salFlowService.addFlow(prepareTestingAddFlow());
similarity index 50%
rename from openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/rpc/RequestContextImplTest.java
rename to openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/rpc/AbstractRequestContextTest.java
index ea2c4eb9e3c65df689711ff2a06fb6ac85f00218..a791db22fa8e9eb83d3a34f5077988d68870c2e1 100644 (file)
@@ -9,41 +9,29 @@
 package org.opendaylight.openflowplugin.impl.rpc;
 
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.Future;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 
 @RunWith(MockitoJUnitRunner.class)
-public class RequestContextImplTest {
-
-    @Mock
-    RpcContext rpcContext;
-
-    RequestContext requestContext;
+public class AbstractRequestContextTest {
+    private AbstractRequestContext<Object> requestContext;
 
     @Before
     public void setup() {
-        requestContext = new RequestContextImpl<>(rpcContext);
+        requestContext = new AbstractRequestContext<Object>() {
+            @Override
+            public void close() {
+                // No-op
+            }
+        };
     }
 
     @Test
     public void testCreateRequestFuture() throws Exception {
-        SettableFuture future = requestContext.getFuture();
+        Future<?> future = requestContext.getFuture();
         assertNotNull(future);
     }
-
-    @Test
-    public void testClose() throws Exception {
-        requestContext.close();
-        verify(rpcContext).forgetRequestContext(Matchers.any(RequestContext.class));
-    }
-
 }
\ No newline at end of file