BUG 5634 : Implement concurrent message limit 45/37245/22
authormiroslav.kovac <miroslav.kovac@pantheon.tech>
Tue, 29 Mar 2016 14:44:08 +0000 (16:44 +0200)
committerJakub Morvay <jmorvay@cisco.com>
Thu, 16 Jun 2016 13:50:47 +0000 (13:50 +0000)
This adds optional feature of limiting maximum concurrently sent rpc
requests. When we reach limit, next request will fail. Queue is freed
when reply message is received.

Change-Id: I5c49206ab929300ac973b12d5ccee58c70536c41
Signed-off-by: miroslav.kovac <miroslav.kovac@pantheon.tech>
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
12 files changed:
netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java
netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java
netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java
netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicatorTest.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java
netconf/sal-netconf-connector/src/main/yang/netconf-node-topology.yang
netconf/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang
netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java
netconf/tools/netconf-cli/src/main/java/org/opendaylight/netconf/cli/NetconfDeviceConnectionManager.java
netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/client/stress/Parameters.java
netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/client/stress/StressClientCallable.java

index 477c3b661f36c544873cafb06c529ccf0cf12151..6d5cdf898b68539ecef02d8ed7fc64e214651a2d 100644 (file)
@@ -88,6 +88,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
     protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
     protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+    protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
     private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
     private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
     private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
@@ -325,13 +326,18 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
 
         final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+        final int rpcMessageLimit =
+                node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+        if (rpcMessageLimit < 1) {
+            LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+        }
 
         return new NetconfConnectorDTO(
                 userCapabilities.isPresent() ?
                         new NetconfDeviceCommunicator(
-                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())):
-                        new NetconfDeviceCommunicator(remoteDeviceId, device)
-                , salFacade);
+                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride()), rpcMessageLimit):
+                        new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
     }
 
     protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
index 1612c7d9fccb23af2e7238145a91d232bc722772..ec6e4fec333b8921c00d4d6e57d12de75674dc51 100644 (file)
@@ -161,7 +161,14 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
         final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
                 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
 
-        return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
+        final int rpcMessageLimit =
+                node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+        if (rpcMessageLimit < 1) {
+            LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+        }
+
+        return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService, rpcMessageLimit), salFacade);
     }
 
     @Override
index 0b191974d761e352de8b35e20a01c5e2251df3de..061024c1caa9478fc2fc7c50303e3409ab97a86b 100644 (file)
@@ -26,8 +26,8 @@ public class ClusteredNetconfDeviceCommunicator extends NetconfDeviceCommunicato
     private final ArrayList<NetconfClientSessionListener> netconfClientSessionListeners = new ArrayList<>();
     private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
 
-    public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService) {
-        super(id, remoteDevice);
+    public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService, final int rpcMessageLimit) {
+        super(id, remoteDevice, rpcMessageLimit);
         this.ownershipService = ownershipService;
     }
 
index 64fe5fad502207fce11d1ce62b81216a7d0bc703..aa6b69da51e5ccc3727e01f0c629aaa4e8c8eb65 100644 (file)
@@ -57,7 +57,7 @@ public class ClusteredNetconfDeviceCommunicatorTest {
         doReturn(ownershipListenerRegistration).when(ownershipService).registerListener(
                 "netconf-node/" + REMOTE_DEVICE_ID.getName(), remoteDevice);
 
-        communicator = new ClusteredNetconfDeviceCommunicator(REMOTE_DEVICE_ID, remoteDevice, ownershipService);
+        communicator = new ClusteredNetconfDeviceCommunicator(REMOTE_DEVICE_ID, remoteDevice, ownershipService, 10);
         communicator.registerNetconfClientSessionListener(listener1);
         communicator.registerNetconfClientSessionListener(listener2);
     }
index 47da05f36be0ce3c1af0858b776f7a61f6d3045a..79a439c0ebc5984f00c0a70616fcbd7cc9920833 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.config.yang.md.sal.connector.netconf;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition;
 import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull;
 
@@ -294,10 +295,14 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
                 .setSalFacade(salFacade)
                 .build();
 
+        if (getConcurrentRpcLimit() < 1) {
+            LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", id);
+        }
+
         final NetconfDeviceCommunicator listener = userCapabilities.isPresent() ?
                 new NetconfDeviceCommunicator(id, device,
-                        new UserPreferences(userCapabilities.get(), getYangModuleCapabilities().getOverride())):
-                new NetconfDeviceCommunicator(id, device);
+                        new UserPreferences(userCapabilities.get(), getYangModuleCapabilities().getOverride()), getConcurrentRpcLimit()):
+                new NetconfDeviceCommunicator(id, device, getConcurrentRpcLimit());
 
         if (shouldSendKeepalive()) {
             ((KeepaliveSalFacade) salFacade).setListener(listener);
index e979071d873f3c7c2770c545cfa9cbfc65480cc9..4fbd6f624145eac163c8bdb2cc61b3384782907d 100644 (file)
@@ -20,6 +20,7 @@ import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.opendaylight.controller.config.util.xml.XmlElement;
@@ -53,7 +54,9 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
     protected final RemoteDeviceId id;
     private final Lock sessionLock = new ReentrantLock();
 
-    // TODO implement concurrent message limit
+    private final Semaphore semaphore;
+    private final int concurentRpcMsgs;
+
     private final Queue<Request> requests = new ArrayDeque<>();
     private NetconfClientSession session;
 
@@ -61,21 +64,24 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
     private SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
 
     public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
-            final UserPreferences NetconfSessionPreferences) {
-        this(id, remoteDevice, Optional.of(NetconfSessionPreferences));
+            final UserPreferences NetconfSessionPreferences, final int rpcMessageLimit) {
+        this(id, remoteDevice, Optional.of(NetconfSessionPreferences), rpcMessageLimit);
     }
 
     public NetconfDeviceCommunicator(final RemoteDeviceId id,
-                                     final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice) {
-        this(id, remoteDevice, Optional.<UserPreferences>absent());
+                                     final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
+                                     final int rpcMessageLimit) {
+        this(id, remoteDevice, Optional.<UserPreferences>absent(), rpcMessageLimit);
     }
 
     private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
-            final Optional<UserPreferences> overrideNetconfCapabilities) {
+                                      final Optional<UserPreferences> overrideNetconfCapabilities, final int rpcMessageLimit) {
+        this.concurentRpcMsgs = rpcMessageLimit;
         this.id = id;
         this.remoteDevice = remoteDevice;
         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
         this.firstConnectionFuture = SettableFuture.create();
+        this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
     }
 
     @Override
@@ -245,6 +251,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
             request = requests.peek();
             if (request != null && request.future.isUncancellable()) {
                 requests.poll();
+                // we have just removed one request from the queue
+                // we can also release one permit
+                if(semaphore != null) {
+                    semaphore.release();
+                }
             } else {
                 request = null;
                 LOG.warn("{}: Ignoring unsolicited message {}", id,
@@ -302,8 +313,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
     @Override
     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
         sessionLock.lock();
+
+        if (semaphore != null && !semaphore.tryAcquire()) {
+            LOG.warn("Limit of concurrent rpc messages was reached (limit :" +
+                    concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName());
+            sessionLock.unlock();
+            return Futures.immediateFailedFuture(new NetconfDocumentedException("Limit of rpc messages was reached (Limit :" +
+                    concurentRpcMsgs + ") waiting for emptying the queue of Netconf device with id" + id.getName()));
+        }
+
         try {
-            return sendRequestWithLock( message, rpc );
+            return sendRequestWithLock(message, rpc);
         } finally {
             sessionLock.unlock();
         }
index 2c327d495e7781e7dfa91df84b48d0297be2fd36..94ce4e792102e92686fbeddd66c67d1236c52e0d 100644 (file)
@@ -118,6 +118,14 @@ module netconf-node-topology {
             description "Netconf connector sends keepalive RPCs while the session is idle, this delay specifies the delay between keepalive RPC in seconds
                          If a value <1 is provided, no keepalives will be sent";
         }
+
+        leaf concurrent-rpc-limit {
+            config true;
+            type uint16;
+            default 0;
+            description "Limit of concurrent messages that can be send before reply messages are received.
+                         If value <1 is provided, no limit will be enforced";
+        }
     }
 
     grouping netconf-node-connection-status {
index a7bb0827148efc97b99e6b96469c2086c382baa7..846c8fe65a9a10e96d134dc198b3a166da29f9f5 100644 (file)
@@ -63,6 +63,13 @@ module odl-sal-netconf-connector-cfg {
                 type string;
             }
 
+            leaf concurrent-rpc-limit {
+                type uint16;
+                default 0;
+                description "Limit of concurrent messages that can be send before reply messages are received.
+                             If value less than 1 is provided, no limit will be enforced";
+            }
+
             leaf schema-cache-directory {
                 type string;
                 default "schema";
index 6f69c84e32530a64ef8e0f180229c151a6cfd14f..7938f429a65c3b16bf0c6d85329777b04f0661e5 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.sal.connect.netconf.listener;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
@@ -37,6 +38,7 @@ import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.ByteArrayInputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
@@ -84,7 +86,7 @@ public class NetconfDeviceCommunicatorTest {
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks( this );
 
-        communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice);
+        communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice, 10);
     }
 
     void setupSession() {
@@ -95,11 +97,11 @@ public class NetconfDeviceCommunicatorTest {
     }
 
     private ListenableFuture<RpcResult<NetconfMessage>> sendRequest() throws Exception {
-        return sendRequest( UUID.randomUUID().toString() );
+        return sendRequest( UUID.randomUUID().toString(), true );
     }
 
     @SuppressWarnings("unchecked")
-    private ListenableFuture<RpcResult<NetconfMessage>> sendRequest( final String messageID ) throws Exception {
+    private ListenableFuture<RpcResult<NetconfMessage>> sendRequest( final String messageID, final boolean doLastTest ) throws Exception {
         Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
         Element element = doc.createElement( "request" );
         element.setAttribute( "message-id", messageID );
@@ -113,8 +115,9 @@ public class NetconfDeviceCommunicatorTest {
 
         ListenableFuture<RpcResult<NetconfMessage>> resultFuture =
                                       communicator.sendRequest( message, QName.create( "mock rpc" ) );
-
-        assertNotNull( "ListenableFuture is null", resultFuture );
+        if(doLastTest) {
+            assertNotNull("ListenableFuture is null", resultFuture);
+        }
         return resultFuture;
     }
 
@@ -301,13 +304,13 @@ public class NetconfDeviceCommunicatorTest {
         setupSession();
 
         String messageID1 = UUID.randomUUID().toString();
-        ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1 );
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1, true );
 
         String messageID2 = UUID.randomUUID().toString();
-        ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2 );
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2, true );
 
         String messageID3 = UUID.randomUUID().toString();
-        ListenableFuture<RpcResult<NetconfMessage>> resultFuture3 = sendRequest( messageID3 );
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture3 = sendRequest( messageID3, true );
 
         //response messages 1,2 are omitted
         communicator.onMessage( mockSession, createSuccessResponseMessage( messageID3 ) );
@@ -320,10 +323,10 @@ public class NetconfDeviceCommunicatorTest {
         setupSession();
 
         String messageID1 = UUID.randomUUID().toString();
-        ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1 );
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1, true );
 
         String messageID2 = UUID.randomUUID().toString();
-        ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2 );
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2, true );
 
         communicator.onMessage( mockSession, createSuccessResponseMessage( messageID1 ) );
         communicator.onMessage( mockSession, createSuccessResponseMessage( messageID2 ) );
@@ -337,7 +340,7 @@ public class NetconfDeviceCommunicatorTest {
         setupSession();
 
         String messageID = UUID.randomUUID().toString();
-        ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID );
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID, true );
 
         communicator.onMessage( mockSession, createErrorResponseMessage( messageID ) );
 
@@ -381,7 +384,7 @@ public class NetconfDeviceCommunicatorTest {
         final EventLoopGroup group = new NioEventLoopGroup();
         final Timer time = new HashedWheelTimer();
         try {
-            final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), device);
+            final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), device, 10);
             final NetconfReconnectingClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create()
                     .withAddress(new InetSocketAddress("localhost", 65000))
                     .withReconnectStrategy(reconnectStrategy)
@@ -411,7 +414,7 @@ public class NetconfDeviceCommunicatorTest {
         setupSession();
 
         String messageID = UUID.randomUUID().toString();
-        ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID );
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID, true );
 
         communicator.onMessage( mockSession, createSuccessResponseMessage( UUID.randomUUID().toString() ) );
 
@@ -428,6 +431,27 @@ public class NetconfDeviceCommunicatorTest {
                       errorInfo.contains( "expected-message-id" ) );
     }
 
+    @Test
+    public void testConcurrentMessageLimit() throws Exception {
+        setupSession();
+        ArrayList<String> messageID = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            messageID.add(UUID.randomUUID().toString());
+            ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest(messageID.get(i), false);
+            assertEquals("ListenableFuture is null", true, resultFuture instanceof UncancellableFuture);
+        }
+
+        final String notWorkingMessageID = UUID.randomUUID().toString();
+        ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest(notWorkingMessageID, false);
+        assertEquals("ListenableFuture is null", false, resultFuture instanceof UncancellableFuture);
+
+        communicator.onMessage(mockSession, createSuccessResponseMessage(messageID.get(0)));
+
+        resultFuture = sendRequest(messageID.get(0), false);
+        assertNotNull("ListenableFuture is null", resultFuture);
+    }
+
     private static NetconfMessage createErrorResponseMessage( final String messageID ) throws Exception {
         String xmlStr =
             "<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"" +
index 2a73412d2c20166e51166f1cbb2701761aeeaef0..4bbbc11dfcdf62b2fcbc69e40718e9583c41565a 100644 (file)
@@ -48,6 +48,7 @@ public class NetconfDeviceConnectionManager implements Closeable {
     private final NetconfClientDispatcherImpl netconfClientDispatcher;
 
     private static final String CACHE = "cache/schema";
+    private static final int RPC_LIMIT = 0;
 
     // Connection
     private NetconfDeviceConnectionHandler handler;
@@ -92,7 +93,8 @@ public class NetconfDeviceConnectionManager implements Closeable {
                 .setId(deviceId)
                 .setSalFacade(handler)
                 .build();
-        listener = new NetconfDeviceCommunicator(deviceId, device);
+        listener = new NetconfDeviceCommunicator(deviceId, device, RPC_LIMIT);
+
         configBuilder.withSessionListener(listener);
         listener.initializeRemoteConnection(netconfClientDispatcher, configBuilder.build());
     }
index 77a0a60a301fd66128cf9a31dcf0b6cf4e0f4a68..fdd2202678dac0c0e6d2122e00f5a2fad46ea2c4 100644 (file)
@@ -67,6 +67,9 @@ public class Parameters {
     @Arg(dest = "thread-amount")
     public int threadAmount;
 
+    @Arg(dest = "concurrent-message-limit")
+    public int concurrentMessageLimit;
+
     static ArgumentParser getParser() {
         final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
 
@@ -163,6 +166,12 @@ public class Parameters {
                 .setDefault(1)
                 .dest("thread-amount");
 
+        parser.addArgument("--concurrent-message-limit")
+                .type(Integer.class)
+                .setDefault(0)
+                .help("Number of rpc messages that can be sent before receiving reply to them.")
+                .dest("concurrent-message-limit");
+
         return parser;
     }
 
index 501ef8c71e70f245f11958188e1c394878cf1c65..39161c3769737a191ae9b269e049f98bbdb749c0 100644 (file)
@@ -43,7 +43,7 @@ public class StressClientCallable implements Callable<Boolean>{
                                 final NetconfClientDispatcherImpl netconfClientDispatcher,
                                 final List<NetconfMessage> preparedMessages) {
         this.params = params;
-        this.sessionListener = getSessionListener(params.getInetAddress());
+        this.sessionListener = getSessionListener(params.getInetAddress(), params.concurrentMessageLimit);
         this.netconfClientDispatcher = netconfClientDispatcher;
         cfg = getNetconfClientConfiguration(this.params, this.sessionListener);
 
@@ -73,9 +73,9 @@ public class StressClientCallable implements Callable<Boolean>{
         }
     }
 
-    private static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
+    private static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress, final int messageLimit) {
         final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new StressClient.LoggingRemoteDevice();
-        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice, messageLimit);
     }
 
     private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {