Make sure RequestContext has a constant XID 77/20577/8
authorRobert Varga <rovarga@cisco.com>
Fri, 15 May 2015 22:41:23 +0000 (00:41 +0200)
committerRobert Varga <rovarga@cisco.com>
Sat, 16 May 2015 10:00:03 +0000 (12:00 +0200)
All callers end up talking to the underlying device to acquire an XID.
Since all RequestContextStack implementations already have (potential)
access to a DeviceContext, there is no need to do this.

Thus make sure RequestContext has an XID allocated by the underlying
device, which unifies code and makes callers simpler. If we fail to
allocate one, the caller will see that XID as null.

We also take this opportunity to make Xid take a Long instead of a
primitive type, as that's what we need for DataObjects and that's what
we get from the Device anyway -- thus preventing autoboxing operations.

Change-Id: I7f553a9a1283d1c82ba7e287d0f21cf4147779e2
Signed-off-by: Robert Varga <rovarga@cisco.com>
23 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/RequestContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/RequestFutureContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/Xid.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/callback/BaseCallback.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/callback/SuccessCallback.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceStateImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/AbstractRequestContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/FlowCapableTransactionServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/RequestContextUtil.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalEchoServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/api/openflow/device/RpcContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/api/openflow/device/RpcManagerImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/listener/MultiMsgCollectorImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/rpc/AbstractRequestContextTest.java

index 3d9f3bd917ef5658b914ae18e3609644840f52e4..63a5951696964003b99f357cb44ce1f4e5e2ba99 100644 (file)
@@ -193,7 +193,7 @@ public interface DeviceContext extends AutoCloseable,
 
     void commitOperationsGatheredInOneTransaction();
 
-    public MultiMsgCollector getMultiMsgCollector();
+    MultiMsgCollector getMultiMsgCollector();
 
     Long getReservedXid();
 
index 7ee42252470c7011d51999540ef359cacf928062..0a8f44f8b5b7a736859acce677ee8ae5c2f0553c 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.openflowplugin.api.openflow.device;
 
-
+import javax.annotation.Nullable;
 
 /**
  * Request context handles all requests on device. Number of requests is limited by request quota. When this quota is
@@ -16,18 +16,15 @@ package org.opendaylight.openflowplugin.api.openflow.device;
  * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 25.2.2015.
  */
 public interface RequestContext<T> extends RequestFutureContext<T>, AutoCloseable {
-
     /**
-     * Returns xid generated for this request.
+     * Returns XID generated for this request.
      *
-     * @return
+     * @return Allocated XID, or null if the device has disconnected.
      */
-    Xid getXid();
+    @Nullable Xid getXid();
 
-    /**
-     * Sets xid generated for this request.
-     */
-    void setXid(Xid xid);
+    @Override
+    void close();
 
     /**
      * Returns request timeout value.
@@ -36,12 +33,8 @@ public interface RequestContext<T> extends RequestFutureContext<T>, AutoCloseabl
      */
     long getWaitTimeout();
 
-
     /**
      * Sets request timeout value.
      */
     void setWaitTimeout(long waitTimeout);
-
-    @Override
-    void close();
 }
index 9cb09270d693001a182312cdd508809082aacbf9..6011465c49e0ff6be1c70684fdee5b541edbba62 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.openflowplugin.api.openflow.device;
 
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
 /**
@@ -16,11 +16,12 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
  * @param <T>
  */
 public interface RequestFutureContext<T> {
-
     /**
      * Method returns future to be used for handling device requests.
      *
      * @return
      */
-    SettableFuture<RpcResult<T>> getFuture();
+    ListenableFuture<RpcResult<T>> getFuture();
+
+    void setResult(RpcResult<T> result);
 }
index efd99b536ca1b712d21f08a8e4d63cf979bd2a10..7d7a5dc004f9c316a461a8c7cef77c5bc1ed89d5 100644 (file)
@@ -8,18 +8,43 @@
 
 package org.opendaylight.openflowplugin.api.openflow.device;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 26.2.2015.
  */
-public class Xid {
-
-    private long value;
+public final class Xid {
+    private final Long value;
 
-    public Xid(final long value) {
-        this.value = value;
+    public Xid(final Long value) {
+        this.value = Preconditions.checkNotNull(value);
     }
 
-    public long getValue() {
+    public Long getValue() {
         return value;
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + value.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof Xid)) {
+            return false;
+        }
+        return value.equals(((Xid) obj).value);
+    }
+
+    @Override
+    public String toString() {
+        return "Xid [value=" + value + "]";
+    }
 }
index 86b21e1213d36018538f298947d8269a81918a9f..4c9ba580d4c76e107eaee318461bae1823179aba 100644 (file)
@@ -60,10 +60,10 @@ public class BaseCallback<I, O> implements FutureCallback<RpcResult<I>> {
                     }
                 }
                 LOG.trace("OF Java result for XID {} was not successful. Errors : {}", getRequestContext().getXid().getValue(), rpcErrors.toString());
-            
+
             }
 
-            getRequestContext().getFuture().set(
+            getRequestContext().setResult(
                     RpcResultBuilder.<O>failed().withRpcErrors(fRpcResult.getErrors()).build());
             RequestContextUtil.closeRequstContext(getRequestContext());
         } else {
@@ -83,11 +83,11 @@ public class BaseCallback<I, O> implements FutureCallback<RpcResult<I>> {
             deviceContext.getMessageSpy().spyMessage(getRequestContext().getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS_NO_RESPONSE);
 
             LOG.trace("Asymmetric message - no response from OF Java expected for XID {}. Closing as successful.", getRequestContext().getXid().getValue());
-            getRequestContext().getFuture().set(RpcResultBuilder.<O>success().build());
+            getRequestContext().setResult(RpcResultBuilder.<O>success().build());
         } else {
             deviceContext.getMessageSpy().spyMessage(getRequestContext().getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_ERROR);
             LOG.trace("Exception occured while processing OF Java response for XID {}.", getRequestContext().getXid().getValue(), throwable);
-            getRequestContext().getFuture().set(
+            getRequestContext().setResult(
                     RpcResultBuilder.<O>failed()
                             .withError(RpcError.ErrorType.APPLICATION, "OF JAVA operation failed.", throwable)
                             .build());
index 3c66ab006dcc69530e41f41424097357712f2ed4..8f287855a14b2c4b9a6d7a288b18155f029ff7be 100644 (file)
@@ -14,15 +14,15 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 
 public abstract class SuccessCallback<I, O> extends BaseCallback<I, O> {
 
-    public SuccessCallback(DeviceContext deviceContext, RequestContext<O> requestContext,
-            ListenableFuture<RpcResult<I>> futureResultFromOfLib) {
+    public SuccessCallback(final DeviceContext deviceContext, final RequestContext<O> requestContext,
+            final ListenableFuture<RpcResult<I>> futureResultFromOfLib) {
         super(deviceContext, requestContext, futureResultFromOfLib);
     }
 
+    @Override
     protected void processSuccess(final RpcResult<I> rpcResult) {
-        getRequestContext().getFuture().set(transform(rpcResult));
+        getRequestContext().setResult(transform(rpcResult));
     }
 
-
     abstract public RpcResult<O> transform(RpcResult<I> rpcResult);
 }
index d37777e33cfd8ece1628148e7eb060af57346698..91a69e335e4cabb85c1d536b5b7d47fc3ed7a7a0 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.openflowplugin.impl.connection.listener;
 
-import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -18,6 +17,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
@@ -35,25 +35,25 @@ import org.slf4j.LoggerFactory;
  */
 public class SystemNotificationsListenerImpl implements SystemNotificationsListener {
 
-    private ConnectionContext connectionContext;
+    private final ConnectionContext connectionContext;
     HandshakeContext handshakeContext;
     private static final Logger LOG = LoggerFactory.getLogger(SystemNotificationsListenerImpl.class);
     @VisibleForTesting
     static final long MAX_ECHO_REPLY_TIMEOUT = 2000;
 
-    public SystemNotificationsListenerImpl(final ConnectionContext connectionContext, 
+    public SystemNotificationsListenerImpl(final ConnectionContext connectionContext,
             final HandshakeContext handshakeContext) {
         this.connectionContext = connectionContext;
         this.handshakeContext = handshakeContext;
     }
 
     @Override
-    public void onDisconnectEvent(DisconnectEvent notification) {
+    public void onDisconnectEvent(final DisconnectEvent notification) {
         disconnect();
     }
 
     @Override
-    public void onSwitchIdleEvent(SwitchIdleEvent notification) {
+    public void onSwitchIdleEvent(final SwitchIdleEvent notification) {
         new Thread(new Runnable() {
             @Override
             public void run() {
@@ -69,7 +69,7 @@ public class SystemNotificationsListenerImpl implements SystemNotificationsListe
                     connectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.TIMEOUTING);
                     EchoInputBuilder builder = new EchoInputBuilder();
                     builder.setVersion(features.getVersion());
-                    Xid xid = new Xid(0);
+                    Xid xid = new Xid(0L);
                     builder.setXid(xid.getValue());
 
                     Future<RpcResult<EchoOutput>> echoReplyFuture = connectionContext.getConnectionAdapter()
index a49f8e3c4a9d6c3640e0902ac3f68fbedb595962..c19041aaeaac196cc7bc9d718422218d3681999f 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import java.math.BigInteger;
@@ -30,7 +29,6 @@ import java.util.concurrent.ExecutionException;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.NotificationRejectedException;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -263,7 +261,6 @@ public class DeviceContextImpl implements DeviceContext {
     public void processReply(final OfHeader ofHeader) {
         final RequestContext requestContext = requests.remove(ofHeader.getXid());
         if (null != requestContext) {
-            final SettableFuture replyFuture = requestContext.getFuture();
             RpcResult<OfHeader> rpcResult;
             if (ofHeader instanceof Error) {
                 //TODO : this is the point, where we can discover that add flow operation failed and where we should
@@ -283,7 +280,7 @@ public class DeviceContextImpl implements DeviceContext {
                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
             }
 
-            replyFuture.set(rpcResult);
+            requestContext.setResult(rpcResult);
             try {
                 requestContext.close();
             } catch (final Exception e) {
@@ -303,12 +300,11 @@ public class DeviceContextImpl implements DeviceContext {
             requestContext = requests.remove(xid.getValue());
         }
         if (null != requestContext) {
-            final SettableFuture replyFuture = requestContext.getFuture();
             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
                     .<List<MultipartReply>>success()
                     .withResult(ofHeaderList)
                     .build();
-            replyFuture.set(rpcResult);
+            requestContext.setResult(rpcResult);
             for (final MultipartReply multipartReply : ofHeaderList) {
                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
             }
@@ -334,12 +330,11 @@ public class DeviceContextImpl implements DeviceContext {
         final RequestContext requestContext = requests.remove(xid.getValue());
 
         if (null != requestContext) {
-            final SettableFuture replyFuture = requestContext.getFuture();
             final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
                     .<List<OfHeader>>failed()
                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
                     .build();
-            replyFuture.set(rpcResult);
+            requestContext.setResult(rpcResult);
             messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
             try {
                 requestContext.close();
index c02764b2f761b30f049b8dde93e5d1ad4589d4ea..6a4de474a224d9b466e1f22d4aa0533bf1999dc4 100644 (file)
@@ -133,18 +133,6 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         tx.submit();
 
         this.messageIntelligenceAgency = messageIntelligenceAgency;
-
-        emptyRequestContextStack = new RequestContextStack() {
-            @Override
-            public <T> RequestContext<T> createRequestContext() {
-                return new AbstractRequestContext<T>() {
-                    @Override
-                    public void close() {
-                        //NOOP
-                    }
-                };
-            }
-        };
     }
 
     @Override
@@ -180,6 +168,18 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         deviceContext.setTranslatorLibrary(translatorLibrary);
         deviceContext.addDeviceContextClosedHandler(this);
 
+        emptyRequestContextStack = new RequestContextStack() {
+            @Override
+            public <T> RequestContext<T> createRequestContext() {
+                return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+                    @Override
+                    public void close() {
+                        //NOOP
+                    }
+                };
+            }
+        };
+
         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
                 connectionContext.getConnectionAdapter(), deviceContext);
 
@@ -323,7 +323,6 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         final Xid xid = new Xid(reservedXid);
 
         final RequestContext<List<MultipartReply>> requestContext = emptyRequestContextStack.createRequestContext();
-        requestContext.setXid(xid);
 
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
         deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
@@ -506,6 +505,7 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
     @Override
     public void onDeviceContextClosed(final DeviceContext deviceContext) {
         deviceContexts.remove(deviceContext);
+        emptyRequestContextStack = null;
     }
 
     @Override
index e27235e179a4e68067e7199926a9203f29972b3a..bc9dafd75491d2cf9b41b891f2ff3e9bdcfbc28f 100644 (file)
@@ -52,6 +52,7 @@ class DeviceStateImpl implements DeviceState {
         Preconditions.checkArgument(featuresReply != null);
         featuresOutput = new GetFeaturesOutputBuilder(featuresReply).build();
         this.nodeId = Preconditions.checkNotNull(nodeId);
+        // FIXME: use builder, as we will be using this identifier often
         nodeII = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
         version = featuresReply.getVersion();
     }
@@ -92,7 +93,7 @@ class DeviceStateImpl implements DeviceState {
     }
 
     @Override
-    public void setMeterAvailable(boolean available) {
+    public void setMeterAvailable(final boolean available) {
         meterIsAvailable = available;
     }
 
@@ -102,7 +103,7 @@ class DeviceStateImpl implements DeviceState {
     }
 
     @Override
-    public void setGroupAvailable(boolean available) {
+    public void setGroupAvailable(final boolean available) {
         groupIsAvailable = available;
     }
 
index 4208ca9f3d30786d5a864b65a41ea47cc75ab6c9..1556967b6b0fabb7196208f381eaeaf08f8be830 100644 (file)
@@ -7,45 +7,43 @@
  */
 package org.opendaylight.openflowplugin.impl.rpc;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
 public abstract class AbstractRequestContext<T> implements RequestContext<T> {
-    private SettableFuture<RpcResult<T>> rpcResultFuture;
+    private final SettableFuture<RpcResult<T>> rpcResultFuture = SettableFuture.create();
+    private final Xid xid;
     private long waitTimeout;
-    private Xid xid;
-
-    protected AbstractRequestContext() {
 
+    protected AbstractRequestContext(final Long xid) {
+        this.xid = xid == null ? null : new Xid(xid);
     }
 
     @Override
-    public SettableFuture<RpcResult<T>> getFuture() {
-        if (null == rpcResultFuture) {
-            rpcResultFuture = SettableFuture.create();
-        }
+    public final ListenableFuture<RpcResult<T>> getFuture() {
         return rpcResultFuture;
     }
 
     @Override
-    public Xid getXid() {
-        return xid;
+    public final void setResult(final RpcResult<T> result) {
+        rpcResultFuture.set(result);
     }
 
     @Override
-    public void setXid(final Xid xid) {
-        this.xid = xid;
+    public final Xid getXid() {
+        return xid;
     }
 
     @Override
-    public long getWaitTimeout() {
+    public final long getWaitTimeout() {
         return waitTimeout;
     }
 
     @Override
-    public void setWaitTimeout(final long waitTimeout) {
+    public final void setWaitTimeout(final long waitTimeout) {
         this.waitTimeout = waitTimeout;
     }
 }
index ab39f92f3acd2e1f9e65e2beea69832dd3e86899..528fde155cbd7e900ff2fede791b4310561d995d 100644 (file)
@@ -7,19 +7,18 @@
  */
 package org.opendaylight.openflowplugin.impl.rpc;
 
+import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.Semaphore;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,17 +26,17 @@ import org.slf4j.LoggerFactory;
 public class RpcContextImpl implements RpcContext {
     private static final Logger LOG = LoggerFactory.getLogger(RpcContextImpl.class);
     private final RpcProviderRegistry rpcProviderRegistry;
+    private final DeviceContext deviceContext;
     private final MessageSpy messageSpy;
     private final Semaphore tracker;
 
     // TODO: add private Sal salBroker
-    private final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
     private final Collection<RoutedRpcRegistration<?>> rpcRegistrations = new HashSet<>();
 
-    public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier, final int maxRequests) {
+    public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext, final int maxRequests) {
         this.messageSpy = messageSpy;
         this.rpcProviderRegistry = rpcProviderRegistry;
-        this.nodeInstanceIdentifier = nodeInstanceIdentifier;
+        this.deviceContext = Preconditions.checkNotNull(deviceContext);
         tracker = new Semaphore(maxRequests, true);
     }
 
@@ -49,9 +48,9 @@ public class RpcContextImpl implements RpcContext {
     public <S extends RpcService> void registerRpcServiceImplementation(final Class<S> serviceClass,
                                                                         final S serviceInstance) {
         final RoutedRpcRegistration<S> routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance);
-        routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier);
+        routedRpcReg.registerPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
         rpcRegistrations.add(routedRpcReg);
-        LOG.debug("Registration of service {} for device {}.", serviceClass, nodeInstanceIdentifier);
+        LOG.debug("Registration of service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier());
     }
 
     /**
@@ -62,7 +61,7 @@ public class RpcContextImpl implements RpcContext {
     @Override
     public void close() {
         for (final RoutedRpcRegistration<?> rpcRegistration : rpcRegistrations) {
-            rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
+            rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
             rpcRegistration.close();
         }
     }
@@ -74,7 +73,7 @@ public class RpcContextImpl implements RpcContext {
             return null;
         }
 
-        return new AbstractRequestContext<T>() {
+        return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
             @Override
             public void close() {
                 tracker.release();
index a619261a1ab4f8448bf6811c0d10f6b8ce1d7bbb..29eda5b168b8591b55dc71c788f3d7c065484fbe 100644 (file)
@@ -33,7 +33,7 @@ public class RpcManagerImpl implements RpcManager {
 
     @Override
     public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
-        final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext.getDeviceState().getNodeInstanceIdentifier(), maxRequestsQuota.intValue());
+        final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext, maxRequestsQuota.intValue());
         deviceContext.setDeviceDisconnectedHandler(rpcContext);
         MdSalRegistratorUtils.registerServices(rpcContext, deviceContext);
         // finish device initialization cycle back to DeviceManager
index b6d691f939fc2ad99fb72b38977a3a2e5c720322..706a46d25f8aa5f42f9e6f650334a5eb78809cf4 100644 (file)
@@ -16,7 +16,6 @@ import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -114,17 +113,14 @@ public abstract class CommonService {
             return failedFuture();
         }
 
-        Long reservedXid = deviceContext.getReservedXid();
-        if (null == reservedXid) {
+        if (requestContext.getXid() == null) {
             deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_RESERVATION_REJECTED);
             return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
         }
-        final Xid xid = new Xid(reservedXid);
-        requestContext.setXid(xid);
         final ListenableFuture<RpcResult<F>> resultFromOFLib;
 
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
-        deviceContext.hookRequestCtx(xid, requestContext);
+        deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
 
         messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT);
         function.apply(requestContext);
index 856c3ad597495a309935fa21fc0412ece3445a69..6c369d55cff9c4e6fb90eca168787f5c288c1d39 100644 (file)
@@ -14,7 +14,6 @@ import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.impl.callback.SuccessCallback;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
@@ -43,16 +42,10 @@ public class FlowCapableTransactionServiceImpl extends CommonService implements
         }
 
         final DeviceContext deviceContext = getDeviceContext();
-        final Long reservedXid = deviceContext.getReservedXid();
-        if (null == reservedXid) {
-            return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
-        }
-        final Xid xid = new Xid(reservedXid);
-        requestContext.setXid(xid);
 
         final BarrierInputBuilder barrierInputOFJavaBuilder = new BarrierInputBuilder();
         barrierInputOFJavaBuilder.setVersion(getVersion());
-        barrierInputOFJavaBuilder.setXid(xid.getValue());
+        barrierInputOFJavaBuilder.setXid(requestContext.getXid().getValue());
 
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
         deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
@@ -62,7 +55,7 @@ public class FlowCapableTransactionServiceImpl extends CommonService implements
         // FIXME: should be submitted through OutboundQueue
         final Future<RpcResult<BarrierOutput>> barrierOutputOFJava = getPrimaryConnectionAdapter()
                 .barrier(barrierInputOFJava);
-        LOG.debug("Barrier with xid {} was sent from controller.", xid);
+        LOG.debug("Barrier with xid {} was sent from controller.", requestContext.getXid());
 
         ListenableFuture<RpcResult<BarrierOutput>> listenableBarrierOutputOFJava = JdkFutureAdapters
                 .listenInPoolThread(barrierOutputOFJava);
index fb9969e46832a3405ae470ed5b700faa9d6aecf9..108309379c5db9d47dd8320d78d47ed0ae74ee44 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -23,9 +23,9 @@ public final class RequestContextUtil {
     }
 
 
-    public static <T> SettableFuture<RpcResult<T>> closeRequestContextWithRpcError(final RequestContext<T> requestContext, final String errorMessage) {
+    public static <T> ListenableFuture<RpcResult<T>> closeRequestContextWithRpcError(final RequestContext<T> requestContext, final String errorMessage) {
         RpcResultBuilder<T> rpcResultBuilder = RpcResultBuilder.<T>failed().withRpcError(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "", errorMessage));
-        requestContext.getFuture().set(rpcResultBuilder.build());
+        requestContext.setResult(rpcResultBuilder.build());
         closeRequstContext(requestContext);
         return requestContext.getFuture();
     }
index c168b4e18a5ac4097ae869c47507f8ba31911f9f..819e9f972f070a04f47c0c83b7781dbf91cabf5f 100644 (file)
@@ -14,7 +14,6 @@ import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.impl.callback.SuccessCallback;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SalEchoService;
@@ -47,27 +46,20 @@ public class SalEchoServiceImpl extends CommonService implements SalEchoService
 
 
         final DeviceContext deviceContext = getDeviceContext();
-        Long reserverXid = deviceContext.getReservedXid();
-        if (null == reserverXid) {
-            reserverXid = deviceContext.getReservedXid();
-            return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
-        }
-        final Xid xid = new Xid(reserverXid);
-        requestContext.setXid(xid);
 
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
         deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
 
         final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
         echoInputOFJavaBuilder.setVersion(getVersion());
-        echoInputOFJavaBuilder.setXid(xid.getValue());
+        echoInputOFJavaBuilder.setXid(requestContext.getXid().getValue());
         echoInputOFJavaBuilder.setData(sendEchoInput.getData());
         final EchoInput echoInputOFJava = echoInputOFJavaBuilder.build();
 
         // FIXME: should be submitted via OutboundQueue
         final Future<RpcResult<EchoOutput>> rpcEchoOutputOFJava = getPrimaryConnectionAdapter()
                 .echo(echoInputOFJava);
-        LOG.debug("Echo with xid {} was sent from controller", xid);
+        LOG.debug("Echo with xid {} was sent from controller", requestContext.getXid());
 
         ListenableFuture<RpcResult<EchoOutput>> listenableRpcEchoOutputOFJava = JdkFutureAdapters
                 .listenInPoolThread(rpcEchoOutputOFJava);
index 27ee72b5cd8e3d2ebc4b61f474db49c1a2e330fc..3b19338e46d68159f865453981f0cfc8cbf3991c 100644 (file)
@@ -103,7 +103,7 @@ public class StatisticsContextImpl implements StatisticsContext {
 
     @Override
     public <T> RequestContext<T> createRequestContext() {
-        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>() {
+        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
             @Override
             public void close() {
                 requestContexts.remove(this);
index cccb2137cfc2387b041271243c3fd33fd8021102..4df1a484e6e1520c66de5dd404fe160982fc33bd 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.api.openflow.device;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -33,7 +34,8 @@ public class RpcContextImplTest {
 
     @Mock
     private BindingAwareBroker.ProviderContext mockedRpcProviderRegistry;
-
+    @Mock
+    private DeviceState deviceState;
     @Mock
     private DeviceContext deviceContext;
     @Mock
@@ -45,6 +47,9 @@ public class RpcContextImplTest {
     public void setup() {
         NodeId nodeId = new NodeId("openflow:1");
         nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
+
+        when(deviceState.getNodeInstanceIdentifier()).thenReturn(nodeInstanceIdentifier);
+        when(deviceContext.getDeviceState()).thenReturn(deviceState);
     }
 
     @Test
@@ -54,7 +59,7 @@ public class RpcContextImplTest {
 
     @Test
     public void testStoreOrFail() throws Exception {
-        final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 100);
+        final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext, 100);
         RequestContext requestContext = rpcContext.createRequestContext();
         assertNotNull(requestContext);
 
@@ -62,7 +67,7 @@ public class RpcContextImplTest {
 
     @Test
     public void testStoreOrFailThatFails() throws Exception {
-        final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 0);
+        final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext, 0);
         RequestContext requestContext = rpcContext.createRequestContext();
         assertNull(requestContext);
     }
index 3fea20faaf7bb15825b23829c8928cd0e7722bdc..008453528609db5c62be6cb2345a2d93a6b5c9f1 100644 (file)
@@ -96,7 +96,7 @@ public class RpcManagerImplTest {
         // TODO: how to invoke service remotely?
         NodeId nodeId = new NodeId("openflow:1");
         KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
-        final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, nodeInstanceIdentifier, capacity);
+        final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, mockedDeviceContext, capacity);
         when(mockedProviderContext.getRpcService(SalFlowService.class)).thenReturn(new SalFlowServiceImpl(rpcContext, mockedDeviceContext));
 
         final SalFlowService salFlowService = mockedProviderContext.getRpcService(SalFlowService.class);
index 2bd9bbf590291ad0d1f790a41ff13217ae15a1b6..4557accb23e5d12326350c158c726e0e66409eb1 100644 (file)
@@ -2,7 +2,7 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-
+import static org.mockito.Matchers.any;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.HashedWheelTimer;
 import java.util.ArrayList;
@@ -18,7 +18,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
@@ -88,7 +90,7 @@ public class DeviceContextImplTest {
     @Mock
     OutboundQueueProvider outboundQueueProvider;
 
-    private AtomicLong atomicLong = new AtomicLong(0);
+    private final AtomicLong atomicLong = new AtomicLong(0);
     @Before
     public void setUp() {
         Mockito.when(dataBroker.createTransactionChain(Mockito.any(TransactionChainManager.class))).thenReturn(txChainFactory);
@@ -96,7 +98,24 @@ public class DeviceContextImplTest {
         final SettableFuture<RpcResult<GetAsyncReply>> settableFuture = SettableFuture.create();
         final SettableFuture<RpcResult<MultipartReply>> settableFutureMultiReply = SettableFuture.create();
         Mockito.when(requestContext.getFuture()).thenReturn(settableFuture);
+        Mockito.doAnswer(new Answer<Object>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public Object answer(final InvocationOnMock invocation) {
+                settableFuture.set((RpcResult<GetAsyncReply>) invocation.getArguments()[0]);
+                return null;
+            }
+        }).when(requestContext).setResult(any(RpcResult.class));
+
         Mockito.when(requestContextMultiReply.getFuture()).thenReturn(settableFutureMultiReply);
+        Mockito.doAnswer(new Answer<Object>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public Object answer(final InvocationOnMock invocation) {
+                settableFutureMultiReply.set((RpcResult<MultipartReply>) invocation.getArguments()[0]);
+                return null;
+            }
+        }).when(requestContextMultiReply).setResult(any(RpcResult.class));
         Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
         Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
         Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider);
index 76b61f4bb2854d20ba6302fc5c3632f9ccc75796..1ce5987edceb57ccb7ae3bc5b5b57c95a9ed8014 100644 (file)
@@ -82,7 +82,7 @@ public class MultiMsgCollectorImplTest {
      */
     @Test
     public void testAddMultipartMsgOne() {
-        final long xid = 1L;
+        final Long xid = 1L;
         collector.registerMultipartXid(xid);
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false).build());
 
@@ -100,7 +100,7 @@ public class MultiMsgCollectorImplTest {
      */
     @Test
     public void testAddMultipartMsgTwo() {
-        final long xid = 1L;
+        final Long xid = 1L;
         collector.registerMultipartXid(xid);
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false).build());
@@ -120,7 +120,7 @@ public class MultiMsgCollectorImplTest {
      */
     @Test
     public void testAddMultipartMsgNotExpectedXid() {
-        final long xid = 1L;
+        final Long xid = 1L;
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
 
         Mockito.verify(deviceProcessor).processException(xidCaptor.capture(), ddeCaptor.capture());
@@ -134,7 +134,7 @@ public class MultiMsgCollectorImplTest {
      */
     @Test
     public void testAddMultipartMsgWrongType1() {
-        final long xid = 1L;
+        final Long xid = 1L;
         collector.registerMultipartXid(xid);
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false)
@@ -163,7 +163,7 @@ public class MultiMsgCollectorImplTest {
      */
     @Test
     public void testAddMultipartMsgWrongType2() {
-        final long xid = 1L;
+        final Long xid = 1L;
         collector.registerMultipartXid(xid);
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true)
@@ -191,7 +191,7 @@ public class MultiMsgCollectorImplTest {
      */
     @Test
     public void testAddMultipartMsgWrongType3() {
-        final long xid = 1L;
+        final Long xid = 1L;
         collector.registerMultipartXid(xid);
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true)
@@ -217,7 +217,7 @@ public class MultiMsgCollectorImplTest {
      */
     @Test
     public void testAddMultipartMsgExpiration() throws InterruptedException {
-        final long xid = 1L;
+        final Long xid = 1L;
         collector.registerMultipartXid(xid);
         collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
 
index a791db22fa8e9eb83d3a34f5077988d68870c2e1..ad31e8000460ff837fc1c8071c27ea7a7a4031a8 100644 (file)
@@ -21,7 +21,7 @@ public class AbstractRequestContextTest {
 
     @Before
     public void setup() {
-        requestContext = new AbstractRequestContext<Object>() {
+        requestContext = new AbstractRequestContext<Object>(1L) {
             @Override
             public void close() {
                 // No-op