NetconfClientDispatcher moved to constructor arguments in NetconfClient 79/2379/3
authorMaros Marsalek <mmarsale@cisco.com>
Tue, 5 Nov 2013 09:19:58 +0000 (10:19 +0100)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 5 Nov 2013 16:13:33 +0000 (16:13 +0000)
This way NioEventLoopGroup instance can be reused by multiple clients.

Dispatcher created in NetconfCLient's constructor spawned an instance of NioEventLoopGroup (in AbstractDispatcher in bgpcep repo).
Instance of NioEventLoopGroup spawns new threads and with multiple concurrent clients system could get out of resources e.g. Too many open files, Unable to create new native thread.

Change-Id: I7dffed243d62ea0be4786d067a16d4e58ab1530c
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/config/config-manager/src/main/java/org/opendaylight/controller/config/manager/impl/ConfigTransactionControllerImpl.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITTest.java

index e732db9..ce0ca03 100644 (file)
@@ -345,7 +345,7 @@ class ConfigTransactionControllerImpl implements
             ModuleIdentifier name = entry.getKey();
             try {
                 logger.debug("About to commit {} in transaction {}",
-                        transactionIdentifier, name);
+                        name, transactionIdentifier);
                 module.getInstance();
             } catch (Exception e) {
                 logger.error("Commit failed on {} in transaction {}", name,
index 47d5ff6..d390161 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
 import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean;
 import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
 import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -32,6 +33,7 @@ import javax.management.MBeanServerConnection;
 import javax.management.Notification;
 import javax.management.NotificationListener;
 import javax.management.ObjectName;
+import javax.net.ssl.SSLContext;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,6 +52,7 @@ public class ConfigPersisterNotificationHandler implements NotificationListener,
     private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
 
     private final InetSocketAddress address;
+    private final NetconfClientDispatcher dispatcher;
 
     private NetconfClient netconfClient;
 
@@ -73,6 +76,7 @@ public class ConfigPersisterNotificationHandler implements NotificationListener,
         this.address = address;
         this.mbeanServer = mbeanServer;
         this.timeout = timeout;
+        this.dispatcher = new NetconfClientDispatcher(Optional.<SSLContext>absent());
     }
 
     public void init() throws InterruptedException {
@@ -117,7 +121,7 @@ public class ConfigPersisterNotificationHandler implements NotificationListener,
             attempt++;
 
             try {
-                netconfClient = new NetconfClient(this.toString(), address, delay);
+                netconfClient = new NetconfClient(this.toString(), address, delay, dispatcher);
                 // TODO is this correct ex to catch ?
             } catch (IllegalStateException e) {
                 logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
@@ -309,6 +313,12 @@ public class ConfigPersisterNotificationHandler implements NotificationListener,
             }
         }
 
+        try {
+            dispatcher.close();
+        } catch (Exception e) {
+            logger.warn("Unable to close netconf client dispatcher {}", dispatcher, e);
+        }
+
         // unregister from JMX
         try {
             if (mbeanServer.isRegistered(on)) {
index b8372b6..cc8d987 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.netconf.client;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.Future;
@@ -20,7 +19,6 @@ import org.opendaylight.protocol.framework.TimedReconnectStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -42,15 +40,14 @@ public class NetconfClient implements Closeable {
 
     // TODO test reconnecting constructor
     public NetconfClient(String clientLabelForLogging, InetSocketAddress address, int connectionAttempts,
-            int attemptMsTimeout) throws InterruptedException {
-        this(clientLabelForLogging, address, getReconnectStrategy(connectionAttempts, attemptMsTimeout), Optional
-                .<SSLContext> absent());
+            int attemptMsTimeout, NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
+        this(clientLabelForLogging, address, getReconnectStrategy(connectionAttempts, attemptMsTimeout),
+                netconfClientDispatcher);
     }
 
-    private NetconfClient(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strat,
-            Optional<SSLContext> maybeSSLContext) throws InterruptedException {
+    private NetconfClient(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strat, NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
         this.label = clientLabelForLogging;
-        dispatch = new NetconfClientDispatcher(maybeSSLContext);
+        dispatch = netconfClientDispatcher;
 
         sessionListener = new NetconfClientSessionListener();
         Future<NetconfClientSession> clientFuture = dispatch.createClient(address, sessionListener, strat);
@@ -70,27 +67,15 @@ public class NetconfClient implements Closeable {
     }
 
     public NetconfClient(String clientLabelForLogging, InetSocketAddress address, int connectTimeoutMs,
-            Optional<SSLContext> maybeSSLContext) throws InterruptedException {
+            NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
         this(clientLabelForLogging, address,
-                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, connectTimeoutMs), maybeSSLContext);
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, connectTimeoutMs), netconfClientDispatcher);
     }
 
-    public NetconfClient(String clientLabelForLogging, InetSocketAddress address, int connectTimeoutMs)
-            throws InterruptedException {
-        this(clientLabelForLogging, address,
-                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, connectTimeoutMs), Optional
-                        .<SSLContext> absent());
-    }
-
-    public NetconfClient(String clientLabelForLogging, InetSocketAddress address) throws InterruptedException {
-        this(clientLabelForLogging, address, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
-                DEFAULT_CONNECT_TIMEOUT), Optional.<SSLContext> absent());
-    }
-
-    public NetconfClient(String clientLabelForLogging, InetSocketAddress address, Optional<SSLContext> maybeSSLContext)
-            throws InterruptedException {
+    public NetconfClient(String clientLabelForLogging, InetSocketAddress address,
+            NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
         this(clientLabelForLogging, address, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
-                DEFAULT_CONNECT_TIMEOUT), maybeSSLContext);
+                DEFAULT_CONNECT_TIMEOUT), netconfClientDispatcher);
     }
 
     public NetconfMessage sendMessage(NetconfMessage message) {
@@ -112,7 +97,6 @@ public class NetconfClient implements Closeable {
     @Override
     public void close() throws IOException {
         clientSession.close();
-        dispatch.close();
     }
 
     private static ReconnectStrategy getReconnectStrategy(int connectionAttempts, int attemptMsTimeout) {
index 121a826..a74a347 100644 (file)
@@ -14,6 +14,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.util.HashedWheelTimer;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -26,6 +27,7 @@ import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
 import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
 import org.opendaylight.controller.netconf.mapping.api.Capability;
 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
@@ -62,6 +64,7 @@ import static org.mockito.Mockito.mock;
 public class ConcurrentClientsTest {
 
     private static final int CONCURRENCY = 16;
+    public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher(Optional.<SSLContext>absent());
     @Mock
     private YangStoreService yangStoreService;
     @Mock
@@ -106,6 +109,11 @@ public class ConcurrentClientsTest {
         s.await();
     }
 
+    @AfterClass
+    public static void tearDownStatic() {
+        NETCONF_CLIENT_DISPATCHER.close();
+    }
+
     private NetconfOperationServiceFactory mockOpF() {
         return new NetconfOperationServiceFactory() {
             @Override
@@ -257,7 +265,7 @@ public class ConcurrentClientsTest {
         @Override
         public void run() {
             try {
-                final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress);
+                final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER);
                 long sessionId = netconfClient.getSessionId();
                 logger.info("Client with sessionid {} hello exchanged", sessionId);
 
diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
new file mode 100644 (file)
index 0000000..9a4bc2a
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.it;
+
+import com.google.common.base.Optional;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.HashedWheelTimer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
+import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
+import org.opendaylight.controller.config.spi.ModuleFactory;
+import org.opendaylight.controller.config.yang.store.api.YangStoreException;
+import org.opendaylight.controller.config.yang.store.impl.HardcodedYangStoreService;
+import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
+import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
+import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
+import org.opendaylight.controller.netconf.impl.NetconfServerSessionListenerFactory;
+import org.opendaylight.controller.netconf.impl.NetconfServerSessionNegotiatorFactory;
+import org.opendaylight.controller.netconf.impl.SessionIdProvider;
+import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
+import org.opendaylight.protocol.util.SSLUtil;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class NetconfITSecureTest extends AbstractConfigTest {
+
+    private static final InetSocketAddress tlsAddress = new InetSocketAddress("127.0.0.1", 12024);
+
+    private DefaultCommitNotificationProducer commitNot;
+    private NetconfServerDispatcher dispatchS;
+
+    @Before
+    public void setUp() throws Exception {
+        super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(getModuleFactories().toArray(
+                new ModuleFactory[0])));
+
+        NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
+        factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
+
+        commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
+
+        dispatchS = createDispatcher(Optional.of(getSslContext()), factoriesListener);
+        ChannelFuture s = dispatchS.createServer(tlsAddress);
+        s.await();
+    }
+
+    private NetconfServerDispatcher createDispatcher(Optional<SSLContext> sslC,
+            NetconfOperationServiceFactoryListenerImpl factoriesListener) {
+        SessionIdProvider idProvider = new SessionIdProvider();
+        NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
+                new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
+
+        NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
+                factoriesListener, commitNot, idProvider);
+
+        return new NetconfServerDispatcher(sslC, serverNegotiatorFactory, listenerFactory);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        commitNot.close();
+        dispatchS.close();
+    }
+
+    private SSLContext getSslContext() throws KeyStoreException, NoSuchAlgorithmException, CertificateException,
+            IOException, UnrecoverableKeyException, KeyManagementException {
+        final InputStream keyStore = getClass().getResourceAsStream("/keystore.jks");
+        final InputStream trustStore = getClass().getResourceAsStream("/keystore.jks");
+        SSLContext sslContext = SSLUtil.initializeSecureContext("password", keyStore, trustStore, KeyManagerFactory.getDefaultAlgorithm());
+        keyStore.close();
+        trustStore.close();
+        return sslContext;
+    }
+
+    private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
+        final Collection<InputStream> yangDependencies = NetconfITTest.getBasicYangs();
+        return new HardcodedYangStoreService(yangDependencies);
+    }
+
+    protected List<ModuleFactory> getModuleFactories() {
+        return NetconfITTest.getModuleFactoriesS();
+    }
+
+    @Test
+    public void testSecure() throws Exception {
+        try (NetconfClientDispatcher dispatch = new NetconfClientDispatcher(Optional.of(getSslContext()));
+             NetconfClient netconfClient = new NetconfClient("tls-client", tlsAddress, 4000, dispatch))  {
+
+        }
+    }
+}
index 4388982..1512d54 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.collect.Sets;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.HashedWheelTimer;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -36,6 +37,7 @@ import org.opendaylight.controller.config.yang.test.impl.NetconfTestImplRuntimeR
 import org.opendaylight.controller.config.yang.test.impl.TestImplModuleFactory;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
 import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
 import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
@@ -47,7 +49,6 @@ import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificat
 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.util.SSLUtil;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.NamedNodeMap;
@@ -55,18 +56,12 @@ import org.w3c.dom.Node;
 import org.xml.sax.SAXException;
 
 import javax.management.ObjectName;
-import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.xml.parsers.ParserConfigurationException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -86,11 +81,12 @@ public class NetconfITTest extends AbstractConfigTest {
     // LoggerFactory.getLogger(NetconfITTest.class);
     //
     private static final InetSocketAddress tcpAddress = new InetSocketAddress("127.0.0.1", 12023);
-    private static final InetSocketAddress tlsAddress = new InetSocketAddress("127.0.0.1", 12024);
 
     private NetconfMessage getConfig, getConfigCandidate, editConfig, closeSession;
     private DefaultCommitNotificationProducer commitNot;
-    private NetconfServerDispatcher dispatch, dispatchS;
+    private NetconfServerDispatcher dispatch;
+
+    private static NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher(Optional.<SSLContext>absent());
 
     @Before
     public void setUp() throws Exception {
@@ -107,10 +103,6 @@ public class NetconfITTest extends AbstractConfigTest {
         dispatch = createDispatcher(Optional.<SSLContext> absent(), factoriesListener);
         ChannelFuture s = dispatch.createServer(tcpAddress);
         s.await();
-
-        dispatchS = createDispatcher(Optional.of(getSslContext()), factoriesListener);
-        s = dispatchS.createServer(tlsAddress);
-        s.await();
     }
 
     private NetconfServerDispatcher createDispatcher(Optional<SSLContext> sslC,
@@ -129,17 +121,11 @@ public class NetconfITTest extends AbstractConfigTest {
     public void tearDown() throws Exception {
         commitNot.close();
         dispatch.close();
-        dispatchS.close();
     }
 
-    private SSLContext getSslContext() throws KeyStoreException, NoSuchAlgorithmException, CertificateException,
-            IOException, UnrecoverableKeyException, KeyManagementException {
-        final InputStream keyStore = getClass().getResourceAsStream("/keystore.jks");
-        final InputStream trustStore = getClass().getResourceAsStream("/keystore.jks");
-        SSLContext sslContext = SSLUtil.initializeSecureContext("password", keyStore, trustStore, KeyManagerFactory.getDefaultAlgorithm());
-        keyStore.close();
-        trustStore.close();
-        return sslContext;
+    @AfterClass
+    public static void tearDownStatic() {
+        NETCONF_CLIENT_DISPATCHER.close();
     }
 
     private void loadMessages() throws IOException, SAXException, ParserConfigurationException {
@@ -154,13 +140,13 @@ public class NetconfITTest extends AbstractConfigTest {
         return new HardcodedYangStoreService(yangDependencies);
     }
 
-    private Collection<InputStream> getBasicYangs() throws IOException {
+    static Collection<InputStream> getBasicYangs() throws IOException {
         List<String> paths = Arrays.asList("/META-INF/yang/config.yang", "/META-INF/yang/rpc-context.yang",
                 "/META-INF/yang/config-test.yang", "/META-INF/yang/config-test-impl.yang",
                 "/META-INF/yang/ietf-inet-types.yang");
         final Collection<InputStream> yangDependencies = new ArrayList<>();
         for (String path : paths) {
-            final InputStream is = checkNotNull(getClass().getResourceAsStream(path), path + " not found");
+            final InputStream is = checkNotNull(NetconfITTest.class.getResourceAsStream(path), path + " not found");
             yangDependencies.add(is);
         }
         return yangDependencies;
@@ -176,7 +162,7 @@ public class NetconfITTest extends AbstractConfigTest {
 
     @Test
     public void testNetconfClientDemonstration() throws Exception {
-        try (NetconfClient netconfClient = new NetconfClient("client", tcpAddress, 4000)) {
+        try (NetconfClient netconfClient = new NetconfClient("client", tcpAddress, 4000, NETCONF_CLIENT_DISPATCHER)) {
 
             Set<String> capabilitiesFromNetconfServer = netconfClient.getCapabilities();
             long sessionId = netconfClient.getSessionId();
@@ -191,8 +177,8 @@ public class NetconfITTest extends AbstractConfigTest {
 
     @Test
     public void testTwoSessions() throws Exception {
-        try (NetconfClient netconfClient = new NetconfClient("1", tcpAddress, 4000)) {
-            try (NetconfClient netconfClient2 = new NetconfClient("2", tcpAddress, 4000)) {
+        try (NetconfClient netconfClient = new NetconfClient("1", tcpAddress, 4000, NETCONF_CLIENT_DISPATCHER))  {
+            try (NetconfClient netconfClient2 = new NetconfClient("2", tcpAddress, 4000, NETCONF_CLIENT_DISPATCHER))  {
             }
         }
     }
@@ -206,13 +192,6 @@ public class NetconfITTest extends AbstractConfigTest {
         h.init();
     }
 
-    @Test
-    public void testSecure() throws Exception {
-        try (NetconfClient netconfClient = new NetconfClient("1", tlsAddress, 4000, Optional.of(getSslContext()))) {
-
-        }
-    }
-
     @Ignore
     @Test
     public void waitingTest() throws Exception {
@@ -369,7 +348,7 @@ public class NetconfITTest extends AbstractConfigTest {
         // final InputStream resourceAsStream =
         // AbstractListenerTest.class.getResourceAsStream(fileName);
         // assertNotNull(resourceAsStream);
-        try (NetconfClient netconfClient = new NetconfClient("test", tcpAddress, 5000)) {
+        try (NetconfClient netconfClient = new NetconfClient("test", tcpAddress, 5000, NETCONF_CLIENT_DISPATCHER)) {
             // IOUtils.copy(resourceAsStream, netconfClient.getStream());
             // netconfClient.getOutputStream().write(NetconfMessageFactory.endOfMessage);
             // server should not write anything back
@@ -418,7 +397,7 @@ public class NetconfITTest extends AbstractConfigTest {
     }
 
     private NetconfClient createSession(final InetSocketAddress address, final String expected) throws Exception {
-        final NetconfClient netconfClient = new NetconfClient("test " + address.toString(), address, 5000);
+        final NetconfClient netconfClient = new NetconfClient("test " + address.toString(), address, 5000, NETCONF_CLIENT_DISPATCHER);
 
         assertEquals(expected, Long.toString(netconfClient.getSessionId()));