OPNFLWPLUG-1032: Neon-MRI: Bump odlparent, yangtools, mdsal
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import java.util.concurrent.atomic.AtomicLong;
16 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
22 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
25 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 public class FlowWriterTxChain implements FlowCounterMBean {
30     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
31     private final DataBroker dataBroker;
32     private final ExecutorService flowPusher;
33     private long startTime;
34     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
35     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
36     private final AtomicLong taskCompletionTime = new AtomicLong();
37
38     public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) {
39         this.dataBroker = dataBroker;
40         this.flowPusher = flowPusher;
41         LOG.info("Using Ping Pong Flow Tester Impl");
42     }
43
44     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
45             short startTableId, short endTableId, boolean isCreateParents) {
46         LOG.info("Using Transaction Chain Flow Writer Impl");
47         countDpnWriteCompletion.set(dpnCount);
48         startTime = System.nanoTime();
49         for (int i = 1; i <= dpnCount; i++) {
50             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
51                     sleepAfter, startTableId, endTableId, isCreateParents);
52             flowPusher.execute(task);
53         }
54     }
55
56     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
57             short endTableId) {
58         LOG.info("Using Transaction Chain Flow Writer Impl");
59         countDpnWriteCompletion.set(dpnCount);
60         for (int i = 1; i <= dpnCount; i++) {
61             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
62                     startTableId, endTableId, false);
63             flowPusher.execute(task);
64         }
65     }
66
67     @Override
68     public int getWriteOpStatus() {
69         return writeOpStatus.get();
70     }
71
72     @Override
73     public long getTaskCompletionTime() {
74         return taskCompletionTime.get();
75     }
76
77     private class FlowHandlerTask implements Runnable, TransactionChainListener {
78         private final String dpId;
79         private final boolean add;
80         private final int flowsPerDpn;
81         private final int batchSize;
82         private final int sleepAfter;
83         private final int sleepMillis;
84         private final short startTableId;
85         private final short endTableId;
86         private final boolean isCreateParents;
87         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
88
89         private BindingTransactionChain txChain;
90
91         FlowHandlerTask(final String dpId,
92                         final int flowsPerDpn,
93                         final boolean add,
94                         final int batchSize,
95                         final int sleepMillis,
96                         final int sleepAfter,
97                         final short startTableId,
98                         final short endTableId,
99                         final boolean isCreateParents) {
100             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
101             this.add = add;
102             this.flowsPerDpn = flowsPerDpn;
103             this.batchSize = batchSize;
104             this.sleepMillis = sleepMillis;
105             this.sleepAfter = sleepAfter;
106             this.startTableId = startTableId;
107             this.endTableId = endTableId;
108             this.isCreateParents = isCreateParents;
109             remainingTxReturn.set(flowsPerDpn / batchSize);
110         }
111
112         @Override
113         public void run() {
114             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
115             short tableId = startTableId;
116             int numSubmits = flowsPerDpn / batchSize;
117             int sourceIp = 1;
118             int newBatchSize = batchSize;
119             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
120
121             txChain = dataBroker.createTransactionChain(this);
122             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
123
124             for (int i = 1; i <= numSubmits; i++) {
125                 WriteTransaction writeTransaction = txChain.newWriteOnlyTransaction();
126                 short calculatedTableId = tableId;
127                 for (; sourceIp <= newBatchSize; sourceIp++) {
128                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
129                     Flow flow = null;
130                     if (add) {
131                         Match match = BulkOMaticUtils.getMatch(sourceIp);
132                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
133                     }
134
135                     writeTxToDs(writeTransaction, flowId,
136                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
137                             flow, sourceIp, calculatedTableId);
138
139                     if (sourceIp < newBatchSize) {
140                         short numberA = 1;
141                         short numberB = (short) (endTableId - startTableId + 1);
142                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
143                     }
144                 }
145                 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
146                         tableId, calculatedTableId, sourceIp - 1);
147                 Futures.addCallback(writeTransaction.submit(),
148                         new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
149                         MoreExecutors.directExecutor());
150                 // Wrap around
151                 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
152                 newBatchSize += batchSize;
153                 if (i % sleepAfter == 0 && sleepMillis > 0) {
154                     try {
155                         Thread.sleep(sleepMillis);
156                     } catch (InterruptedException e) {
157                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
158                     }
159                 }
160             }
161             LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
162         }
163
164         @Override
165         public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain,
166                 AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
167             LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
168                     asyncTransaction.getIdentifier(), throwable);
169             transactionChain.close();
170         }
171
172         @Override
173         public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
174             LOG.info("Transaction chain: {} closed successfully.", transactionChain);
175         }
176
177         private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
178                 Flow flow, Integer sourceIp, Short tableId) {
179             if (add) {
180                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
181                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
182             } else {
183                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
184                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
185             }
186         }
187
188         private class DsCallBack implements FutureCallback<Void> {
189             private final String dpId;
190             private final int sourceIp;
191             private final short endTableId;
192             private final short beginTableId;
193             private final BindingTransactionChain txChain;
194
195             DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
196                     BindingTransactionChain txChain) {
197                 this.dpId = dpId;
198                 this.sourceIp = sourceIp;
199                 this.endTableId = endTableId;
200                 this.beginTableId = beginTableId;
201                 this.txChain = txChain;
202             }
203
204             @Override
205             public void onSuccess(Void notUsed) {
206                 if (remainingTxReturn.decrementAndGet() <= 0) {
207                     long dur = System.nanoTime() - startTime;
208                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
209                     if (0 == countDpnWriteCompletion.decrementAndGet()
210                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
211                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
212                         taskCompletionTime.set(dur);
213                     }
214                     txChain.close();
215                 }
216             }
217
218             @Override
219             public void onFailure(Throwable error) {
220                 if (remainingTxReturn.decrementAndGet() <= 0) {
221                     long dur = System.nanoTime() - startTime;
222                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
223                 }
224                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
225                         + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
226                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
227             }
228         }
229     }
230 }