X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-impl%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fimpl%2FConcurrentClientsTest.java;h=8e8a73b914cdf79ca7ad7e24d98ab31a859b0f44;hp=659743ab6df605e528aa1bc9c7163f0a182d3396;hb=d2d4cefa4d8d662554b7f2fc0b0dd568d0db1180;hpb=79df3ca27a571091af4d8cf766ef81864aa70c2f diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java index 659743ab6d..8e8a73b914 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java @@ -16,27 +16,47 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.GlobalEventExecutor; import java.io.DataOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; - +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; -import org.junit.Ignore; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; -import org.opendaylight.controller.netconf.client.test.TestingNetconfClient; +import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl; +import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener; +import org.opendaylight.controller.netconf.client.TestingNetconfClient; +import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; +import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder; import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl; import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService; import org.opendaylight.controller.netconf.mapping.api.Capability; @@ -45,34 +65,50 @@ import org.opendaylight.controller.netconf.mapping.api.NetconfOperation; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory; +import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage; import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil; -import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage; import org.opendaylight.controller.netconf.util.test.XmlFileLoader; import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.protocol.framework.NeverReconnectStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; -import com.google.common.base.Optional; -import com.google.common.collect.Sets; +@RunWith(Parameterized.class) +public class ConcurrentClientsTest { + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class); -import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.HashedWheelTimer; + private static ExecutorService clientExecutor; -public class ConcurrentClientsTest { + private static final int CONCURRENCY = 32; + private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); - private static final int CONCURRENCY = 64; - public static final int NETTY_THREADS = 4; + private int nettyThreads; + private Class clientRunnable; + private Set serverCaps; - private EventLoopGroup nettyGroup; - private NetconfClientDispatcher netconfClientDispatcher; + public ConcurrentClientsTest(int nettyThreads, Class clientRunnable, Set serverCaps) { + this.nettyThreads = nettyThreads; + this.clientRunnable = clientRunnable; + this.serverCaps = serverCaps; + } - private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); + @Parameterized.Parameters() + public static Collection data() { + return Arrays.asList(new Object[][]{{4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + // empty set of capabilities = only base 1.0 netconf capability + {4, TestingNetconfClientRunnable.class, Collections.emptySet()}, + {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()}, + {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()}, + {4, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + {1, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + }); + } - static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class); + private EventLoopGroup nettyGroup; + private NetconfClientDispatcher netconfClientDispatcher; private DefaultCommitNotificationProducer commitNot; @@ -86,29 +122,41 @@ public class ConcurrentClientsTest { return monitoring; } - // TODO refactor and test with different configurations + @BeforeClass + public static void setUpClientExecutor() { + clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() { + int i = 1; + + @Override + public Thread newThread(final Runnable r) { + Thread thread = new Thread(r); + thread.setName("client-" + i++); + thread.setDaemon(true); + return thread; + } + }); + } @Before public void setUp() throws Exception { - - nettyGroup = new NioEventLoopGroup(NETTY_THREADS); - NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client"); - netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000); + hashedWheelTimer = new HashedWheelTimer(); + nettyGroup = new NioEventLoopGroup(nettyThreads); + netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer); NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl(); + testingNetconfOperation = new TestingNetconfOperation(); - factoriesListener.onAddNetconfOperationServiceFactory(mockOpF(testingNetconfOperation)); + factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation)); SessionIdProvider idProvider = new SessionIdProvider(); - hashedWheelTimer = new HashedWheelTimer(); NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory( - hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService()); + hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps); commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer()); - NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory); - final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup); + NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcherImpl.ServerChannelInitializer(serverNegotiatorFactory); + final NetconfServerDispatcherImpl dispatch = new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup); ChannelFuture s = dispatch.createServer(netconfAddress); s.await(); @@ -116,74 +164,64 @@ public class ConcurrentClientsTest { @After public void tearDown(){ + commitNot.close(); hashedWheelTimer.stop(); - nettyGroup.shutdownGracefully(); + try { + nettyGroup.shutdownGracefully().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Ignoring exception while cleaning up after test", e); + } } - private NetconfOperationServiceFactory mockOpF(final NetconfOperation... operations) { - return new TestingOperationServiceFactory(operations); + @AfterClass + public static void tearDownClientExecutor() { + clientExecutor.shutdownNow(); } - @After - public void cleanUp() throws Exception { - commitNot.close(); - } + @Test(timeout = CONCURRENCY * 1000) + public void testConcurrentClients() throws Exception { - @Test(timeout = 30 * 1000) - public void multipleClients() throws Exception { - List threads = new ArrayList<>(); + List> futures = Lists.newArrayListWithCapacity(CONCURRENCY); - final int attempts = 5; for (int i = 0; i < CONCURRENCY; i++) { - TestingThread thread = new TestingThread(String.valueOf(i), attempts); - threads.add(thread); - thread.start(); + futures.add(clientExecutor.submit(getInstanceOfClientRunnable())); } - for (TestingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } catch (ExecutionException e) { + LOG.error("Thread for testing client failed", e); + fail("Client failed: " + e.getMessage()); } } assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount()); } - /** - * Cannot handle CHUNK, make server configurable - */ - @Ignore - @Test(timeout = 30 * 1000) - public void synchronizationTest() throws Exception { - new BlockingThread("foo").run2(); + public static Set getOnlyExiServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0 + ); } - /** - * Cannot handle CHUNK, make server configurable - */ - @Ignore - @Test(timeout = 30 * 1000) - public void multipleBlockingClients() throws Exception { - List threads = new ArrayList<>(); - for (int i = 0; i < CONCURRENCY; i++) { - BlockingThread thread = new BlockingThread(String.valueOf(i)); - threads.add(thread); - thread.start(); - } + public static Set getOnlyChunkServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1 + ); + } - for (BlockingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); - } - } + public Runnable getInstanceOfClientRunnable() throws Exception { + return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this); } + /** + * Responds to all operations except start-exi and counts all requests + */ private static class TestingNetconfOperation implements NetconfOperation { private final AtomicLong counter = new AtomicLong(); @@ -198,7 +236,7 @@ public class ConcurrentClientsTest { @Override public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException { try { - logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage)); + LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage)); counter.getAndIncrement(); return XmlUtil.readXmlToDocument(""); } catch (Exception e) { @@ -211,6 +249,9 @@ public class ConcurrentClientsTest { } } + /** + * Hardcoded operation service factory + */ private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory { private final NetconfOperation[] operations; @@ -228,30 +269,26 @@ public class ConcurrentClientsTest { @Override public Set getNetconfOperations() { - return Sets. newHashSet(operations); + return Sets.newHashSet(operations); } @Override - public void close() { - } + public void close() {} }; } } - class BlockingThread extends Thread { - private Optional thrownException; - - public BlockingThread(String name) { - super("client-" + name); - } + /** + * Pure socket based blocking client + */ + public final class BlockingClientRunnable implements Runnable { @Override public void run() { try { run2(); - thrownException = Optional.absent(); } catch (Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } @@ -268,7 +305,7 @@ public class ConcurrentClientsTest { while (sb.toString().endsWith("]]>]]>") == false) { sb.append((char) inFromServer.read()); } - logger.info(sb.toString()); + LOG.info(sb.toString()); outToServer.write(IOUtils.toByteArray(clientHello)); outToServer.write("]]>]]>".getBytes()); @@ -282,45 +319,49 @@ public class ConcurrentClientsTest { while (sb.toString().endsWith("]]>]]>") == false) { sb.append((char) inFromServer.read()); } - logger.info(sb.toString()); + LOG.info(sb.toString()); clientSocket.close(); } } - class TestingThread extends Thread { - - private final String clientId; - private final int attempts; - private Optional thrownException; - - TestingThread(String clientId, int attempts) { - this.clientId = clientId; - this.attempts = attempts; - setName("client-" + clientId); - } + /** + * TestingNetconfClient based runnable + */ + public final class TestingNetconfClientRunnable implements Runnable { @Override public void run() { try { - final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher); + final TestingNetconfClient netconfClient = + new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig()); long sessionId = netconfClient.getSessionId(); - logger.info("Client with sessionid {} hello exchanged", sessionId); + LOG.info("Client with session id {}: hello exchanged", sessionId); final NetconfMessage getMessage = XmlFileLoader .xmlFileToNetconfMessage("netconfMessages/getConfig.xml"); NetconfMessage result = netconfClient.sendRequest(getMessage).get(); - logger.info("Client with sessionid {} got result {}", sessionId, result); + LOG.info("Client with session id {}: got result {}", sessionId, result); Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false, - "Received error response: " + XmlUtil.toString(result.getDocument()) + - " to request: " + XmlUtil.toString(getMessage.getDocument())); + "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: " + + XmlUtil.toString(getMessage.getDocument())); netconfClient.close(); - logger.info("Client with session id {} ended", sessionId); - thrownException = Optional.absent(); + LOG.info("Client with session id {}: ended", sessionId); } catch (final Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } + + private NetconfClientConfiguration getClientConfig() { + final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create(); + b.withAddress(netconfAddress); + b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", + "client")); + b.withSessionListener(new SimpleNetconfClientSessionListener()); + b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, + NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS)); + return b.build(); + } } }