Merge "Mask Support for TCP and UDP ports in nicira extension "
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterSequential.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
14 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
17 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 public class FlowWriterSequential implements FlowCounterMBean {
26     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
27     private final DataBroker dataBroker;
28     private final ExecutorService flowPusher;
29     protected int dpnCount;
30     private long startTime;
31     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32     private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
33     private AtomicLong taskCompletionTime = new AtomicLong(0);
34     private static final String UNITS = "ns";
35
36     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
37         this.dataBroker = dataBroker;
38         this.flowPusher = flowPusher;
39         LOG.info("Using Sequential implementation of Flow Writer.");
40     }
41
42     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis,
43                          short startTableId, short endTableId) {
44         LOG.info("Using Sequential implementation of Flow Writer.");
45         this.dpnCount = dpnCount;
46         countDpnWriteCompletion.set(dpnCount);
47         startTime = System.nanoTime();
48         for (int i = 1; i <= dpnCount; i++) {
49             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize,
50                     sleepMillis, startTableId, endTableId);
51             flowPusher.execute(task);
52         }
53     }
54
55     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
56                             short endTableId) {
57         LOG.info("Using Sequential implementation of Flow Writer.");
58         countDpnWriteCompletion.set(dpnCount);
59         for (int i = 1; i <= dpnCount; i++) {
60             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
61                     startTableId, endTableId);
62             flowPusher.execute(task);
63         }
64     }
65
66     @Override
67     public long getFlowCount() {
68         return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
69     }
70
71     @Override
72     public int getReadOpStatus() {
73         return BulkOMaticUtils.DEFUALT_STATUS;
74     }
75
76     @Override
77     public int getWriteOpStatus() {
78         return writeOpStatus.get();
79     }
80
81     @Override
82     public long getTaskCompletionTime() {
83         return taskCompletionTime.get();
84     }
85
86     @Override
87     public String getUnits() {
88         return UNITS;
89     }
90
91     private class FlowHandlerTask implements Runnable {
92         private final String dpId;
93         private final int flowsPerDpn;
94         private final boolean add;
95         private final int batchSize;
96         private final int sleepMillis;
97         private final short startTableId;
98         private final short endTableId;
99
100         public FlowHandlerTask(final String dpId,
101                                final int flowsPerDpn,
102                                final boolean add,
103                                final int batchSize,
104                                int sleepMillis,
105                                final short startTableId,
106                                final short endTableId){
107             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
108             this.add = add;
109             this.flowsPerDpn = flowsPerDpn;
110             this.batchSize = batchSize;
111             this.sleepMillis = sleepMillis;
112             this.startTableId = startTableId;
113             this.endTableId = endTableId;
114         }
115
116         @Override
117         public void run() {
118             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
119             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
120
121             Short tableId = startTableId;
122             Integer sourceIp = 1;
123
124             WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
125             short k = tableId;
126
127             for (; sourceIp <= batchSize; sourceIp++) {
128                 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
129                 LOG.debug("Adding flow with id: {}", flowId);
130                 Flow flow = null;
131                 if (add) {
132                     Match match = BulkOMaticUtils.getMatch(sourceIp);
133                     flow = BulkOMaticUtils.buildFlow(k, flowId, match);
134                 }
135                 addFlowToTx(writeTransaction, flowId,
136                         BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
137
138                 if (sourceIp < batchSize) {
139                     short a = 1;
140                     short b = (short)(endTableId - startTableId + 1);
141                     k = (short) (((k + a) % b) + startTableId);
142                 }
143             }
144
145             LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp);
146
147             Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
148         }
149
150         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
151                                  Flow flow) {
152             if (add) {
153                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
154                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
155             } else {
156                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
157                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
158             }
159         }
160
161         private class DsCallBack implements FutureCallback {
162             private String dpId;
163             private Integer sourceIp;
164             private Short tableId;
165
166             public DsCallBack(String dpId, Integer sourceIp, Short tableId) {
167                 this.dpId = dpId;
168                 this.sourceIp = sourceIp;
169                 short a = 1;
170                 short b = (short)(endTableId - startTableId + 1);
171                 this.tableId = (short) (((tableId + a) % b) + startTableId);
172             }
173
174             @Override
175             public void onSuccess(Object o) {
176                 if (sourceIp > flowsPerDpn) {
177                     long dur = System.nanoTime() - startTime;
178                     LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
179                             tableId, sourceIp, dur);
180                     if(0 == countDpnWriteCompletion.decrementAndGet() &&
181                             writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
182                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
183                         taskCompletionTime.set(dur);
184                     }
185                     return;
186                 }
187                 try {
188                     if (sleepMillis > 0) {
189                         Thread.sleep(sleepMillis);
190                     }
191                 } catch (InterruptedException e) {
192                     LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
193                 }
194
195                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
196                 int newBatchSize = sourceIp + batchSize - 1;
197                 short k = tableId;
198                 for (; sourceIp <= newBatchSize; sourceIp++) {
199                     String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
200                     Flow flow = null;
201                     if (add) {
202                         Match match = BulkOMaticUtils.getMatch(sourceIp);
203                         flow = BulkOMaticUtils.buildFlow(k, flowId, match);
204                     }
205                     LOG.debug("Adding flow with id: {}", flowId);
206                     addFlowToTx(writeTransaction, flowId,
207                             BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
208
209                     if (sourceIp < newBatchSize) {
210                         short a = 1;
211                         short b = (short)(endTableId - startTableId + 1);
212                         k = (short) (((k + a) % b) + startTableId);
213                     }
214                 }
215                 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
216                         dpId, tableId, k, sourceIp);
217                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
218             }
219
220             public void onFailure(Throwable error) {
221                 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}",
222                         error, dpId, tableId, sourceIp);
223                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
224             }
225         }
226     }
227 }