X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-impl%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fimpl%2FConcurrentClientsTest.java;h=0a0cd2208891982efce68832bb66d46e29fe9cb3;hp=c0d2687a8a092b376f498e5cccadcaadfb816ab4;hb=a8c1a6e9cb8570fb7baac0f147a9143826c1a826;hpb=cdeca662c72dd78f39d7503c3c06ccfde9e4c51b diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java index c0d2687a8a..0a0cd22088 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java @@ -8,11 +8,16 @@ package org.opendaylight.controller.netconf.impl; -import com.google.common.base.Optional; -import com.google.common.collect.Sets; -import io.netty.channel.ChannelFuture; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.HashedWheelTimer; import java.io.DataOutputStream; import java.io.InputStream; @@ -20,210 +25,271 @@ import java.io.InputStreamReader; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; -import javax.management.ObjectName; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.config.util.ConfigRegistryJMXClient; -import org.opendaylight.controller.config.util.ConfigTransactionJMXClient; -import org.opendaylight.controller.config.yang.store.api.YangStoreService; -import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.api.NetconfOperationRouter; -import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; +import org.opendaylight.controller.netconf.client.test.TestingNetconfClient; import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl; +import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService; import org.opendaylight.controller.netconf.mapping.api.Capability; import org.opendaylight.controller.netconf.mapping.api.HandlingPriority; import org.opendaylight.controller.netconf.mapping.api.NetconfOperation; -import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil; +import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage; import org.opendaylight.controller.netconf.util.test.XmlFileLoader; +import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import com.google.common.collect.Sets; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.nio.NioEventLoopGroup; + +@RunWith(Parameterized.class) public class ConcurrentClientsTest { + private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class); + + private static ExecutorService clientExecutor; - private static final int CONCURRENCY = 16; - private static EventLoopGroup nettyGroup = new NioEventLoopGroup(); - public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher( nettyGroup, nettyGroup); + private static final int CONCURRENCY = 32; + private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); - @Mock - private YangStoreService yangStoreService; - @Mock - private ConfigRegistryJMXClient jmxClient; + private int nettyThreads; + private Class clientRunnable; + private Set serverCaps; - private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); + public ConcurrentClientsTest(int nettyThreads, Class clientRunnable, Set serverCaps) { + this.nettyThreads = nettyThreads; + this.clientRunnable = clientRunnable; + this.serverCaps = serverCaps; + } + + @Parameterized.Parameters() + public static Collection data() { + return Arrays.asList(new Object[][]{ + {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + // empty set of capabilities = only base 1.0 netconf capability + {4, TestingNetconfClientRunnable.class, Collections.emptySet()}, + {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()}, + {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()}, + + {4, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + {1, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + }); + } - static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class); + private EventLoopGroup nettyGroup; + private NetconfClientDispatcher netconfClientDispatcher; private DefaultCommitNotificationProducer commitNot; - private NetconfServerDispatcher dispatch; + HashedWheelTimer hashedWheelTimer; + private TestingNetconfOperation testingNetconfOperation; + + public static SessionMonitoringService createMockedMonitoringService() { + SessionMonitoringService monitoring = mock(SessionMonitoringService.class); + doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class)); + doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class)); + return monitoring; + } + + @BeforeClass + public static void setUpClientExecutor() { + clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() { + int i = 1; + + @Override + public Thread newThread(final Runnable r) { + Thread thread = new Thread(r); + thread.setName("client-" + i++); + thread.setDaemon(true); + return thread; + } + }); + } @Before public void setUp() throws Exception { - { // init mocks - MockitoAnnotations.initMocks(this); - final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class); - doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot(); - doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap(); - doReturn(Collections.emptyMap()).when(yStore).getModuleMap(); - - final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class); - doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class)); - doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans(); - } + nettyGroup = new NioEventLoopGroup(nettyThreads); + NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client"); + netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000); NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl(); - factoriesListener.onAddNetconfOperationServiceFactory(mockOpF()); + + testingNetconfOperation = new TestingNetconfOperation(); + factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation)); SessionIdProvider idProvider = new SessionIdProvider(); + hashedWheelTimer = new HashedWheelTimer(); + NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory( - new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider); + hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps); commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer()); - NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory( - factoriesListener, commitNot, idProvider); - NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory); - dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup); + NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory); + final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup); ChannelFuture s = dispatch.createServer(netconfAddress); s.await(); } - @AfterClass - public static void tearDownStatic() { - nettyGroup.shutdownGracefully(); + @After + public void tearDown(){ + commitNot.close(); + hashedWheelTimer.stop(); + try { + nettyGroup.shutdownGracefully().get(); + } catch (InterruptedException | ExecutionException e) { + logger.warn("Ignoring exception while cleaning up after test", e); + } } - private NetconfOperationServiceFactory mockOpF() { - return new NetconfOperationServiceFactory() { - @Override - public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) { - return new NetconfOperationService() { - @Override - public Set getCapabilities() { - return Collections.emptySet(); - } - - @Override - public Set getNetconfOperations() { - return Sets. newHashSet(new NetconfOperation() { - @Override - public HandlingPriority canHandle(Document message) { - return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE); - } - - @Override - public Document handle(Document message, NetconfOperationRouter operationRouter) - throws NetconfDocumentedException { - try { - return XmlUtil.readXmlToDocument(""); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - @Override - public Set getFilters() { - return Collections.emptySet(); - } - - @Override - public void close() { - } - }; - } - }; + @AfterClass + public static void tearDownClientExecutor() { + clientExecutor.shutdownNow(); } - @After - public void cleanUp() throws Exception { - commitNot.close(); - } + @Test(timeout = CONCURRENCY * 1000) + public void testConcurrentClients() throws Exception { - @Test - public void multipleClients() throws Exception { - List threads = new ArrayList<>(); + List> futures = Lists.newArrayListWithCapacity(CONCURRENCY); - final int attempts = 5; for (int i = 0; i < CONCURRENCY; i++) { - TestingThread thread = new TestingThread(String.valueOf(i), attempts); - threads.add(thread); - thread.start(); + futures.add(clientExecutor.submit(getInstanceOfClientRunnable())); } - for (TestingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } catch (ExecutionException e) { + logger.error("Thread for testing client failed", e); + fail("Client failed: " + e.getMessage()); } } + + assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount()); } - @Test - public void synchronizationTest() throws Exception { - new BlockingThread("foo").run2(); + public static Set getOnlyExiServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0 + ); } - @Test - public void multipleBlockingClients() throws Exception { - List threads = new ArrayList<>(); - for (int i = 0; i < CONCURRENCY; i++) { - BlockingThread thread = new BlockingThread(String.valueOf(i)); - threads.add(thread); - thread.start(); + public static Set getOnlyChunkServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1 + ); + } + + public Runnable getInstanceOfClientRunnable() throws Exception { + return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this); + } + + /** + * Responds to all operations except start-exi and counts all requests + */ + private static class TestingNetconfOperation implements NetconfOperation { + + private final AtomicLong counter = new AtomicLong(); + + @Override + public HandlingPriority canHandle(Document message) { + return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ? + HandlingPriority.CANNOT_HANDLE : + HandlingPriority.HANDLE_WITH_MAX_PRIORITY; } - for (BlockingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); + @Override + public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException { + try { + logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage)); + counter.getAndIncrement(); + return XmlUtil.readXmlToDocument(""); + } catch (Exception e) { + throw new RuntimeException(e); } } + + public long getMessageCount() { + return counter.get(); + } } - class BlockingThread extends Thread { - private Optional thrownException; + /** + * Hardcoded operation service factory + */ + private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory { + private final NetconfOperation[] operations; - public BlockingThread(String name) { - super("client-" + name); + public TestingOperationServiceFactory(final NetconfOperation... operations) { + this.operations = operations; } + @Override + public NetconfOperationService createService(String netconfSessionIdForReporting) { + return new NetconfOperationService() { + @Override + public Set getCapabilities() { + return Collections.emptySet(); + } + + @Override + public Set getNetconfOperations() { + return Sets.newHashSet(operations); + } + + @Override + public void close() {} + }; + } + } + + /** + * Pure socket based blocking client + */ + public final class BlockingClientRunnable implements Runnable { + @Override public void run() { try { run2(); - thrownException = Optional.absent(); } catch (Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } @@ -259,34 +325,32 @@ public class ConcurrentClientsTest { } } - class TestingThread extends Thread { - - private final String clientId; - private final int attempts; - private Optional thrownException; - - TestingThread(String clientId, int attempts) { - this.clientId = clientId; - this.attempts = attempts; - setName("client-" + clientId); - } + /** + * TestingNetconfClient based runnable + */ + public final class TestingNetconfClientRunnable implements Runnable { @Override public void run() { try { - final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER); + final TestingNetconfClient netconfClient = new TestingNetconfClient(Thread.currentThread().getName(), + netconfAddress, netconfClientDispatcher); long sessionId = netconfClient.getSessionId(); - logger.info("Client with sessionid {} hello exchanged", sessionId); + logger.info("Client with session id {}: hello exchanged", sessionId); final NetconfMessage getMessage = XmlFileLoader .xmlFileToNetconfMessage("netconfMessages/getConfig.xml"); - NetconfMessage result = netconfClient.sendMessage(getMessage); - logger.info("Client with sessionid {} got result {}", sessionId, result); + NetconfMessage result = netconfClient.sendRequest(getMessage).get(); + logger.info("Client with session id {}: got result {}", sessionId, result); + + Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false, + "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: " + + XmlUtil.toString(getMessage.getDocument())); + netconfClient.close(); - logger.info("Client with session id {} ended", sessionId); - thrownException = Optional.absent(); + logger.info("Client with session id {}: ended", sessionId); } catch (final Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } }