From: Maros Marsalek Date: Mon, 28 Apr 2014 13:41:35 +0000 (+0200) Subject: BUG-848 Make base capabilities from netconf server configurable X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~164^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a8c1a6e9cb8570fb7baac0f147a9143826c1a826 BUG-848 Make base capabilities from netconf server configurable Refactor ConcurrentClientsTest, add cases for different netconf server configuration (EXI/CHUNK) Change-Id: Ie84ea1ff716b11b0da49e0498127a0defc86f41f Signed-off-by: Maros Marsalek --- diff --git a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java index e40ee577fe..d5a34d11b2 100644 --- a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java +++ b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.netconf.impl; import static org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider.NetconfOperationProviderUtil.getNetconfSessionIdForReporting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import java.util.Set; @@ -35,8 +36,7 @@ import org.slf4j.LoggerFactory; public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory { - // TODO make this configurable - private static final Set DEFAULT_BASE_CAPABILITIES = ImmutableSet.of( + public static final Set DEFAULT_BASE_CAPABILITIES = ImmutableSet.of( XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1, XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0 @@ -50,18 +50,42 @@ public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorF private final DefaultCommitNotificationProducer commitNotificationProducer; private final SessionMonitoringService monitoringService; private static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionNegotiatorFactory.class); + private final Set baseCapabilities; // TODO too many params, refactor public NetconfServerSessionNegotiatorFactory(Timer timer, NetconfOperationProvider netconfOperationProvider, SessionIdProvider idProvider, long connectionTimeoutMillis, DefaultCommitNotificationProducer commitNot, SessionMonitoringService monitoringService) { + this(timer, netconfOperationProvider, idProvider, connectionTimeoutMillis, commitNot, monitoringService, DEFAULT_BASE_CAPABILITIES); + } + + // TODO too many params, refactor + public NetconfServerSessionNegotiatorFactory(Timer timer, NetconfOperationProvider netconfOperationProvider, + SessionIdProvider idProvider, long connectionTimeoutMillis, + DefaultCommitNotificationProducer commitNot, + SessionMonitoringService monitoringService, Set baseCapabilities) { this.timer = timer; this.netconfOperationProvider = netconfOperationProvider; this.idProvider = idProvider; this.connectionTimeoutMillis = connectionTimeoutMillis; this.commitNotificationProducer = commitNot; this.monitoringService = monitoringService; + this.baseCapabilities = validateBaseCapabilities(baseCapabilities); + } + + private ImmutableSet validateBaseCapabilities(final Set baseCapabilities) { + // Check base capabilities to be supported by the server + Sets.SetView unknownBaseCaps = Sets.difference(baseCapabilities, DEFAULT_BASE_CAPABILITIES); + Preconditions.checkArgument(unknownBaseCaps.isEmpty(), + "Base capabilities that will be supported by netconf server have to be subset of %s, unknown base capabilities: %s", + DEFAULT_BASE_CAPABILITIES, unknownBaseCaps); + + ImmutableSet.Builder b = ImmutableSet.builder(); + b.addAll(baseCapabilities); + // Base 1.0 capability is supported by default + b.add(XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0); + return b.build(); } /** @@ -99,7 +123,7 @@ public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorF } private NetconfHelloMessage createHelloMessage(long sessionId, CapabilityProvider capabilityProvider) throws NetconfDocumentedException { - return NetconfHelloMessage.createServerHello(Sets.union(capabilityProvider.getCapabilities(), DEFAULT_BASE_CAPABILITIES), sessionId); + return NetconfHelloMessage.createServerHello(Sets.union(capabilityProvider.getCapabilities(), baseCapabilities), sessionId); } } diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java index 659743ab6d..0a0cd22088 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java @@ -16,23 +16,35 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import io.netty.channel.EventLoopGroup; +import io.netty.util.HashedWheelTimer; import java.io.DataOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; -import org.junit.Ignore; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; @@ -49,30 +61,53 @@ import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAddi import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil; import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage; import org.opendaylight.controller.netconf.util.test.XmlFileLoader; +import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; -import com.google.common.base.Optional; import com.google.common.collect.Sets; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.HashedWheelTimer; +@RunWith(Parameterized.class) public class ConcurrentClientsTest { + private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class); - private static final int CONCURRENCY = 64; - public static final int NETTY_THREADS = 4; + private static ExecutorService clientExecutor; - private EventLoopGroup nettyGroup; - private NetconfClientDispatcher netconfClientDispatcher; + private static final int CONCURRENCY = 32; + private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); - private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303); + private int nettyThreads; + private Class clientRunnable; + private Set serverCaps; - static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class); + public ConcurrentClientsTest(int nettyThreads, Class clientRunnable, Set serverCaps) { + this.nettyThreads = nettyThreads; + this.clientRunnable = clientRunnable; + this.serverCaps = serverCaps; + } + + @Parameterized.Parameters() + public static Collection data() { + return Arrays.asList(new Object[][]{ + {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES}, + // empty set of capabilities = only base 1.0 netconf capability + {4, TestingNetconfClientRunnable.class, Collections.emptySet()}, + {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()}, + {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()}, + + {4, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + {1, BlockingClientRunnable.class, getOnlyExiServerCaps()}, + }); + } + + private EventLoopGroup nettyGroup; + private NetconfClientDispatcher netconfClientDispatcher; private DefaultCommitNotificationProducer commitNot; @@ -86,24 +121,38 @@ public class ConcurrentClientsTest { return monitoring; } - // TODO refactor and test with different configurations + @BeforeClass + public static void setUpClientExecutor() { + clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() { + int i = 1; + + @Override + public Thread newThread(final Runnable r) { + Thread thread = new Thread(r); + thread.setName("client-" + i++); + thread.setDaemon(true); + return thread; + } + }); + } @Before public void setUp() throws Exception { - nettyGroup = new NioEventLoopGroup(NETTY_THREADS); + nettyGroup = new NioEventLoopGroup(nettyThreads); NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client"); netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000); NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl(); + testingNetconfOperation = new TestingNetconfOperation(); - factoriesListener.onAddNetconfOperationServiceFactory(mockOpF(testingNetconfOperation)); + factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation)); SessionIdProvider idProvider = new SessionIdProvider(); hashedWheelTimer = new HashedWheelTimer(); NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory( - hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService()); + hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps); commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer()); @@ -116,74 +165,64 @@ public class ConcurrentClientsTest { @After public void tearDown(){ + commitNot.close(); hashedWheelTimer.stop(); - nettyGroup.shutdownGracefully(); + try { + nettyGroup.shutdownGracefully().get(); + } catch (InterruptedException | ExecutionException e) { + logger.warn("Ignoring exception while cleaning up after test", e); + } } - private NetconfOperationServiceFactory mockOpF(final NetconfOperation... operations) { - return new TestingOperationServiceFactory(operations); + @AfterClass + public static void tearDownClientExecutor() { + clientExecutor.shutdownNow(); } - @After - public void cleanUp() throws Exception { - commitNot.close(); - } + @Test(timeout = CONCURRENCY * 1000) + public void testConcurrentClients() throws Exception { - @Test(timeout = 30 * 1000) - public void multipleClients() throws Exception { - List threads = new ArrayList<>(); + List> futures = Lists.newArrayListWithCapacity(CONCURRENCY); - final int attempts = 5; for (int i = 0; i < CONCURRENCY; i++) { - TestingThread thread = new TestingThread(String.valueOf(i), attempts); - threads.add(thread); - thread.start(); + futures.add(clientExecutor.submit(getInstanceOfClientRunnable())); } - for (TestingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } catch (ExecutionException e) { + logger.error("Thread for testing client failed", e); + fail("Client failed: " + e.getMessage()); } } assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount()); } - /** - * Cannot handle CHUNK, make server configurable - */ - @Ignore - @Test(timeout = 30 * 1000) - public void synchronizationTest() throws Exception { - new BlockingThread("foo").run2(); + public static Set getOnlyExiServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0 + ); } - /** - * Cannot handle CHUNK, make server configurable - */ - @Ignore - @Test(timeout = 30 * 1000) - public void multipleBlockingClients() throws Exception { - List threads = new ArrayList<>(); - for (int i = 0; i < CONCURRENCY; i++) { - BlockingThread thread = new BlockingThread(String.valueOf(i)); - threads.add(thread); - thread.start(); - } + public static Set getOnlyChunkServerCaps() { + return Sets.newHashSet( + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1 + ); + } - for (BlockingThread thread : threads) { - thread.join(); - if(thread.thrownException.isPresent()) { - Exception exception = thread.thrownException.get(); - logger.error("Thread for testing client failed", exception); - fail("Client thread " + thread + " failed: " + exception.getMessage()); - } - } + public Runnable getInstanceOfClientRunnable() throws Exception { + return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this); } + /** + * Responds to all operations except start-exi and counts all requests + */ private static class TestingNetconfOperation implements NetconfOperation { private final AtomicLong counter = new AtomicLong(); @@ -211,6 +250,9 @@ public class ConcurrentClientsTest { } } + /** + * Hardcoded operation service factory + */ private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory { private final NetconfOperation[] operations; @@ -228,30 +270,26 @@ public class ConcurrentClientsTest { @Override public Set getNetconfOperations() { - return Sets. newHashSet(operations); + return Sets.newHashSet(operations); } @Override - public void close() { - } + public void close() {} }; } } - class BlockingThread extends Thread { - private Optional thrownException; - - public BlockingThread(String name) { - super("client-" + name); - } + /** + * Pure socket based blocking client + */ + public final class BlockingClientRunnable implements Runnable { @Override public void run() { try { run2(); - thrownException = Optional.absent(); } catch (Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } @@ -287,39 +325,32 @@ public class ConcurrentClientsTest { } } - class TestingThread extends Thread { - - private final String clientId; - private final int attempts; - private Optional thrownException; - - TestingThread(String clientId, int attempts) { - this.clientId = clientId; - this.attempts = attempts; - setName("client-" + clientId); - } + /** + * TestingNetconfClient based runnable + */ + public final class TestingNetconfClientRunnable implements Runnable { @Override public void run() { try { - final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher); + final TestingNetconfClient netconfClient = new TestingNetconfClient(Thread.currentThread().getName(), + netconfAddress, netconfClientDispatcher); long sessionId = netconfClient.getSessionId(); - logger.info("Client with sessionid {} hello exchanged", sessionId); + logger.info("Client with session id {}: hello exchanged", sessionId); final NetconfMessage getMessage = XmlFileLoader .xmlFileToNetconfMessage("netconfMessages/getConfig.xml"); NetconfMessage result = netconfClient.sendRequest(getMessage).get(); - logger.info("Client with sessionid {} got result {}", sessionId, result); + logger.info("Client with session id {}: got result {}", sessionId, result); Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false, - "Received error response: " + XmlUtil.toString(result.getDocument()) + - " to request: " + XmlUtil.toString(getMessage.getDocument())); + "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: " + + XmlUtil.toString(getMessage.getDocument())); netconfClient.close(); - logger.info("Client with session id {} ended", sessionId); - thrownException = Optional.absent(); + logger.info("Client with session id {}: ended", sessionId); } catch (final Exception e) { - thrownException = Optional.of(e); + throw new IllegalStateException(Thread.currentThread().getName(), e); } } }