this.timer = timer;
}
+ protected Timer getTimer() {
+ return timer;
+ }
+
@Override
public Future<NetconfClientSession> createClient(final NetconfClientConfiguration clientConfiguration) {
switch (clientConfiguration.getProtocol()) {
public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfClientSession, NetconfClientSessionListener> {
- public static final Set<String> CLIENT_CAPABILITIES = ImmutableSet.of(
+ public static final Set<String> EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
+ public static final Set<String> LEGACY_EXI_CLIENT_CAPABILITIES = ImmutableSet.of(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
+
+ public static final Set<String> DEFAULT_CLIENT_CAPABILITIES = ImmutableSet.of(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
+
+ public static final Set<String> LEGACY_FRAMING_CLIENT_CAPABILITIES = ImmutableSet.of(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0);
+
private static final Logger LOG = LoggerFactory.getLogger(NetconfClientSessionNegotiatorFactory.class);
private static final String START_EXI_MESSAGE_ID = "default-start-exi";
private static final EXIOptions DEFAULT_OPTIONS;
DEFAULT_OPTIONS = opts;
}
+ private final Set<String> clientCapabilities;
+
public NetconfClientSessionNegotiatorFactory(final Timer timer,
final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
final long connectionTimeoutMillis) {
this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS);
}
+ public NetconfClientSessionNegotiatorFactory(final Timer timer,
+ final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+ final long connectionTimeoutMillis, final Set<String> capabilities) {
+ this(timer, additionalHeader, connectionTimeoutMillis, DEFAULT_OPTIONS, capabilities);
+
+ }
+
public NetconfClientSessionNegotiatorFactory(final Timer timer,
final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
final long connectionTimeoutMillis, final EXIOptions exiOptions) {
+ this(timer, additionalHeader, connectionTimeoutMillis, exiOptions, EXI_CLIENT_CAPABILITIES);
+ }
+
+ public NetconfClientSessionNegotiatorFactory(final Timer timer,
+ final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader,
+ final long connectionTimeoutMillis, final EXIOptions exiOptions, final Set<String> capabilities) {
this.timer = Preconditions.checkNotNull(timer);
this.additionalHeader = additionalHeader;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.options = exiOptions;
+ this.clientCapabilities = capabilities;
}
@Override
NetconfMessage startExiMessage = NetconfStartExiMessage.create(options, START_EXI_MESSAGE_ID);
NetconfHelloMessage helloMessage = null;
try {
- helloMessage = NetconfHelloMessage.createClientHello(CLIENT_CAPABILITIES, additionalHeader);
+ helloMessage = NetconfHelloMessage.createClientHello(clientCapabilities, additionalHeader);
} catch (NetconfDocumentedException e) {
- LOG.error("Unable to create client hello message with capabilities {} and additional handler {}",CLIENT_CAPABILITIES,additionalHeader);
+ LOG.error("Unable to create client hello message with capabilities {} and additional handler {}", clientCapabilities,additionalHeader);
throw new IllegalStateException(e);
}
--- /dev/null
+<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+<module>
+<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
+<name>name{MSG_ID}</name>
+<name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
+</module>
+</modules>
\ No newline at end of file
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-connector-config</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-netconf-connector</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>logback-config</artifactId>
<shadedClassifierName>executable</shadedClassifierName>
</configuration>
</execution>
+
+ <execution>
+ <id>stress-client</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <shadedArtifactId>stress-client</shadedArtifactId>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.opendaylight.controller.netconf.test.tool.client.stress.StressClient</mainClass>
+ </transformer>
+ </transformers>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>executable</shadedClassifierName>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncExecutionStrategy implements ExecutionStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class);
+
+ private final Parameters params;
+ private final List<NetconfMessage> preparedMessages;
+ private final NetconfDeviceCommunicator sessionListener;
+ private final List<Integer> editBatches;
+
+ public AsyncExecutionStrategy(final Parameters params, final List<NetconfMessage> editConfigMsgs, final NetconfDeviceCommunicator sessionListener) {
+ this.params = params;
+ this.preparedMessages = editConfigMsgs;
+ this.sessionListener = sessionListener;
+ this.editBatches = countEditBatchSizes(params);
+ }
+
+ private static List<Integer> countEditBatchSizes(final Parameters params) {
+ final List<Integer> editBatches = Lists.newArrayList();
+ if (params.editBatchSize != params.editCount) {
+ final int fullBatches = params.editCount / params.editBatchSize;
+ for (int i = 0; i < fullBatches; i++) {
+ editBatches.add(params.editBatchSize);
+ }
+
+ if (params.editCount % params.editBatchSize != 0) {
+ editBatches.add(params.editCount % params.editBatchSize);
+ }
+ } else {
+ editBatches.add(params.editBatchSize);
+ }
+ return editBatches;
+ }
+
+ @Override
+ public void invoke() {
+ final AtomicInteger responseCounter = new AtomicInteger(0);
+ final List<ListenableFuture<RpcResult<NetconfMessage>>> futures = Lists.newArrayList();
+
+ int batchI = 0;
+ for (final Integer editBatch : editBatches) {
+ for (int i = 0; i < editBatch; i++) {
+ final int msgId = i + (batchI * params.editBatchSize);
+ final NetconfMessage msg = preparedMessages.get(msgId);
+ LOG.debug("Sending message {}", msgId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+ }
+ final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+ sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+ futures.add(netconfMessageFuture);
+ }
+ batchI++;
+ LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+ futures.add(sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+ }
+
+ LOG.info("All batches sent. Waiting for responses");
+ // Wait for every future
+ for (final ListenableFuture<RpcResult<NetconfMessage>> future : futures) {
+ try {
+ final RpcResult<NetconfMessage> netconfMessageRpcResult = future.get(params.msgTimeout, TimeUnit.SECONDS);
+ if(netconfMessageRpcResult.isSuccessful()) {
+ responseCounter.incrementAndGet();
+ LOG.debug("Received response {}", responseCounter.get());
+ } else {
+ LOG.warn("Request failed {}", netconfMessageRpcResult);
+ }
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (final ExecutionException | TimeoutException e) {
+ throw new RuntimeException("Request not finished", e);
+ }
+ }
+
+ Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.Timer;
+import java.util.Set;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionNegotiatorFactory;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+
+public class ConfigurableClientDispatcher extends NetconfClientDispatcherImpl {
+
+ private final Set<String> capabilities;
+
+ private ConfigurableClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer, final Set<String> capabilities) {
+ super(bossGroup, workerGroup, timer);
+ this.capabilities = capabilities;
+ }
+
+ /**
+ * EXI + chunked framing
+ */
+ public static ConfigurableClientDispatcher createChunkedExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.EXI_CLIENT_CAPABILITIES);
+ }
+
+ /**
+ * EXI + ]]>]]> framing
+ */
+ public static ConfigurableClientDispatcher createLegacyExi(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_EXI_CLIENT_CAPABILITIES);
+ }
+
+ /**
+ * Chunked framing
+ */
+ public static ConfigurableClientDispatcher createChunked(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.DEFAULT_CLIENT_CAPABILITIES);
+ }
+
+ /**
+ * ]]>]]> framing
+ */
+ public static ConfigurableClientDispatcher createLegacy(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final Timer timer) {
+ return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer, NetconfClientSessionNegotiatorFactory.LEGACY_FRAMING_CLIENT_CAPABILITIES);
+ }
+
+ @Override
+ protected NetconfClientSessionNegotiatorFactory getNegotiatorFactory(final NetconfClientConfiguration cfg) {
+ return new NetconfClientSessionNegotiatorFactory(getTimer(), cfg.getAdditionalHeader(), cfg.getConnectionTimeoutMillis(), capabilities);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+/**
+ * Created by mmarsale on 18.4.2015.
+ */
+public interface ExecutionStrategy {
+ void invoke();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.annotation.Arg;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+
+public class Parameters {
+
+ @Arg(dest = "ip")
+ public String ip;
+
+ @Arg(dest = "port")
+ public int port;
+
+ @Arg(dest = "edit-count")
+ public int editCount;
+
+ @Arg(dest = "edit-content")
+ public File editContent;
+
+ @Arg(dest = "edit-batch-size")
+ public int editBatchSize;
+
+ @Arg(dest = "debug")
+ public boolean debug;
+
+ @Arg(dest = "legacy-framing")
+ public boolean legacyFraming;
+
+ @Arg(dest = "exi")
+ public boolean exi;
+
+ @Arg(dest = "async")
+ public boolean async;
+
+ @Arg(dest = "ssh")
+ public boolean ssh;
+
+ @Arg(dest = "msg-timeout")
+ public long msgTimeout;
+
+ static ArgumentParser getParser() {
+ final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
+
+ parser.description("Netconf stress client");
+
+ parser.addArgument("--ip")
+ .type(String.class)
+ .setDefault("127.0.0.1")
+ .type(String.class)
+ .help("Netconf server IP")
+ .dest("ip");
+
+ parser.addArgument("--port")
+ .type(Integer.class)
+ .setDefault(2830)
+ .type(Integer.class)
+ .help("Netconf server port")
+ .dest("port");
+
+ parser.addArgument("--edits")
+ .type(Integer.class)
+ .setDefault(50000)
+ .type(Integer.class)
+ .help("Netconf edit rpcs to be sent")
+ .dest("edit-count");
+
+ parser.addArgument("--edit-content")
+ .type(File.class)
+ .setDefault(new File("edit.txt"))
+ .type(File.class)
+ .dest("edit-content");
+
+ parser.addArgument("--edit-batch-size")
+ .type(Integer.class)
+ .required(false)
+ .setDefault(-1)
+ .type(Integer.class)
+ .dest("edit-batch-size");
+
+ parser.addArgument("--debug")
+ .type(Boolean.class)
+ .setDefault(false)
+ .help("Whether to use debug log level instead of INFO")
+ .dest("debug");
+
+ parser.addArgument("--legacy-framing")
+ .type(Boolean.class)
+ .setDefault(false)
+ .dest("legacy-framing");
+
+ parser.addArgument("--exi")
+ .type(Boolean.class)
+ .setDefault(false)
+ .dest("exi");
+
+ parser.addArgument("--async-requests")
+ .type(Boolean.class)
+ .setDefault(true)
+ .dest("async");
+
+ parser.addArgument("--msg-timeout")
+ .type(Integer.class)
+ .setDefault(60)
+ .dest("msg-timeout");
+
+ parser.addArgument("--ssh")
+ .type(Boolean.class)
+ .setDefault(false)
+ .dest("ssh");
+
+ // TODO add get-config option instead of edit + commit
+ // TODO different edit config content
+
+ return parser;
+ }
+
+ void validate() {
+ Preconditions.checkArgument(port > 0, "Port =< 0");
+ Preconditions.checkArgument(editCount > 0, "Edit count =< 0");
+ if (editBatchSize == -1) {
+ editBatchSize = editCount;
+ } else {
+ Preconditions.checkArgument(editBatchSize <= editCount, "Edit count =< 0");
+ }
+
+ Preconditions.checkArgument(editContent.exists(), "Edit content file missing");
+ Preconditions.checkArgument(editContent.isDirectory() == false, "Edit content file is a dir");
+ Preconditions.checkArgument(editContent.canRead(), "Edit content file is unreadable");
+ // TODO validate
+ }
+
+ public InetSocketAddress getInetAddress() {
+ try {
+ return new InetSocketAddress(InetAddress.getByName(ip), port);
+ } catch (final UnknownHostException e) {
+ throw new IllegalArgumentException("Unknown ip", e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import ch.qos.logback.classic.Level;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.xml.sax.SAXException;
+
+public final class StressClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StressClient.class);
+
+ static final QName COMMIT_QNAME = QName.create(CommitInput.QNAME, "commit");
+ public static final NetconfMessage COMMIT_MSG;
+
+ static {
+ try {
+ COMMIT_MSG = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"commit-batch\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <commit/>\n" +
+ "</rpc>"));
+ } catch (SAXException | IOException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ static final QName EDIT_QNAME = QName.create(EditConfigInput.QNAME, "edit-config");
+ static final org.w3c.dom.Document editBlueprint;
+
+ static {
+ try {
+ editBlueprint = XmlUtil.readXmlToDocument(
+ "<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <edit-config xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <target>\n" +
+ " <candidate/>\n" +
+ " </target>\n" +
+ " <config/>\n" +
+ " </edit-config>\n" +
+ "</rpc>");
+ } catch (SAXException | IOException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}";
+ private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}";
+
+ public static void main(final String[] args) {
+ final Parameters params = parseArgs(args, Parameters.getParser());
+ params.validate();
+
+ // TODO remove
+ try {
+ Thread.sleep(10000);
+ } catch (final InterruptedException e) {
+// e.printStackTrace();
+ }
+
+ final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+ root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
+
+ LOG.info("Preparing messages");
+ // Prepare all msgs up front
+ final List<NetconfMessage> preparedMessages = Lists.newArrayListWithCapacity(params.editCount);
+
+ final String editContentString;
+ boolean needsModification = false;
+ try {
+ editContentString = Files.toString(params.editContent, Charsets.UTF_8);
+ if(editContentString.contains(MSG_ID_PLACEHOLDER)) {
+ needsModification = true;
+ };
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Cannot read content of " + params.editContent);
+ }
+
+ for (int i = 0; i < params.editCount; i++) {
+ final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
+ msg.getDocumentElement().setAttribute("message-id", Integer.toString(i));
+ final NetconfMessage netconfMessage = new NetconfMessage(msg);
+
+ final Element editContentElement;
+ try {
+ // Insert message id where needed
+ final String specificEditContent = needsModification ?
+ editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) :
+ editContentString;
+
+ editContentElement = XmlUtil.readXmlToElement(specificEditContent);
+ final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
+ getElementsByTagName("config").item(0);
+ config.appendChild(msg.importNode(editContentElement, true));
+ } catch (final IOException | SAXException e) {
+ throw new IllegalArgumentException("Edit content file is unreadable", e);
+ }
+
+ preparedMessages.add(netconfMessage);
+
+ }
+
+
+ final NioEventLoopGroup nioGroup = new NioEventLoopGroup();
+ final Timer timer = new HashedWheelTimer();
+
+ final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer);
+
+ final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress());
+
+ final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener);
+
+ LOG.info("Connecting to netconf server {}:{}", params.ip, params.port);
+ final NetconfClientSession netconfClientSession;
+ try {
+ netconfClientSession = netconfClientDispatcher.createClient(cfg).get();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException("Unable to connect", e);
+ }
+
+ LOG.info("Starting stress test");
+ final Stopwatch started = Stopwatch.createStarted();
+ getExecutionStrategy(params, preparedMessages, sessionListener).invoke();
+ started.stop();
+
+ LOG.info("FINISHED. Execution time: {}", started);
+ LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS)));
+
+ // Cleanup
+ netconfClientSession.close();
+ timer.stop();
+ try {
+ nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("Unable to close executor properly", e);
+ }
+ }
+
+ private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+ if(params.async) {
+ return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
+ } else {
+ return new SyncExecutionStrategy(params, preparedMessages, sessionListener);
+ }
+ }
+
+ private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) {
+ final NetconfClientDispatcherImpl netconfClientDispatcher;
+ if(params.exi) {
+ if(params.legacyFraming) {
+ netconfClientDispatcher= ConfigurableClientDispatcher.createLegacyExi(nioGroup, nioGroup, timer);
+ } else {
+ netconfClientDispatcher = ConfigurableClientDispatcher.createChunkedExi(nioGroup, nioGroup, timer);
+ }
+ } else {
+ if(params.legacyFraming) {
+ netconfClientDispatcher = ConfigurableClientDispatcher.createLegacy(nioGroup, nioGroup, timer);
+ } else {
+ netconfClientDispatcher = ConfigurableClientDispatcher.createChunked(nioGroup, nioGroup, timer);
+ }
+ }
+ return netconfClientDispatcher;
+ }
+
+ private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {
+ final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
+ netconfClientConfigurationBuilder.withSessionListener(sessionListener);
+ netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
+ netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
+ netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
+ netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+ return netconfClientConfigurationBuilder.build();
+ }
+
+ static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
+ final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new LoggingRemoteDevice();
+ return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+ }
+
+ private static Parameters parseArgs(final String[] args, final ArgumentParser parser) {
+ final Parameters opt = new Parameters();
+ try {
+ parser.parseArgs(args, opt);
+ return opt;
+ } catch (final ArgumentParserException e) {
+ parser.handleError(e);
+ }
+
+ System.exit(1);
+ return null;
+ }
+
+
+ private static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+ @Override
+ public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) {
+ LOG.info("Session established");
+ }
+
+ @Override
+ public void onRemoteSessionDown() {
+ LOG.info("Session down");
+ }
+
+ @Override
+ public void onRemoteSessionFailed(final Throwable throwable) {
+ LOG.info("Session failed");
+ }
+
+ @Override
+ public void onNotification(final NetconfMessage notification) {
+ LOG.info("Notification received: {}", notification.toString());
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO reuse code from org.opendaylight.controller.netconf.test.tool.client.stress.AsyncExecutionStrategy
+class SyncExecutionStrategy implements ExecutionStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(SyncExecutionStrategy.class);
+
+ private final Parameters params;
+ private final List<NetconfMessage> preparedMessages;
+ private final NetconfDeviceCommunicator sessionListener;
+ private final List<Integer> editBatches;
+
+ public SyncExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+ this.params = params;
+ this.preparedMessages = preparedMessages;
+ this.sessionListener = sessionListener;
+ editBatches = countEditBatchSizes(params);
+ }
+
+ private static List<Integer> countEditBatchSizes(final Parameters params) {
+ final List<Integer> editBatches = Lists.newArrayList();
+ if (params.editBatchSize != params.editCount) {
+ final int fullBatches = params.editCount / params.editBatchSize;
+ for (int i = 0; i < fullBatches; i++) {
+ editBatches.add(params.editBatchSize);
+ }
+
+ if (params.editCount % params.editBatchSize != 0) {
+ editBatches.add(params.editCount % params.editBatchSize);
+ }
+ } else {
+ editBatches.add(params.editBatchSize);
+ }
+ return editBatches;
+ }
+
+ public void invoke() {
+ final AtomicInteger responseCounter = new AtomicInteger(0);
+
+ int batchI = 0;
+ for (final Integer editBatch : editBatches) {
+ for (int i = 0; i < editBatch; i++) {
+ final int msgId = i + (batchI * params.editBatchSize);
+ final NetconfMessage msg = preparedMessages.get(msgId);
+ LOG.debug("Sending message {}", msgId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
+ }
+ final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
+ sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+ // Wait for response
+ waitForResponse(responseCounter, netconfMessageFuture);
+
+ }
+ batchI++;
+ LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
+
+ // Commit batch sync
+ waitForResponse(responseCounter,
+ sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+ }
+
+ Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+ }
+
+ private void waitForResponse(AtomicInteger responseCounter, final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture) {
+ try {
+ final RpcResult<NetconfMessage> netconfMessageRpcResult =
+ netconfMessageFuture.get(params.msgTimeout, TimeUnit.SECONDS);
+ if (netconfMessageRpcResult.isSuccessful()) {
+ responseCounter.incrementAndGet();
+ LOG.debug("Received response {}", responseCounter.get());
+ } else {
+ LOG.warn("Request failed {}", netconfMessageRpcResult);
+ }
+
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (final ExecutionException | TimeoutException e) {
+ throw new RuntimeException("Request not finished", e);
+ }
+ }
+}
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<sourceDirectory>${project.basedir}</sourceDirectory>
<includes>**\/*.java,**\/*.xml,**\/*.ini,**\/*.sh,**\/*.bat,**\/*.yang</includes>
- <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java</excludes>
+ <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/netconf\/test\/tool\/Main.java, **\/netconf\/test\/tool\/client\/stress\/StressClient.java</excludes>
</configuration>
<dependencies>
<dependency>