All callers end up talking to the underlying device to acquire an XID.
Since all RequestContextStack implementations already have (potential)
access to a DeviceContext, there is no need to do this.
Thus make sure RequestContext has an XID allocated by the underlying
device, which unifies code and makes callers simpler. If we fail to
allocate one, the caller will see that XID as null.
We also take this opportunity to make Xid take a Long instead of a
primitive type, as that's what we need for DataObjects and that's what
we get from the Device anyway -- thus preventing autoboxing operations.
Change-Id: I7f553a9a1283d1c82ba7e287d0f21cf4147779e2
Signed-off-by: Robert Varga <rovarga@cisco.com>
void commitOperationsGatheredInOneTransaction();
- public MultiMsgCollector getMultiMsgCollector();
+ MultiMsgCollector getMultiMsgCollector();
Long getReservedXid();
*/
package org.opendaylight.openflowplugin.api.openflow.device;
-
+import javax.annotation.Nullable;
/**
* Request context handles all requests on device. Number of requests is limited by request quota. When this quota is
* Created by Martin Bobak <mbobak@cisco.com> on 25.2.2015.
*/
public interface RequestContext<T> extends RequestFutureContext<T>, AutoCloseable {
-
/**
- * Returns xid generated for this request.
+ * Returns XID generated for this request.
*
- * @return
+ * @return Allocated XID, or null if the device has disconnected.
*/
- Xid getXid();
+ @Nullable Xid getXid();
- /**
- * Sets xid generated for this request.
- */
- void setXid(Xid xid);
+ @Override
+ void close();
/**
* Returns request timeout value.
*/
long getWaitTimeout();
-
/**
* Sets request timeout value.
*/
void setWaitTimeout(long waitTimeout);
-
- @Override
- void close();
}
package org.opendaylight.openflowplugin.api.openflow.device;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.yangtools.yang.common.RpcResult;
/**
* @param <T>
*/
public interface RequestFutureContext<T> {
-
/**
* Method returns future to be used for handling device requests.
*
* @return
*/
- SettableFuture<RpcResult<T>> getFuture();
+ ListenableFuture<RpcResult<T>> getFuture();
+
+ void setResult(RpcResult<T> result);
}
package org.opendaylight.openflowplugin.api.openflow.device;
+import com.google.common.base.Preconditions;
+
/**
* Created by Martin Bobak <mbobak@cisco.com> on 26.2.2015.
*/
-public class Xid {
-
- private long value;
+public final class Xid {
+ private final Long value;
- public Xid(final long value) {
- this.value = value;
+ public Xid(final Long value) {
+ this.value = Preconditions.checkNotNull(value);
}
- public long getValue() {
+ public Long getValue() {
return value;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + value.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Xid)) {
+ return false;
+ }
+ return value.equals(((Xid) obj).value);
+ }
+
+ @Override
+ public String toString() {
+ return "Xid [value=" + value + "]";
+ }
}
}
}
LOG.trace("OF Java result for XID {} was not successful. Errors : {}", getRequestContext().getXid().getValue(), rpcErrors.toString());
-
+
}
- getRequestContext().getFuture().set(
+ getRequestContext().setResult(
RpcResultBuilder.<O>failed().withRpcErrors(fRpcResult.getErrors()).build());
RequestContextUtil.closeRequstContext(getRequestContext());
} else {
deviceContext.getMessageSpy().spyMessage(getRequestContext().getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS_NO_RESPONSE);
LOG.trace("Asymmetric message - no response from OF Java expected for XID {}. Closing as successful.", getRequestContext().getXid().getValue());
- getRequestContext().getFuture().set(RpcResultBuilder.<O>success().build());
+ getRequestContext().setResult(RpcResultBuilder.<O>success().build());
} else {
deviceContext.getMessageSpy().spyMessage(getRequestContext().getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_ERROR);
LOG.trace("Exception occured while processing OF Java response for XID {}.", getRequestContext().getXid().getValue(), throwable);
- getRequestContext().getFuture().set(
+ getRequestContext().setResult(
RpcResultBuilder.<O>failed()
.withError(RpcError.ErrorType.APPLICATION, "OF JAVA operation failed.", throwable)
.build());
public abstract class SuccessCallback<I, O> extends BaseCallback<I, O> {
- public SuccessCallback(DeviceContext deviceContext, RequestContext<O> requestContext,
- ListenableFuture<RpcResult<I>> futureResultFromOfLib) {
+ public SuccessCallback(final DeviceContext deviceContext, final RequestContext<O> requestContext,
+ final ListenableFuture<RpcResult<I>> futureResultFromOfLib) {
super(deviceContext, requestContext, futureResultFromOfLib);
}
+ @Override
protected void processSuccess(final RpcResult<I> rpcResult) {
- getRequestContext().getFuture().set(transform(rpcResult));
+ getRequestContext().setResult(transform(rpcResult));
}
-
abstract public RpcResult<O> transform(RpcResult<I> rpcResult);
}
*/
package org.opendaylight.openflowplugin.impl.connection.listener;
-import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
*/
public class SystemNotificationsListenerImpl implements SystemNotificationsListener {
- private ConnectionContext connectionContext;
+ private final ConnectionContext connectionContext;
HandshakeContext handshakeContext;
private static final Logger LOG = LoggerFactory.getLogger(SystemNotificationsListenerImpl.class);
@VisibleForTesting
static final long MAX_ECHO_REPLY_TIMEOUT = 2000;
- public SystemNotificationsListenerImpl(final ConnectionContext connectionContext,
+ public SystemNotificationsListenerImpl(final ConnectionContext connectionContext,
final HandshakeContext handshakeContext) {
this.connectionContext = connectionContext;
this.handshakeContext = handshakeContext;
}
@Override
- public void onDisconnectEvent(DisconnectEvent notification) {
+ public void onDisconnectEvent(final DisconnectEvent notification) {
disconnect();
}
@Override
- public void onSwitchIdleEvent(SwitchIdleEvent notification) {
+ public void onSwitchIdleEvent(final SwitchIdleEvent notification) {
new Thread(new Runnable() {
@Override
public void run() {
connectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.TIMEOUTING);
EchoInputBuilder builder = new EchoInputBuilder();
builder.setVersion(features.getVersion());
- Xid xid = new Xid(0);
+ Xid xid = new Xid(0L);
builder.setXid(xid.getValue());
Future<RpcResult<EchoOutput>> echoReplyFuture = connectionContext.getConnectionAdapter()
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import java.math.BigInteger;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.NotificationRejectedException;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
public void processReply(final OfHeader ofHeader) {
final RequestContext requestContext = requests.remove(ofHeader.getXid());
if (null != requestContext) {
- final SettableFuture replyFuture = requestContext.getFuture();
RpcResult<OfHeader> rpcResult;
if (ofHeader instanceof Error) {
//TODO : this is the point, where we can discover that add flow operation failed and where we should
messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
}
- replyFuture.set(rpcResult);
+ requestContext.setResult(rpcResult);
try {
requestContext.close();
} catch (final Exception e) {
requestContext = requests.remove(xid.getValue());
}
if (null != requestContext) {
- final SettableFuture replyFuture = requestContext.getFuture();
final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
.<List<MultipartReply>>success()
.withResult(ofHeaderList)
.build();
- replyFuture.set(rpcResult);
+ requestContext.setResult(rpcResult);
for (final MultipartReply multipartReply : ofHeaderList) {
messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
}
final RequestContext requestContext = requests.remove(xid.getValue());
if (null != requestContext) {
- final SettableFuture replyFuture = requestContext.getFuture();
final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
.<List<OfHeader>>failed()
.withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
.build();
- replyFuture.set(rpcResult);
+ requestContext.setResult(rpcResult);
messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
try {
requestContext.close();
tx.submit();
this.messageIntelligenceAgency = messageIntelligenceAgency;
-
- emptyRequestContextStack = new RequestContextStack() {
- @Override
- public <T> RequestContext<T> createRequestContext() {
- return new AbstractRequestContext<T>() {
- @Override
- public void close() {
- //NOOP
- }
- };
- }
- };
}
@Override
deviceContext.setTranslatorLibrary(translatorLibrary);
deviceContext.addDeviceContextClosedHandler(this);
+ emptyRequestContextStack = new RequestContextStack() {
+ @Override
+ public <T> RequestContext<T> createRequestContext() {
+ return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+ @Override
+ public void close() {
+ //NOOP
+ }
+ };
+ }
+ };
+
final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
connectionContext.getConnectionAdapter(), deviceContext);
final Xid xid = new Xid(reservedXid);
final RequestContext<List<MultipartReply>> requestContext = emptyRequestContextStack.createRequestContext();
- requestContext.setXid(xid);
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
@Override
public void onDeviceContextClosed(final DeviceContext deviceContext) {
deviceContexts.remove(deviceContext);
+ emptyRequestContextStack = null;
}
@Override
Preconditions.checkArgument(featuresReply != null);
featuresOutput = new GetFeaturesOutputBuilder(featuresReply).build();
this.nodeId = Preconditions.checkNotNull(nodeId);
+ // FIXME: use builder, as we will be using this identifier often
nodeII = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
version = featuresReply.getVersion();
}
}
@Override
- public void setMeterAvailable(boolean available) {
+ public void setMeterAvailable(final boolean available) {
meterIsAvailable = available;
}
}
@Override
- public void setGroupAvailable(boolean available) {
+ public void setGroupAvailable(final boolean available) {
groupIsAvailable = available;
}
*/
package org.opendaylight.openflowplugin.impl.rpc;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.yangtools.yang.common.RpcResult;
public abstract class AbstractRequestContext<T> implements RequestContext<T> {
- private SettableFuture<RpcResult<T>> rpcResultFuture;
+ private final SettableFuture<RpcResult<T>> rpcResultFuture = SettableFuture.create();
+ private final Xid xid;
private long waitTimeout;
- private Xid xid;
-
- protected AbstractRequestContext() {
+ protected AbstractRequestContext(final Long xid) {
+ this.xid = xid == null ? null : new Xid(xid);
}
@Override
- public SettableFuture<RpcResult<T>> getFuture() {
- if (null == rpcResultFuture) {
- rpcResultFuture = SettableFuture.create();
- }
+ public final ListenableFuture<RpcResult<T>> getFuture() {
return rpcResultFuture;
}
@Override
- public Xid getXid() {
- return xid;
+ public final void setResult(final RpcResult<T> result) {
+ rpcResultFuture.set(result);
}
@Override
- public void setXid(final Xid xid) {
- this.xid = xid;
+ public final Xid getXid() {
+ return xid;
}
@Override
- public long getWaitTimeout() {
+ public final long getWaitTimeout() {
return waitTimeout;
}
@Override
- public void setWaitTimeout(final long waitTimeout) {
+ public final void setWaitTimeout(final long waitTimeout) {
this.waitTimeout = waitTimeout;
}
}
*/
package org.opendaylight.openflowplugin.impl.rpc;
+import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.Semaphore;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcContextImpl implements RpcContext {
private static final Logger LOG = LoggerFactory.getLogger(RpcContextImpl.class);
private final RpcProviderRegistry rpcProviderRegistry;
+ private final DeviceContext deviceContext;
private final MessageSpy messageSpy;
private final Semaphore tracker;
// TODO: add private Sal salBroker
- private final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
private final Collection<RoutedRpcRegistration<?>> rpcRegistrations = new HashSet<>();
- public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier, final int maxRequests) {
+ public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext, final int maxRequests) {
this.messageSpy = messageSpy;
this.rpcProviderRegistry = rpcProviderRegistry;
- this.nodeInstanceIdentifier = nodeInstanceIdentifier;
+ this.deviceContext = Preconditions.checkNotNull(deviceContext);
tracker = new Semaphore(maxRequests, true);
}
public <S extends RpcService> void registerRpcServiceImplementation(final Class<S> serviceClass,
final S serviceInstance) {
final RoutedRpcRegistration<S> routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance);
- routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier);
+ routedRpcReg.registerPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
rpcRegistrations.add(routedRpcReg);
- LOG.debug("Registration of service {} for device {}.", serviceClass, nodeInstanceIdentifier);
+ LOG.debug("Registration of service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier());
}
/**
@Override
public void close() {
for (final RoutedRpcRegistration<?> rpcRegistration : rpcRegistrations) {
- rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
+ rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
rpcRegistration.close();
}
}
return null;
}
- return new AbstractRequestContext<T>() {
+ return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
@Override
public void close() {
tracker.release();
@Override
public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
- final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext.getDeviceState().getNodeInstanceIdentifier(), maxRequestsQuota.intValue());
+ final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext, maxRequestsQuota.intValue());
deviceContext.setDeviceDisconnectedHandler(rpcContext);
MdSalRegistratorUtils.registerServices(rpcContext, deviceContext);
// finish device initialization cycle back to DeviceManager
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
import org.opendaylight.yangtools.yang.common.RpcError;
return failedFuture();
}
- Long reservedXid = deviceContext.getReservedXid();
- if (null == reservedXid) {
+ if (requestContext.getXid() == null) {
deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_RESERVATION_REJECTED);
return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
}
- final Xid xid = new Xid(reservedXid);
- requestContext.setXid(xid);
final ListenableFuture<RpcResult<F>> resultFromOFLib;
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
- deviceContext.hookRequestCtx(xid, requestContext);
+ deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT);
function.apply(requestContext);
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.impl.callback.SuccessCallback;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
}
final DeviceContext deviceContext = getDeviceContext();
- final Long reservedXid = deviceContext.getReservedXid();
- if (null == reservedXid) {
- return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
- }
- final Xid xid = new Xid(reservedXid);
- requestContext.setXid(xid);
final BarrierInputBuilder barrierInputOFJavaBuilder = new BarrierInputBuilder();
barrierInputOFJavaBuilder.setVersion(getVersion());
- barrierInputOFJavaBuilder.setXid(xid.getValue());
+ barrierInputOFJavaBuilder.setXid(requestContext.getXid().getValue());
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
// FIXME: should be submitted through OutboundQueue
final Future<RpcResult<BarrierOutput>> barrierOutputOFJava = getPrimaryConnectionAdapter()
.barrier(barrierInputOFJava);
- LOG.debug("Barrier with xid {} was sent from controller.", xid);
+ LOG.debug("Barrier with xid {} was sent from controller.", requestContext.getXid());
ListenableFuture<RpcResult<BarrierOutput>> listenableBarrierOutputOFJava = JdkFutureAdapters
.listenInPoolThread(barrierOutputOFJava);
*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
}
- public static <T> SettableFuture<RpcResult<T>> closeRequestContextWithRpcError(final RequestContext<T> requestContext, final String errorMessage) {
+ public static <T> ListenableFuture<RpcResult<T>> closeRequestContextWithRpcError(final RequestContext<T> requestContext, final String errorMessage) {
RpcResultBuilder<T> rpcResultBuilder = RpcResultBuilder.<T>failed().withRpcError(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "", errorMessage));
- requestContext.getFuture().set(rpcResultBuilder.build());
+ requestContext.setResult(rpcResultBuilder.build());
closeRequstContext(requestContext);
return requestContext.getFuture();
}
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.impl.callback.SuccessCallback;
import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SalEchoService;
final DeviceContext deviceContext = getDeviceContext();
- Long reserverXid = deviceContext.getReservedXid();
- if (null == reserverXid) {
- reserverXid = deviceContext.getReservedXid();
- return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
- }
- final Xid xid = new Xid(reserverXid);
- requestContext.setXid(xid);
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
echoInputOFJavaBuilder.setVersion(getVersion());
- echoInputOFJavaBuilder.setXid(xid.getValue());
+ echoInputOFJavaBuilder.setXid(requestContext.getXid().getValue());
echoInputOFJavaBuilder.setData(sendEchoInput.getData());
final EchoInput echoInputOFJava = echoInputOFJavaBuilder.build();
// FIXME: should be submitted via OutboundQueue
final Future<RpcResult<EchoOutput>> rpcEchoOutputOFJava = getPrimaryConnectionAdapter()
.echo(echoInputOFJava);
- LOG.debug("Echo with xid {} was sent from controller", xid);
+ LOG.debug("Echo with xid {} was sent from controller", requestContext.getXid());
ListenableFuture<RpcResult<EchoOutput>> listenableRpcEchoOutputOFJava = JdkFutureAdapters
.listenInPoolThread(rpcEchoOutputOFJava);
@Override
public <T> RequestContext<T> createRequestContext() {
- final AbstractRequestContext<T> ret = new AbstractRequestContext<T>() {
+ final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
@Override
public void close() {
requestContexts.remove(this);
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@Mock
private BindingAwareBroker.ProviderContext mockedRpcProviderRegistry;
-
+ @Mock
+ private DeviceState deviceState;
@Mock
private DeviceContext deviceContext;
@Mock
public void setup() {
NodeId nodeId = new NodeId("openflow:1");
nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
+
+ when(deviceState.getNodeInstanceIdentifier()).thenReturn(nodeInstanceIdentifier);
+ when(deviceContext.getDeviceState()).thenReturn(deviceState);
}
@Test
@Test
public void testStoreOrFail() throws Exception {
- final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 100);
+ final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext, 100);
RequestContext requestContext = rpcContext.createRequestContext();
assertNotNull(requestContext);
@Test
public void testStoreOrFailThatFails() throws Exception {
- final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 0);
+ final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext, 0);
RequestContext requestContext = rpcContext.createRequestContext();
assertNull(requestContext);
}
// TODO: how to invoke service remotely?
NodeId nodeId = new NodeId("openflow:1");
KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
- final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, nodeInstanceIdentifier, capacity);
+ final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, mockedDeviceContext, capacity);
when(mockedProviderContext.getRpcService(SalFlowService.class)).thenReturn(new SalFlowServiceImpl(rpcContext, mockedDeviceContext));
final SalFlowService salFlowService = mockedProviderContext.getRpcService(SalFlowService.class);
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-
+import static org.mockito.Matchers.any;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
@Mock
OutboundQueueProvider outboundQueueProvider;
- private AtomicLong atomicLong = new AtomicLong(0);
+ private final AtomicLong atomicLong = new AtomicLong(0);
@Before
public void setUp() {
Mockito.when(dataBroker.createTransactionChain(Mockito.any(TransactionChainManager.class))).thenReturn(txChainFactory);
final SettableFuture<RpcResult<GetAsyncReply>> settableFuture = SettableFuture.create();
final SettableFuture<RpcResult<MultipartReply>> settableFutureMultiReply = SettableFuture.create();
Mockito.when(requestContext.getFuture()).thenReturn(settableFuture);
+ Mockito.doAnswer(new Answer<Object>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object answer(final InvocationOnMock invocation) {
+ settableFuture.set((RpcResult<GetAsyncReply>) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(requestContext).setResult(any(RpcResult.class));
+
Mockito.when(requestContextMultiReply.getFuture()).thenReturn(settableFutureMultiReply);
+ Mockito.doAnswer(new Answer<Object>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object answer(final InvocationOnMock invocation) {
+ settableFutureMultiReply.set((RpcResult<MultipartReply>) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(requestContextMultiReply).setResult(any(RpcResult.class));
Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider);
*/
@Test
public void testAddMultipartMsgOne() {
- final long xid = 1L;
+ final Long xid = 1L;
collector.registerMultipartXid(xid);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false).build());
*/
@Test
public void testAddMultipartMsgTwo() {
- final long xid = 1L;
+ final Long xid = 1L;
collector.registerMultipartXid(xid);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false).build());
*/
@Test
public void testAddMultipartMsgNotExpectedXid() {
- final long xid = 1L;
+ final Long xid = 1L;
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
Mockito.verify(deviceProcessor).processException(xidCaptor.capture(), ddeCaptor.capture());
*/
@Test
public void testAddMultipartMsgWrongType1() {
- final long xid = 1L;
+ final Long xid = 1L;
collector.registerMultipartXid(xid);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false)
*/
@Test
public void testAddMultipartMsgWrongType2() {
- final long xid = 1L;
+ final Long xid = 1L;
collector.registerMultipartXid(xid);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true)
*/
@Test
public void testAddMultipartMsgWrongType3() {
- final long xid = 1L;
+ final Long xid = 1L;
collector.registerMultipartXid(xid);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true)
*/
@Test
public void testAddMultipartMsgExpiration() throws InterruptedException {
- final long xid = 1L;
+ final Long xid = 1L;
collector.registerMultipartXid(xid);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
@Before
public void setup() {
- requestContext = new AbstractRequestContext<Object>() {
+ requestContext = new AbstractRequestContext<Object>(1L) {
@Override
public void close() {
// No-op