Update MRI projects for Aluminium
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.Transaction;
17 import org.opendaylight.mdsal.binding.api.TransactionChain;
18 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
19 import org.opendaylight.mdsal.binding.api.WriteTransaction;
20 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 public class FlowWriterTxChain implements FlowCounterMBean {
28     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
29     private final DataBroker dataBroker;
30     private final ExecutorService flowPusher;
31     private long startTime;
32     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
33     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
34     private final AtomicLong taskCompletionTime = new AtomicLong();
35
36     public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) {
37         this.dataBroker = dataBroker;
38         this.flowPusher = flowPusher;
39         LOG.info("Using Ping Pong Flow Tester Impl");
40     }
41
42     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
43             short startTableId, short endTableId, boolean isCreateParents) {
44         LOG.info("Using Transaction Chain Flow Writer Impl");
45         countDpnWriteCompletion.set(dpnCount);
46         startTime = System.nanoTime();
47         for (int i = 1; i <= dpnCount; i++) {
48             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
49                     sleepAfter, startTableId, endTableId, isCreateParents);
50             flowPusher.execute(task);
51         }
52     }
53
54     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
55             short endTableId) {
56         LOG.info("Using Transaction Chain Flow Writer Impl");
57         countDpnWriteCompletion.set(dpnCount);
58         for (int i = 1; i <= dpnCount; i++) {
59             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
60                     startTableId, endTableId, false);
61             flowPusher.execute(task);
62         }
63     }
64
65     @Override
66     public int getWriteOpStatus() {
67         return writeOpStatus.get();
68     }
69
70     @Override
71     public long getTaskCompletionTime() {
72         return taskCompletionTime.get();
73     }
74
75     private class FlowHandlerTask implements Runnable, TransactionChainListener {
76         private final String dpId;
77         private final boolean add;
78         private final int flowsPerDpn;
79         private final int batchSize;
80         private final int sleepAfter;
81         private final int sleepMillis;
82         private final short startTableId;
83         private final short endTableId;
84         private final boolean isCreateParents;
85         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
86
87         private TransactionChain txChain;
88
89         FlowHandlerTask(final String dpId,
90                         final int flowsPerDpn,
91                         final boolean add,
92                         final int batchSize,
93                         final int sleepMillis,
94                         final int sleepAfter,
95                         final short startTableId,
96                         final short endTableId,
97                         final boolean isCreateParents) {
98             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
99             this.add = add;
100             this.flowsPerDpn = flowsPerDpn;
101             this.batchSize = batchSize;
102             this.sleepMillis = sleepMillis;
103             this.sleepAfter = sleepAfter;
104             this.startTableId = startTableId;
105             this.endTableId = endTableId;
106             this.isCreateParents = isCreateParents;
107             remainingTxReturn.set(flowsPerDpn / batchSize);
108         }
109
110         @Override
111         public void run() {
112             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
113             short tableId = startTableId;
114             int numSubmits = flowsPerDpn / batchSize;
115             int sourceIp = 1;
116             int newBatchSize = batchSize;
117             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
118
119             txChain = dataBroker.createMergingTransactionChain(this);
120             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
121
122             for (int i = 1; i <= numSubmits; i++) {
123                 WriteTransaction writeTransaction = txChain.newWriteOnlyTransaction();
124                 short calculatedTableId = tableId;
125                 for (; sourceIp <= newBatchSize; sourceIp++) {
126                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
127                     Flow flow = null;
128                     if (add) {
129                         Match match = BulkOMaticUtils.getMatch(sourceIp);
130                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
131                     }
132
133                     writeTxToDs(writeTransaction, flowId,
134                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
135                             flow, sourceIp, calculatedTableId);
136
137                     if (sourceIp < newBatchSize) {
138                         short numberA = 1;
139                         short numberB = (short) (endTableId - startTableId + 1);
140                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
141                     }
142                 }
143                 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
144                         tableId, calculatedTableId, sourceIp - 1);
145                 writeTransaction.commit().addCallback(
146                         new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
147                         MoreExecutors.directExecutor());
148                 // Wrap around
149                 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
150                 newBatchSize += batchSize;
151                 if (i % sleepAfter == 0 && sleepMillis > 0) {
152                     try {
153                         Thread.sleep(sleepMillis);
154                     } catch (InterruptedException e) {
155                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
156                     }
157                 }
158             }
159             LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
160         }
161
162         @Override
163         public void onTransactionChainFailed(TransactionChain transactionChain,
164                 Transaction asyncTransaction, Throwable throwable) {
165             LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
166                     asyncTransaction.getIdentifier(), throwable);
167             transactionChain.close();
168         }
169
170         @Override
171         public void onTransactionChainSuccessful(TransactionChain transactionChain) {
172             LOG.info("Transaction chain: {} closed successfully.", transactionChain);
173         }
174
175         private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
176                 Flow flow, Integer sourceIp, Short tableId) {
177             if (add) {
178                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
179                 if (isCreateParents) {
180                     writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
181                 } else {
182                     writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
183                 }
184             } else {
185                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
186                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
187             }
188         }
189
190         private class DsCallBack implements FutureCallback<Object> {
191             private final String dpId;
192             private final int sourceIp;
193             private final short endTableId;
194             private final short beginTableId;
195             private final TransactionChain txChain;
196
197             DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
198                     TransactionChain txChain) {
199                 this.dpId = dpId;
200                 this.sourceIp = sourceIp;
201                 this.endTableId = endTableId;
202                 this.beginTableId = beginTableId;
203                 this.txChain = txChain;
204             }
205
206             @Override
207             public void onSuccess(Object notUsed) {
208                 if (remainingTxReturn.decrementAndGet() <= 0) {
209                     long dur = System.nanoTime() - startTime;
210                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
211                     if (0 == countDpnWriteCompletion.decrementAndGet()
212                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
213                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
214                         taskCompletionTime.set(dur);
215                     }
216                     txChain.close();
217                 }
218             }
219
220             @Override
221             public void onFailure(Throwable error) {
222                 if (remainingTxReturn.decrementAndGet() <= 0) {
223                     long dur = System.nanoTime() - startTime;
224                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
225                 }
226                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
227                         + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
228                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
229             }
230         }
231     }
232 }