Bug 8293: Add table writer to bulk-o-matic
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
24 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 public class FlowWriterTxChain implements FlowCounterMBean {
29     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
30     private final DataBroker dataBroker;
31     private final ExecutorService flowPusher;
32     private long startTime;
33     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
34     private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
35     private AtomicLong taskCompletionTime = new AtomicLong();
36
37     public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
38         this.dataBroker = dataBroker;
39         this.flowPusher = flowPusher;
40         LOG.info("Using Ping Pong Flow Tester Impl");
41     }
42
43     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
44                          int sleepMillis, int sleepAfter, short startTableId, short endTableId,
45                          boolean isCreateParents) {
46         LOG.info("Using Transaction Chain Flow Writer Impl");
47         countDpnWriteCompletion.set(dpnCount);
48         startTime = System.nanoTime();
49         for (int i = 1; i <= dpnCount; i++) {
50             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
51                     flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
52             flowPusher.execute(task);
53         }
54     }
55
56     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
57                             short startTableId, short endTableId) {
58         LOG.info("Using Transaction Chain Flow Writer Impl");
59         countDpnWriteCompletion.set(dpnCount);
60         for (int i = 1; i <= dpnCount; i++) {
61             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
62                     0, 1, startTableId, endTableId, false);
63             flowPusher.execute(task);
64         }
65     }
66
67     @Override
68     public int getWriteOpStatus() {
69         return writeOpStatus.get();
70     }
71
72     @Override
73     public long getTaskCompletionTime() {
74         return taskCompletionTime.get();
75     }
76
77     private class FlowHandlerTask implements Runnable, TransactionChainListener {
78         private final String dpId;
79         private final boolean add;
80         private final int flowsPerDpn;
81         private final int batchSize;
82         private final int sleepAfter;
83         private final int sleepMillis;
84         private final short startTableId;
85         private final short endTableId;
86         private final boolean isCreateParents;
87         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
88
89         BindingTransactionChain txChain;
90
91         public FlowHandlerTask(final String dpId,
92                                final int flowsPerDpn,
93                                final boolean add,
94                                final int batchSize,
95                                final int sleepMillis,
96                                final int sleepAfter,
97                                final short startTableId,
98                                final short endTableId,
99                                final boolean isCreateParents){
100             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
101             this.add = add;
102             this.flowsPerDpn = flowsPerDpn;
103             this.batchSize = batchSize;
104             this.sleepMillis = sleepMillis;
105             this.sleepAfter = sleepAfter;
106             this.startTableId = startTableId;
107             this.endTableId = endTableId;
108             this.isCreateParents = isCreateParents;
109             remainingTxReturn.set(flowsPerDpn/batchSize);
110         }
111
112         @Override
113         public void run() {
114             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
115             short tableId = startTableId;
116             int numSubmits = flowsPerDpn/batchSize;
117             int sourceIp = 1;
118             int newBatchSize = batchSize;
119             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
120
121             txChain = dataBroker.createTransactionChain(this);
122             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
123
124             for (int i = 1; i <= numSubmits; i++) {
125                 WriteTransaction writeTransaction;
126                 try {
127                     writeTransaction = txChain.newWriteOnlyTransaction();
128                 } catch (Exception e) {
129                     LOG.error("Transaction creation failed in txChain: {}, due to: {}", txChain, e);
130                     break;
131                 }
132                 short k = tableId;
133                 for (; sourceIp <= newBatchSize; sourceIp++) {
134                     String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
135                     Flow flow = null;
136                     if (add) {
137                         Match match = BulkOMaticUtils.getMatch(sourceIp);
138                         flow = BulkOMaticUtils.buildFlow(k, flowId, match);
139                     }
140
141                     writeTxToDs(writeTransaction, flowId,
142                             BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
143
144                     if (sourceIp < newBatchSize) {
145                         short a = 1;
146                         short b = (short) (endTableId - startTableId + 1);
147                         k = (short) (((k + a) % b) + startTableId);
148                     }
149                 }
150                 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp - 1);
151                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
152                 // Wrap around
153                 tableId = (short) (((k + 1) % ((short) (endTableId - startTableId + 1))) + startTableId);
154                 newBatchSize += batchSize;
155                 if (((i % sleepAfter) == 0) && (sleepMillis > 0)) {
156                     try {
157                         Thread.sleep(sleepMillis);
158                     } catch (InterruptedException e) {
159                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
160                     }
161                 }
162             }
163             LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
164         }
165
166         @Override
167         public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
168             LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: {}", transactionChain,
169                     asyncTransaction.getIdentifier(), throwable);
170             transactionChain.close();
171         }
172
173         @Override
174         public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
175             LOG.info("Transaction chain: {} closed successfully.", transactionChain);
176         }
177
178         private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
179             if (add) {
180                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
181                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
182             } else {
183                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
184                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
185             }
186         }
187
188         private class DsCallBack implements FutureCallback {
189             private String dpId;
190             private int sourceIp;
191             private short endTableId;
192             private short beginTableId;
193
194             public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
195                 this.dpId = dpId;
196                 this.sourceIp = sourceIp;
197                 this.endTableId = endTableId;
198                 this.beginTableId = beginTableId;
199             }
200
201             @Override
202             public void onSuccess(Object o) {
203                 if (remainingTxReturn.decrementAndGet() <= 0) {
204                     long dur = System.nanoTime() - startTime;
205                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
206                             dur);
207                     if(0 == countDpnWriteCompletion.decrementAndGet() &&
208                             writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
209                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
210                         taskCompletionTime.set(dur);
211                     }
212                     txChain.close();
213                 }
214             }
215
216             public void onFailure(Throwable error) {
217                 if (remainingTxReturn.decrementAndGet() <= 0) {
218                     long dur = System.nanoTime() - startTime;
219                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
220                             dur);
221                 }
222                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
223                         "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
224                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
225             }
226         }
227     }
228 }