BUG-5841: Enhances bulk-o-matic to stress test Data Store and Openflowplugin RPC
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
13 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
14 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
15 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
21 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 public class FlowWriterTxChain implements FlowCounterMBean {
30     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
31     private final DataBroker dataBroker;
32     private final ExecutorService flowPusher;
33     private long startTime;
34     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
35     private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
36     private AtomicLong taskCompletionTime = new AtomicLong(0);
37     private final String UNITS = "ns";
38
39     public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
40         this.dataBroker = dataBroker;
41         this.flowPusher = flowPusher;
42         LOG.info("Using Ping Pong Flow Tester Impl");
43     }
44
45     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
46                          int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
47         LOG.info("Using Transaction Chain Flow Writer Impl");
48         countDpnWriteCompletion.set(dpnCount);
49         startTime = System.nanoTime();
50         for (int i = 1; i <= dpnCount; i++) {
51             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
52                     flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
53             flowPusher.execute(task);
54         }
55     }
56
57     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
58                             short startTableId, short endTableId) {
59         LOG.info("Using Transaction Chain Flow Writer Impl");
60         countDpnWriteCompletion.set(dpnCount);
61         for (int i = 1; i <= dpnCount; i++) {
62             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
63                     0, 1, startTableId, endTableId);
64             flowPusher.execute(task);
65         }
66     }
67
68     @Override
69     public long getFlowCount() {
70         return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
71     }
72
73     @Override
74     public int getReadOpStatus() {
75         return BulkOMaticUtils.DEFUALT_STATUS;
76     }
77
78     @Override
79     public int getWriteOpStatus() {
80         return writeOpStatus.get();
81     }
82
83     @Override
84     public long getTaskCompletionTime() {
85         return taskCompletionTime.get();
86     }
87
88     @Override
89     public String getUnits() {
90         return UNITS;
91     }
92
93     private class FlowHandlerTask implements Runnable, TransactionChainListener {
94         private final String dpId;
95         private final boolean add;
96         private final int flowsPerDpn;
97         private final int batchSize;
98         private final int sleepAfter;
99         private final int sleepMillis;
100         private final short startTableId;
101         private final short endTableId;
102         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
103
104         BindingTransactionChain txChain;
105
106         public FlowHandlerTask(final String dpId,
107                                final int flowsPerDpn,
108                                final boolean add,
109                                final int batchSize,
110                                final int sleepMillis,
111                                final int sleepAfter,
112                                final short startTableId,
113                                final short endTableId){
114             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
115             this.add = add;
116             this.flowsPerDpn = flowsPerDpn;
117             this.batchSize = batchSize;
118             this.sleepMillis = sleepMillis;
119             this.sleepAfter = sleepAfter;
120             this.startTableId = startTableId;
121             this.endTableId = endTableId;
122             remainingTxReturn.set(flowsPerDpn/batchSize);
123         }
124
125         @Override
126         public void run() {
127             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
128             short tableId = startTableId;
129             int numSubmits = flowsPerDpn/batchSize;
130             int sourceIp = 1;
131             int newBatchSize = batchSize;
132             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
133
134             txChain = dataBroker.createTransactionChain(this);
135             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
136
137             for (int i = 1; i <= numSubmits; i++) {
138                 WriteTransaction writeTransaction;
139                 try {
140                     writeTransaction = txChain.newWriteOnlyTransaction();
141                 } catch (Exception e) {
142                     LOG.error("Transaction creation failed in txChain: {}, due to: {}", txChain, e);
143                     break;
144                 }
145                 short k = tableId;
146                 for (; sourceIp <= newBatchSize; sourceIp++) {
147                     String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
148                     Flow flow = null;
149                     if (add) {
150                         Match match = BulkOMaticUtils.getMatch(sourceIp);
151                         flow = BulkOMaticUtils.buildFlow(k, flowId, match);
152                     }
153
154                     writeTxToDs(writeTransaction, flowId,
155                             BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
156
157                     if (sourceIp < newBatchSize) {
158                         short a = 1;
159                         short b = (short) (endTableId - startTableId + 1);
160                         k = (short) (((k + a) % b) + startTableId);
161                     }
162                 }
163                 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp - 1);
164                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
165                 // Wrap around
166                 tableId = (short) (((k + 1) % ((short) (endTableId - startTableId + 1))) + startTableId);
167                 newBatchSize += batchSize;
168                 if (((i % sleepAfter) == 0) && (sleepMillis > 0)) {
169                     try {
170                         Thread.sleep(sleepMillis);
171                     } catch (InterruptedException e) {
172                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
173                     }
174                 }
175             }
176             LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
177         }
178
179         @Override
180         public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
181             LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: {}", transactionChain,
182                     asyncTransaction.getIdentifier(), throwable);
183             transactionChain.close();
184         }
185
186         @Override
187         public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
188             LOG.info("Transaction chain: {} closed successfully.", transactionChain);
189         }
190
191         private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
192             if (add) {
193                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
194                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
195             } else {
196                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
197                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
198             }
199         }
200
201         private class DsCallBack implements FutureCallback {
202             private String dpId;
203             private int sourceIp;
204             private short endTableId;
205             private short beginTableId;
206
207             public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
208                 this.dpId = dpId;
209                 this.sourceIp = sourceIp;
210                 this.endTableId = endTableId;
211                 this.beginTableId = beginTableId;
212             }
213
214             @Override
215             public void onSuccess(Object o) {
216                 if (remainingTxReturn.decrementAndGet() <= 0) {
217                     long dur = System.nanoTime() - startTime;
218                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
219                             dur);
220                     if(0 == countDpnWriteCompletion.decrementAndGet() &&
221                             writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
222                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
223                         taskCompletionTime.set(dur);
224                     }
225                     txChain.close();
226                 }
227             }
228
229             public void onFailure(Throwable error) {
230                 if (remainingTxReturn.decrementAndGet() <= 0) {
231                     long dur = System.nanoTime() - startTime;
232                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
233                             dur);
234                 }
235                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
236                         "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
237                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
238             }
239         }
240     }
241 }