Netconf testtool stress client 80/18580/3
authorMaros Marsalek <mmarsale@cisco.com>
Sat, 18 Apr 2015 17:44:46 +0000 (19:44 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Mon, 20 Apr 2015 12:50:46 +0000 (14:50 +0200)
Change-Id: I4bd351fe17e596e9153a48bf233432b8c8b19ea9
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcherImpl.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorFactory.java
opendaylight/netconf/netconf-testtool/edit.txt [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/pom.xml
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ConfigurableClientDispatcher.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ExecutionStrategy.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/SyncExecutionStrategy.java [new file with mode: 0644]
opendaylight/netconf/pom.xml

index 5d584f3b987f90ca65920a9befcb945616b1b8c7..039900327b388c9981d318420b3f1a7c9ab1cb7a 100644 (file)
@@ -32,6 +32,10 @@ public class NetconfClientDispatcherImpl extends AbstractDispatcher<NetconfClien
         this.timer = timer;
     }
 
+    protected Timer getTimer() {
+        return timer;
+    }
+
     @Override
     public Future<NetconfClientSession> createClient(final NetconfClientConfiguration clientConfiguration) {
         switch (clientConfiguration.getProtocol()) {
index ac13729d885fbf5b83ff21d4e9a90e91ab2dfa64..4c5fd1d1ec040ef6218e36d619c47409cddbb8d8 100644 (file)
@@ -33,11 +33,22 @@ import org.slf4j.LoggerFactory;
 
 public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfClientSession, NetconfClientSessionListener> {
 
-    public static final Set<String> CLIENT_CAPABILITIES = ImmutableSet.of(
+    public static final Set<String> EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
 
+    public static final Set<String> LEGACY_EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
+
+    public static final Set<String> DEFAULT_CLIENT_CAPABILITIES = ImmutableSet.of(
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
+
+    public static final Set<String> LEGACY_FRAMING_CLIENT_CAPABILITIES = ImmutableSet.of(
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0);
+
     private static final Logger LOG = LoggerFactory.getLogger(NetconfClientSessionNegotiatorFactory.class);
     private static final String START_EXI_MESSAGE_ID = "default-start-exi";
     private static final EXIOptions DEFAULT_OPTIONS;
@@ -61,19 +72,35 @@ public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorF
         DEFAULT_OPTIONS = opts;
     }
 
+    private final Set<String> clientCapabilities;
+
     public NetconfClientSessionNegotiatorFactory(final Timer timer,
                                                  final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
                                                  final long connectionTimeoutMillis) {
         this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS);
     }
 
+    public NetconfClientSessionNegotiatorFactory(final Timer timer,
+                                                 final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+                                                 final long connectionTimeoutMillis, final Set<String> capabilities) {
+        this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS, capabilities);
+
+    }
+
     public NetconfClientSessionNegotiatorFactory(final Timer timer,
                                                  final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
                                                  final long connectionTimeoutMillis, final EXIOptions exiOptions) {
+        this(timer, additionalHeader, connectionTimeoutMillis, exiOptions, EXI_CLIENT_CAPABILITIES);
+    }
+
+    public NetconfClientSessionNegotiatorFactory(final Timer timer,
+                                                 final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+                                                 final long connectionTimeoutMillis, final EXIOptions exiOptions, final Set<String> capabilities) {
         this.timer = Preconditions.checkNotNull(timer);
         this.additionalHeader = additionalHeader;
         this.connectionTimeoutMillis = connectionTimeoutMillis;
         this.options = exiOptions;
+        this.clientCapabilities = capabilities;
     }
 
     @Override
@@ -84,9 +111,9 @@ public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorF
         NetconfMessage startExiMessage = NetconfStartExiMessage.create(options, START_EXI_MESSAGE_ID);
         NetconfHelloMessage helloMessage = null;
         try {
-            helloMessage = NetconfHelloMessage.createClientHello(CLIENT_CAPABILITIES, additionalHeader);
+            helloMessage = NetconfHelloMessage.createClientHello(clientCapabilities, additionalHeader);
         } catch (NetconfDocumentedException e) {
-            LOG.error("Unable to create client hello message with capabilities {} and additional handler {}",CLIENT_CAPABILITIES,additionalHeader);
+            LOG.error("Unable to create client hello message with capabilities {} and additional handler {}", clientCapabilities,additionalHeader);
             throw new IllegalStateException(e);
         }
 
diff --git a/opendaylight/netconf/netconf-testtool/edit.txt b/opendaylight/netconf/netconf-testtool/edit.txt
new file mode 100644 (file)
index 0000000..1e7bbb6
--- /dev/null
@@ -0,0 +1,7 @@
+<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+<module>
+<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
+<name>name{MSG_ID}</name>
+<name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
+</module>
+</modules>
\ No newline at end of file
index 6548d87b49bf827d20c94e5ea60316b65b78f8ef..f15dcc341714c2e6f22e729bd585d094bb8a193f 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>netconf-connector-config</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-netconf-connector</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>logback-config</artifactId>
                             <shadedClassifierName>executable</shadedClassifierName>
                         </configuration>
                     </execution>
+
+                    <execution>
+                        <id>stress-client</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <shadedArtifactId>stress-client</shadedArtifactId>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.opendaylight.controller.netconf.test.tool.client.stress.StressClient</mainClass>
+                                </transformer>
+                            </transformers>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <shadedClassifierName>executable</shadedClassifierName>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
         </plugins>
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java
new file mode 100644 (file)
index 0000000..7b60a17
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncExecutionStrategy implements ExecutionStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class);
+
+    private final Parameters params;
+    private final List<NetconfMessage> preparedMessages;
+    private final NetconfDeviceCommunicator sessionListener;
+    private final List<Integer> editBatches;
+
+    public AsyncExecutionStrategy(final Parameters params, final List<NetconfMessage> editConfigMsgs, final NetconfDeviceCommunicator sessionListener) {
+        this.params = params;
+        this.preparedMessages = editConfigMsgs;
+        this.sessionListener = sessionListener;
+        this.editBatches = countEditBatchSizes(params);
+    }
+
+    private static List<Integer> countEditBatchSizes(final Parameters params) {
+        final List<Integer> editBatches = Lists.newArrayList();
+        if (params.editBatchSize != params.editCount) {
+            final int fullBatches = params.editCount / params.editBatchSize;
+            for (int i = 0; i < fullBatches; i++) {
+                editBatches.add(params.editBatchSize);
+            }
+
+            if (params.editCount % params.editBatchSize != 0) {
+                editBatches.add(params.editCount % params.editBatchSize);
+            }
+        } else {
+            editBatches.add(params.editBatchSize);
+        }
+        return editBatches;
+    }
+
+    @Override
+    public void invoke() {
+        final AtomicInteger responseCounter = new AtomicInteger(0);
+        final List<ListenableFuture<RpcResult<NetconfMessage>>> futures = Lists.newArrayList();
+
+        int batchI = 0;
+        for (final Integer editBatch : editBatches) {
+            for (int i = 0; i < editBatch; i++) {
+                final int msgId = i + (batchI * params.editBatchSize);
+                final NetconfMessage msg = preparedMessages.get(msgId);
+                LOG.debug("Sending message {}", msgId);
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+                }
+                final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+                        sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+                futures.add(netconfMessageFuture);
+            }
+            batchI++;
+            LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+            futures.add(sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+        }
+
+        LOG.info("All batches sent. Waiting for responses");
+        // Wait for every future
+        for (final ListenableFuture<RpcResult<NetconfMessage>> future : futures) {
+            try {
+                final RpcResult<NetconfMessage> netconfMessageRpcResult = future.get(params.msgTimeout, TimeUnit.SECONDS);
+                if(netconfMessageRpcResult.isSuccessful()) {
+                    responseCounter.incrementAndGet();
+                    LOG.debug("Received response {}", responseCounter.get());
+                } else {
+                    LOG.warn("Request failed {}", netconfMessageRpcResult);
+                }
+            } catch (final InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (final ExecutionException | TimeoutException e) {
+                throw new RuntimeException("Request not finished", e);
+            }
+        }
+
+        Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+    }
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ConfigurableClientDispatcher.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ConfigurableClientDispatcher.java
new file mode 100644 (file)
index 0000000..2d96d8f
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.Timer;
+import java.util.Set;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionNegotiatorFactory;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+
+public class ConfigurableClientDispatcher extends NetconfClientDispatcherImpl {
+
+    private final Set<String> capabilities;
+
+    private ConfigurableClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer, final Set<String> capabilities) {
+        super(bossGroup, workerGroup, timer);
+        this.capabilities = capabilities;
+    }
+
+    /**
+     * EXI + chunked framing
+     */
+    public static ConfigurableClientDispatcher createChunkedExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.EXI_CLIENT_CAPABILITIES);
+    }
+
+    /**
+     * EXI + ]]>]]> framing
+     */
+    public static ConfigurableClientDispatcher createLegacyExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_EXI_CLIENT_CAPABILITIES);
+    }
+
+    /**
+     * Chunked framing
+     */
+    public static ConfigurableClientDispatcher createChunked(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.DEFAULT_CLIENT_CAPABILITIES);
+    }
+
+    /**
+     * ]]>]]> framing
+     */
+    public static ConfigurableClientDispatcher createLegacy(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+        return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_FRAMING_CLIENT_CAPABILITIES);
+    }
+
+    @Override
+    protected NetconfClientSessionNegotiatorFactory getNegotiatorFactory(final NetconfClientConfiguration cfg) {
+        return new NetconfClientSessionNegotiatorFactory(getTimer(), cfg.getAdditionalHeader(), cfg.getConnectionTimeoutMillis(), capabilities);
+    }
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ExecutionStrategy.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/ExecutionStrategy.java
new file mode 100644 (file)
index 0000000..dc2ddf1
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+/**
+ * Created by mmarsale on 18.4.2015.
+ */
+public interface ExecutionStrategy {
+    void invoke();
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java
new file mode 100644 (file)
index 0000000..6648bd4
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.annotation.Arg;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+
+public class Parameters {
+
+    @Arg(dest = "ip")
+    public String ip;
+
+    @Arg(dest = "port")
+    public int port;
+
+    @Arg(dest = "edit-count")
+    public int editCount;
+
+    @Arg(dest = "edit-content")
+    public File editContent;
+
+    @Arg(dest = "edit-batch-size")
+    public int editBatchSize;
+
+    @Arg(dest = "debug")
+    public boolean debug;
+
+    @Arg(dest = "legacy-framing")
+    public boolean legacyFraming;
+
+    @Arg(dest = "exi")
+    public boolean exi;
+
+    @Arg(dest = "async")
+    public boolean async;
+
+    @Arg(dest = "ssh")
+    public boolean ssh;
+
+    @Arg(dest = "msg-timeout")
+    public long msgTimeout;
+
+    static ArgumentParser getParser() {
+        final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
+
+        parser.description("Netconf stress client");
+
+        parser.addArgument("--ip")
+                .type(String.class)
+                .setDefault("127.0.0.1")
+                .type(String.class)
+                .help("Netconf server IP")
+                .dest("ip");
+
+        parser.addArgument("--port")
+                .type(Integer.class)
+                .setDefault(2830)
+                .type(Integer.class)
+                .help("Netconf server port")
+                .dest("port");
+
+        parser.addArgument("--edits")
+                .type(Integer.class)
+                .setDefault(50000)
+                .type(Integer.class)
+                .help("Netconf edit rpcs to be sent")
+                .dest("edit-count");
+
+        parser.addArgument("--edit-content")
+                .type(File.class)
+                .setDefault(new File("edit.txt"))
+                .type(File.class)
+                .dest("edit-content");
+
+        parser.addArgument("--edit-batch-size")
+                .type(Integer.class)
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .dest("edit-batch-size");
+
+        parser.addArgument("--debug")
+                .type(Boolean.class)
+                .setDefault(false)
+                .help("Whether to use debug log level instead of INFO")
+                .dest("debug");
+
+        parser.addArgument("--legacy-framing")
+                .type(Boolean.class)
+                .setDefault(false)
+                .dest("legacy-framing");
+
+        parser.addArgument("--exi")
+                .type(Boolean.class)
+                .setDefault(false)
+                .dest("exi");
+
+        parser.addArgument("--async-requests")
+                .type(Boolean.class)
+                .setDefault(true)
+                .dest("async");
+
+        parser.addArgument("--msg-timeout")
+                .type(Integer.class)
+                .setDefault(60)
+                .dest("msg-timeout");
+
+        parser.addArgument("--ssh")
+                .type(Boolean.class)
+                .setDefault(false)
+                .dest("ssh");
+
+        // TODO add get-config option instead of edit + commit
+        // TODO different edit config content
+
+        return parser;
+    }
+
+    void validate() {
+        Preconditions.checkArgument(port > 0, "Port =< 0");
+        Preconditions.checkArgument(editCount > 0, "Edit count =< 0");
+        if (editBatchSize == -1) {
+            editBatchSize = editCount;
+        } else {
+            Preconditions.checkArgument(editBatchSize <= editCount, "Edit count =< 0");
+        }
+
+        Preconditions.checkArgument(editContent.exists(), "Edit content file missing");
+        Preconditions.checkArgument(editContent.isDirectory() == false, "Edit content file is a dir");
+        Preconditions.checkArgument(editContent.canRead(), "Edit content file is unreadable");
+        // TODO validate
+    }
+
+    public InetSocketAddress getInetAddress() {
+        try {
+            return new InetSocketAddress(InetAddress.getByName(ip), port);
+        } catch (final UnknownHostException e) {
+            throw new IllegalArgumentException("Unknown ip", e);
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java
new file mode 100644 (file)
index 0000000..fe0a0bc
--- /dev/null
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import ch.qos.logback.classic.Level;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.xml.sax.SAXException;
+
+public final class StressClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StressClient.class);
+
+    static final QName COMMIT_QNAME = QName.create(CommitInput.QNAME, "commit");
+    public static final NetconfMessage COMMIT_MSG;
+
+    static {
+        try {
+            COMMIT_MSG = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"commit-batch\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                    "    <commit/>\n" +
+                    "</rpc>"));
+        } catch (SAXException | IOException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    static final QName EDIT_QNAME = QName.create(EditConfigInput.QNAME, "edit-config");
+    static final org.w3c.dom.Document editBlueprint;
+
+    static {
+        try {
+            editBlueprint = XmlUtil.readXmlToDocument(
+                    "<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                            "    <edit-config xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                            "        <target>\n" +
+                            "            <candidate/>\n" +
+                            "        </target>\n" +
+                            "        <config/>\n" +
+                            "    </edit-config>\n" +
+                            "</rpc>");
+        } catch (SAXException | IOException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}";
+    private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}";
+
+    public static void main(final String[] args) {
+        final Parameters params = parseArgs(args, Parameters.getParser());
+        params.validate();
+
+        // TODO remove
+        try {
+            Thread.sleep(10000);
+        } catch (final InterruptedException e) {
+//            e.printStackTrace();
+        }
+
+        final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+        root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
+
+        LOG.info("Preparing messages");
+        // Prepare all msgs up front
+        final List<NetconfMessage> preparedMessages = Lists.newArrayListWithCapacity(params.editCount);
+
+        final String editContentString;
+        boolean needsModification = false;
+        try {
+            editContentString = Files.toString(params.editContent, Charsets.UTF_8);
+            if(editContentString.contains(MSG_ID_PLACEHOLDER)) {
+                needsModification = true;
+            };
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Cannot read content of " + params.editContent);
+        }
+
+        for (int i = 0; i < params.editCount; i++) {
+            final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
+            msg.getDocumentElement().setAttribute("message-id", Integer.toString(i));
+            final NetconfMessage netconfMessage = new NetconfMessage(msg);
+
+            final Element editContentElement;
+            try {
+                // Insert message id where needed
+                final String specificEditContent = needsModification ?
+                        editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) :
+                        editContentString;
+
+                editContentElement = XmlUtil.readXmlToElement(specificEditContent);
+                final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
+                        getElementsByTagName("config").item(0);
+                config.appendChild(msg.importNode(editContentElement, true));
+            } catch (final IOException | SAXException e) {
+                throw new IllegalArgumentException("Edit content file is unreadable", e);
+            }
+
+            preparedMessages.add(netconfMessage);
+
+        }
+
+
+        final NioEventLoopGroup nioGroup = new NioEventLoopGroup();
+        final Timer timer = new HashedWheelTimer();
+
+        final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer);
+
+        final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress());
+
+        final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener);
+
+        LOG.info("Connecting to netconf server {}:{}", params.ip, params.port);
+        final NetconfClientSession netconfClientSession;
+        try {
+            netconfClientSession = netconfClientDispatcher.createClient(cfg).get();
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (final ExecutionException e) {
+            throw new RuntimeException("Unable to connect", e);
+        }
+
+        LOG.info("Starting stress test");
+        final Stopwatch started = Stopwatch.createStarted();
+        getExecutionStrategy(params, preparedMessages, sessionListener).invoke();
+        started.stop();
+
+        LOG.info("FINISHED. Execution time: {}", started);
+        LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS)));
+
+        // Cleanup
+        netconfClientSession.close();
+        timer.stop();
+        try {
+            nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.warn("Unable to close executor properly", e);
+        }
+    }
+
+    private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+        if(params.async) {
+            return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
+        } else {
+            return new SyncExecutionStrategy(params, preparedMessages, sessionListener);
+        }
+    }
+
+    private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) {
+        final NetconfClientDispatcherImpl netconfClientDispatcher;
+        if(params.exi) {
+            if(params.legacyFraming) {
+                netconfClientDispatcher= ConfigurableClientDispatcher.createLegacyExi(nioGroup, nioGroup, timer);
+            } else {
+                netconfClientDispatcher = ConfigurableClientDispatcher.createChunkedExi(nioGroup, nioGroup, timer);
+            }
+        } else {
+            if(params.legacyFraming) {
+                netconfClientDispatcher = ConfigurableClientDispatcher.createLegacy(nioGroup, nioGroup, timer);
+            } else {
+                netconfClientDispatcher = ConfigurableClientDispatcher.createChunked(nioGroup, nioGroup, timer);
+            }
+        }
+        return netconfClientDispatcher;
+    }
+
+    private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {
+        final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
+        netconfClientConfigurationBuilder.withSessionListener(sessionListener);
+        netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
+        netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
+        netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
+        netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+        return netconfClientConfigurationBuilder.build();
+    }
+
+    static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new LoggingRemoteDevice();
+        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+    }
+
+    private static Parameters parseArgs(final String[] args, final ArgumentParser parser) {
+        final Parameters opt = new Parameters();
+        try {
+            parser.parseArgs(args, opt);
+            return opt;
+        } catch (final ArgumentParserException e) {
+            parser.handleError(e);
+        }
+
+        System.exit(1);
+        return null;
+    }
+
+
+    private static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+        @Override
+        public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) {
+            LOG.info("Session established");
+        }
+
+        @Override
+        public void onRemoteSessionDown() {
+            LOG.info("Session down");
+        }
+
+        @Override
+        public void onRemoteSessionFailed(final Throwable throwable) {
+            LOG.info("Session failed");
+        }
+
+        @Override
+        public void onNotification(final NetconfMessage notification) {
+            LOG.info("Notification received: {}", notification.toString());
+        }
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/SyncExecutionStrategy.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/SyncExecutionStrategy.java
new file mode 100644 (file)
index 0000000..34142a7
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO reuse code from org.opendaylight.controller.netconf.test.tool.client.stress.AsyncExecutionStrategy
+class SyncExecutionStrategy implements ExecutionStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(SyncExecutionStrategy.class);
+
+    private final Parameters params;
+    private final List<NetconfMessage> preparedMessages;
+    private final NetconfDeviceCommunicator sessionListener;
+    private final List<Integer> editBatches;
+
+    public SyncExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+        this.params = params;
+        this.preparedMessages = preparedMessages;
+        this.sessionListener = sessionListener;
+        editBatches = countEditBatchSizes(params);
+    }
+
+    private static List<Integer> countEditBatchSizes(final Parameters params) {
+        final List<Integer> editBatches = Lists.newArrayList();
+        if (params.editBatchSize != params.editCount) {
+            final int fullBatches = params.editCount / params.editBatchSize;
+            for (int i = 0; i < fullBatches; i++) {
+                editBatches.add(params.editBatchSize);
+            }
+
+            if (params.editCount % params.editBatchSize != 0) {
+                editBatches.add(params.editCount % params.editBatchSize);
+            }
+        } else {
+            editBatches.add(params.editBatchSize);
+        }
+        return editBatches;
+    }
+
+    public void invoke() {
+        final AtomicInteger responseCounter = new AtomicInteger(0);
+
+        int batchI = 0;
+        for (final Integer editBatch : editBatches) {
+            for (int i = 0; i < editBatch; i++) {
+                final int msgId = i + (batchI * params.editBatchSize);
+                final NetconfMessage msg = preparedMessages.get(msgId);
+                LOG.debug("Sending message {}", msgId);
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+                }
+                final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+                        sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+                // Wait for response
+                waitForResponse(responseCounter, netconfMessageFuture);
+
+            }
+            batchI++;
+            LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+
+            // Commit batch sync
+            waitForResponse(responseCounter,
+                    sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+        }
+
+        Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+    }
+
+    private void waitForResponse(AtomicInteger responseCounter, final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture) {
+        try {
+            final RpcResult<NetconfMessage> netconfMessageRpcResult =
+                    netconfMessageFuture.get(params.msgTimeout, TimeUnit.SECONDS);
+            if (netconfMessageRpcResult.isSuccessful()) {
+                responseCounter.incrementAndGet();
+                LOG.debug("Received response {}", responseCounter.get());
+            } else {
+                LOG.warn("Request failed {}", netconfMessageRpcResult);
+            }
+
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (final ExecutionException | TimeoutException e) {
+            throw new RuntimeException("Request not finished", e);
+        }
+    }
+}
index a990b5c6cb935d1139f54353a5282329f9fd3b0e..ab92128dbd2a30e914d58cf1b4e53dedf6c1d995 100644 (file)
           <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <sourceDirectory>${project.basedir}</sourceDirectory>
           <includes>**\/*.java,**\/*.xml,**\/*.ini,**\/*.sh,**\/*.bat,**\/*.yang</includes>
-          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java</excludes>
+          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java, **\/netconf\/test\/tool\/client\/stress\/StressClient.java</excludes>
         </configuration>
         <dependencies>
           <dependency>