X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-impl%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fimpl%2FConcurrentClientsTest.java;h=b8622d1e911c54e225fb02bc032c5a9f9fae702a;hp=c0d52ad85e87560a6dd0b645a40b809255f663fb;hb=31b7a44c89d1057489338492fcf62a64147bea24;hpb=2e7347fdc0eb7734ff59a4f902227a93ab6afece diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java index c0d52ad85e..b8622d1e91 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java @@ -8,119 +8,160 @@ package org.opendaylight.controller.netconf.impl; -import com.google.common.base.Optional; -import com.google.common.collect.Sets; -import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.HashedWheelTimer; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +import java.io.DataOutputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.io.IOUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.config.util.ConfigRegistryJMXClient; -import org.opendaylight.controller.config.util.ConfigTransactionJMXClient; -import org.opendaylight.controller.config.yang.store.api.YangStoreService; -import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.api.NetconfOperationRouter; -import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; +import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl; +import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener; +import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; +import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder; +import org.opendaylight.controller.netconf.client.test.TestingNetconfClient; import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl; import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService; import org.opendaylight.controller.netconf.mapping.api.Capability; import org.opendaylight.controller.netconf.mapping.api.HandlingPriority; import org.opendaylight.controller.netconf.mapping.api.NetconfOperation; -import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory; import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil; +import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage; import org.opendaylight.controller.netconf.util.test.XmlFileLoader; +import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.protocol.framework.NeverReconnectStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; -import javax.management.ObjectName; -import java.io.DataOutputStream; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.GlobalEventExecutor; +@RunWith(Parameterized.class) public class ConcurrentClientsTest { + private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class); - private static final int CONCURRENCY = 16; - private EventLoopGroup nettyGroup; - private NetconfClientDispatcher netconfClientDispatcher; + private static ExecutorService clientExecutor; - @Mock - private YangStoreService yangStoreService; - @Mock - private ConfigRegistryJMXClient jmxClient; + private static final int CONCURRENCY = 32; + private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); - private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); + private int nettyThreads; + private Class clientRunnable; + private Set serverCaps; - static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class); + public ConcurrentClientsTest(int nettyThreads, Class clientRunnable, Set serverCaps) { + this.nettyThreads = nettyThreads; + this.clientRunnable = clientRunnable; + this.serverCaps = serverCaps; + } - private DefaultCommitNotificationProducer commitNot; - private NetconfServerDispatcher dispatch; + @Parameterized.Parameters() + public static Collection data() { + return Arrays.asList(new Object[][]{ + {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + // empty set of capabilities = only base 1.0 netconf capability + {4, TestingNetconfClientRunnable.class, Collections.emptySet()}, + {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()}, + {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()}, + + {4, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + {1, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + }); + } - @Mock - private SessionMonitoringService monitoring; + private EventLoopGroup nettyGroup; + private NetconfClientDispatcher netconfClientDispatcher; + + private DefaultCommitNotificationProducer commitNot; HashedWheelTimer hashedWheelTimer; + private TestingNetconfOperation testingNetconfOperation; - @Before - public void setUp() throws Exception { - { // init mocks - MockitoAnnotations.initMocks(this); - final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class); - doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot(); - doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap(); + public static SessionMonitoringService createMockedMonitoringService() { + SessionMonitoringService monitoring = mock(SessionMonitoringService.class); + doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class)); + doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class)); + return monitoring; + } - final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class); - doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class)); + @BeforeClass + public static void setUpClientExecutor() { + clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() { + int i = 1; - doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans(); - } + @Override + public Thread newThread(final Runnable r) { + Thread thread = new Thread(r); + thread.setName("client-" + i++); + thread.setDaemon(true); + return thread; + } + }); + } - nettyGroup = new NioEventLoopGroup(); - NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client"); - netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000); + @Before + public void setUp() throws Exception { + hashedWheelTimer = new HashedWheelTimer(); + nettyGroup = new NioEventLoopGroup(nettyThreads); + netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer); NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl(); - factoriesListener.onAddNetconfOperationServiceFactory(mockOpF()); + + testingNetconfOperation = new TestingNetconfOperation(); + factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation)); SessionIdProvider idProvider = new SessionIdProvider(); - hashedWheelTimer = new HashedWheelTimer(); + NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory( - hashedWheelTimer, factoriesListener, idProvider, 5000); + hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps); commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer()); - doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class)); - doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class)); - - NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory( - factoriesListener, commitNot, idProvider, monitoring); - NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory); - dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup); + NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory); + final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup); ChannelFuture s = dispatch.createServer(netconfAddress); s.await(); @@ -128,117 +169,131 @@ public class ConcurrentClientsTest { @After public void tearDown(){ + commitNot.close(); hashedWheelTimer.stop(); - nettyGroup.shutdownGracefully(); + try { + nettyGroup.shutdownGracefully().get(); + } catch (InterruptedException | ExecutionException e) { + logger.warn("Ignoring exception while cleaning up after test", e); + } } - private NetconfOperationServiceFactory mockOpF() { - return new NetconfOperationServiceFactory() { - @Override - public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) { - return new NetconfOperationService() { - @Override - public Set getCapabilities() { - return Collections.emptySet(); - } - - @Override - public Set getNetconfOperations() { - return Sets. newHashSet(new NetconfOperation() { - @Override - public HandlingPriority canHandle(Document message) { - return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE); - } - - @Override - public Document handle(Document message, NetconfOperationRouter operationRouter) - throws NetconfDocumentedException { - try { - return XmlUtil.readXmlToDocument(""); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - @Override - public Set getFilters() { - return Collections.emptySet(); - } - - @Override - public void close() { - } - }; - } - }; + @AfterClass + public static void tearDownClientExecutor() { + clientExecutor.shutdownNow(); } - @After - public void cleanUp() throws Exception { - commitNot.close(); - } + @Test(timeout = CONCURRENCY * 1000) + public void testConcurrentClients() throws Exception { - @Test - public void multipleClients() throws Exception { - List threads = new ArrayList<>(); + List> futures = Lists.newArrayListWithCapacity(CONCURRENCY); - final int attempts = 5; for (int i = 0; i < CONCURRENCY; i++) { - TestingThread thread = new TestingThread(String.valueOf(i), attempts); - threads.add(thread); - thread.start(); + futures.add(clientExecutor.submit(getInstanceOfClientRunnable())); } - for (TestingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } catch (ExecutionException e) { + logger.error("Thread for testing client failed", e); + fail("Client failed: " + e.getMessage()); } } + + assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount()); } - @Test - public void synchronizationTest() throws Exception { - new BlockingThread("foo").run2(); + public static Set getOnlyExiServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0 + ); } - @Test - public void multipleBlockingClients() throws Exception { - List threads = new ArrayList<>(); - for (int i = 0; i < CONCURRENCY; i++) { - BlockingThread thread = new BlockingThread(String.valueOf(i)); - threads.add(thread); - thread.start(); + public static Set getOnlyChunkServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1 + ); + } + + public Runnable getInstanceOfClientRunnable() throws Exception { + return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this); + } + + /** + * Responds to all operations except start-exi and counts all requests + */ + private static class TestingNetconfOperation implements NetconfOperation { + + private final AtomicLong counter = new AtomicLong(); + + @Override + public HandlingPriority canHandle(Document message) { + return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ? + HandlingPriority.CANNOT_HANDLE : + HandlingPriority.HANDLE_WITH_MAX_PRIORITY; } - for (BlockingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); + @Override + public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException { + try { + logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage)); + counter.getAndIncrement(); + return XmlUtil.readXmlToDocument(""); + } catch (Exception e) { + throw new RuntimeException(e); } } + + public long getMessageCount() { + return counter.get(); + } } - class BlockingThread extends Thread { - private Optional thrownException; + /** + * Hardcoded operation service factory + */ + private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory { + private final NetconfOperation[] operations; - public BlockingThread(String name) { - super("client-" + name); + public TestingOperationServiceFactory(final NetconfOperation... operations) { + this.operations = operations; } + @Override + public NetconfOperationService createService(String netconfSessionIdForReporting) { + return new NetconfOperationService() { + @Override + public Set getCapabilities() { + return Collections.emptySet(); + } + + @Override + public Set getNetconfOperations() { + return Sets.newHashSet(operations); + } + + @Override + public void close() {} + }; + } + } + + /** + * Pure socket based blocking client + */ + public final class BlockingClientRunnable implements Runnable { + @Override public void run() { try { run2(); - thrownException = Optional.absent(); } catch (Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } @@ -274,35 +329,44 @@ public class ConcurrentClientsTest { } } - class TestingThread extends Thread { - - private final String clientId; - private final int attempts; - private Optional thrownException; - - TestingThread(String clientId, int attempts) { - this.clientId = clientId; - this.attempts = attempts; - setName("client-" + clientId); - } + /** + * TestingNetconfClient based runnable + */ + public final class TestingNetconfClientRunnable implements Runnable { @Override public void run() { try { - final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher); + final TestingNetconfClient netconfClient = + new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig()); long sessionId = netconfClient.getSessionId(); - logger.info("Client with sessionid {} hello exchanged", sessionId); + logger.info("Client with session id {}: hello exchanged", sessionId); final NetconfMessage getMessage = XmlFileLoader .xmlFileToNetconfMessage("netconfMessages/getConfig.xml"); - NetconfMessage result = netconfClient.sendMessage(getMessage); - logger.info("Client with sessionid {} got result {}", sessionId, result); + NetconfMessage result = netconfClient.sendRequest(getMessage).get(); + logger.info("Client with session id {}: got result {}", sessionId, result); + + Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false, + "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: " + + XmlUtil.toString(getMessage.getDocument())); + netconfClient.close(); - logger.info("Client with session id {} ended", sessionId); - thrownException = Optional.absent(); + logger.info("Client with session id {}: ended", sessionId); } catch (final Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } + + private NetconfClientConfiguration getClientConfig() { + final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create(); + b.withAddress(netconfAddress); + b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", + "client")); + b.withSessionListener(new SimpleNetconfClientSessionListener()); + b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, + NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS)); + return b.build(); + } } }