--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool.client.stress;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+
+abstract class AbstractExecutionStrategy implements ExecutionStrategy {
+ private final Parameters params;
+ private final List<NetconfMessage> preparedMessages;
+ private final NetconfDeviceCommunicator sessionListener;
+ private final List<Integer> editBatches;
+ private final int editAmount;
+
+ public AbstractExecutionStrategy(final Parameters params, final List<NetconfMessage> editConfigMsgs, final NetconfDeviceCommunicator sessionListener) {
+ editAmount = editConfigMsgs.size();
+ this.params = params;
+ this.preparedMessages = editConfigMsgs;
+ this.sessionListener = sessionListener;
+ this.editBatches = countEditBatchSizes(params, editConfigMsgs.size());
+ }
+
+ private static List<Integer> countEditBatchSizes(final Parameters params, final int amount) {
+ final List<Integer> editBatches = Lists.newArrayList();
+ if (params.editBatchSize != amount) {
+ final int fullBatches = amount / params.editBatchSize;
+ for (int i = 0; i < fullBatches; i++) {
+ editBatches.add(params.editBatchSize);
+ }
+
+ if (amount % params.editBatchSize != 0) {
+ editBatches.add(amount % params.editBatchSize);
+ }
+ } else {
+ editBatches.add(params.editBatchSize);
+ }
+ return editBatches;
+ }
+
+
+ protected Parameters getParams() {
+ return params;
+ }
+
+ protected List<NetconfMessage> getPreparedMessages() {
+ return preparedMessages;
+ }
+
+ protected NetconfDeviceCommunicator getSessionListener() {
+ return sessionListener;
+ }
+
+ protected List<Integer> getEditBatches() {
+ return editBatches;
+ }
+
+ protected int getEditAmount() {
+ return editAmount;
+ }
+}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class AsyncExecutionStrategy implements ExecutionStrategy {
+class AsyncExecutionStrategy extends AbstractExecutionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class);
- private final Parameters params;
- private final List<NetconfMessage> preparedMessages;
- private final NetconfDeviceCommunicator sessionListener;
- private final List<Integer> editBatches;
- private final int editAmount;
-
public AsyncExecutionStrategy(final Parameters params, final List<NetconfMessage> editConfigMsgs, final NetconfDeviceCommunicator sessionListener) {
- this.params = params;
- this.preparedMessages = editConfigMsgs;
- this.sessionListener = sessionListener;
- this.editBatches = countEditBatchSizes(params, editConfigMsgs.size());
- editAmount = editConfigMsgs.size();
- }
-
- private static List<Integer> countEditBatchSizes(final Parameters params, final int amount) {
- final List<Integer> editBatches = Lists.newArrayList();
- if (params.editBatchSize != amount) {
- final int fullBatches = amount / params.editBatchSize;
- for (int i = 0; i < fullBatches; i++) {
- editBatches.add(params.editBatchSize);
- }
-
- if (amount % params.editBatchSize != 0) {
- editBatches.add(amount % params.editBatchSize);
- }
- } else {
- editBatches.add(params.editBatchSize);
- }
- return editBatches;
+ super(params, editConfigMsgs, sessionListener);
}
@Override
final List<ListenableFuture<RpcResult<NetconfMessage>>> futures = Lists.newArrayList();
int batchI = 0;
- for (final Integer editBatch : editBatches) {
+ for (final Integer editBatch : getEditBatches()) {
for (int i = 0; i < editBatch; i++) {
- final int msgId = i + (batchI * params.editBatchSize);
- final NetconfMessage msg = preparedMessages.get(msgId);
+ final int msgId = i + (batchI * getParams().editBatchSize);
+ final NetconfMessage msg = getPreparedMessages().get(msgId);
LOG.debug("Sending message {}", msgId);
if(LOG.isDebugEnabled()) {
LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
}
final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
- sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+ getSessionListener().sendRequest(msg, StressClient.EDIT_QNAME);
futures.add(netconfMessageFuture);
}
batchI++;
LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
- if (params.candidateDatastore) {
- futures.add(sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+ if (getParams().candidateDatastore) {
+ futures.add(getSessionListener().sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
}
}
// Wait for every future
for (final ListenableFuture<RpcResult<NetconfMessage>> future : futures) {
try {
- final RpcResult<NetconfMessage> netconfMessageRpcResult = future.get(params.msgTimeout, TimeUnit.SECONDS);
+ final RpcResult<NetconfMessage> netconfMessageRpcResult = future.get(getParams().msgTimeout, TimeUnit.SECONDS);
if(netconfMessageRpcResult.isSuccessful()) {
responseCounter.incrementAndGet();
LOG.debug("Received response {}", responseCounter.get());
}
}
- Preconditions.checkState(responseCounter.get() == editAmount + (params.candidateDatastore ? editBatches.size() : 0),
- "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+ Preconditions.checkState(responseCounter.get() == getEditAmount() + (getParams().candidateDatastore ? getEditBatches().size() : 0),
+ "Not all responses were received, only %s from %s", responseCounter.get(), getParams().editCount + getEditBatches().size());
}
}
package org.opendaylight.netconf.test.tool.client.stress;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// TODO reuse code from AsyncExecutionStrategy
-class SyncExecutionStrategy implements ExecutionStrategy {
+class SyncExecutionStrategy extends AbstractExecutionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SyncExecutionStrategy.class);
- private final Parameters params;
- private final List<NetconfMessage> preparedMessages;
- private final NetconfDeviceCommunicator sessionListener;
- private final List<Integer> editBatches;
- private final int editAmount;
-
public SyncExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
- this.params = params;
- this.preparedMessages = preparedMessages;
- this.sessionListener = sessionListener;
- this.editBatches = countEditBatchSizes(params, preparedMessages.size());
- editAmount = preparedMessages.size();
- }
-
- private static List<Integer> countEditBatchSizes(final Parameters params, final int amount) {
- final List<Integer> editBatches = Lists.newArrayList();
- if (params.editBatchSize != amount) {
- final int fullBatches = amount / params.editBatchSize;
- for (int i = 0; i < fullBatches; i++) {
- editBatches.add(params.editBatchSize);
- }
-
- if (amount % params.editBatchSize != 0) {
- editBatches.add(amount % params.editBatchSize);
- }
- } else {
- editBatches.add(params.editBatchSize);
- }
- return editBatches;
+ super(params, preparedMessages, sessionListener);
}
public void invoke() {
final AtomicInteger responseCounter = new AtomicInteger(0);
int batchI = 0;
- for (final Integer editBatch : editBatches) {
+ for (final Integer editBatch : getEditBatches()) {
for (int i = 0; i < editBatch; i++) {
- final int msgId = i + (batchI * params.editBatchSize);
- final NetconfMessage msg = preparedMessages.get(msgId);
+ final int msgId = i + (batchI * getParams().editBatchSize);
+ final NetconfMessage msg = getPreparedMessages().get(msgId);
LOG.debug("Sending message {}", msgId);
if(LOG.isDebugEnabled()) {
LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
}
final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
- sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
+ getSessionListener().sendRequest(msg, StressClient.EDIT_QNAME);
// Wait for response
waitForResponse(responseCounter, netconfMessageFuture);
LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
// Commit batch sync
- if (params.candidateDatastore) {
+ if (getParams().candidateDatastore) {
waitForResponse(responseCounter,
- sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
+ getSessionListener().sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
}
}
- Preconditions.checkState(responseCounter.get() == editAmount + (params.candidateDatastore ? editBatches.size() : 0),
- "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+ Preconditions.checkState(responseCounter.get() == getEditAmount() + (getParams().candidateDatastore ? getEditBatches().size() : 0),
+ "Not all responses were received, only %s from %s", responseCounter.get(), getParams().editCount + getEditBatches().size());
}
private void waitForResponse(AtomicInteger responseCounter, final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture) {
try {
final RpcResult<NetconfMessage> netconfMessageRpcResult =
- netconfMessageFuture.get(params.msgTimeout, TimeUnit.SECONDS);
+ netconfMessageFuture.get(getParams().msgTimeout, TimeUnit.SECONDS);
if (netconfMessageRpcResult.isSuccessful()) {
responseCounter.incrementAndGet();
LOG.debug("Received response {}", responseCounter.get());