c26d1bcee46ec6daf6a3f1553af4d0b3d8b5c160
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
14 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
17 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 public class FlowWriterConcurrent implements FlowCounterMBean {
26     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
27     private final DataBroker dataBroker;
28     private final ExecutorService flowPusher;
29     private long startTime;
30     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
31     private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
32     private AtomicLong taskCompletionTime = new AtomicLong(0);
33     private final String UNITS = "ns";
34
35     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
36         this.dataBroker = dataBroker;
37         this.flowPusher = flowPusher;
38         LOG.info("Using Concurrent implementation of Flow Writer.");
39     }
40
41     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
42                          int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
43         LOG.info("Using Concurrent implementation of Flow Writer.");
44         countDpnWriteCompletion.set(dpnCount);
45         startTime = System.nanoTime();
46         for (int i = 1; i <= dpnCount; i++) {
47             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
48                     flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
49             flowPusher.execute(task);
50         }
51     }
52
53     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
54                             short startTableId, short endTableId) {
55         LOG.info("Using Concurrent implementation of Flow Writer.");
56         countDpnWriteCompletion.set(dpnCount);
57         for (int i = 1; i <= dpnCount; i++) {
58             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
59                     0, 1, startTableId, endTableId);
60             flowPusher.execute(task);
61         }
62     }
63
64     @Override
65     public long getFlowCount() {
66         return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
67     }
68
69     @Override
70     public int getReadOpStatus() {
71         return BulkOMaticUtils.DEFUALT_STATUS;
72     }
73
74     @Override
75     public int getWriteOpStatus() {
76         return writeOpStatus.get();
77     }
78
79     @Override
80     public long getTaskCompletionTime() {
81         return taskCompletionTime.get();
82     }
83
84     @Override
85     public String getUnits() {
86         return UNITS;
87     }
88
89     private class FlowHandlerTask implements Runnable {
90         private final String dpId;
91         private final boolean add;
92         private final int flowsPerDpn;
93         private final int batchSize;
94         private final int sleepAfter;
95         private final int sleepMillis;
96         private final short startTableId;
97         private final short endTableId;
98         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
99
100         public FlowHandlerTask(final String dpId,
101                                final int flowsPerDpn,
102                                final boolean add,
103                                final int batchSize,
104                                final int sleepMillis,
105                                final int sleepAfter,
106                                final short startTableId,
107                                final short endTableId){
108             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
109             this.add = add;
110             this.flowsPerDpn = flowsPerDpn;
111             this.batchSize = batchSize;
112             this.sleepMillis = sleepMillis;
113             this.sleepAfter = sleepAfter;
114             this.startTableId = startTableId;
115             this.endTableId = endTableId;
116             remainingTxReturn.set(flowsPerDpn/batchSize);
117         }
118
119         @Override
120         public void run() {
121             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
122             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
123             short tableId = startTableId;
124             int numSubmits = flowsPerDpn/batchSize;
125             int sourceIp = 1;
126             int newBatchSize = batchSize;
127
128             for (int i = 1; i <= numSubmits; i++) {
129                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
130                 short k = tableId;
131                 for (; sourceIp <= newBatchSize; sourceIp++) {
132                     String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
133                     Flow flow = null;
134                     if (add) {
135                         Match match = BulkOMaticUtils.getMatch(sourceIp);
136                         flow = BulkOMaticUtils.buildFlow(k, flowId, match);
137                     }
138
139                     addFlowToTx(writeTransaction, flowId,
140                             BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
141
142                     if (sourceIp < newBatchSize) {
143                         short a = 1;
144                         short b = (short)(endTableId - startTableId + 1);
145                         k = (short) (((k + a) % b) + startTableId);
146                     }
147                 }
148                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
149                 // Wrap around
150                 tableId = (short)(((k + 1)%((short)(endTableId - startTableId + 1))) + startTableId);
151                 newBatchSize += batchSize;
152                 if (((i%sleepAfter) == 0) && (sleepMillis > 0)) {
153                     try {
154                         Thread.sleep(sleepMillis);
155                     } catch (InterruptedException e) {
156                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
157                     }
158                 }
159             }
160         }
161
162         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
163             if (add) {
164                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
165                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
166             } else {
167                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
168                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
169             }
170         }
171
172         private class DsCallBack implements FutureCallback {
173             private String dpId;
174             private int sourceIp;
175             private short endTableId;
176             private short beginTableId;
177
178             public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
179                 this.dpId = dpId;
180                 this.sourceIp = sourceIp;
181                 this.endTableId = endTableId;
182                 this.beginTableId = beginTableId;
183             }
184
185             @Override
186             public void onSuccess(Object o) {
187                 if (remainingTxReturn.decrementAndGet() <= 0) {
188                     long dur = System.nanoTime() - startTime;
189                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
190                             dur);
191                     if(0 == countDpnWriteCompletion.decrementAndGet() &&
192                             writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
193                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
194                         taskCompletionTime.set(dur);
195                     }
196                 }
197             }
198
199             public void onFailure(Throwable error) {
200                 if (remainingTxReturn.decrementAndGet() <= 0) {
201                     long dur = System.nanoTime() - startTime;
202                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
203                             dur);
204                 }
205                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
206                         "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
207                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
208             }
209         }
210     }
211 }