Decouple config and netconf subsystems.
[controller.git] / opendaylight / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / controller / netconf / test / tool / client / stress / SyncExecutionStrategy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.test.tool.client.stress;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.List;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.TimeoutException;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import org.opendaylight.controller.config.util.xml.XmlUtil;
20 import org.opendaylight.controller.netconf.api.NetconfMessage;
21 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
22 import org.opendaylight.yangtools.yang.common.RpcResult;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 // TODO reuse code from org.opendaylight.controller.netconf.test.tool.client.stress.AsyncExecutionStrategy
27 class SyncExecutionStrategy implements ExecutionStrategy {
28     private static final Logger LOG = LoggerFactory.getLogger(SyncExecutionStrategy.class);
29
30     private final Parameters params;
31     private final List<NetconfMessage> preparedMessages;
32     private final NetconfDeviceCommunicator sessionListener;
33     private final List<Integer> editBatches;
34     private final int editAmount;
35
36     public SyncExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
37         this.params = params;
38         this.preparedMessages = preparedMessages;
39         this.sessionListener = sessionListener;
40         this.editBatches = countEditBatchSizes(params, preparedMessages.size());
41         editAmount = preparedMessages.size();
42     }
43
44     private static List<Integer> countEditBatchSizes(final Parameters params, final int amount) {
45         final List<Integer> editBatches = Lists.newArrayList();
46         if (params.editBatchSize != amount) {
47             final int fullBatches = amount / params.editBatchSize;
48             for (int i = 0; i < fullBatches; i++) {
49                 editBatches.add(params.editBatchSize);
50             }
51
52             if (amount % params.editBatchSize != 0) {
53                 editBatches.add(amount % params.editBatchSize);
54             }
55         } else {
56             editBatches.add(params.editBatchSize);
57         }
58         return editBatches;
59     }
60
61     public void invoke() {
62         final AtomicInteger responseCounter = new AtomicInteger(0);
63
64         int batchI = 0;
65         for (final Integer editBatch : editBatches) {
66             for (int i = 0; i < editBatch; i++) {
67                 final int msgId = i + (batchI * params.editBatchSize);
68                 final NetconfMessage msg = preparedMessages.get(msgId);
69                 LOG.debug("Sending message {}", msgId);
70                 if(LOG.isDebugEnabled()) {
71                     LOG.debug("Sending message {}", XmlUtil.toString(msg.getDocument()));
72                 }
73                 final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture =
74                         sessionListener.sendRequest(msg, StressClient.EDIT_QNAME);
75                 // Wait for response
76                 waitForResponse(responseCounter, netconfMessageFuture);
77
78             }
79             batchI++;
80             LOG.info("Batch {} with size {} sent. Committing", batchI, editBatch);
81
82             // Commit batch sync
83             waitForResponse(responseCounter,
84                     sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
85         }
86
87         Preconditions.checkState(responseCounter.get() == editAmount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
88     }
89
90     private void waitForResponse(AtomicInteger responseCounter, final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture) {
91         try {
92             final RpcResult<NetconfMessage> netconfMessageRpcResult =
93                     netconfMessageFuture.get(params.msgTimeout, TimeUnit.SECONDS);
94             if (netconfMessageRpcResult.isSuccessful()) {
95                 responseCounter.incrementAndGet();
96                 LOG.debug("Received response {}", responseCounter.get());
97             } else {
98                 LOG.warn("Request failed {}", netconfMessageRpcResult);
99             }
100
101         } catch (final InterruptedException e) {
102             throw new RuntimeException(e);
103         } catch (final ExecutionException | TimeoutException e) {
104             throw new RuntimeException("Request not finished", e);
105         }
106     }
107 }