X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-testtool%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Ftest%2Ftool%2Fclient%2Fstress%2FStressClient.java;h=2916ec52f7bed25d4d0065579db166b73c94b4c8;hb=e1fc2efacc0b2da9075b6cd4e5ae149a141c486d;hp=6bf50d2c5a0070e4273d19d11cf843e974d569a4;hpb=a62b23201606691c83c800888bbc36617b604202;p=controller.git diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java index 6bf50d2c5a..2916ec52f7 100644 --- a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java @@ -10,38 +10,28 @@ package org.opendaylight.controller.netconf.test.tool.client.stress; import ch.qos.logback.classic.Level; import com.google.common.base.Charsets; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.io.Files; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; -import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl; -import org.opendaylight.controller.netconf.client.NetconfClientSession; -import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; -import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder; -import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; -import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; -import org.opendaylight.protocol.framework.NeverReconnectStrategy; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput; import org.opendaylight.yangtools.yang.common.QName; @@ -80,6 +70,7 @@ public final class StressClient { " \n" + " \n" + " \n" + + " none" + " \n" + " \n" + ""); @@ -89,90 +80,89 @@ public final class StressClient { } private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}"; - private static final String PHYS_ADDR_PLACEHOLDER_REGEX = "\\{PHYS_ADDR\\}"; + private static final String PHYS_ADDR_PLACEHOLDER = "{PHYS_ADDR}"; + + private static long macStart = 0xAABBCCDD0000L; public static void main(final String[] args) { final Parameters params = parseArgs(args, Parameters.getParser()); params.validate(); - // Wait 5 seconds to allow for debugging/profiling - try { - Thread.sleep(5000); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); root.setLevel(params.debug ? Level.DEBUG : Level.INFO); + final int threadAmount = params.threadAmount; + LOG.info("thread amount: " + threadAmount); + final int requestsPerThread = params.editCount / params.threadAmount; + LOG.info("requestsPerThread: " + requestsPerThread); + final int leftoverRequests = params.editCount % params.threadAmount; + LOG.info("leftoverRequests: " + leftoverRequests); + + LOG.info("Preparing messages"); // Prepare all msgs up front - final List preparedMessages = Lists.newArrayListWithCapacity(params.editCount); + final List> allPreparedMessages = new ArrayList<>(threadAmount); + for (int i = 0; i < threadAmount; i++) { + if (i != threadAmount - 1) { + allPreparedMessages.add(new ArrayList(requestsPerThread)); + } else { + allPreparedMessages.add(new ArrayList(requestsPerThread + leftoverRequests)); + } + } + final String editContentString; try { editContentString = Files.toString(params.editContent, Charsets.UTF_8); - } catch (IOException e) { + } catch (final IOException e) { throw new IllegalArgumentException("Cannot read content of " + params.editContent); } - for (int i = 0; i < params.editCount; i++) { - final Document msg = XmlUtil.createDocumentCopy(editBlueprint); - msg.getDocumentElement().setAttribute("message-id", Integer.toString(i)); - final NetconfMessage netconfMessage = new NetconfMessage(msg); - - final Element editContentElement; - try { - // Insert message id where needed - String specificEditContent = - editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)); - - // Insert physical address where needed - specificEditContent = - specificEditContent.replaceAll(PHYS_ADDR_PLACEHOLDER_REGEX, getMac(i)); - - editContentElement = XmlUtil.readXmlToElement(specificEditContent); - final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)). - getElementsByTagName("config").item(0); - config.appendChild(msg.importNode(editContentElement, true)); - } catch (final IOException | SAXException e) { - throw new IllegalArgumentException("Edit content file is unreadable", e); + for (int i = 0; i < threadAmount; i++) { + final List preparedMessages = allPreparedMessages.get(i); + int padding = 0; + if (i == threadAmount - 1) { + padding = leftoverRequests; + } + for (int j = 0; j < requestsPerThread + padding; j++) { + LOG.debug("id: " + (i * requestsPerThread + j)); + preparedMessages.add(prepareMessage(i * requestsPerThread + j, editContentString)); } - - preparedMessages.add(netconfMessage); - } - final NioEventLoopGroup nioGroup = new NioEventLoopGroup(); final Timer timer = new HashedWheelTimer(); final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer); - final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress()); + final List callables = new ArrayList<>(threadAmount); + for (final List messages : allPreparedMessages) { + callables.add(new StressClientCallable(params, netconfClientDispatcher, messages)); + } - final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener); + final ExecutorService executorService = Executors.newFixedThreadPool(threadAmount); - LOG.info("Connecting to netconf server {}:{}", params.ip, params.port); - final NetconfClientSession netconfClientSession; + LOG.info("Starting stress test"); + final Stopwatch started = Stopwatch.createStarted(); try { - netconfClientSession = netconfClientDispatcher.createClient(cfg).get(); + final List> futures = executorService.invokeAll(callables); + for (final Future future : futures) { + try { + future.get(4L, TimeUnit.MINUTES); + } catch (ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + executorService.shutdownNow(); } catch (final InterruptedException e) { - throw new RuntimeException(e); - } catch (final ExecutionException e) { - throw new RuntimeException("Unable to connect", e); + throw new RuntimeException("Unable to execute requests", e); } - - LOG.info("Starting stress test"); - final Stopwatch started = Stopwatch.createStarted(); - getExecutionStrategy(params, preparedMessages, sessionListener).invoke(); started.stop(); LOG.info("FINISHED. Execution time: {}", started); LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS))); // Cleanup - netconfClientSession.close(); timer.stop(); try { nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS); @@ -181,29 +171,33 @@ public final class StressClient { } } - private static String getMac(final int i) { - final String hex = Integer.toHexString(i); - final Iterable macGroups = Splitter.fixedLength(2).split(hex); + static NetconfMessage prepareMessage(final int id, final String editContentString) { + final Document msg = XmlUtil.createDocumentCopy(editBlueprint); + msg.getDocumentElement().setAttribute("message-id", Integer.toString(id)); + final NetconfMessage netconfMessage = new NetconfMessage(msg); - final int additional = 6 - Iterables.size(macGroups); - final ArrayList additionalGroups = Lists.newArrayListWithCapacity(additional); - for (int j = 0; j < additional; j++) { - additionalGroups.add("00"); - } - return Joiner.on(':').join(Iterables.concat(Iterables.transform(macGroups, new Function() { - @Override - public String apply(final String input) { - return input.length() == 1 ? input + "0" : input; + final Element editContentElement; + try { + // Insert message id where needed + String specificEditContent = editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(id)); + + final StringBuilder stringBuilder = new StringBuilder(specificEditContent); + int idx = stringBuilder.indexOf(PHYS_ADDR_PLACEHOLDER); + while (idx!= -1) { + stringBuilder.replace(idx, idx + PHYS_ADDR_PLACEHOLDER.length(), getMac(macStart++)); + idx = stringBuilder.indexOf(PHYS_ADDR_PLACEHOLDER); } - }), additionalGroups)); - } - - private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List preparedMessages, final NetconfDeviceCommunicator sessionListener) { - if(params.async) { - return new AsyncExecutionStrategy(params, preparedMessages, sessionListener); - } else { - return new SyncExecutionStrategy(params, preparedMessages, sessionListener); + specificEditContent = stringBuilder.toString(); + + editContentElement = XmlUtil.readXmlToElement(specificEditContent); + final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)). + getElementsByTagName("config").item(0); + config.appendChild(msg.importNode(editContentElement, true)); + } catch (final IOException | SAXException e) { + throw new IllegalArgumentException("Edit content file is unreadable", e); } + + return netconfMessage; } private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) { @@ -224,29 +218,18 @@ public final class StressClient { return netconfClientDispatcher; } - private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) { - final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create(); - netconfClientConfigurationBuilder.withSessionListener(sessionListener); - netconfClientConfigurationBuilder.withAddress(params.getInetAddress()); - if(params.tcpHeader != null) { - final String header = params.tcpHeader.replaceAll("\"", "").trim() + "\n"; - netconfClientConfigurationBuilder.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader(null, null, null, null, null) { - @Override - public String toFormattedString() { - LOG.debug("Sending TCP header {}", header); - return header; - } - }); + public static String getMac(long mac) { + StringBuilder m = new StringBuilder(Long.toString(mac, 16)); + + for (int i = m.length(); i < 12; i++) { + m.insert(0, "0"); + } + + for (int j = m.length() - 2; j >= 2; j-=2) { + m.insert(j, ":"); } - netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP); - netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L); - netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)); - return netconfClientConfigurationBuilder.build(); - } - static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) { - final RemoteDevice loggingRemoteDevice = new LoggingRemoteDevice(); - return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice); + return m.toString(); } private static Parameters parseArgs(final String[] args, final ArgumentParser parser) { @@ -263,7 +246,7 @@ public final class StressClient { } - private static class LoggingRemoteDevice implements RemoteDevice { + static class LoggingRemoteDevice implements RemoteDevice { @Override public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) { LOG.info("Session established");