This way NioEventLoopGroup instance can be reused by multiple clients.
Dispatcher created in NetconfCLient's constructor spawned an instance of NioEventLoopGroup (in AbstractDispatcher in bgpcep repo).
Instance of NioEventLoopGroup spawns new threads and with multiple concurrent clients system could get out of resources e.g. Too many open files, Unable to create new native thread.
Change-Id: I7dffed243d62ea0be4786d067a16d4e58ab1530c
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
ModuleIdentifier name = entry.getKey();
try {
logger.debug("About to commit {} in transaction {}",
- transactionIdentifier, name);
+ name, transactionIdentifier);
module.getInstance();
} catch (Exception e) {
logger.error("Commit failed on {} in transaction {}", name,
import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean;
import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
+import javax.net.ssl.SSLContext;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
private final InetSocketAddress address;
+ private final NetconfClientDispatcher dispatcher;
private NetconfClient netconfClient;
this.address = address;
this.mbeanServer = mbeanServer;
this.timeout = timeout;
+ this.dispatcher = new NetconfClientDispatcher(Optional.<SSLContext>absent());
}
public void init() throws InterruptedException {
attempt++;
try {
- netconfClient = new NetconfClient(this.toString(), address, delay);
+ netconfClient = new NetconfClient(this.toString(), address, delay, dispatcher);
// TODO is this correct ex to catch ?
} catch (IllegalStateException e) {
logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
}
}
+ try {
+ dispatcher.close();
+ } catch (Exception e) {
+ logger.warn("Unable to close netconf client dispatcher {}", dispatcher, e);
+ }
+
// unregister from JMX
try {
if (mbeanServer.isRegistered(on)) {
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
// TODO test reconnecting constructor
public NetconfClient(String clientLabelForLogging, InetSocketAddress address, int connectionAttempts,
- int attemptMsTimeout) throws InterruptedException {
- this(clientLabelForLogging, address, getReconnectStrategy(connectionAttempts, attemptMsTimeout), Optional
- .<SSLContext> absent());
+ int attemptMsTimeout, NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
+ this(clientLabelForLogging, address, getReconnectStrategy(connectionAttempts, attemptMsTimeout),
+ netconfClientDispatcher);
}
- private NetconfClient(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strat,
- Optional<SSLContext> maybeSSLContext) throws InterruptedException {
+ private NetconfClient(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strat, NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
this.label = clientLabelForLogging;
- dispatch = new NetconfClientDispatcher(maybeSSLContext);
+ dispatch = netconfClientDispatcher;
sessionListener = new NetconfClientSessionListener();
Future<NetconfClientSession> clientFuture = dispatch.createClient(address, sessionListener, strat);
}
public NetconfClient(String clientLabelForLogging, InetSocketAddress address, int connectTimeoutMs,
- Optional<SSLContext> maybeSSLContext) throws InterruptedException {
+ NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
this(clientLabelForLogging, address,
- new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, connectTimeoutMs), maybeSSLContext);
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, connectTimeoutMs), netconfClientDispatcher);
}
- public NetconfClient(String clientLabelForLogging, InetSocketAddress address, int connectTimeoutMs)
- throws InterruptedException {
- this(clientLabelForLogging, address,
- new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, connectTimeoutMs), Optional
- .<SSLContext> absent());
- }
-
- public NetconfClient(String clientLabelForLogging, InetSocketAddress address) throws InterruptedException {
- this(clientLabelForLogging, address, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
- DEFAULT_CONNECT_TIMEOUT), Optional.<SSLContext> absent());
- }
-
- public NetconfClient(String clientLabelForLogging, InetSocketAddress address, Optional<SSLContext> maybeSSLContext)
- throws InterruptedException {
+ public NetconfClient(String clientLabelForLogging, InetSocketAddress address,
+ NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
this(clientLabelForLogging, address, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
- DEFAULT_CONNECT_TIMEOUT), maybeSSLContext);
+ DEFAULT_CONNECT_TIMEOUT), netconfClientDispatcher);
}
public NetconfMessage sendMessage(NetconfMessage message) {
@Override
public void close() throws IOException {
clientSession.close();
- dispatch.close();
}
private static ReconnectStrategy getReconnectStrategy(int connectionAttempts, int attemptMsTimeout) {
import io.netty.util.HashedWheelTimer;
import org.apache.commons.io.IOUtils;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
import org.opendaylight.controller.netconf.mapping.api.Capability;
import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
public class ConcurrentClientsTest {
private static final int CONCURRENCY = 16;
+ public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher(Optional.<SSLContext>absent());
@Mock
private YangStoreService yangStoreService;
@Mock
s.await();
}
+ @AfterClass
+ public static void tearDownStatic() {
+ NETCONF_CLIENT_DISPATCHER.close();
+ }
+
private NetconfOperationServiceFactory mockOpF() {
return new NetconfOperationServiceFactory() {
@Override
@Override
public void run() {
try {
- final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress);
+ final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER);
long sessionId = netconfClient.getSessionId();
logger.info("Client with sessionid {} hello exchanged", sessionId);
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.it;
+
+import com.google.common.base.Optional;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.HashedWheelTimer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
+import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
+import org.opendaylight.controller.config.spi.ModuleFactory;
+import org.opendaylight.controller.config.yang.store.api.YangStoreException;
+import org.opendaylight.controller.config.yang.store.impl.HardcodedYangStoreService;
+import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
+import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
+import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
+import org.opendaylight.controller.netconf.impl.NetconfServerSessionListenerFactory;
+import org.opendaylight.controller.netconf.impl.NetconfServerSessionNegotiatorFactory;
+import org.opendaylight.controller.netconf.impl.SessionIdProvider;
+import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
+import org.opendaylight.protocol.util.SSLUtil;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class NetconfITSecureTest extends AbstractConfigTest {
+
+ private static final InetSocketAddress tlsAddress = new InetSocketAddress("127.0.0.1", 12024);
+
+ private DefaultCommitNotificationProducer commitNot;
+ private NetconfServerDispatcher dispatchS;
+
+ @Before
+ public void setUp() throws Exception {
+ super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(getModuleFactories().toArray(
+ new ModuleFactory[0])));
+
+ NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
+ factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
+
+ commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
+
+ dispatchS = createDispatcher(Optional.of(getSslContext()), factoriesListener);
+ ChannelFuture s = dispatchS.createServer(tlsAddress);
+ s.await();
+ }
+
+ private NetconfServerDispatcher createDispatcher(Optional<SSLContext> sslC,
+ NetconfOperationServiceFactoryListenerImpl factoriesListener) {
+ SessionIdProvider idProvider = new SessionIdProvider();
+ NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
+ new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
+
+ NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
+ factoriesListener, commitNot, idProvider);
+
+ return new NetconfServerDispatcher(sslC, serverNegotiatorFactory, listenerFactory);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ commitNot.close();
+ dispatchS.close();
+ }
+
+ private SSLContext getSslContext() throws KeyStoreException, NoSuchAlgorithmException, CertificateException,
+ IOException, UnrecoverableKeyException, KeyManagementException {
+ final InputStream keyStore = getClass().getResourceAsStream("/keystore.jks");
+ final InputStream trustStore = getClass().getResourceAsStream("/keystore.jks");
+ SSLContext sslContext = SSLUtil.initializeSecureContext("password", keyStore, trustStore, KeyManagerFactory.getDefaultAlgorithm());
+ keyStore.close();
+ trustStore.close();
+ return sslContext;
+ }
+
+ private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
+ final Collection<InputStream> yangDependencies = NetconfITTest.getBasicYangs();
+ return new HardcodedYangStoreService(yangDependencies);
+ }
+
+ protected List<ModuleFactory> getModuleFactories() {
+ return NetconfITTest.getModuleFactoriesS();
+ }
+
+ @Test
+ public void testSecure() throws Exception {
+ try (NetconfClientDispatcher dispatch = new NetconfClientDispatcher(Optional.of(getSslContext()));
+ NetconfClient netconfClient = new NetconfClient("tls-client", tlsAddress, 4000, dispatch)) {
+
+ }
+ }
+}
import io.netty.channel.ChannelFuture;
import io.netty.util.HashedWheelTimer;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.opendaylight.controller.config.yang.test.impl.TestImplModuleFactory;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.util.SSLUtil;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.xml.sax.SAXException;
import javax.management.ObjectName;
-import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
// LoggerFactory.getLogger(NetconfITTest.class);
//
private static final InetSocketAddress tcpAddress = new InetSocketAddress("127.0.0.1", 12023);
- private static final InetSocketAddress tlsAddress = new InetSocketAddress("127.0.0.1", 12024);
private NetconfMessage getConfig, getConfigCandidate, editConfig, closeSession;
private DefaultCommitNotificationProducer commitNot;
- private NetconfServerDispatcher dispatch, dispatchS;
+ private NetconfServerDispatcher dispatch;
+
+ private static NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher(Optional.<SSLContext>absent());
@Before
public void setUp() throws Exception {
dispatch = createDispatcher(Optional.<SSLContext> absent(), factoriesListener);
ChannelFuture s = dispatch.createServer(tcpAddress);
s.await();
-
- dispatchS = createDispatcher(Optional.of(getSslContext()), factoriesListener);
- s = dispatchS.createServer(tlsAddress);
- s.await();
}
private NetconfServerDispatcher createDispatcher(Optional<SSLContext> sslC,
public void tearDown() throws Exception {
commitNot.close();
dispatch.close();
- dispatchS.close();
}
- private SSLContext getSslContext() throws KeyStoreException, NoSuchAlgorithmException, CertificateException,
- IOException, UnrecoverableKeyException, KeyManagementException {
- final InputStream keyStore = getClass().getResourceAsStream("/keystore.jks");
- final InputStream trustStore = getClass().getResourceAsStream("/keystore.jks");
- SSLContext sslContext = SSLUtil.initializeSecureContext("password", keyStore, trustStore, KeyManagerFactory.getDefaultAlgorithm());
- keyStore.close();
- trustStore.close();
- return sslContext;
+ @AfterClass
+ public static void tearDownStatic() {
+ NETCONF_CLIENT_DISPATCHER.close();
}
private void loadMessages() throws IOException, SAXException, ParserConfigurationException {
return new HardcodedYangStoreService(yangDependencies);
}
- private Collection<InputStream> getBasicYangs() throws IOException {
+ static Collection<InputStream> getBasicYangs() throws IOException {
List<String> paths = Arrays.asList("/META-INF/yang/config.yang", "/META-INF/yang/rpc-context.yang",
"/META-INF/yang/config-test.yang", "/META-INF/yang/config-test-impl.yang",
"/META-INF/yang/ietf-inet-types.yang");
final Collection<InputStream> yangDependencies = new ArrayList<>();
for (String path : paths) {
- final InputStream is = checkNotNull(getClass().getResourceAsStream(path), path + " not found");
+ final InputStream is = checkNotNull(NetconfITTest.class.getResourceAsStream(path), path + " not found");
yangDependencies.add(is);
}
return yangDependencies;
@Test
public void testNetconfClientDemonstration() throws Exception {
- try (NetconfClient netconfClient = new NetconfClient("client", tcpAddress, 4000)) {
+ try (NetconfClient netconfClient = new NetconfClient("client", tcpAddress, 4000, NETCONF_CLIENT_DISPATCHER)) {
Set<String> capabilitiesFromNetconfServer = netconfClient.getCapabilities();
long sessionId = netconfClient.getSessionId();
@Test
public void testTwoSessions() throws Exception {
- try (NetconfClient netconfClient = new NetconfClient("1", tcpAddress, 4000)) {
- try (NetconfClient netconfClient2 = new NetconfClient("2", tcpAddress, 4000)) {
+ try (NetconfClient netconfClient = new NetconfClient("1", tcpAddress, 4000, NETCONF_CLIENT_DISPATCHER)) {
+ try (NetconfClient netconfClient2 = new NetconfClient("2", tcpAddress, 4000, NETCONF_CLIENT_DISPATCHER)) {
}
}
}
h.init();
}
- @Test
- public void testSecure() throws Exception {
- try (NetconfClient netconfClient = new NetconfClient("1", tlsAddress, 4000, Optional.of(getSslContext()))) {
-
- }
- }
-
@Ignore
@Test
public void waitingTest() throws Exception {
// final InputStream resourceAsStream =
// AbstractListenerTest.class.getResourceAsStream(fileName);
// assertNotNull(resourceAsStream);
- try (NetconfClient netconfClient = new NetconfClient("test", tcpAddress, 5000)) {
+ try (NetconfClient netconfClient = new NetconfClient("test", tcpAddress, 5000, NETCONF_CLIENT_DISPATCHER)) {
// IOUtils.copy(resourceAsStream, netconfClient.getStream());
// netconfClient.getOutputStream().write(NetconfMessageFactory.endOfMessage);
// server should not write anything back
}
private NetconfClient createSession(final InetSocketAddress address, final String expected) throws Exception {
- final NetconfClient netconfClient = new NetconfClient("test " + address.toString(), address, 5000);
+ final NetconfClient netconfClient = new NetconfClient("test " + address.toString(), address, 5000, NETCONF_CLIENT_DISPATCHER);
assertEquals(expected, Long.toString(netconfClient.getSessionId()));