package org.opendaylight.openflowplugin.api.openflow.device;
+
/**
* Request context handles all requests on device. Number of requests is limited by request quota. When this quota is
* exceeded all rpc's will end up with exception.
*/
void setWaitTimeout(long waitTimeout);
+ @Override
+ void close();
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.openflowplugin.api.openflow.device;
-import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.yangtools.yang.common.RpcResult;
+import javax.annotation.Nullable;
/**
* Created by Martin Bobak <mbobak@cisco.com> on 1.4.2015.
*/
public interface RequestContextStack {
-
- <T> void forgetRequestContext(RequestContext<T> requestContext);
-
- /**
- * Method adds request to request queue which has limited quota. After number of requests exceeds quota limit future
- * will be done immediately and will contain information about exceeded request quota.
- *
- * @param data
- */
- <T> SettableFuture<RpcResult<T>> storeOrFail(RequestContext<T> data);
-
/**
* Method returns new request context for current request.
*
- * @return
+ * @return A request context, or null if one cannot be created.
*/
- <T> RequestContext<T> createRequestContext();
-
+ @Nullable <T> RequestContext<T> createRequestContext();
}
* Created by Martin Bobak <mbobak@cisco.com> on 25.2.2015.
*/
public interface RpcContext extends RequestContextStack, AutoCloseable, DeviceDisconnectedHandler {
-
<S extends RpcService> void registerRpcServiceImplementation(Class<S> serviceClass, S serviceInstance);
-
-
- /**
- * Method for setting request quota value. When the Request Context quota is exceeded, incoming RPCs fail
- * immediately, with a well-defined error.
- *
- * @param maxRequestsPerDevice
- */
- void setRequestContextQuota(int maxRequestsPerDevice);
-
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
import java.math.BigInteger;
import java.util.ArrayList;
import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
import org.opendaylight.openflowplugin.impl.connection.ThrottledNotificationsOffererImpl;
import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
-import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
+import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
this.messageIntelligenceAgency = messageIntelligenceAgency;
emptyRequestContextStack = new RequestContextStack() {
- @Override
- public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
- //NOOP
- }
-
- @Override
- public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> data) {
- return data.getFuture();
- }
-
@Override
public <T> RequestContext<T> createRequestContext() {
- return new RequestContextImpl<>(this);
+ return new AbstractRequestContext<T>() {
+ @Override
+ public void close() {
+ //NOOP
+ }
+ };
}
};
}
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.yangtools.yang.common.RpcResult;
-/**
- * @author joe
- */
-public class RequestContextImpl<T> implements RequestContext<T> {
-
- private final RequestContextStack requestContextStack;
+public abstract class AbstractRequestContext<T> implements RequestContext<T> {
private SettableFuture<RpcResult<T>> rpcResultFuture;
private long waitTimeout;
private Xid xid;
- public RequestContextImpl(RequestContextStack requestContextStack) {
- this.requestContextStack = requestContextStack;
- }
+ protected AbstractRequestContext() {
- @Override
- public void close() {
- requestContextStack.forgetRequestContext(this);
}
@Override
}
@Override
- public void setXid(Xid xid) {
+ public void setXid(final Xid xid) {
this.xid = xid;
}
}
@Override
- public void setWaitTimeout(long waitTimeout) {
+ public void setWaitTimeout(final long waitTimeout) {
this.waitTimeout = waitTimeout;
}
}
*/
package org.opendaylight.openflowplugin.impl.rpc;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.Collection;
import java.util.HashSet;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.Semaphore;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.RpcService;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RpcContextImpl implements RpcContext {
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(RpcContextImpl.class);
- private MessageSpy messagSpy;
- final RpcProviderRegistry rpcProviderRegistry;
+ private static final Logger LOG = LoggerFactory.getLogger(RpcContextImpl.class);
+ private final RpcProviderRegistry rpcProviderRegistry;
+ private final MessageSpy messageSpy;
+ private final Semaphore tracker;
// TODO: add private Sal salBroker
private final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
private final Collection<RoutedRpcRegistration<?>> rpcRegistrations = new HashSet<>();
-
- @GuardedBy("requestsList")
- private final Collection<RequestContext<?>> requestsList = new HashSet<RequestContext<?>>();
-
- private int maxRequestsPerDevice;
-
- public RpcContextImpl(final MessageSpy messagSpy, final RpcProviderRegistry rpcProviderRegistry, final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier) {
- this.messagSpy = messagSpy;
+ public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier, final int maxRequests) {
+ this.messageSpy = messageSpy;
this.rpcProviderRegistry = rpcProviderRegistry;
this.nodeInstanceIdentifier = nodeInstanceIdentifier;
+ tracker = new Semaphore(maxRequests, true);
}
/**
LOG.debug("Registration of service {} for device {}.", serviceClass, nodeInstanceIdentifier);
}
- @Override
- public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> requestContext) {
- final SettableFuture<RpcResult<T>> rpcResultFuture = requestContext.getFuture();
-
- final boolean success;
- // FIXME: use a fixed-size collection, with lockless reserve/set queue
- synchronized (requestsList) {
- if (requestsList.size() < maxRequestsPerDevice) {
- requestsList.add(requestContext);
- success = true;
- } else {
- success = false;
- }
- }
-
- if (!success) {
- final RpcResult<T> rpcResult = RpcResultBuilder.<T>failed()
- .withError(RpcError.ErrorType.APPLICATION, "", "Device's request queue is full.").build();
- rpcResultFuture.set(rpcResult);
- }
-
- return rpcResultFuture;
- }
-
/**
* Unregisters all services.
*
* @see java.lang.AutoCloseable#close()
*/
@Override
- public void close() throws Exception {
+ public void close() {
for (final RoutedRpcRegistration<?> rpcRegistration : rpcRegistrations) {
rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
rpcRegistration.close();
}
}
- /**
- * @see org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext#setRequestContextQuota(int)
- */
- @Override
- public void setRequestContextQuota(final int maxRequestsPerDevice) {
- this.maxRequestsPerDevice = maxRequestsPerDevice;
- }
-
@Override
- public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
- synchronized (requestsList) {
- requestsList.remove(requestContext);
- LOG.trace("Removed request context with xid {}. Context request in list {}.",
- requestContext.getXid().getValue(), requestsList.size());
- messagSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED);
+ public <T> RequestContext<T> createRequestContext() {
+ if (!tracker.tryAcquire()) {
+ LOG.trace("Device queue {} at capacity", this);
+ return null;
}
- }
- @Override
- public <T> RequestContext<T> createRequestContext() {
- return new RequestContextImpl<T>(this);
+ return new AbstractRequestContext<T>() {
+ @Override
+ public void close() {
+ tracker.release();
+ LOG.trace("Removed request context with xid {}", getXid().getValue());
+ messageSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED);
+ }
+ };
}
@Override
for (RoutedRpcRegistration<?> registration : rpcRegistrations) {
registration.close();
}
-
- synchronized (requestsList) {
- requestsList.clear();
- }
}
}
@Override
public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
- final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext.getDeviceState().getNodeInstanceIdentifier());
- rpcContext.setRequestContextQuota(maxRequestsQuota.intValue());
+ final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext.getDeviceState().getNodeInstanceIdentifier(), maxRequestsQuota.intValue());
deviceContext.setDeviceDisconnectedHandler(rpcContext);
MdSalRegistratorUtils.registerServices(rpcContext, deviceContext);
// finish device initialization cycle back to DeviceManager
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.math.BigInteger;
-import java.util.concurrent.Future;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
public abstract class CommonService {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CommonService.class);
private static final long WAIT_TIME = 2000;
- private final static Future<RpcResult<Void>> ERROR_RPC_RESULT = Futures.immediateFuture(RpcResultBuilder
- .<Void>failed().withError(ErrorType.APPLICATION, "", "Request quota exceeded.").build());
-
-
private static final BigInteger PRIMARY_CONNECTION = BigInteger.ZERO;
private final short version;
private final MessageSpy messageSpy;
- public CommonService(final RequestContextStack requestContextStack, DeviceContext deviceContext) {
+ public CommonService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
this.requestContextStack = requestContextStack;
this.deviceContext = deviceContext;
final FeaturesReply features = this.deviceContext.getPrimaryConnectionContext().getFeatures();
final DataCrateBuilder<T> dataCrateBuilder) {
LOG.trace("Handling general service call");
- final RequestContext<T> requestContext = requestContextStack.createRequestContext();
- final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
- if (result.isDone()) {
+ final RequestContext<T> requestContext = createRequestContext();
+ if (requestContext == null) {
LOG.trace("Request context refused.");
- deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
- return result;
+ deviceContext.getMessageSpy().spyMessage(null, MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
+ return failedFuture();
}
Long reservedXid = deviceContext.getReservedXid();
//retry
reservedXid = deviceContext.getReservedXid();
if (null == reservedXid) {
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_RESERVATION_REJECTED);
- return result;
+ return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
}
}
final Xid xid = new Xid(reservedXid);
messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT);
function.apply(dataCrate);
- return result;
+ return requestContext.getFuture();
}
+ protected final <T> RequestContext<T> createRequestContext() {
+ return requestContextStack.createRequestContext();
+ }
+
+ protected static <T> ListenableFuture<RpcResult<T>> failedFuture() {
+ final RpcResult<T> rpcResult = RpcResultBuilder.<T>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "", "Request quota exceeded").build();
+ return Futures.immediateFuture(rpcResult);
+ }
}
*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Future;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
}
@Override
- public Future<RpcResult<Void>> sendBarrier(SendBarrierInput input) {
+ public Future<RpcResult<Void>> sendBarrier(final SendBarrierInput input) {
final RequestContext<Void> requestContext = getRequestContextStack().createRequestContext();
- final SettableFuture<RpcResult<Void>> sendBarrierOutput = getRequestContextStack()
- .storeOrFail(requestContext);
- if (!sendBarrierOutput.isDone()) {
- final DeviceContext deviceContext = getDeviceContext();
- final Long reservedXid = deviceContext.getReservedXid();
- if (null == reservedXid){
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
- return sendBarrierOutput;
- }
- final Xid xid = new Xid(reservedXid);
- requestContext.setXid(xid);
-
- final BarrierInputBuilder barrierInputOFJavaBuilder = new BarrierInputBuilder();
- barrierInputOFJavaBuilder.setVersion(getVersion());
- barrierInputOFJavaBuilder.setXid(xid.getValue());
+ if (requestContext == null) {
+ getMessageSpy().spyMessage(null, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+ return failedFuture();
+ }
- LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
- deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+ final DeviceContext deviceContext = getDeviceContext();
+ final Long reservedXid = deviceContext.getReservedXid();
+ if (null == reservedXid) {
+ return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+ }
+ final Xid xid = new Xid(reservedXid);
+ requestContext.setXid(xid);
- final BarrierInput barrierInputOFJava = barrierInputOFJavaBuilder.build();
+ final BarrierInputBuilder barrierInputOFJavaBuilder = new BarrierInputBuilder();
+ barrierInputOFJavaBuilder.setVersion(getVersion());
+ barrierInputOFJavaBuilder.setXid(xid.getValue());
- final Future<RpcResult<BarrierOutput>> barrierOutputOFJava = getPrimaryConnectionAdapter()
- .barrier(barrierInputOFJava);
- LOG.debug("Barrier with xid {} was sent from controller.", xid);
+ LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
+ deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
- ListenableFuture<RpcResult<BarrierOutput>> listenableBarrierOutputOFJava = JdkFutureAdapters
- .listenInPoolThread(barrierOutputOFJava);
+ final BarrierInput barrierInputOFJava = barrierInputOFJavaBuilder.build();
- // callback on OF JAVA future
- SuccessCallback<BarrierOutput, Void> successCallback = new SuccessCallback<BarrierOutput, Void>(
- deviceContext, requestContext, listenableBarrierOutputOFJava) {
+ // FIXME: should be submitted through OutboundQueue
+ final Future<RpcResult<BarrierOutput>> barrierOutputOFJava = getPrimaryConnectionAdapter()
+ .barrier(barrierInputOFJava);
+ LOG.debug("Barrier with xid {} was sent from controller.", xid);
- @Override
- public RpcResult<Void> transform(RpcResult<BarrierOutput> rpcResult) {
- //no transformation, because output for request context is Void
- LOG.debug("Barrier reply with xid {} was obtained by controller.", rpcResult.getResult().getXid());
- return RpcResultBuilder.<Void>success().build();
- }
- };
- Futures.addCallback(listenableBarrierOutputOFJava, successCallback);
- } else {
- getMessageSpy().spyMessage(requestContext, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
- }
+ ListenableFuture<RpcResult<BarrierOutput>> listenableBarrierOutputOFJava = JdkFutureAdapters
+ .listenInPoolThread(barrierOutputOFJava);
- //callback on request context future
- Futures.addCallback(sendBarrierOutput, new FutureCallback<RpcResult<Void>>() {
+ // callback on OF JAVA future
+ SuccessCallback<BarrierOutput, Void> successCallback = new SuccessCallback<BarrierOutput, Void>(
+ deviceContext, requestContext, listenableBarrierOutputOFJava) {
@Override
- public void onSuccess(RpcResult<Void> result) {
+ public RpcResult<Void> transform(final RpcResult<BarrierOutput> rpcResult) {
+ //no transformation, because output for request context is Void
+ LOG.debug("Barrier reply with xid {} was obtained by controller.", rpcResult.getResult().getXid());
+ return RpcResultBuilder.<Void>success().build();
}
+ };
+ Futures.addCallback(listenableBarrierOutputOFJava, successCallback);
- @Override
- public void onFailure(Throwable t) {
- if (sendBarrierOutput.isCancelled()) {
- requestContext.getFuture().set(
- RpcResultBuilder.<Void>failed()
- .withError(ErrorType.APPLICATION, "Barrier response wasn't obtained until barrier.")
- .build());
- LOG.debug("Barrier reply with xid {} wasn't obtained by controller.", requestContext.getXid());
-
- }
- }
- });
-
- return sendBarrierOutput;
-
+ return requestContext.getFuture();
}
-
}
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Future;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInputBuilder;
import org.opendaylight.yangtools.yang.common.RpcResult;
-/**
- * @author joe
- */
public class NodeConfigServiceImpl extends CommonService implements NodeConfigService {
- private final RequestContextStack requestContextStack;
+ // FIXME: should be only in CommonService
private final DeviceContext deviceContext;
public NodeConfigServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
super(requestContextStack, deviceContext);
- this.requestContextStack = requestContextStack;
this.deviceContext = deviceContext;
}
-
@Override
public Future<RpcResult<SetConfigOutput>> setConfig(final SetConfigInput input) {
- final RequestContext requestContext = requestContextStack.createRequestContext();
- final SettableFuture<RpcResult<SetConfigOutput>> result = requestContextStack.storeOrFail(requestContext);
- if (!result.isDone()) {
- SetConfigInputBuilder builder = new SetConfigInputBuilder();
- SwitchConfigFlag flag = SwitchConfigFlag.valueOf(input.getFlag());
- final Long reserverXid = deviceContext.getReservedXid();
- if (null == reserverXid){
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
- return result;
- }
+ final RequestContext<SetConfigOutput> requestContext = createRequestContext();
+ if (requestContext == null) {
+ return failedFuture();
+ }
+
+ SetConfigInputBuilder builder = new SetConfigInputBuilder();
+ SwitchConfigFlag flag = SwitchConfigFlag.valueOf(input.getFlag());
+ final Long reserverXid = deviceContext.getReservedXid();
+ if (null == reserverXid) {
+ return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+ }
- final Xid xid = new Xid(reserverXid);
- builder.setXid(xid.getValue());
- builder.setFlags(flag);
- builder.setMissSendLen(input.getMissSearchLength());
- builder.setVersion(getVersion());
- ListenableFuture<RpcResult<Void>> futureResultFromOfLib;
- synchronized (deviceContext) {
- futureResultFromOfLib = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter().setConfig(builder.build()));
- }
- OFJResult2RequestCtxFuture<SetConfigOutput> OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext);
- OFJResult2RequestCtxFuture.processResultFromOfJava(futureResultFromOfLib);
- } else {
- RequestContextUtil.closeRequstContext(requestContext);
+ final Xid xid = new Xid(reserverXid);
+ builder.setXid(xid.getValue());
+ builder.setFlags(flag);
+ builder.setMissSendLen(input.getMissSearchLength());
+ builder.setVersion(getVersion());
+ ListenableFuture<RpcResult<Void>> futureResultFromOfLib;
+ synchronized (deviceContext) {
+ futureResultFromOfLib = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter().setConfig(builder.build()));
}
- return result;
+ OFJResult2RequestCtxFuture<SetConfigOutput> OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext);
+ OFJResult2RequestCtxFuture.processResultFromOfJava(futureResultFromOfLib);
+ return requestContext.getFuture();
}
}
*/
package org.opendaylight.openflowplugin.impl.services;
+import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
}
- public static void closeRequestContextWithRpcError(final RequestContext<?> requestContext, String errorMessage) {
-
- RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed().withRpcError(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "", errorMessage));
+ public static <T> SettableFuture<RpcResult<T>> closeRequestContextWithRpcError(final RequestContext<T> requestContext, final String errorMessage) {
+ RpcResultBuilder<T> rpcResultBuilder = RpcResultBuilder.<T>failed().withRpcError(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "", errorMessage));
requestContext.getFuture().set(rpcResultBuilder.build());
closeRequstContext(requestContext);
+ return requestContext.getFuture();
}
public static void closeRequstContext(final RequestContext<?> requestContext) {
*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Future;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
@Override
public Future<RpcResult<SendEchoOutput>> sendEcho(final SendEchoInput sendEchoInput) {
final RequestContext<SendEchoOutput> requestContext = getRequestContextStack().createRequestContext();
- final SettableFuture<RpcResult<SendEchoOutput>> sendEchoOutput = getRequestContextStack()
- .storeOrFail(requestContext);
- if (!sendEchoOutput.isDone()) {
- final DeviceContext deviceContext = getDeviceContext();
- Long reserverXid = deviceContext.getReservedXid();
- if (null == reserverXid) {
- if (null == reserverXid) {
- reserverXid = deviceContext.getReservedXid();
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
- return sendEchoOutput;
- }
- }
- final Xid xid = new Xid(reserverXid);
- requestContext.setXid(xid);
-
- LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
- deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+ if (requestContext == null) {
+ getMessageSpy().spyMessage(null, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+ return failedFuture();
+ }
- final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
- echoInputOFJavaBuilder.setVersion(getVersion());
- echoInputOFJavaBuilder.setXid(xid.getValue());
- echoInputOFJavaBuilder.setData(sendEchoInput.getData());
- final EchoInput echoInputOFJava = echoInputOFJavaBuilder.build();
- final Future<RpcResult<EchoOutput>> rpcEchoOutputOFJava = getPrimaryConnectionAdapter()
- .echo(echoInputOFJava);
- LOG.debug("Echo with xid {} was sent from controller", xid);
+ final DeviceContext deviceContext = getDeviceContext();
+ Long reserverXid = deviceContext.getReservedXid();
+ if (null == reserverXid) {
+ reserverXid = deviceContext.getReservedXid();
+ return RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+ }
+ final Xid xid = new Xid(reserverXid);
+ requestContext.setXid(xid);
- ListenableFuture<RpcResult<EchoOutput>> listenableRpcEchoOutputOFJava = JdkFutureAdapters
- .listenInPoolThread(rpcEchoOutputOFJava);
+ LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
+ deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
- // callback on OF JAVA future
- SuccessCallback<EchoOutput, SendEchoOutput> successCallback = new SuccessCallback<EchoOutput, SendEchoOutput>(
- deviceContext, requestContext, listenableRpcEchoOutputOFJava) {
+ final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
+ echoInputOFJavaBuilder.setVersion(getVersion());
+ echoInputOFJavaBuilder.setXid(xid.getValue());
+ echoInputOFJavaBuilder.setData(sendEchoInput.getData());
+ final EchoInput echoInputOFJava = echoInputOFJavaBuilder.build();
- @Override
- public RpcResult<SendEchoOutput> transform(RpcResult<EchoOutput> rpcResult) {
- EchoOutput echoOutputOFJava = rpcResult.getResult();
- SendEchoOutputBuilder sendEchoOutputBuilder = new SendEchoOutputBuilder();
- sendEchoOutputBuilder.setData(echoOutputOFJava.getData());
+ // FIXME: should be submitted via OutboundQueue
+ final Future<RpcResult<EchoOutput>> rpcEchoOutputOFJava = getPrimaryConnectionAdapter()
+ .echo(echoInputOFJava);
+ LOG.debug("Echo with xid {} was sent from controller", xid);
- LOG.debug("Echo with xid {} was received by controller.", rpcResult.getResult().getXid());
- return RpcResultBuilder.success(sendEchoOutputBuilder.build()).build();
- }
- };
- Futures.addCallback(listenableRpcEchoOutputOFJava, successCallback);
- } else {
- getMessageSpy().spyMessage(requestContext, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
- }
+ ListenableFuture<RpcResult<EchoOutput>> listenableRpcEchoOutputOFJava = JdkFutureAdapters
+ .listenInPoolThread(rpcEchoOutputOFJava);
- // callback on request context future
- Futures.addCallback(sendEchoOutput, new FutureCallback<RpcResult<SendEchoOutput>>() {
+ // callback on OF JAVA future
+ SuccessCallback<EchoOutput, SendEchoOutput> successCallback = new SuccessCallback<EchoOutput, SendEchoOutput>(
+ deviceContext, requestContext, listenableRpcEchoOutputOFJava) {
@Override
- public void onSuccess(RpcResult<SendEchoOutput> result) {
- }
+ public RpcResult<SendEchoOutput> transform(final RpcResult<EchoOutput> rpcResult) {
+ EchoOutput echoOutputOFJava = rpcResult.getResult();
+ SendEchoOutputBuilder sendEchoOutputBuilder = new SendEchoOutputBuilder();
+ sendEchoOutputBuilder.setData(echoOutputOFJava.getData());
- @Override
- public void onFailure(Throwable t) {
- if (sendEchoOutput.isCancelled()) {
- requestContext.getFuture().set(
- RpcResultBuilder.<SendEchoOutput>failed()
- .withError(ErrorType.APPLICATION, "Echo response wasn't obtained until barrier.")
- .build());
- LOG.debug("Echo reply with xid {} wasn't received by controller until barrier.",
- requestContext.getXid());
- }
+ LOG.debug("Echo with xid {} was received by controller.", rpcResult.getResult().getXid());
+ return RpcResultBuilder.success(sendEchoOutputBuilder.build()).build();
}
- });
+ };
+ Futures.addCallback(listenableRpcEchoOutputOFJava, successCallback);
- return sendEchoOutput;
+ return requestContext.getFuture();
}
-
}
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
-import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
+import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StatisticsContextImpl implements StatisticsContext {
private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
- public static final String CONNECTION_CLOSED = "Connection closed.";
+ private static final String CONNECTION_CLOSED = "Connection closed.";
private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
private final DeviceContext deviceContext;
-
private final StatisticsGatheringService statisticsGatheringService;
public StatisticsContextImpl(final DeviceContext deviceContext) {
this.deviceContext = deviceContext;
statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
-
}
@Override
return resultingFuture;
}
- @Override
- public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
- requestContexts.remove(requestContext);
- }
-
- @Override
- public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> data) {
- requestContexts.add(data);
- return data.getFuture();
- }
-
@Override
public <T> RequestContext<T> createRequestContext() {
- return new RequestContextImpl<>(this);
+ final AbstractRequestContext<T> ret = new AbstractRequestContext<T>() {
+ @Override
+ public void close() {
+ requestContexts.remove(this);
+ }
+ };
+ requestContexts.add(ret);
+ return ret;
}
@Override
- public void close() throws Exception {
- for (final RequestContext requestContext : requestContexts) {
+ public void close() {
+ for (final RequestContext<?> requestContext : requestContexts) {
RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
}
}
*/
package org.opendaylight.openflowplugin.api.openflow.device;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Iterator;
-import java.util.concurrent.Future;
+import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.impl.rpc.RpcContextImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
/**
* @author joe
@Mock
private MessageSpy messageSpy;
- private RpcContext rpcContext;
-
- private static final String QUEUE_IS_FULL = "Device's request queue is full.";
+ private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
@Before
public void setup() {
NodeId nodeId = new NodeId("openflow:1");
- KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
-
- rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier);
+ nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
}
@Test
@Test
public void testStoreOrFail() throws Exception {
+ final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 100);
RequestContext requestContext = rpcContext.createRequestContext();
- rpcContext.setRequestContextQuota(100);
- Future<RpcResult<UpdateFlowOutput>> resultFuture = rpcContext.storeOrFail(requestContext);
- assertNotNull(resultFuture);
- assertFalse(resultFuture.isDone());
+ assertNotNull(requestContext);
+
}
@Test
public void testStoreOrFailThatFails() throws Exception {
+ final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, nodeInstanceIdentifier, 0);
RequestContext requestContext = rpcContext.createRequestContext();
- rpcContext.setRequestContextQuota(0);
- Future<RpcResult<UpdateFlowOutput>> resultFuture = rpcContext.storeOrFail(requestContext);
- assertNotNull(resultFuture);
- assertTrue(resultFuture.isDone());
- RpcResult<UpdateFlowOutput> updateFlowOutputRpcResult = resultFuture.get();
- assertNotNull(updateFlowOutputRpcResult);
- assertEquals(1, updateFlowOutputRpcResult.getErrors().size());
- Iterator<RpcError> iterator = updateFlowOutputRpcResult.getErrors().iterator();
- assertEquals(QUEUE_IS_FULL, iterator.next().getMessage());
+ assertNull(requestContext);
}
-
}
// TODO: how to invoke service remotely?
NodeId nodeId = new NodeId("openflow:1");
KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
- final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, nodeInstanceIdentifier);
+ final RpcContextImpl rpcContext = new RpcContextImpl(messageSpy, mockedProviderContext, nodeInstanceIdentifier, capacity);
when(mockedProviderContext.getRpcService(SalFlowService.class)).thenReturn(new SalFlowServiceImpl(rpcContext, mockedDeviceContext));
- rpcContext.setRequestContextQuota(capacity);
final SalFlowService salFlowService = mockedProviderContext.getRpcService(SalFlowService.class);
final Future<RpcResult<AddFlowOutput>> addedFlow = salFlowService.addFlow(prepareTestingAddFlow());
package org.opendaylight.openflowplugin.impl.rpc;
import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
@RunWith(MockitoJUnitRunner.class)
-public class RequestContextImplTest {
-
- @Mock
- RpcContext rpcContext;
-
- RequestContext requestContext;
+public class AbstractRequestContextTest {
+ private AbstractRequestContext<Object> requestContext;
@Before
public void setup() {
- requestContext = new RequestContextImpl<>(rpcContext);
+ requestContext = new AbstractRequestContext<Object>() {
+ @Override
+ public void close() {
+ // No-op
+ }
+ };
}
@Test
public void testCreateRequestFuture() throws Exception {
- SettableFuture future = requestContext.getFuture();
+ Future<?> future = requestContext.getFuture();
assertNotNull(future);
}
-
- @Test
- public void testClose() throws Exception {
- requestContext.close();
- verify(rpcContext).forgetRequestContext(Matchers.any(RequestContext.class));
- }
-
}
\ No newline at end of file