X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Ftools%2Fnetconf-testtool%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Ftest%2Ftool%2Fclient%2Fstress%2FAsyncExecutionStrategy.java;fp=opendaylight%2Fnetconf%2Ftools%2Fnetconf-testtool%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Ftest%2Ftool%2Fclient%2Fstress%2FAsyncExecutionStrategy.java;h=20f57a75569d6c76da8ff58a2061bd0a6b359dc0;hb=23fe9ca678ada6263fec5dd996f4025e4a32fcf5;hp=0000000000000000000000000000000000000000;hpb=071a641d7c12c0e6112d5ce0afe806b54f116ed2;p=controller.git diff --git a/opendaylight/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java b/opendaylight/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java new file mode 100644 index 0000000000..20f57a7556 --- /dev/null +++ b/opendaylight/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool.client.stress; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.opendaylight.controller.config.util.xml.XmlUtil; +import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncExecutionStrategy implements ExecutionStrategy { + private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class); + + private final Parameters params; + private final List preparedMessages; + private final NetconfDeviceCommunicator sessionListener; + private final List editBatches; + private final int editAmount; + + public AsyncExecutionStrategy(final Parameters params, final List editConfigMsgs, final NetconfDeviceCommunicator sessionListener) { + this.params = params; + this.preparedMessages = editConfigMsgs; + this.sessionListener = sessionListener; + this.editBatches = countEditBatchSizes(params, editConfigMsgs.size()); + editAmount = editConfigMsgs.size(); + } + + private static List countEditBatchSizes(final Parameters params, final int amount) { + final List editBatches = Lists.newArrayList(); + if (params.editBatchSize != amount) { + final int fullBatches = amount / params.editBatchSize; + for (int i = 0; i < fullBatches; i++) { + editBatches.add(params.editBatchSize); + } + + if (amount % params.editBatchSize != 0) { + editBatches.add(amount % params.editBatchSize); + } + } else { + editBatches.add(params.editBatchSize); + } + return editBatches; + } + + @Override + public void invoke() { + final AtomicInteger responseCounter = new AtomicInteger(0); + final List>> futures = Lists.newArrayList(); + + int batchI = 0; + for (final Integer editBatch : editBatches) { + for (int i = 0; i < editBatch; i++) { + final int msgId = i + (batchI * params.editBatchSize); + final NetconfMessage msg = preparedMessages.get(msgId); + LOG.debug("Sending message {}", msgId); + if(LOG.isDebugEnabled()) { + LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument())); + } + final ListenableFuture> netconfMessageFuture = + sessionListener.sendRequest(msg, StressClient.EDIT_QNAME); + futures.add(netconfMessageFuture); + } + batchI++; + LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch); + futures.add(sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME)); + } + + LOG.info("All batches sent. Waiting for responses"); + // Wait for every future + for (final ListenableFuture> future : futures) { + try { + final RpcResult netconfMessageRpcResult = future.get(params.msgTimeout, TimeUnit.SECONDS); + if(netconfMessageRpcResult.isSuccessful()) { + responseCounter.incrementAndGet(); + LOG.debug("Received response {}", responseCounter.get()); + } else { + LOG.warn("Request failed {}", netconfMessageRpcResult); + } + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException | TimeoutException e) { + throw new RuntimeException("Request not finished", e); + } + } + + Preconditions.checkState(responseCounter.get() == editAmount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size()); + } +}