Update MRI projects for Aluminium
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterSequential.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.WriteTransaction;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class FlowWriterSequential implements FlowCounterMBean {
25     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
26     private final DataBroker dataBroker;
27     private final ExecutorService flowPusher;
28     protected int dpnCount;
29     private long startTime;
30     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
31     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
32     private final AtomicLong taskCompletionTime = new AtomicLong();
33
34     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
35         this.dataBroker = dataBroker;
36         this.flowPusher = flowPusher;
37         LOG.info("Using Sequential implementation of Flow Writer.");
38     }
39
40     public void addFlows(Integer count, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
41             short endTableId, boolean isCreateParents) {
42         LOG.info("Using Sequential implementation of Flow Writer.");
43         this.dpnCount = count;
44         countDpnWriteCompletion.set(count);
45         startTime = System.nanoTime();
46         for (int i = 1; i <= count; i++) {
47             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
48                     startTableId, endTableId, isCreateParents);
49             flowPusher.execute(task);
50         }
51     }
52
53     public void deleteFlows(Integer count, Integer flowsPerDPN, int batchSize, short startTableId,
54             short endTableId) {
55         LOG.info("Using Sequential implementation of Flow Writer.");
56         countDpnWriteCompletion.set(count);
57         for (int i = 1; i <= count; i++) {
58             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
59                     startTableId, endTableId, false);
60             flowPusher.execute(task);
61         }
62     }
63
64     @Override
65     public int getWriteOpStatus() {
66         return writeOpStatus.get();
67     }
68
69     @Override
70     public long getTaskCompletionTime() {
71         return taskCompletionTime.get();
72     }
73
74     private class FlowHandlerTask implements Runnable {
75         private final String dpId;
76         private final int flowsPerDpn;
77         private final boolean add;
78         private final int batchSize;
79         private final int sleepMillis;
80         private final short startTableId;
81         private final short endTableId;
82         private final boolean isCreateParents;
83
84         FlowHandlerTask(final String dpId,
85                         final int flowsPerDpn,
86                         final boolean add,
87                         final int batchSize,
88                         int sleepMillis,
89                         final short startTableId,
90                         final short endTableId,
91                         final boolean isCreateParents) {
92             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
93             this.add = add;
94             this.flowsPerDpn = flowsPerDpn;
95             this.batchSize = batchSize;
96             this.sleepMillis = sleepMillis;
97             this.startTableId = startTableId;
98             this.endTableId = endTableId;
99             this.isCreateParents = isCreateParents;
100         }
101
102         @Override
103         public void run() {
104             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
105                     flowsPerDpn / batchSize);
106             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
107
108             Short tableId = startTableId;
109
110             WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
111             short calculatedTableId = tableId;
112
113             int sourceIp = 1;
114             for (; sourceIp <= batchSize; sourceIp++) {
115                 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
116                 LOG.debug("Adding flow with id: {}", flowId);
117                 Flow flow = null;
118                 if (add) {
119                     Match match = BulkOMaticUtils.getMatch(sourceIp);
120                     flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
121                 }
122                 addFlowToTx(writeTransaction, flowId,
123                         BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow);
124
125                 if (sourceIp < batchSize) {
126                     short numberA = 1;
127                     short numberB = (short) (endTableId - startTableId + 1);
128                     calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
129                 }
130             }
131
132             LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId,
133                     calculatedTableId, sourceIp);
134
135             writeTransaction.commit().addCallback(new DsCallBack(dpId, sourceIp, calculatedTableId),
136                     MoreExecutors.directExecutor());
137         }
138
139         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
140                 Flow flow) {
141             if (add) {
142                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
143                 if (isCreateParents) {
144                     writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
145                 } else {
146                     writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
147                 }
148             } else {
149                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
150                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
151             }
152         }
153
154         private class DsCallBack implements FutureCallback<Object> {
155             private final String dpId;
156             private int sourceIp;
157             private final Short tableId;
158
159             DsCallBack(String dpId, int sourceIp, Short tableId) {
160                 this.dpId = dpId;
161                 this.sourceIp = sourceIp;
162                 short numberA = 1;
163                 short numberB = (short) (endTableId - startTableId + 1);
164                 this.tableId = (short) ((tableId + numberA) % numberB + startTableId);
165             }
166
167             @Override
168             public void onSuccess(Object notUsed) {
169                 if (sourceIp > flowsPerDpn) {
170                     long dur = System.nanoTime() - startTime;
171                     LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
172                             tableId, sourceIp, dur);
173                     if (0 == countDpnWriteCompletion.decrementAndGet()
174                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
175                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
176                         taskCompletionTime.set(dur);
177                     }
178                     return;
179                 }
180                 try {
181                     if (sleepMillis > 0) {
182                         Thread.sleep(sleepMillis);
183                     }
184                 } catch (InterruptedException e) {
185                     LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
186                 }
187
188                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
189                 int newBatchSize = sourceIp + batchSize - 1;
190                 short calculatedTableId = tableId;
191                 for (; sourceIp <= newBatchSize; sourceIp++) {
192                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
193                     Flow flow = null;
194                     if (add) {
195                         Match match = BulkOMaticUtils.getMatch(sourceIp);
196                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
197                     }
198                     LOG.debug("Adding flow with id: {}", flowId);
199                     addFlowToTx(writeTransaction, flowId,
200                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
201                             flow);
202
203                     if (sourceIp < newBatchSize) {
204                         short numberA = 1;
205                         short numberB = (short) (endTableId - startTableId + 1);
206                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
207                     }
208                 }
209                 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
210                         dpId, tableId, calculatedTableId, sourceIp);
211                 writeTransaction.commit().addCallback(new DsCallBack(dpId, sourceIp, calculatedTableId),
212                         MoreExecutors.directExecutor());
213             }
214
215             @Override
216             public void onFailure(Throwable error) {
217                 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}", error, dpId,
218                         tableId, sourceIp);
219                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
220             }
221         }
222     }
223 }