Merge "Fix minor issues regarding checkstyle"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
14 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
17 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 public class FlowWriterConcurrent implements FlowCounterMBean {
26     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
27     public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER = "Using Concurrent implementation of Flow Writer.";
28     private final DataBroker dataBroker;
29     private final ExecutorService flowPusher;
30     private long startTime;
31     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32     private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
33     private AtomicLong taskCompletionTime = new AtomicLong(0);
34     private static final String UNITS = "ns";
35
36     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
37         this.dataBroker = dataBroker;
38         this.flowPusher = flowPusher;
39         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
40     }
41
42     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
43                          int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
44         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
45         countDpnWriteCompletion.set(dpnCount);
46         startTime = System.nanoTime();
47         for (int i = 1; i <= dpnCount; i++) {
48             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
49                     flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
50             flowPusher.execute(task);
51         }
52     }
53
54     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
55                             short startTableId, short endTableId) {
56         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
57         countDpnWriteCompletion.set(dpnCount);
58         for (int i = 1; i <= dpnCount; i++) {
59             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
60                     0, 1, startTableId, endTableId);
61             flowPusher.execute(task);
62         }
63     }
64
65     @Override
66     public long getFlowCount() {
67         return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
68     }
69
70     @Override
71     public int getReadOpStatus() {
72         return BulkOMaticUtils.DEFUALT_STATUS;
73     }
74
75     @Override
76     public int getWriteOpStatus() {
77         return writeOpStatus.get();
78     }
79
80     @Override
81     public long getTaskCompletionTime() {
82         return taskCompletionTime.get();
83     }
84
85     @Override
86     public String getUnits() {
87         return UNITS;
88     }
89
90     private class FlowHandlerTask implements Runnable {
91         private final String dpId;
92         private final boolean add;
93         private final int flowsPerDpn;
94         private final int batchSize;
95         private final int sleepAfter;
96         private final int sleepMillis;
97         private final short startTableId;
98         private final short endTableId;
99         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
100
101         public FlowHandlerTask(final String dpId,
102                                final int flowsPerDpn,
103                                final boolean add,
104                                final int batchSize,
105                                final int sleepMillis,
106                                final int sleepAfter,
107                                final short startTableId,
108                                final short endTableId){
109             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
110             this.add = add;
111             this.flowsPerDpn = flowsPerDpn;
112             this.batchSize = batchSize;
113             this.sleepMillis = sleepMillis;
114             this.sleepAfter = sleepAfter;
115             this.startTableId = startTableId;
116             this.endTableId = endTableId;
117             remainingTxReturn.set(flowsPerDpn/batchSize);
118         }
119
120         @Override
121         public void run() {
122             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
123             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
124             short tableId = startTableId;
125             int numSubmits = flowsPerDpn/batchSize;
126             int sourceIp = 1;
127             int newBatchSize = batchSize;
128
129             for (int i = 1; i <= numSubmits; i++) {
130                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
131                 short k = tableId;
132                 for (; sourceIp <= newBatchSize; sourceIp++) {
133                     String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
134                     Flow flow = null;
135                     if (add) {
136                         Match match = BulkOMaticUtils.getMatch(sourceIp);
137                         flow = BulkOMaticUtils.buildFlow(k, flowId, match);
138                     }
139
140                     addFlowToTx(writeTransaction, flowId,
141                             BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
142
143                     if (sourceIp < newBatchSize) {
144                         short a = 1;
145                         short b = (short)(endTableId - startTableId + 1);
146                         k = (short) (((k + a) % b) + startTableId);
147                     }
148                 }
149                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
150                 // Wrap around
151                 tableId = (short)(((k + 1)%((short)(endTableId - startTableId + 1))) + startTableId);
152                 newBatchSize += batchSize;
153                 if (((i%sleepAfter) == 0) && (sleepMillis > 0)) {
154                     try {
155                         Thread.sleep(sleepMillis);
156                     } catch (InterruptedException e) {
157                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
158                     }
159                 }
160             }
161         }
162
163         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
164             if (add) {
165                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
166                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
167             } else {
168                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
169                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
170             }
171         }
172
173         private class DsCallBack implements FutureCallback {
174             private String dpId;
175             private int sourceIp;
176             private short endTableId;
177             private short beginTableId;
178
179             public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
180                 this.dpId = dpId;
181                 this.sourceIp = sourceIp;
182                 this.endTableId = endTableId;
183                 this.beginTableId = beginTableId;
184             }
185
186             @Override
187             public void onSuccess(Object o) {
188                 if (remainingTxReturn.decrementAndGet() <= 0) {
189                     long dur = System.nanoTime() - startTime;
190                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
191                             dur);
192                     if(0 == countDpnWriteCompletion.decrementAndGet() &&
193                             writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
194                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
195                         taskCompletionTime.set(dur);
196                     }
197                 }
198             }
199
200             public void onFailure(Throwable error) {
201                 if (remainingTxReturn.decrementAndGet() <= 0) {
202                     long dur = System.nanoTime() - startTime;
203                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
204                             dur);
205                 }
206                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
207                         "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
208                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
209             }
210         }
211     }
212 }