X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-testtool%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Ftest%2Ftool%2Fclient%2Fstress%2FStressClient.java;h=0e87604cc5d6216034bf401d3f324ac38065ce0b;hb=315e4af2de2691ed689b9e3bad70b8597935b7cc;hp=fe0a0bcd523a8249e45a9c9f66f687e72611e97c;hpb=f0a3398f9a6598b9b7321ddf7d1a2f9433065d92;p=controller.git diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java index fe0a0bcd52..0e87604cc5 100644 --- a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java @@ -10,32 +10,33 @@ package org.opendaylight.controller.netconf.test.tool.client.stress; import ch.qos.logback.classic.Level; import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; -import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; -import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl; -import org.opendaylight.controller.netconf.client.NetconfClientSession; -import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; -import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; -import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; -import org.opendaylight.protocol.framework.NeverReconnectStrategy; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput; import org.opendaylight.yangtools.yang.common.QName; @@ -74,6 +75,7 @@ public final class StressClient { " \n" + " \n" + " \n" + + " none" + " \n" + " \n" + ""); @@ -82,92 +84,95 @@ public final class StressClient { } } - private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}"; private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}"; + private static final String PHYS_ADDR_PLACEHOLDER_REGEX = "\\{PHYS_ADDR\\}"; public static void main(final String[] args) { final Parameters params = parseArgs(args, Parameters.getParser()); params.validate(); // TODO remove - try { - Thread.sleep(10000); - } catch (final InterruptedException e) { +// try { +// Thread.sleep(10000); +// } catch (final InterruptedException e) { // e.printStackTrace(); - } +// } final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); root.setLevel(params.debug ? Level.DEBUG : Level.INFO); + int threadAmount = params.threadAmount; + LOG.info("thread amount: " + threadAmount); + int requestsPerThread = params.editCount / params.threadAmount; + LOG.info("requestsPerThread: " + requestsPerThread); + int leftoverRequests = params.editCount % params.threadAmount; + LOG.info("leftoverRequests: " + leftoverRequests); + + LOG.info("Preparing messages"); // Prepare all msgs up front - final List preparedMessages = Lists.newArrayListWithCapacity(params.editCount); + final List> allPreparedMessages = new ArrayList<>(threadAmount); + for (int i = 0; i < threadAmount; i++) { + if (i != threadAmount - 1) { + allPreparedMessages.add(new ArrayList(requestsPerThread)); + } else { + allPreparedMessages.add(new ArrayList(requestsPerThread + leftoverRequests)); + } + } + final String editContentString; - boolean needsModification = false; try { editContentString = Files.toString(params.editContent, Charsets.UTF_8); - if(editContentString.contains(MSG_ID_PLACEHOLDER)) { - needsModification = true; - }; } catch (IOException e) { throw new IllegalArgumentException("Cannot read content of " + params.editContent); } - for (int i = 0; i < params.editCount; i++) { - final Document msg = XmlUtil.createDocumentCopy(editBlueprint); - msg.getDocumentElement().setAttribute("message-id", Integer.toString(i)); - final NetconfMessage netconfMessage = new NetconfMessage(msg); - - final Element editContentElement; - try { - // Insert message id where needed - final String specificEditContent = needsModification ? - editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) : - editContentString; - - editContentElement = XmlUtil.readXmlToElement(specificEditContent); - final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)). - getElementsByTagName("config").item(0); - config.appendChild(msg.importNode(editContentElement, true)); - } catch (final IOException | SAXException e) { - throw new IllegalArgumentException("Edit content file is unreadable", e); + for (int i = 0; i < threadAmount; i++) { + final List preparedMessages = allPreparedMessages.get(i); + int padding = 0; + if (i == threadAmount - 1) { + padding = leftoverRequests; + } + for (int j = 0; j < requestsPerThread + padding; j++) { + LOG.debug("id: " + (i * requestsPerThread + j)); + preparedMessages.add(prepareMessage(i * requestsPerThread + j, editContentString)); } - - preparedMessages.add(netconfMessage); - } - final NioEventLoopGroup nioGroup = new NioEventLoopGroup(); final Timer timer = new HashedWheelTimer(); final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer); - final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress()); - - final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener); - - LOG.info("Connecting to netconf server {}:{}", params.ip, params.port); - final NetconfClientSession netconfClientSession; - try { - netconfClientSession = netconfClientDispatcher.createClient(cfg).get(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } catch (final ExecutionException e) { - throw new RuntimeException("Unable to connect", e); + final List callables = new ArrayList<>(threadAmount); + for (List messages : allPreparedMessages) { + callables.add(new StressClientCallable(params, netconfClientDispatcher, messages)); } + final ExecutorService executorService = Executors.newFixedThreadPool(threadAmount); + LOG.info("Starting stress test"); final Stopwatch started = Stopwatch.createStarted(); - getExecutionStrategy(params, preparedMessages, sessionListener).invoke(); + try { + final List> futures = executorService.invokeAll(callables); + for (Future future : futures) { + try { + future.get(4L, TimeUnit.MINUTES); + } catch (ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + executorService.shutdownNow(); + } catch (InterruptedException e) { + throw new RuntimeException("Unable to execute requests", e); + } started.stop(); LOG.info("FINISHED. Execution time: {}", started); LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS))); // Cleanup - netconfClientSession.close(); timer.stop(); try { nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS); @@ -176,12 +181,28 @@ public final class StressClient { } } - private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List preparedMessages, final NetconfDeviceCommunicator sessionListener) { - if(params.async) { - return new AsyncExecutionStrategy(params, preparedMessages, sessionListener); - } else { - return new SyncExecutionStrategy(params, preparedMessages, sessionListener); + private static NetconfMessage prepareMessage(final int id, final String editContentString) { + final Document msg = XmlUtil.createDocumentCopy(editBlueprint); + msg.getDocumentElement().setAttribute("message-id", Integer.toString(id)); + final NetconfMessage netconfMessage = new NetconfMessage(msg); + + final Element editContentElement; + try { + // Insert message id where needed + String specificEditContent = editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(id)); + + specificEditContent = + specificEditContent.replaceAll(PHYS_ADDR_PLACEHOLDER_REGEX, getMac(id)); + + editContentElement = XmlUtil.readXmlToElement(specificEditContent); + final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)). + getElementsByTagName("config").item(0); + config.appendChild(msg.importNode(editContentElement, true)); + } catch (final IOException | SAXException e) { + throw new IllegalArgumentException("Edit content file is unreadable", e); } + + return netconfMessage; } private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) { @@ -202,19 +223,21 @@ public final class StressClient { return netconfClientDispatcher; } - private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) { - final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create(); - netconfClientConfigurationBuilder.withSessionListener(sessionListener); - netconfClientConfigurationBuilder.withAddress(params.getInetAddress()); - netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP); - netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L); - netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)); - return netconfClientConfigurationBuilder.build(); - } + private static String getMac(final int i) { + final String hex = Integer.toHexString(i); + final Iterable macGroups = Splitter.fixedLength(2).split(hex); - static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) { - final RemoteDevice loggingRemoteDevice = new LoggingRemoteDevice(); - return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice); + final int additional = 6 - Iterables.size(macGroups); + final ArrayList additionalGroups = Lists.newArrayListWithCapacity(additional); + for (int j = 0; j < additional; j++) { + additionalGroups.add("00"); + } + return Joiner.on(':').join(Iterables.concat(Iterables.transform(macGroups, new Function() { + @Override + public String apply(final String input) { + return input.length() == 1 ? input + "0" : input; + } + }), additionalGroups)); } private static Parameters parseArgs(final String[] args, final ArgumentParser parser) { @@ -231,7 +254,7 @@ public final class StressClient { } - private static class LoggingRemoteDevice implements RemoteDevice { + static class LoggingRemoteDevice implements RemoteDevice { @Override public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) { LOG.info("Session established");