protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+ protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
}
final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+ final int rpcMessageLimit =
+ node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+ if (rpcMessageLimit < 1) {
+ LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+ }
return new NetconfConnectorDTO(
userCapabilities.isPresent() ?
new NetconfDeviceCommunicator(
- remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())):
- new NetconfDeviceCommunicator(remoteDeviceId, device)
- , salFacade);
+ remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride()), rpcMessageLimit):
+ new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
}
protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
- return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
+ final int rpcMessageLimit =
+ node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+ if (rpcMessageLimit < 1) {
+ LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+ }
+
+ return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService, rpcMessageLimit), salFacade);
}
@Override
private final ArrayList<NetconfClientSessionListener> netconfClientSessionListeners = new ArrayList<>();
private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
- public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService) {
- super(id, remoteDevice);
+ public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService, final int rpcMessageLimit) {
+ super(id, remoteDevice, rpcMessageLimit);
this.ownershipService = ownershipService;
}
doReturn(ownershipListenerRegistration).when(ownershipService).registerListener(
"netconf-node/" + REMOTE_DEVICE_ID.getName(), remoteDevice);
- communicator = new ClusteredNetconfDeviceCommunicator(REMOTE_DEVICE_ID, remoteDevice, ownershipService);
+ communicator = new ClusteredNetconfDeviceCommunicator(REMOTE_DEVICE_ID, remoteDevice, ownershipService, 10);
communicator.registerNetconfClientSessionListener(listener1);
communicator.registerNetconfClientSessionListener(listener2);
}
*/
package org.opendaylight.controller.config.yang.md.sal.connector.netconf;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition;
import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull;
.setSalFacade(salFacade)
.build();
+ if (getConcurrentRpcLimit() < 1) {
+ LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", id);
+ }
+
final NetconfDeviceCommunicator listener = userCapabilities.isPresent() ?
new NetconfDeviceCommunicator(id, device,
- new UserPreferences(userCapabilities.get(), getYangModuleCapabilities().getOverride())):
- new NetconfDeviceCommunicator(id, device);
+ new UserPreferences(userCapabilities.get(), getYangModuleCapabilities().getOverride()), getConcurrentRpcLimit()):
+ new NetconfDeviceCommunicator(id, device, getConcurrentRpcLimit());
if (shouldSendKeepalive()) {
((KeepaliveSalFacade) salFacade).setListener(listener);
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.opendaylight.controller.config.util.xml.XmlElement;
protected final RemoteDeviceId id;
private final Lock sessionLock = new ReentrantLock();
- // TODO implement concurrent message limit
+ private final Semaphore semaphore;
+ private final int concurentRpcMsgs;
+
private final Queue<Request> requests = new ArrayDeque<>();
private NetconfClientSession session;
private SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
- final UserPreferences NetconfSessionPreferences) {
- this(id, remoteDevice, Optional.of(NetconfSessionPreferences));
+ final UserPreferences NetconfSessionPreferences, final int rpcMessageLimit) {
+ this(id, remoteDevice, Optional.of(NetconfSessionPreferences), rpcMessageLimit);
}
public NetconfDeviceCommunicator(final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice) {
- this(id, remoteDevice, Optional.<UserPreferences>absent());
+ final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
+ final int rpcMessageLimit) {
+ this(id, remoteDevice, Optional.<UserPreferences>absent(), rpcMessageLimit);
}
private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
- final Optional<UserPreferences> overrideNetconfCapabilities) {
+ final Optional<UserPreferences> overrideNetconfCapabilities, final int rpcMessageLimit) {
+ this.concurentRpcMsgs = rpcMessageLimit;
this.id = id;
this.remoteDevice = remoteDevice;
this.overrideNetconfCapabilities = overrideNetconfCapabilities;
this.firstConnectionFuture = SettableFuture.create();
+ this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
}
@Override
request = requests.peek();
if (request != null && request.future.isUncancellable()) {
requests.poll();
+ // we have just removed one request from the queue
+ // we can also release one permit
+ if(semaphore != null) {
+ semaphore.release();
+ }
} else {
request = null;
LOG.warn("{}: Ignoring unsolicited message {}", id,
@Override
public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
sessionLock.lock();
+
+ if (semaphore != null && !semaphore.tryAcquire()) {
+ LOG.warn("Limit of concurrent rpc messages was reached (limit :" +
+ concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName());
+ sessionLock.unlock();
+ return Futures.immediateFailedFuture(new NetconfDocumentedException("Limit of rpc messages was reached (Limit :" +
+ concurentRpcMsgs + ") waiting for emptying the queue of Netconf device with id" + id.getName()));
+ }
+
try {
- return sendRequestWithLock( message, rpc );
+ return sendRequestWithLock(message, rpc);
} finally {
sessionLock.unlock();
}
description "Netconf connector sends keepalive RPCs while the session is idle, this delay specifies the delay between keepalive RPC in seconds
If a value <1 is provided, no keepalives will be sent";
}
+
+ leaf concurrent-rpc-limit {
+ config true;
+ type uint16;
+ default 0;
+ description "Limit of concurrent messages that can be send before reply messages are received.
+ If value <1 is provided, no limit will be enforced";
+ }
}
grouping netconf-node-connection-status {
type string;
}
+ leaf concurrent-rpc-limit {
+ type uint16;
+ default 0;
+ description "Limit of concurrent messages that can be send before reply messages are received.
+ If value less than 1 is provided, no limit will be enforced";
+ }
+
leaf schema-cache-directory {
type string;
default "schema";
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.ByteArrayInputStream;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
public void setUp() throws Exception {
MockitoAnnotations.initMocks( this );
- communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice);
+ communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice, 10);
}
void setupSession() {
}
private ListenableFuture<RpcResult<NetconfMessage>> sendRequest() throws Exception {
- return sendRequest( UUID.randomUUID().toString() );
+ return sendRequest( UUID.randomUUID().toString(), true );
}
@SuppressWarnings("unchecked")
- private ListenableFuture<RpcResult<NetconfMessage>> sendRequest( final String messageID ) throws Exception {
+ private ListenableFuture<RpcResult<NetconfMessage>> sendRequest( final String messageID, final boolean doLastTest ) throws Exception {
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
Element element = doc.createElement( "request" );
element.setAttribute( "message-id", messageID );
ListenableFuture<RpcResult<NetconfMessage>> resultFuture =
communicator.sendRequest( message, QName.create( "mock rpc" ) );
-
- assertNotNull( "ListenableFuture is null", resultFuture );
+ if(doLastTest) {
+ assertNotNull("ListenableFuture is null", resultFuture);
+ }
return resultFuture;
}
setupSession();
String messageID1 = UUID.randomUUID().toString();
- ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1 );
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1, true );
String messageID2 = UUID.randomUUID().toString();
- ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2 );
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2, true );
String messageID3 = UUID.randomUUID().toString();
- ListenableFuture<RpcResult<NetconfMessage>> resultFuture3 = sendRequest( messageID3 );
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture3 = sendRequest( messageID3, true );
//response messages 1,2 are omitted
communicator.onMessage( mockSession, createSuccessResponseMessage( messageID3 ) );
setupSession();
String messageID1 = UUID.randomUUID().toString();
- ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1 );
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1, true );
String messageID2 = UUID.randomUUID().toString();
- ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2 );
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2, true );
communicator.onMessage( mockSession, createSuccessResponseMessage( messageID1 ) );
communicator.onMessage( mockSession, createSuccessResponseMessage( messageID2 ) );
setupSession();
String messageID = UUID.randomUUID().toString();
- ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID );
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID, true );
communicator.onMessage( mockSession, createErrorResponseMessage( messageID ) );
final EventLoopGroup group = new NioEventLoopGroup();
final Timer time = new HashedWheelTimer();
try {
- final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), device);
+ final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), device, 10);
final NetconfReconnectingClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create()
.withAddress(new InetSocketAddress("localhost", 65000))
.withReconnectStrategy(reconnectStrategy)
setupSession();
String messageID = UUID.randomUUID().toString();
- ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID );
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID, true );
communicator.onMessage( mockSession, createSuccessResponseMessage( UUID.randomUUID().toString() ) );
errorInfo.contains( "expected-message-id" ) );
}
+ @Test
+ public void testConcurrentMessageLimit() throws Exception {
+ setupSession();
+ ArrayList<String> messageID = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ messageID.add(UUID.randomUUID().toString());
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest(messageID.get(i), false);
+ assertEquals("ListenableFuture is null", true, resultFuture instanceof UncancellableFuture);
+ }
+
+ final String notWorkingMessageID = UUID.randomUUID().toString();
+ ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest(notWorkingMessageID, false);
+ assertEquals("ListenableFuture is null", false, resultFuture instanceof UncancellableFuture);
+
+ communicator.onMessage(mockSession, createSuccessResponseMessage(messageID.get(0)));
+
+ resultFuture = sendRequest(messageID.get(0), false);
+ assertNotNull("ListenableFuture is null", resultFuture);
+ }
+
private static NetconfMessage createErrorResponseMessage( final String messageID ) throws Exception {
String xmlStr =
"<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"" +
private final NetconfClientDispatcherImpl netconfClientDispatcher;
private static final String CACHE = "cache/schema";
+ private static final int RPC_LIMIT = 0;
// Connection
private NetconfDeviceConnectionHandler handler;
.setId(deviceId)
.setSalFacade(handler)
.build();
- listener = new NetconfDeviceCommunicator(deviceId, device);
+ listener = new NetconfDeviceCommunicator(deviceId, device, RPC_LIMIT);
+
configBuilder.withSessionListener(listener);
listener.initializeRemoteConnection(netconfClientDispatcher, configBuilder.build());
}
@Arg(dest = "thread-amount")
public int threadAmount;
+ @Arg(dest = "concurrent-message-limit")
+ public int concurrentMessageLimit;
+
static ArgumentParser getParser() {
final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
.setDefault(1)
.dest("thread-amount");
+ parser.addArgument("--concurrent-message-limit")
+ .type(Integer.class)
+ .setDefault(0)
+ .help("Number of rpc messages that can be sent before receiving reply to them.")
+ .dest("concurrent-message-limit");
+
return parser;
}
final NetconfClientDispatcherImpl netconfClientDispatcher,
final List<NetconfMessage> preparedMessages) {
this.params = params;
- this.sessionListener = getSessionListener(params.getInetAddress());
+ this.sessionListener = getSessionListener(params.getInetAddress(), params.concurrentMessageLimit);
this.netconfClientDispatcher = netconfClientDispatcher;
cfg = getNetconfClientConfiguration(this.params, this.sessionListener);
}
}
- private static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
+ private static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress, final int messageLimit) {
final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new StressClient.LoggingRemoteDevice();
- return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+ return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice, messageLimit);
}
private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {