Update MRI projects for Aluminium
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.WriteTransaction;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class FlowWriterConcurrent implements FlowCounterMBean {
25     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
26     public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
27             "Using Concurrent implementation of Flow Writer.";
28     private final DataBroker dataBroker;
29     private final ExecutorService flowPusher;
30     private long startTime;
31     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
33     private final AtomicLong taskCompletionTime = new AtomicLong();
34
35     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
36         this.dataBroker = dataBroker;
37         this.flowPusher = flowPusher;
38         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
39     }
40
41     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
42             short startTableId, short endTableId, boolean isCreateParents) {
43         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
44         countDpnWriteCompletion.set(dpnCount);
45         startTime = System.nanoTime();
46         for (int i = 1; i <= dpnCount; i++) {
47             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
48                     sleepAfter, startTableId, endTableId, isCreateParents);
49             flowPusher.execute(task);
50         }
51     }
52
53     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
54             short endTableId) {
55         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
56         countDpnWriteCompletion.set(dpnCount);
57         for (int i = 1; i <= dpnCount; i++) {
58             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
59                     startTableId, endTableId, false);
60             flowPusher.execute(task);
61         }
62     }
63
64     @Override
65     public int getWriteOpStatus() {
66         return writeOpStatus.get();
67     }
68
69     @Override
70     public long getTaskCompletionTime() {
71         return taskCompletionTime.get();
72     }
73
74     private class FlowHandlerTask implements Runnable {
75         private final String dpId;
76         private final boolean add;
77         private final int flowsPerDpn;
78         private final int batchSize;
79         private final int sleepAfter;
80         private final int sleepMillis;
81         private final short startTableId;
82         private final short endTableId;
83         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
84         private final boolean isCreateParents;
85
86         FlowHandlerTask(final String dpId, final int flowsPerDpn,
87                 final boolean add, final int batchSize,
88                 final int sleepMillis, final int sleepAfter,
89                 final short startTableId, final short endTableId,
90                 final boolean isCreateParents) {
91             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
92             this.add = add;
93             this.flowsPerDpn = flowsPerDpn;
94             this.batchSize = batchSize;
95             this.sleepMillis = sleepMillis;
96             this.sleepAfter = sleepAfter;
97             this.startTableId = startTableId;
98             this.endTableId = endTableId;
99             this.isCreateParents = isCreateParents;
100             remainingTxReturn.set(flowsPerDpn / batchSize);
101         }
102
103         @Override
104         public void run() {
105             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
106                     flowsPerDpn / batchSize);
107             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
108             short tableId = startTableId;
109             int numSubmits = flowsPerDpn / batchSize;
110             int sourceIp = 1;
111             int newBatchSize = batchSize;
112
113             for (int i = 1; i <= numSubmits; i++) {
114                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
115                 short calculatedTableId = tableId;
116                 for (; sourceIp <= newBatchSize; sourceIp++) {
117                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
118                     Flow flow = null;
119                     if (add) {
120                         Match match = BulkOMaticUtils.getMatch(sourceIp);
121                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
122                     }
123
124                     addFlowToTx(writeTransaction, flowId,
125                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
126                             calculatedTableId);
127
128                     if (sourceIp < newBatchSize) {
129                         short numberA = 1;
130                         short numberB = (short) (endTableId - startTableId + 1);
131                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
132                     }
133                 }
134                 writeTransaction.commit().addCallback(
135                         new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
136                 // Wrap around
137                 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
138                 newBatchSize += batchSize;
139                 if (i % sleepAfter == 0 && sleepMillis > 0) {
140                     try {
141                         Thread.sleep(sleepMillis);
142                     } catch (InterruptedException e) {
143                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
144                     }
145                 }
146             }
147         }
148
149         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
150                 Flow flow, Integer sourceIp, Short tableId) {
151             if (add) {
152                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
153                 if (isCreateParents) {
154                     writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
155                 } else {
156                     writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
157                 }
158             } else {
159                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
160                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
161             }
162         }
163
164         private class DsCallBack implements FutureCallback<Object> {
165             private final String dpId;
166             private final int sourceIp;
167             private final short endTableId;
168             private final short beginTableId;
169
170             DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
171                 this.dpId = dpId;
172                 this.sourceIp = sourceIp;
173                 this.endTableId = endTableId;
174                 this.beginTableId = beginTableId;
175             }
176
177             @Override
178             public void onSuccess(Object object) {
179                 if (remainingTxReturn.decrementAndGet() <= 0) {
180                     long dur = System.nanoTime() - startTime;
181                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
182                     if (0 == countDpnWriteCompletion.decrementAndGet()
183                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
184                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
185                         taskCompletionTime.set(dur);
186                     }
187                 }
188             }
189
190             @Override
191             public void onFailure(Throwable error) {
192                 if (remainingTxReturn.decrementAndGet() <= 0) {
193                     long dur = System.nanoTime() - startTime;
194                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
195                 }
196                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
197                         + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
198                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
199             }
200         }
201     }
202 }