Merge "Remove deprecated"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
22 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class FlowWriterConcurrent implements FlowCounterMBean {
27     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
28     public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
29             "Using Concurrent implementation of Flow Writer.";
30     private final DataBroker dataBroker;
31     private final ExecutorService flowPusher;
32     private long startTime;
33     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
34     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
35     private final AtomicLong taskCompletionTime = new AtomicLong();
36
37     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
38         this.dataBroker = dataBroker;
39         this.flowPusher = flowPusher;
40         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
41     }
42
43     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
44             short startTableId, short endTableId, boolean isCreateParents) {
45         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
46         countDpnWriteCompletion.set(dpnCount);
47         startTime = System.nanoTime();
48         for (int i = 1; i <= dpnCount; i++) {
49             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
50                     sleepAfter, startTableId, endTableId, isCreateParents);
51             flowPusher.execute(task);
52         }
53     }
54
55     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
56             short endTableId) {
57         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
58         countDpnWriteCompletion.set(dpnCount);
59         for (int i = 1; i <= dpnCount; i++) {
60             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
61                     startTableId, endTableId, false);
62             flowPusher.execute(task);
63         }
64     }
65
66     @Override
67     public int getWriteOpStatus() {
68         return writeOpStatus.get();
69     }
70
71     @Override
72     public long getTaskCompletionTime() {
73         return taskCompletionTime.get();
74     }
75
76     private class FlowHandlerTask implements Runnable {
77         private final String dpId;
78         private final boolean add;
79         private final int flowsPerDpn;
80         private final int batchSize;
81         private final int sleepAfter;
82         private final int sleepMillis;
83         private final short startTableId;
84         private final short endTableId;
85         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
86         private final boolean isCreateParents;
87
88         FlowHandlerTask(final String dpId, final int flowsPerDpn,
89                 final boolean add, final int batchSize,
90                 final int sleepMillis, final int sleepAfter,
91                 final short startTableId, final short endTableId,
92                 final boolean isCreateParents) {
93             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
94             this.add = add;
95             this.flowsPerDpn = flowsPerDpn;
96             this.batchSize = batchSize;
97             this.sleepMillis = sleepMillis;
98             this.sleepAfter = sleepAfter;
99             this.startTableId = startTableId;
100             this.endTableId = endTableId;
101             this.isCreateParents = isCreateParents;
102             remainingTxReturn.set(flowsPerDpn / batchSize);
103         }
104
105         @Override
106         public void run() {
107             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
108                     flowsPerDpn / batchSize);
109             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
110             short tableId = startTableId;
111             int numSubmits = flowsPerDpn / batchSize;
112             int sourceIp = 1;
113             int newBatchSize = batchSize;
114
115             for (int i = 1; i <= numSubmits; i++) {
116                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
117                 short calculatedTableId = tableId;
118                 for (; sourceIp <= newBatchSize; sourceIp++) {
119                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
120                     Flow flow = null;
121                     if (add) {
122                         Match match = BulkOMaticUtils.getMatch(sourceIp);
123                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
124                     }
125
126                     addFlowToTx(writeTransaction, flowId,
127                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
128                             calculatedTableId);
129
130                     if (sourceIp < newBatchSize) {
131                         short numberA = 1;
132                         short numberB = (short) (endTableId - startTableId + 1);
133                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
134                     }
135                 }
136                 Futures.addCallback(writeTransaction.submit(),
137                         new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
138                 // Wrap around
139                 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
140                 newBatchSize += batchSize;
141                 if (i % sleepAfter == 0 && sleepMillis > 0) {
142                     try {
143                         Thread.sleep(sleepMillis);
144                     } catch (InterruptedException e) {
145                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
146                     }
147                 }
148             }
149         }
150
151         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
152                 Flow flow, Integer sourceIp, Short tableId) {
153             if (add) {
154                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
155                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
156             } else {
157                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
158                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
159             }
160         }
161
162         private class DsCallBack implements FutureCallback {
163             private final String dpId;
164             private final int sourceIp;
165             private final short endTableId;
166             private final short beginTableId;
167
168             DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
169                 this.dpId = dpId;
170                 this.sourceIp = sourceIp;
171                 this.endTableId = endTableId;
172                 this.beginTableId = beginTableId;
173             }
174
175             @Override
176             public void onSuccess(Object object) {
177                 if (remainingTxReturn.decrementAndGet() <= 0) {
178                     long dur = System.nanoTime() - startTime;
179                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
180                     if (0 == countDpnWriteCompletion.decrementAndGet()
181                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
182                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
183                         taskCompletionTime.set(dur);
184                     }
185                 }
186             }
187
188             @Override
189             public void onFailure(Throwable error) {
190                 if (remainingTxReturn.decrementAndGet() <= 0) {
191                     long dur = System.nanoTime() - startTime;
192                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
193                 }
194                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
195                         + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
196                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
197             }
198         }
199     }
200 }