Merge "Bug 8535: Fix IPv6 OXMHeader Mask issue"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class FlowWriterConcurrent implements FlowCounterMBean {
25     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
26     public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER = "Using Concurrent implementation of Flow Writer.";
27     private final DataBroker dataBroker;
28     private final ExecutorService flowPusher;
29     private long startTime;
30     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
31     private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
32     private AtomicLong taskCompletionTime = new AtomicLong();
33
34     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
35         this.dataBroker = dataBroker;
36         this.flowPusher = flowPusher;
37         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
38     }
39
40     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
41                          int sleepMillis, int sleepAfter, short startTableId, short endTableId,
42                          boolean isCreateParents) {
43         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
44         countDpnWriteCompletion.set(dpnCount);
45         startTime = System.nanoTime();
46         for (int i = 1; i <= dpnCount; i++) {
47             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
48                     flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
49             flowPusher.execute(task);
50         }
51     }
52
53     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
54                             short startTableId, short endTableId) {
55         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
56         countDpnWriteCompletion.set(dpnCount);
57         for (int i = 1; i <= dpnCount; i++) {
58             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
59                     0, 1, startTableId, endTableId, false);
60             flowPusher.execute(task);
61         }
62     }
63
64     @Override
65     public int getWriteOpStatus() {
66         return writeOpStatus.get();
67     }
68
69     @Override
70     public long getTaskCompletionTime() {
71         return taskCompletionTime.get();
72     }
73
74     private class FlowHandlerTask implements Runnable {
75         private final String dpId;
76         private final boolean add;
77         private final int flowsPerDpn;
78         private final int batchSize;
79         private final int sleepAfter;
80         private final int sleepMillis;
81         private final short startTableId;
82         private final short endTableId;
83         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
84         private final boolean isCreateParents;
85
86         public FlowHandlerTask(final String dpId,
87                                final int flowsPerDpn,
88                                final boolean add,
89                                final int batchSize,
90                                final int sleepMillis,
91                                final int sleepAfter,
92                                final short startTableId,
93                                final short endTableId,
94                                final boolean isCreateParents){
95             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
96             this.add = add;
97             this.flowsPerDpn = flowsPerDpn;
98             this.batchSize = batchSize;
99             this.sleepMillis = sleepMillis;
100             this.sleepAfter = sleepAfter;
101             this.startTableId = startTableId;
102             this.endTableId = endTableId;
103             this.isCreateParents = isCreateParents;
104             remainingTxReturn.set(flowsPerDpn/batchSize);
105         }
106
107         @Override
108         public void run() {
109             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
110             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
111             short tableId = startTableId;
112             int numSubmits = flowsPerDpn/batchSize;
113             int sourceIp = 1;
114             int newBatchSize = batchSize;
115
116             for (int i = 1; i <= numSubmits; i++) {
117                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
118                 short k = tableId;
119                 for (; sourceIp <= newBatchSize; sourceIp++) {
120                     String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
121                     Flow flow = null;
122                     if (add) {
123                         Match match = BulkOMaticUtils.getMatch(sourceIp);
124                         flow = BulkOMaticUtils.buildFlow(k, flowId, match);
125                     }
126
127                     addFlowToTx(writeTransaction, flowId,
128                             BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
129
130                     if (sourceIp < newBatchSize) {
131                         short a = 1;
132                         short b = (short)(endTableId - startTableId + 1);
133                         k = (short) (((k + a) % b) + startTableId);
134                     }
135                 }
136                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
137                 // Wrap around
138                 tableId = (short)(((k + 1)%((short)(endTableId - startTableId + 1))) + startTableId);
139                 newBatchSize += batchSize;
140                 if (((i%sleepAfter) == 0) && (sleepMillis > 0)) {
141                     try {
142                         Thread.sleep(sleepMillis);
143                     } catch (InterruptedException e) {
144                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
145                     }
146                 }
147             }
148         }
149
150         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
151             if (add) {
152                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
153                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
154             } else {
155                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
156                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
157             }
158         }
159
160         private class DsCallBack implements FutureCallback {
161             private String dpId;
162             private int sourceIp;
163             private short endTableId;
164             private short beginTableId;
165
166             public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
167                 this.dpId = dpId;
168                 this.sourceIp = sourceIp;
169                 this.endTableId = endTableId;
170                 this.beginTableId = beginTableId;
171             }
172
173             @Override
174             public void onSuccess(Object o) {
175                 if (remainingTxReturn.decrementAndGet() <= 0) {
176                     long dur = System.nanoTime() - startTime;
177                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
178                             dur);
179                     if(0 == countDpnWriteCompletion.decrementAndGet() &&
180                             writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
181                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
182                         taskCompletionTime.set(dur);
183                     }
184                 }
185             }
186
187             public void onFailure(Throwable error) {
188                 if (remainingTxReturn.decrementAndGet() <= 0) {
189                     long dur = System.nanoTime() - startTime;
190                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
191                             dur);
192                 }
193                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
194                         "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
195                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
196             }
197         }
198     }
199 }