BUG-848 Make base capabilities from netconf server configurable 44/6544/2
authorMaros Marsalek <mmarsale@cisco.com>
Mon, 28 Apr 2014 13:41:35 +0000 (15:41 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Tue, 29 Apr 2014 07:21:33 +0000 (07:21 +0000)
Refactor ConcurrentClientsTest, add cases for different netconf server configuration (EXI/CHUNK)

Change-Id: Ie84ea1ff716b11b0da49e0498127a0defc86f41f
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java

index e40ee57..d5a34d1 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.netconf.impl;
 
 import static org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider.NetconfOperationProviderUtil.getNetconfSessionIdForReporting;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import java.util.Set;
 
@@ -35,8 +36,7 @@ import org.slf4j.LoggerFactory;
 
 public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfHelloMessage, NetconfServerSession, NetconfServerSessionListener> {
 
-    // TODO make this configurable
-    private static final Set<String> DEFAULT_BASE_CAPABILITIES = ImmutableSet.of(
+    public static final Set<String> DEFAULT_BASE_CAPABILITIES = ImmutableSet.of(
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
@@ -50,18 +50,42 @@ public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorF
     private final DefaultCommitNotificationProducer commitNotificationProducer;
     private final SessionMonitoringService monitoringService;
     private static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionNegotiatorFactory.class);
+    private final Set<String> baseCapabilities;
 
     // TODO too many params, refactor
     public NetconfServerSessionNegotiatorFactory(Timer timer, NetconfOperationProvider netconfOperationProvider,
                                                  SessionIdProvider idProvider, long connectionTimeoutMillis,
                                                  DefaultCommitNotificationProducer commitNot,
                                                  SessionMonitoringService monitoringService) {
+        this(timer, netconfOperationProvider, idProvider, connectionTimeoutMillis, commitNot, monitoringService, DEFAULT_BASE_CAPABILITIES);
+    }
+
+    // TODO too many params, refactor
+    public NetconfServerSessionNegotiatorFactory(Timer timer, NetconfOperationProvider netconfOperationProvider,
+                                                 SessionIdProvider idProvider, long connectionTimeoutMillis,
+                                                 DefaultCommitNotificationProducer commitNot,
+                                                 SessionMonitoringService monitoringService, Set<String> baseCapabilities) {
         this.timer = timer;
         this.netconfOperationProvider = netconfOperationProvider;
         this.idProvider = idProvider;
         this.connectionTimeoutMillis = connectionTimeoutMillis;
         this.commitNotificationProducer = commitNot;
         this.monitoringService = monitoringService;
+        this.baseCapabilities = validateBaseCapabilities(baseCapabilities);
+    }
+
+    private ImmutableSet<String> validateBaseCapabilities(final Set<String> baseCapabilities) {
+        // Check base capabilities to be supported by the server
+        Sets.SetView<String> unknownBaseCaps = Sets.difference(baseCapabilities, DEFAULT_BASE_CAPABILITIES);
+        Preconditions.checkArgument(unknownBaseCaps.isEmpty(),
+                "Base capabilities that will be supported by netconf server have to be subset of %s, unknown base capabilities: %s",
+                DEFAULT_BASE_CAPABILITIES, unknownBaseCaps);
+
+        ImmutableSet.Builder<String> b = ImmutableSet.builder();
+        b.addAll(baseCapabilities);
+        // Base 1.0 capability is supported by default
+        b.add(XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0);
+        return b.build();
     }
 
     /**
@@ -99,7 +123,7 @@ public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorF
     }
 
     private NetconfHelloMessage createHelloMessage(long sessionId, CapabilityProvider capabilityProvider) throws NetconfDocumentedException {
-        return NetconfHelloMessage.createServerHello(Sets.union(capabilityProvider.getCapabilities(), DEFAULT_BASE_CAPABILITIES), sessionId);
+        return NetconfHelloMessage.createServerHello(Sets.union(capabilityProvider.getCapabilities(), baseCapabilities), sessionId);
     }
 
 }
index 659743a..0a0cd22 100644 (file)
@@ -16,23 +16,35 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
 import java.io.DataOutputStream;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
@@ -49,30 +61,53 @@ import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAddi
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
 import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
 
+@RunWith(Parameterized.class)
 public class ConcurrentClientsTest {
+    private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
 
-    private static final int CONCURRENCY = 64;
-    public static final int NETTY_THREADS = 4;
+    private static ExecutorService clientExecutor;
 
-    private EventLoopGroup nettyGroup;
-    private NetconfClientDispatcher netconfClientDispatcher;
+    private static final int CONCURRENCY = 32;
+    private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
 
-    private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
+    private int nettyThreads;
+    private Class<? extends Runnable> clientRunnable;
+    private Set<String> serverCaps;
 
-    static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
+    public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
+        this.nettyThreads = nettyThreads;
+        this.clientRunnable = clientRunnable;
+        this.serverCaps = serverCaps;
+    }
+
+    @Parameterized.Parameters()
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][]{
+                {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
+                {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
+                // empty set of capabilities = only base 1.0 netconf capability
+                {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
+                {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
+                {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
+
+                {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
+                {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
+        });
+    }
+
+    private EventLoopGroup nettyGroup;
+    private NetconfClientDispatcher netconfClientDispatcher;
 
     private DefaultCommitNotificationProducer commitNot;
 
@@ -86,24 +121,38 @@ public class ConcurrentClientsTest {
         return monitoring;
     }
 
-    // TODO refactor and test with different configurations
+    @BeforeClass
+    public static void setUpClientExecutor() {
+        clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
+            int i = 1;
+
+            @Override
+            public Thread newThread(final Runnable r) {
+                Thread thread = new Thread(r);
+                thread.setName("client-" + i++);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+    }
 
     @Before
     public void setUp() throws Exception {
 
-        nettyGroup = new NioEventLoopGroup(NETTY_THREADS);
+        nettyGroup = new NioEventLoopGroup(nettyThreads);
         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
 
         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
+
         testingNetconfOperation = new TestingNetconfOperation();
-        factoriesListener.onAddNetconfOperationServiceFactory(mockOpF(testingNetconfOperation));
+        factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
 
         SessionIdProvider idProvider = new SessionIdProvider();
         hashedWheelTimer = new HashedWheelTimer();
 
         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
-                hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService());
+                hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps);
 
         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
 
@@ -116,74 +165,64 @@ public class ConcurrentClientsTest {
 
     @After
     public void tearDown(){
+        commitNot.close();
         hashedWheelTimer.stop();
-        nettyGroup.shutdownGracefully();
+        try {
+            nettyGroup.shutdownGracefully().get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.warn("Ignoring exception while cleaning up after test", e);
+        }
     }
 
-    private NetconfOperationServiceFactory mockOpF(final NetconfOperation... operations) {
-        return new TestingOperationServiceFactory(operations);
+    @AfterClass
+    public static void tearDownClientExecutor() {
+        clientExecutor.shutdownNow();
     }
 
-    @After
-    public void cleanUp() throws Exception {
-        commitNot.close();
-    }
+    @Test(timeout = CONCURRENCY * 1000)
+    public void testConcurrentClients() throws Exception {
 
-    @Test(timeout = 30 * 1000)
-    public void multipleClients() throws Exception {
-        List<TestingThread> threads = new ArrayList<>();
+        List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
 
-        final int attempts = 5;
         for (int i = 0; i < CONCURRENCY; i++) {
-            TestingThread thread = new TestingThread(String.valueOf(i), attempts);
-            threads.add(thread);
-            thread.start();
+            futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
         }
 
-        for (TestingThread thread : threads) {
-            thread.join();
-            if(thread.thrownException.isPresent()) {
-                Exception exception = thread.thrownException.get();
-                logger.error("Thread for testing client failed", exception);
-                fail("Client thread " + thread + " failed: " + exception.getMessage());
+        for (Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException e) {
+                throw new IllegalStateException(e);
+            } catch (ExecutionException e) {
+                logger.error("Thread for testing client failed", e);
+                fail("Client failed: " + e.getMessage());
             }
         }
 
         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
     }
 
-    /**
-     * Cannot handle CHUNK, make server configurable
-     */
-    @Ignore
-    @Test(timeout = 30 * 1000)
-    public void synchronizationTest() throws Exception {
-        new BlockingThread("foo").run2();
+    public static Set<String> getOnlyExiServerCaps() {
+        return Sets.newHashSet(
+                XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+                XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
+        );
     }
 
-    /**
-     * Cannot handle CHUNK, make server configurable
-     */
-    @Ignore
-    @Test(timeout = 30 * 1000)
-    public void multipleBlockingClients() throws Exception {
-        List<BlockingThread> threads = new ArrayList<>();
-        for (int i = 0; i < CONCURRENCY; i++) {
-            BlockingThread thread = new BlockingThread(String.valueOf(i));
-            threads.add(thread);
-            thread.start();
-        }
+    public static Set<String> getOnlyChunkServerCaps() {
+        return Sets.newHashSet(
+                XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+                XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
+        );
+    }
 
-        for (BlockingThread thread : threads) {
-            thread.join();
-            if(thread.thrownException.isPresent()) {
-                Exception exception = thread.thrownException.get();
-                logger.error("Thread for testing client failed", exception);
-                fail("Client thread " + thread + " failed: " + exception.getMessage());
-            }
-        }
+    public Runnable getInstanceOfClientRunnable() throws Exception {
+        return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
     }
 
+    /**
+     * Responds to all operations except start-exi and counts all requests
+     */
     private static class TestingNetconfOperation implements NetconfOperation {
 
         private final AtomicLong counter = new AtomicLong();
@@ -211,6 +250,9 @@ public class ConcurrentClientsTest {
         }
     }
 
+    /**
+     * Hardcoded operation service factory
+     */
     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
         private final NetconfOperation[] operations;
 
@@ -228,30 +270,26 @@ public class ConcurrentClientsTest {
 
                 @Override
                 public Set<NetconfOperation> getNetconfOperations() {
-                    return Sets.<NetconfOperation> newHashSet(operations);
+                    return Sets.newHashSet(operations);
                 }
 
                 @Override
-                public void close() {
-                }
+                public void close() {}
             };
         }
     }
 
-    class BlockingThread extends Thread {
-        private Optional<Exception> thrownException;
-
-        public BlockingThread(String name) {
-            super("client-" + name);
-        }
+    /**
+     * Pure socket based blocking client
+     */
+    public final class BlockingClientRunnable implements Runnable {
 
         @Override
         public void run() {
             try {
                 run2();
-                thrownException = Optional.absent();
             } catch (Exception e) {
-                thrownException = Optional.of(e);
+                throw new IllegalStateException(Thread.currentThread().getName(), e);
             }
         }
 
@@ -287,39 +325,32 @@ public class ConcurrentClientsTest {
         }
     }
 
-    class TestingThread extends Thread {
-
-        private final String clientId;
-        private final int attempts;
-        private Optional<Exception> thrownException;
-
-        TestingThread(String clientId, int attempts) {
-            this.clientId = clientId;
-            this.attempts = attempts;
-            setName("client-" + clientId);
-        }
+    /**
+     * TestingNetconfClient based runnable
+     */
+    public final class TestingNetconfClientRunnable implements Runnable {
 
         @Override
         public void run() {
             try {
-                final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher);
+                final TestingNetconfClient netconfClient = new TestingNetconfClient(Thread.currentThread().getName(),
+                        netconfAddress, netconfClientDispatcher);
                 long sessionId = netconfClient.getSessionId();
-                logger.info("Client with sessionid {} hello exchanged", sessionId);
+                logger.info("Client with session id {}: hello exchanged", sessionId);
 
                 final NetconfMessage getMessage = XmlFileLoader
                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
-                logger.info("Client with sessionid {} got result {}", sessionId, result);
+                logger.info("Client with session id {}: got result {}", sessionId, result);
 
                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
-                        "Received error response: " + XmlUtil.toString(result.getDocument()) +
-                                " to request: " + XmlUtil.toString(getMessage.getDocument()));
+                        "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
+                                + XmlUtil.toString(getMessage.getDocument()));
 
                 netconfClient.close();
-                logger.info("Client with session id {} ended", sessionId);
-                thrownException = Optional.absent();
+                logger.info("Client with session id {}: ended", sessionId);
             } catch (final Exception e) {
-                thrownException = Optional.of(e);
+                throw new IllegalStateException(Thread.currentThread().getName(), e);
             }
         }
     }