Add AuthHandler to stresstool
[controller.git] / opendaylight / netconf / netconf-testtool / src / main / java / org / opendaylight / controller / netconf / test / tool / client / stress / StressClient.java
index fe0a0bcd523a8249e45a9c9f66f687e72611e97c..e03de59f2c71ea6694683b17c70ad27aa6e42900 100644 (file)
@@ -11,31 +11,31 @@ package org.opendaylight.controller.netconf.test.tool.client.stress;
 import ch.qos.logback.classic.Level;
 import com.google.common.base.Charsets;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.security.Provider;
+import java.security.Security;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
-import org.opendaylight.controller.netconf.client.NetconfClientSession;
-import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandler;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.protocol.framework.NeverReconnectStrategy;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -74,6 +74,7 @@ public final class StressClient {
                             "        <target>\n" +
                             "            <candidate/>\n" +
                             "        </target>\n" +
+                            "        <default-operation>none</default-operation>" +
                             "        <config/>\n" +
                             "    </edit-config>\n" +
                             "</rpc>");
@@ -82,106 +83,130 @@ public final class StressClient {
         }
     }
 
-    private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}";
     private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}";
+    private static final String PHYS_ADDR_PLACEHOLDER = "{PHYS_ADDR}";
+
+    private static long macStart = 0xAABBCCDD0000L;
 
     public static void main(final String[] args) {
+
         final Parameters params = parseArgs(args, Parameters.getParser());
         params.validate();
 
-        // TODO remove
-        try {
-            Thread.sleep(10000);
-        } catch (final InterruptedException e) {
-//            e.printStackTrace();
-        }
-
         final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
         root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
 
+        final int threadAmount = params.threadAmount;
+        LOG.info("thread amount: " + threadAmount);
+        final int requestsPerThread = params.editCount / params.threadAmount;
+        LOG.info("requestsPerThread: " + requestsPerThread);
+        final int leftoverRequests = params.editCount % params.threadAmount;
+        LOG.info("leftoverRequests: " + leftoverRequests);
+
+
         LOG.info("Preparing messages");
         // Prepare all msgs up front
-        final List<NetconfMessage> preparedMessages = Lists.newArrayListWithCapacity(params.editCount);
+        final List<List<NetconfMessage>> allPreparedMessages = new ArrayList<>(threadAmount);
+        for (int i = 0; i < threadAmount; i++) {
+            if (i != threadAmount - 1) {
+                allPreparedMessages.add(new ArrayList<NetconfMessage>(requestsPerThread));
+            } else {
+                allPreparedMessages.add(new ArrayList<NetconfMessage>(requestsPerThread + leftoverRequests));
+            }
+        }
+
 
         final String editContentString;
-        boolean needsModification = false;
         try {
             editContentString = Files.toString(params.editContent, Charsets.UTF_8);
-            if(editContentString.contains(MSG_ID_PLACEHOLDER)) {
-                needsModification = true;
-            };
-        } catch (IOException e) {
+        } catch (final IOException e) {
             throw new IllegalArgumentException("Cannot read content of " + params.editContent);
         }
 
-        for (int i = 0; i < params.editCount; i++) {
-            final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
-            msg.getDocumentElement().setAttribute("message-id", Integer.toString(i));
-            final NetconfMessage netconfMessage = new NetconfMessage(msg);
-
-            final Element editContentElement;
-            try {
-                // Insert message id where needed
-                final String specificEditContent = needsModification ?
-                        editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) :
-                        editContentString;
-
-                editContentElement = XmlUtil.readXmlToElement(specificEditContent);
-                final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
-                        getElementsByTagName("config").item(0);
-                config.appendChild(msg.importNode(editContentElement, true));
-            } catch (final IOException | SAXException e) {
-                throw new IllegalArgumentException("Edit content file is unreadable", e);
+        for (int i = 0; i < threadAmount; i++) {
+            final List<NetconfMessage> preparedMessages = allPreparedMessages.get(i);
+            int padding = 0;
+            if (i == threadAmount - 1) {
+                padding = leftoverRequests;
+            }
+            for (int j = 0; j < requestsPerThread + padding; j++) {
+                LOG.debug("id: " + (i * requestsPerThread + j));
+                preparedMessages.add(prepareMessage(i * requestsPerThread + j, editContentString));
             }
-
-            preparedMessages.add(netconfMessage);
-
         }
 
-
         final NioEventLoopGroup nioGroup = new NioEventLoopGroup();
         final Timer timer = new HashedWheelTimer();
 
         final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer);
 
-        final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress());
+        final List<StressClientCallable> callables = new ArrayList<>(threadAmount);
+        for (final List<NetconfMessage> messages : allPreparedMessages) {
+            callables.add(new StressClientCallable(params, netconfClientDispatcher, messages));
+        }
 
-        final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener);
+        final ExecutorService executorService = Executors.newFixedThreadPool(threadAmount);
 
-        LOG.info("Connecting to netconf server {}:{}", params.ip, params.port);
-        final NetconfClientSession netconfClientSession;
+        LOG.info("Starting stress test");
+        final Stopwatch started = Stopwatch.createStarted();
         try {
-            netconfClientSession = netconfClientDispatcher.createClient(cfg).get();
+            final List<Future<Boolean>> futures = executorService.invokeAll(callables);
+            for (final Future<Boolean> future : futures) {
+                try {
+                    future.get(4L, TimeUnit.MINUTES);
+                } catch (ExecutionException | TimeoutException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            executorService.shutdownNow();
         } catch (final InterruptedException e) {
-            throw new RuntimeException(e);
-        } catch (final ExecutionException e) {
-            throw new RuntimeException("Unable to connect", e);
+            throw new RuntimeException("Unable to execute requests", e);
         }
-
-        LOG.info("Starting stress test");
-        final Stopwatch started = Stopwatch.createStarted();
-        getExecutionStrategy(params, preparedMessages, sessionListener).invoke();
         started.stop();
 
         LOG.info("FINISHED. Execution time: {}", started);
         LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS)));
 
         // Cleanup
-        netconfClientSession.close();
         timer.stop();
         try {
             nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             LOG.warn("Unable to close executor properly", e);
         }
+        //stop the underlying ssh thread that gets spawned if we use ssh
+        if (params.ssh) {
+            AsyncSshHandler.DEFAULT_CLIENT.stop();
+        }
     }
 
-    private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
-        if(params.async) {
-            return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
-        } else {
-            return new SyncExecutionStrategy(params, preparedMessages, sessionListener);
+    static NetconfMessage prepareMessage(final int id, final String editContentString) {
+        final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
+        msg.getDocumentElement().setAttribute("message-id", Integer.toString(id));
+        final NetconfMessage netconfMessage = new NetconfMessage(msg);
+
+        final Element editContentElement;
+        try {
+            // Insert message id where needed
+            String specificEditContent = editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(id));
+
+            final StringBuilder stringBuilder = new StringBuilder(specificEditContent);
+            int idx = stringBuilder.indexOf(PHYS_ADDR_PLACEHOLDER);
+            while (idx!= -1) {
+                stringBuilder.replace(idx, idx + PHYS_ADDR_PLACEHOLDER.length(), getMac(macStart++));
+                idx = stringBuilder.indexOf(PHYS_ADDR_PLACEHOLDER);
+            }
+            specificEditContent = stringBuilder.toString();
+
+            editContentElement = XmlUtil.readXmlToElement(specificEditContent);
+            final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
+                    getElementsByTagName("config").item(0);
+            config.appendChild(msg.importNode(editContentElement, true));
+        } catch (final IOException | SAXException e) {
+            throw new IllegalArgumentException("Edit content file is unreadable", e);
         }
+
+        return netconfMessage;
     }
 
     private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) {
@@ -202,19 +227,18 @@ public final class StressClient {
         return netconfClientDispatcher;
     }
 
-    private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {
-        final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
-        netconfClientConfigurationBuilder.withSessionListener(sessionListener);
-        netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
-        netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
-        netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
-        netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
-        return netconfClientConfigurationBuilder.build();
-    }
+    public static String getMac(long mac) {
+        StringBuilder m = new StringBuilder(Long.toString(mac, 16));
+
+        for (int i = m.length(); i < 12; i++) {
+            m.insert(0, "0");
+        }
+
+        for (int j = m.length() - 2; j >= 2; j-=2) {
+            m.insert(j, ":");
+        }
 
-    static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
-        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new LoggingRemoteDevice();
-        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+        return m.toString();
     }
 
     private static Parameters parseArgs(final String[] args, final ArgumentParser parser) {
@@ -231,7 +255,7 @@ public final class StressClient {
     }
 
 
-    private static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+    static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
         @Override
         public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) {
             LOG.info("Session established");