Merge "OPNFLWPLUG-983 Group and flow removal stats are not reported in order"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterSequential.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import java.util.concurrent.atomic.AtomicLong;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
21 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 public class FlowWriterSequential implements FlowCounterMBean {
26     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
27     private final DataBroker dataBroker;
28     private final ExecutorService flowPusher;
29     protected int dpnCount;
30     private long startTime;
31     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
33     private final AtomicLong taskCompletionTime = new AtomicLong();
34
35     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
36         this.dataBroker = dataBroker;
37         this.flowPusher = flowPusher;
38         LOG.info("Using Sequential implementation of Flow Writer.");
39     }
40
41     public void addFlows(Integer count, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
42             short endTableId, boolean isCreateParents) {
43         LOG.info("Using Sequential implementation of Flow Writer.");
44         this.dpnCount = count;
45         countDpnWriteCompletion.set(count);
46         startTime = System.nanoTime();
47         for (int i = 1; i <= count; i++) {
48             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
49                     startTableId, endTableId, isCreateParents);
50             flowPusher.execute(task);
51         }
52     }
53
54     public void deleteFlows(Integer count, Integer flowsPerDPN, int batchSize, short startTableId,
55             short endTableId) {
56         LOG.info("Using Sequential implementation of Flow Writer.");
57         countDpnWriteCompletion.set(count);
58         for (int i = 1; i <= count; i++) {
59             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
60                     startTableId, endTableId, false);
61             flowPusher.execute(task);
62         }
63     }
64
65     @Override
66     public int getWriteOpStatus() {
67         return writeOpStatus.get();
68     }
69
70     @Override
71     public long getTaskCompletionTime() {
72         return taskCompletionTime.get();
73     }
74
75     private class FlowHandlerTask implements Runnable {
76         private final String dpId;
77         private final int flowsPerDpn;
78         private final boolean add;
79         private final int batchSize;
80         private final int sleepMillis;
81         private final short startTableId;
82         private final short endTableId;
83         private final boolean isCreateParents;
84
85         FlowHandlerTask(final String dpId,
86                         final int flowsPerDpn,
87                         final boolean add,
88                         final int batchSize,
89                         int sleepMillis,
90                         final short startTableId,
91                         final short endTableId,
92                         final boolean isCreateParents) {
93             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
94             this.add = add;
95             this.flowsPerDpn = flowsPerDpn;
96             this.batchSize = batchSize;
97             this.sleepMillis = sleepMillis;
98             this.startTableId = startTableId;
99             this.endTableId = endTableId;
100             this.isCreateParents = isCreateParents;
101         }
102
103         @Override
104         public void run() {
105             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
106                     flowsPerDpn / batchSize);
107             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
108
109             Short tableId = startTableId;
110
111             WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
112             short calculatedTableId = tableId;
113
114             int sourceIp = 1;
115             for (; sourceIp <= batchSize; sourceIp++) {
116                 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
117                 LOG.debug("Adding flow with id: {}", flowId);
118                 Flow flow = null;
119                 if (add) {
120                     Match match = BulkOMaticUtils.getMatch(sourceIp);
121                     flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
122                 }
123                 addFlowToTx(writeTransaction, flowId,
124                         BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow);
125
126                 if (sourceIp < batchSize) {
127                     short numberA = 1;
128                     short numberB = (short) (endTableId - startTableId + 1);
129                     calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
130                 }
131             }
132
133             LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId,
134                     calculatedTableId, sourceIp);
135
136             Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
137                     MoreExecutors.directExecutor());
138         }
139
140         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
141                 Flow flow) {
142             if (add) {
143                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
144                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
145             } else {
146                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
147                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
148             }
149         }
150
151         private class DsCallBack implements FutureCallback<Void> {
152             private final String dpId;
153             private int sourceIp;
154             private final Short tableId;
155
156             DsCallBack(String dpId, int sourceIp, Short tableId) {
157                 this.dpId = dpId;
158                 this.sourceIp = sourceIp;
159                 short numberA = 1;
160                 short numberB = (short) (endTableId - startTableId + 1);
161                 this.tableId = (short) ((tableId + numberA) % numberB + startTableId);
162             }
163
164             @Override
165             public void onSuccess(Void notUsed) {
166                 if (sourceIp > flowsPerDpn) {
167                     long dur = System.nanoTime() - startTime;
168                     LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
169                             tableId, sourceIp, dur);
170                     if (0 == countDpnWriteCompletion.decrementAndGet()
171                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
172                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
173                         taskCompletionTime.set(dur);
174                     }
175                     return;
176                 }
177                 try {
178                     if (sleepMillis > 0) {
179                         Thread.sleep(sleepMillis);
180                     }
181                 } catch (InterruptedException e) {
182                     LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
183                 }
184
185                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
186                 int newBatchSize = sourceIp + batchSize - 1;
187                 short calculatedTableId = tableId;
188                 for (; sourceIp <= newBatchSize; sourceIp++) {
189                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
190                     Flow flow = null;
191                     if (add) {
192                         Match match = BulkOMaticUtils.getMatch(sourceIp);
193                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
194                     }
195                     LOG.debug("Adding flow with id: {}", flowId);
196                     addFlowToTx(writeTransaction, flowId,
197                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
198                             flow);
199
200                     if (sourceIp < newBatchSize) {
201                         short numberA = 1;
202                         short numberB = (short) (endTableId - startTableId + 1);
203                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
204                     }
205                 }
206                 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
207                         dpId, tableId, calculatedTableId, sourceIp);
208                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
209                         MoreExecutors.directExecutor());
210             }
211
212             @Override
213             public void onFailure(Throwable error) {
214                 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}", error, dpId,
215                         tableId, sourceIp);
216                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
217             }
218         }
219     }
220 }