package org.opendaylight.controller.netconf.impl;
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.io.DataOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.io.IOUtils;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.Mock;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
import org.opendaylight.controller.netconf.mapping.api.Capability;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
-import java.io.DataOutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
+@RunWith(Parameterized.class)
+public class ConcurrentClientsTest {
+ private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.MockitoAnnotations.initMocks;
+ private static ExecutorService clientExecutor;
-public class ConcurrentClientsTest {
+ private static final int CONCURRENCY = 32;
+ private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
+
+ private int nettyThreads;
+ private Class<? extends Runnable> clientRunnable;
+ private Set<String> serverCaps;
+
+ public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
+ this.nettyThreads = nettyThreads;
+ this.clientRunnable = clientRunnable;
+ this.serverCaps = serverCaps;
+ }
+
+ @Parameterized.Parameters()
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
+ {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
+ // empty set of capabilities = only base 1.0 netconf capability
+ {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
+ {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
+ {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
+
+ {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
+ {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
+ });
+ }
- private static final int CONCURRENCY = 16;
private EventLoopGroup nettyGroup;
private NetconfClientDispatcher netconfClientDispatcher;
- private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
+ private DefaultCommitNotificationProducer commitNot;
- static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
+ HashedWheelTimer hashedWheelTimer;
+ private TestingNetconfOperation testingNetconfOperation;
- private DefaultCommitNotificationProducer commitNot;
- private NetconfServerDispatcher dispatch;
+ public static SessionMonitoringService createMockedMonitoringService() {
+ SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
+ doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
+ doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
+ return monitoring;
+ }
- @Mock
- private SessionMonitoringService monitoring;
+ @BeforeClass
+ public static void setUpClientExecutor() {
+ clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
+ int i = 1;
- HashedWheelTimer hashedWheelTimer;
+ @Override
+ public Thread newThread(final Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName("client-" + i++);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
@Before
public void setUp() throws Exception {
- initMocks(this);
- nettyGroup = new NioEventLoopGroup();
- NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
- netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
+ hashedWheelTimer = new HashedWheelTimer();
+ nettyGroup = new NioEventLoopGroup(nettyThreads);
+ netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
- factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
+
+ testingNetconfOperation = new TestingNetconfOperation();
+ factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
SessionIdProvider idProvider = new SessionIdProvider();
- hashedWheelTimer = new HashedWheelTimer();
+
NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
- hashedWheelTimer, factoriesListener, idProvider, 5000);
+ hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps);
commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
- doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
- doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
-
- NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
- factoriesListener, commitNot, idProvider, monitoring);
- NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
- dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
+ NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
+ final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
ChannelFuture s = dispatch.createServer(netconfAddress);
s.await();
@After
public void tearDown(){
+ commitNot.close();
hashedWheelTimer.stop();
- nettyGroup.shutdownGracefully();
+ try {
+ nettyGroup.shutdownGracefully().get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.warn("Ignoring exception while cleaning up after test", e);
+ }
}
- private NetconfOperationServiceFactory mockOpF() {
- return new NetconfOperationServiceFactory() {
- @Override
- public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
- return new NetconfOperationService() {
- @Override
- public Set<Capability> getCapabilities() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<NetconfOperation> getNetconfOperations() {
- return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
- @Override
- public HandlingPriority canHandle(Document message) {
- return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
- }
-
- @Override
- public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
- try {
- return XmlUtil.readXmlToDocument("<test/>");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- @Override
- public void close() {
- }
- };
- }
- };
+ @AfterClass
+ public static void tearDownClientExecutor() {
+ clientExecutor.shutdownNow();
}
- @After
- public void cleanUp() throws Exception {
- commitNot.close();
- }
+ @Test(timeout = CONCURRENCY * 1000)
+ public void testConcurrentClients() throws Exception {
- @Test
- public void multipleClients() throws Exception {
- List<TestingThread> threads = new ArrayList<>();
+ List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
- final int attempts = 5;
for (int i = 0; i < CONCURRENCY; i++) {
- TestingThread thread = new TestingThread(String.valueOf(i), attempts);
- threads.add(thread);
- thread.start();
+ futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
}
- for (TestingThread thread : threads) {
- thread.join();
- if(thread.thrownException.isPresent()) {
- Exception exception = thread.thrownException.get();
- logger.error("Thread for testing client failed", exception);
- fail("Client thread " + thread + " failed: " + exception.getMessage());
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ logger.error("Thread for testing client failed", e);
+ fail("Client failed: " + e.getMessage());
}
}
+
+ assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
}
- @Test
- public void synchronizationTest() throws Exception {
- new BlockingThread("foo").run2();
+ public static Set<String> getOnlyExiServerCaps() {
+ return Sets.newHashSet(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
+ );
}
- @Test
- public void multipleBlockingClients() throws Exception {
- List<BlockingThread> threads = new ArrayList<>();
- for (int i = 0; i < CONCURRENCY; i++) {
- BlockingThread thread = new BlockingThread(String.valueOf(i));
- threads.add(thread);
- thread.start();
+ public static Set<String> getOnlyChunkServerCaps() {
+ return Sets.newHashSet(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
+ );
+ }
+
+ public Runnable getInstanceOfClientRunnable() throws Exception {
+ return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
+ }
+
+ /**
+ * Responds to all operations except start-exi and counts all requests
+ */
+ private static class TestingNetconfOperation implements NetconfOperation {
+
+ private final AtomicLong counter = new AtomicLong();
+
+ @Override
+ public HandlingPriority canHandle(Document message) {
+ return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
+ HandlingPriority.CANNOT_HANDLE :
+ HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
}
- for (BlockingThread thread : threads) {
- thread.join();
- if(thread.thrownException.isPresent()) {
- Exception exception = thread.thrownException.get();
- logger.error("Thread for testing client failed", exception);
- fail("Client thread " + thread + " failed: " + exception.getMessage());
+ @Override
+ public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
+ try {
+ logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
+ counter.getAndIncrement();
+ return XmlUtil.readXmlToDocument("<test/>");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
+
+ public long getMessageCount() {
+ return counter.get();
+ }
}
- class BlockingThread extends Thread {
- private Optional<Exception> thrownException;
+ /**
+ * Hardcoded operation service factory
+ */
+ private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
+ private final NetconfOperation[] operations;
+
+ public TestingOperationServiceFactory(final NetconfOperation... operations) {
+ this.operations = operations;
+ }
- public BlockingThread(String name) {
- super("client-" + name);
+ @Override
+ public NetconfOperationService createService(String netconfSessionIdForReporting) {
+ return new NetconfOperationService() {
+ @Override
+ public Set<Capability> getCapabilities() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<NetconfOperation> getNetconfOperations() {
+ return Sets.newHashSet(operations);
+ }
+
+ @Override
+ public void close() {}
+ };
}
+ }
+
+ /**
+ * Pure socket based blocking client
+ */
+ public final class BlockingClientRunnable implements Runnable {
@Override
public void run() {
try {
run2();
- thrownException = Optional.absent();
} catch (Exception e) {
- thrownException = Optional.of(e);
+ throw new IllegalStateException(Thread.currentThread().getName(), e);
}
}
}
}
- class TestingThread extends Thread {
-
- private final String clientId;
- private final int attempts;
- private Optional<Exception> thrownException;
-
- TestingThread(String clientId, int attempts) {
- this.clientId = clientId;
- this.attempts = attempts;
- setName("client-" + clientId);
- }
+ /**
+ * TestingNetconfClient based runnable
+ */
+ public final class TestingNetconfClientRunnable implements Runnable {
@Override
public void run() {
try {
- final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
+ final TestingNetconfClient netconfClient =
+ new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig());
long sessionId = netconfClient.getSessionId();
- logger.info("Client with sessionid {} hello exchanged", sessionId);
+ logger.info("Client with session id {}: hello exchanged", sessionId);
final NetconfMessage getMessage = XmlFileLoader
.xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
- NetconfMessage result = netconfClient.sendMessage(getMessage);
- logger.info("Client with sessionid {} got result {}", sessionId, result);
+ NetconfMessage result = netconfClient.sendRequest(getMessage).get();
+ logger.info("Client with session id {}: got result {}", sessionId, result);
+
+ Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
+ "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
+ + XmlUtil.toString(getMessage.getDocument()));
+
netconfClient.close();
- logger.info("Client with session id {} ended", sessionId);
- thrownException = Optional.absent();
+ logger.info("Client with session id {}: ended", sessionId);
} catch (final Exception e) {
- thrownException = Optional.of(e);
+ throw new IllegalStateException(Thread.currentThread().getName(), e);
}
}
+
+ private NetconfClientConfiguration getClientConfig() {
+ final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
+ b.withAddress(netconfAddress);
+ b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
+ "client"));
+ b.withSessionListener(new SimpleNetconfClientSessionListener());
+ b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
+ NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
+ return b.build();
+ }
}
}