Switch to MD-SAL APIs
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.Transaction;
17 import org.opendaylight.mdsal.binding.api.TransactionChain;
18 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
19 import org.opendaylight.mdsal.binding.api.WriteTransaction;
20 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 public class FlowWriterTxChain implements FlowCounterMBean {
28     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
29     private final DataBroker dataBroker;
30     private final ExecutorService flowPusher;
31     private long startTime;
32     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
33     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
34     private final AtomicLong taskCompletionTime = new AtomicLong();
35
36     public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) {
37         this.dataBroker = dataBroker;
38         this.flowPusher = flowPusher;
39         LOG.info("Using Ping Pong Flow Tester Impl");
40     }
41
42     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
43             short startTableId, short endTableId, boolean isCreateParents) {
44         LOG.info("Using Transaction Chain Flow Writer Impl");
45         countDpnWriteCompletion.set(dpnCount);
46         startTime = System.nanoTime();
47         for (int i = 1; i <= dpnCount; i++) {
48             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
49                     sleepAfter, startTableId, endTableId, isCreateParents);
50             flowPusher.execute(task);
51         }
52     }
53
54     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
55             short endTableId) {
56         LOG.info("Using Transaction Chain Flow Writer Impl");
57         countDpnWriteCompletion.set(dpnCount);
58         for (int i = 1; i <= dpnCount; i++) {
59             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
60                     startTableId, endTableId, false);
61             flowPusher.execute(task);
62         }
63     }
64
65     @Override
66     public int getWriteOpStatus() {
67         return writeOpStatus.get();
68     }
69
70     @Override
71     public long getTaskCompletionTime() {
72         return taskCompletionTime.get();
73     }
74
75     private class FlowHandlerTask implements Runnable, TransactionChainListener {
76         private final String dpId;
77         private final boolean add;
78         private final int flowsPerDpn;
79         private final int batchSize;
80         private final int sleepAfter;
81         private final int sleepMillis;
82         private final short startTableId;
83         private final short endTableId;
84         private final boolean isCreateParents;
85         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
86
87         private TransactionChain txChain;
88
89         FlowHandlerTask(final String dpId,
90                         final int flowsPerDpn,
91                         final boolean add,
92                         final int batchSize,
93                         final int sleepMillis,
94                         final int sleepAfter,
95                         final short startTableId,
96                         final short endTableId,
97                         final boolean isCreateParents) {
98             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
99             this.add = add;
100             this.flowsPerDpn = flowsPerDpn;
101             this.batchSize = batchSize;
102             this.sleepMillis = sleepMillis;
103             this.sleepAfter = sleepAfter;
104             this.startTableId = startTableId;
105             this.endTableId = endTableId;
106             this.isCreateParents = isCreateParents;
107             remainingTxReturn.set(flowsPerDpn / batchSize);
108         }
109
110         @Override
111         public void run() {
112             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
113             short tableId = startTableId;
114             int numSubmits = flowsPerDpn / batchSize;
115             int sourceIp = 1;
116             int newBatchSize = batchSize;
117             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
118
119             txChain = dataBroker.createTransactionChain(this);
120             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
121
122             for (int i = 1; i <= numSubmits; i++) {
123                 WriteTransaction writeTransaction = txChain.newWriteOnlyTransaction();
124                 short calculatedTableId = tableId;
125                 for (; sourceIp <= newBatchSize; sourceIp++) {
126                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
127                     Flow flow = null;
128                     if (add) {
129                         Match match = BulkOMaticUtils.getMatch(sourceIp);
130                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
131                     }
132
133                     writeTxToDs(writeTransaction, flowId,
134                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
135                             flow, sourceIp, calculatedTableId);
136
137                     if (sourceIp < newBatchSize) {
138                         short numberA = 1;
139                         short numberB = (short) (endTableId - startTableId + 1);
140                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
141                     }
142                 }
143                 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
144                         tableId, calculatedTableId, sourceIp - 1);
145                 writeTransaction.commit().addCallback(
146                         new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
147                         MoreExecutors.directExecutor());
148                 // Wrap around
149                 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
150                 newBatchSize += batchSize;
151                 if (i % sleepAfter == 0 && sleepMillis > 0) {
152                     try {
153                         Thread.sleep(sleepMillis);
154                     } catch (InterruptedException e) {
155                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
156                     }
157                 }
158             }
159             LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
160         }
161
162         @Override
163         public void onTransactionChainFailed(TransactionChain transactionChain,
164                 Transaction asyncTransaction, Throwable throwable) {
165             LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
166                     asyncTransaction.getIdentifier(), throwable);
167             transactionChain.close();
168         }
169
170         @Override
171         public void onTransactionChainSuccessful(TransactionChain transactionChain) {
172             LOG.info("Transaction chain: {} closed successfully.", transactionChain);
173         }
174
175         private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
176                 Flow flow, Integer sourceIp, Short tableId) {
177             if (add) {
178                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
179                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
180             } else {
181                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
182                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
183             }
184         }
185
186         private class DsCallBack implements FutureCallback<Object> {
187             private final String dpId;
188             private final int sourceIp;
189             private final short endTableId;
190             private final short beginTableId;
191             private final TransactionChain txChain;
192
193             DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
194                     TransactionChain txChain) {
195                 this.dpId = dpId;
196                 this.sourceIp = sourceIp;
197                 this.endTableId = endTableId;
198                 this.beginTableId = beginTableId;
199                 this.txChain = txChain;
200             }
201
202             @Override
203             public void onSuccess(Object notUsed) {
204                 if (remainingTxReturn.decrementAndGet() <= 0) {
205                     long dur = System.nanoTime() - startTime;
206                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
207                     if (0 == countDpnWriteCompletion.decrementAndGet()
208                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
209                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
210                         taskCompletionTime.set(dur);
211                     }
212                     txChain.close();
213                 }
214             }
215
216             @Override
217             public void onFailure(Throwable error) {
218                 if (remainingTxReturn.decrementAndGet() <= 0) {
219                     long dur = System.nanoTime() - startTime;
220                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
221                 }
222                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
223                         + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
224                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
225             }
226         }
227     }
228 }