From: miroslav.kovac Date: Tue, 29 Mar 2016 14:44:08 +0000 (+0200) Subject: BUG 5634 : Implement concurrent message limit X-Git-Tag: release/boron~67^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=fa157675e0352725df2af31a75a9f49bf4e6c2b7;p=netconf.git BUG 5634 : Implement concurrent message limit This adds optional feature of limiting maximum concurrently sent rpc requests. When we reach limit, next request will fail. Queue is freed when reply message is received. Change-Id: I5c49206ab929300ac973b12d5ccee58c70536c41 Signed-off-by: miroslav.kovac Signed-off-by: Jakub Morvay --- diff --git a/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java b/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java index 477c3b661f..6d5cdf898b 100644 --- a/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java +++ b/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java @@ -88,6 +88,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L; protected static final int DEFAULT_KEEPALIVE_DELAY = 0; protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false; + protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0; private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0; private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000; private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L; @@ -325,13 +326,18 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin } final Optional userCapabilities = getUserCapabilities(node); + final int rpcMessageLimit = + node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit(); + + if (rpcMessageLimit < 1) { + LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId); + } return new NetconfConnectorDTO( userCapabilities.isPresent() ? new NetconfDeviceCommunicator( - remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())): - new NetconfDeviceCommunicator(remoteDeviceId, device) - , salFacade); + remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride()), rpcMessageLimit): + new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade); } protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) { diff --git a/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java b/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java index 1612c7d9fc..ec6e4fec33 100644 --- a/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java +++ b/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java @@ -161,7 +161,14 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade, processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context()); - return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade); + final int rpcMessageLimit = + node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit(); + + if (rpcMessageLimit < 1) { + LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId); + } + + return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService, rpcMessageLimit), salFacade); } @Override diff --git a/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java b/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java index 0b191974d7..061024c1ca 100644 --- a/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java +++ b/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java @@ -26,8 +26,8 @@ public class ClusteredNetconfDeviceCommunicator extends NetconfDeviceCommunicato private final ArrayList netconfClientSessionListeners = new ArrayList<>(); private EntityOwnershipListenerRegistration ownershipListenerRegistration = null; - public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService) { - super(id, remoteDevice); + public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService, final int rpcMessageLimit) { + super(id, remoteDevice, rpcMessageLimit); this.ownershipService = ownershipService; } diff --git a/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicatorTest.java b/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicatorTest.java index 64fe5fad50..aa6b69da51 100644 --- a/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicatorTest.java +++ b/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicatorTest.java @@ -57,7 +57,7 @@ public class ClusteredNetconfDeviceCommunicatorTest { doReturn(ownershipListenerRegistration).when(ownershipService).registerListener( "netconf-node/" + REMOTE_DEVICE_ID.getName(), remoteDevice); - communicator = new ClusteredNetconfDeviceCommunicator(REMOTE_DEVICE_ID, remoteDevice, ownershipService); + communicator = new ClusteredNetconfDeviceCommunicator(REMOTE_DEVICE_ID, remoteDevice, ownershipService, 10); communicator.registerNetconfClientSessionListener(listener1); communicator.registerNetconfClientSessionListener(listener2); } diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java index 47da05f36b..79a439c0eb 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.config.yang.md.sal.connector.netconf; +import static com.google.common.base.Preconditions.checkArgument; import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition; import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull; @@ -294,10 +295,14 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co .setSalFacade(salFacade) .build(); + if (getConcurrentRpcLimit() < 1) { + LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", id); + } + final NetconfDeviceCommunicator listener = userCapabilities.isPresent() ? new NetconfDeviceCommunicator(id, device, - new UserPreferences(userCapabilities.get(), getYangModuleCapabilities().getOverride())): - new NetconfDeviceCommunicator(id, device); + new UserPreferences(userCapabilities.get(), getYangModuleCapabilities().getOverride()), getConcurrentRpcLimit()): + new NetconfDeviceCommunicator(id, device, getConcurrentRpcLimit()); if (shouldSendKeepalive()) { ((KeepaliveSalFacade) salFacade).setListener(listener); diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index e979071d87..4fbd6f6241 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -20,6 +20,7 @@ import java.util.ArrayDeque; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.controller.config.util.xml.XmlElement; @@ -53,7 +54,9 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, protected final RemoteDeviceId id; private final Lock sessionLock = new ReentrantLock(); - // TODO implement concurrent message limit + private final Semaphore semaphore; + private final int concurentRpcMsgs; + private final Queue requests = new ArrayDeque<>(); private NetconfClientSession session; @@ -61,21 +64,24 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private SettableFuture firstConnectionFuture; public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final UserPreferences NetconfSessionPreferences) { - this(id, remoteDevice, Optional.of(NetconfSessionPreferences)); + final UserPreferences NetconfSessionPreferences, final int rpcMessageLimit) { + this(id, remoteDevice, Optional.of(NetconfSessionPreferences), rpcMessageLimit); } public NetconfDeviceCommunicator(final RemoteDeviceId id, - final RemoteDevice remoteDevice) { - this(id, remoteDevice, Optional.absent()); + final RemoteDevice remoteDevice, + final int rpcMessageLimit) { + this(id, remoteDevice, Optional.absent(), rpcMessageLimit); } private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final Optional overrideNetconfCapabilities) { + final Optional overrideNetconfCapabilities, final int rpcMessageLimit) { + this.concurentRpcMsgs = rpcMessageLimit; this.id = id; this.remoteDevice = remoteDevice; this.overrideNetconfCapabilities = overrideNetconfCapabilities; this.firstConnectionFuture = SettableFuture.create(); + this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null; } @Override @@ -245,6 +251,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, request = requests.peek(); if (request != null && request.future.isUncancellable()) { requests.poll(); + // we have just removed one request from the queue + // we can also release one permit + if(semaphore != null) { + semaphore.release(); + } } else { request = null; LOG.warn("{}: Ignoring unsolicited message {}", id, @@ -302,8 +313,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, @Override public ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { sessionLock.lock(); + + if (semaphore != null && !semaphore.tryAcquire()) { + LOG.warn("Limit of concurrent rpc messages was reached (limit :" + + concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName()); + sessionLock.unlock(); + return Futures.immediateFailedFuture(new NetconfDocumentedException("Limit of rpc messages was reached (Limit :" + + concurentRpcMsgs + ") waiting for emptying the queue of Netconf device with id" + id.getName())); + } + try { - return sendRequestWithLock( message, rpc ); + return sendRequestWithLock(message, rpc); } finally { sessionLock.unlock(); } diff --git a/netconf/sal-netconf-connector/src/main/yang/netconf-node-topology.yang b/netconf/sal-netconf-connector/src/main/yang/netconf-node-topology.yang index 2c327d495e..94ce4e7921 100644 --- a/netconf/sal-netconf-connector/src/main/yang/netconf-node-topology.yang +++ b/netconf/sal-netconf-connector/src/main/yang/netconf-node-topology.yang @@ -118,6 +118,14 @@ module netconf-node-topology { description "Netconf connector sends keepalive RPCs while the session is idle, this delay specifies the delay between keepalive RPC in seconds If a value <1 is provided, no keepalives will be sent"; } + + leaf concurrent-rpc-limit { + config true; + type uint16; + default 0; + description "Limit of concurrent messages that can be send before reply messages are received. + If value <1 is provided, no limit will be enforced"; + } } grouping netconf-node-connection-status { diff --git a/netconf/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang b/netconf/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang index a7bb082714..846c8fe65a 100644 --- a/netconf/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang +++ b/netconf/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang @@ -63,6 +63,13 @@ module odl-sal-netconf-connector-cfg { type string; } + leaf concurrent-rpc-limit { + type uint16; + default 0; + description "Limit of concurrent messages that can be send before reply messages are received. + If value less than 1 is provided, no limit will be enforced"; + } + leaf schema-cache-directory { type string; default "schema"; diff --git a/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java b/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java index 6f69c84e32..7938f429a6 100644 --- a/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java +++ b/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java @@ -11,6 +11,7 @@ package org.opendaylight.netconf.sal.connect.netconf.listener; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; @@ -37,6 +38,7 @@ import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GlobalEventExecutor; import java.io.ByteArrayInputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.UUID; @@ -84,7 +86,7 @@ public class NetconfDeviceCommunicatorTest { public void setUp() throws Exception { MockitoAnnotations.initMocks( this ); - communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice); + communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice, 10); } void setupSession() { @@ -95,11 +97,11 @@ public class NetconfDeviceCommunicatorTest { } private ListenableFuture> sendRequest() throws Exception { - return sendRequest( UUID.randomUUID().toString() ); + return sendRequest( UUID.randomUUID().toString(), true ); } @SuppressWarnings("unchecked") - private ListenableFuture> sendRequest( final String messageID ) throws Exception { + private ListenableFuture> sendRequest( final String messageID, final boolean doLastTest ) throws Exception { Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument(); Element element = doc.createElement( "request" ); element.setAttribute( "message-id", messageID ); @@ -113,8 +115,9 @@ public class NetconfDeviceCommunicatorTest { ListenableFuture> resultFuture = communicator.sendRequest( message, QName.create( "mock rpc" ) ); - - assertNotNull( "ListenableFuture is null", resultFuture ); + if(doLastTest) { + assertNotNull("ListenableFuture is null", resultFuture); + } return resultFuture; } @@ -301,13 +304,13 @@ public class NetconfDeviceCommunicatorTest { setupSession(); String messageID1 = UUID.randomUUID().toString(); - ListenableFuture> resultFuture1 = sendRequest( messageID1 ); + ListenableFuture> resultFuture1 = sendRequest( messageID1, true ); String messageID2 = UUID.randomUUID().toString(); - ListenableFuture> resultFuture2 = sendRequest( messageID2 ); + ListenableFuture> resultFuture2 = sendRequest( messageID2, true ); String messageID3 = UUID.randomUUID().toString(); - ListenableFuture> resultFuture3 = sendRequest( messageID3 ); + ListenableFuture> resultFuture3 = sendRequest( messageID3, true ); //response messages 1,2 are omitted communicator.onMessage( mockSession, createSuccessResponseMessage( messageID3 ) ); @@ -320,10 +323,10 @@ public class NetconfDeviceCommunicatorTest { setupSession(); String messageID1 = UUID.randomUUID().toString(); - ListenableFuture> resultFuture1 = sendRequest( messageID1 ); + ListenableFuture> resultFuture1 = sendRequest( messageID1, true ); String messageID2 = UUID.randomUUID().toString(); - ListenableFuture> resultFuture2 = sendRequest( messageID2 ); + ListenableFuture> resultFuture2 = sendRequest( messageID2, true ); communicator.onMessage( mockSession, createSuccessResponseMessage( messageID1 ) ); communicator.onMessage( mockSession, createSuccessResponseMessage( messageID2 ) ); @@ -337,7 +340,7 @@ public class NetconfDeviceCommunicatorTest { setupSession(); String messageID = UUID.randomUUID().toString(); - ListenableFuture> resultFuture = sendRequest( messageID ); + ListenableFuture> resultFuture = sendRequest( messageID, true ); communicator.onMessage( mockSession, createErrorResponseMessage( messageID ) ); @@ -381,7 +384,7 @@ public class NetconfDeviceCommunicatorTest { final EventLoopGroup group = new NioEventLoopGroup(); final Timer time = new HashedWheelTimer(); try { - final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), device); + final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), device, 10); final NetconfReconnectingClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create() .withAddress(new InetSocketAddress("localhost", 65000)) .withReconnectStrategy(reconnectStrategy) @@ -411,7 +414,7 @@ public class NetconfDeviceCommunicatorTest { setupSession(); String messageID = UUID.randomUUID().toString(); - ListenableFuture> resultFuture = sendRequest( messageID ); + ListenableFuture> resultFuture = sendRequest( messageID, true ); communicator.onMessage( mockSession, createSuccessResponseMessage( UUID.randomUUID().toString() ) ); @@ -428,6 +431,27 @@ public class NetconfDeviceCommunicatorTest { errorInfo.contains( "expected-message-id" ) ); } + @Test + public void testConcurrentMessageLimit() throws Exception { + setupSession(); + ArrayList messageID = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + messageID.add(UUID.randomUUID().toString()); + ListenableFuture> resultFuture = sendRequest(messageID.get(i), false); + assertEquals("ListenableFuture is null", true, resultFuture instanceof UncancellableFuture); + } + + final String notWorkingMessageID = UUID.randomUUID().toString(); + ListenableFuture> resultFuture = sendRequest(notWorkingMessageID, false); + assertEquals("ListenableFuture is null", false, resultFuture instanceof UncancellableFuture); + + communicator.onMessage(mockSession, createSuccessResponseMessage(messageID.get(0))); + + resultFuture = sendRequest(messageID.get(0), false); + assertNotNull("ListenableFuture is null", resultFuture); + } + private static NetconfMessage createErrorResponseMessage( final String messageID ) throws Exception { String xmlStr = "{ final NetconfClientDispatcherImpl netconfClientDispatcher, final List preparedMessages) { this.params = params; - this.sessionListener = getSessionListener(params.getInetAddress()); + this.sessionListener = getSessionListener(params.getInetAddress(), params.concurrentMessageLimit); this.netconfClientDispatcher = netconfClientDispatcher; cfg = getNetconfClientConfiguration(this.params, this.sessionListener); @@ -73,9 +73,9 @@ public class StressClientCallable implements Callable{ } } - private static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) { + private static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress, final int messageLimit) { final RemoteDevice loggingRemoteDevice = new StressClient.LoggingRemoteDevice(); - return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice); + return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice, messageLimit); } private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {