Switch to MD-SAL APIs
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterSequential.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.WriteTransaction;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class FlowWriterSequential implements FlowCounterMBean {
25     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
26     private final DataBroker dataBroker;
27     private final ExecutorService flowPusher;
28     protected int dpnCount;
29     private long startTime;
30     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
31     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
32     private final AtomicLong taskCompletionTime = new AtomicLong();
33
34     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
35         this.dataBroker = dataBroker;
36         this.flowPusher = flowPusher;
37         LOG.info("Using Sequential implementation of Flow Writer.");
38     }
39
40     public void addFlows(Integer count, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
41             short endTableId, boolean isCreateParents) {
42         LOG.info("Using Sequential implementation of Flow Writer.");
43         this.dpnCount = count;
44         countDpnWriteCompletion.set(count);
45         startTime = System.nanoTime();
46         for (int i = 1; i <= count; i++) {
47             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
48                     startTableId, endTableId, isCreateParents);
49             flowPusher.execute(task);
50         }
51     }
52
53     public void deleteFlows(Integer count, Integer flowsPerDPN, int batchSize, short startTableId,
54             short endTableId) {
55         LOG.info("Using Sequential implementation of Flow Writer.");
56         countDpnWriteCompletion.set(count);
57         for (int i = 1; i <= count; i++) {
58             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
59                     startTableId, endTableId, false);
60             flowPusher.execute(task);
61         }
62     }
63
64     @Override
65     public int getWriteOpStatus() {
66         return writeOpStatus.get();
67     }
68
69     @Override
70     public long getTaskCompletionTime() {
71         return taskCompletionTime.get();
72     }
73
74     private class FlowHandlerTask implements Runnable {
75         private final String dpId;
76         private final int flowsPerDpn;
77         private final boolean add;
78         private final int batchSize;
79         private final int sleepMillis;
80         private final short startTableId;
81         private final short endTableId;
82         private final boolean isCreateParents;
83
84         FlowHandlerTask(final String dpId,
85                         final int flowsPerDpn,
86                         final boolean add,
87                         final int batchSize,
88                         int sleepMillis,
89                         final short startTableId,
90                         final short endTableId,
91                         final boolean isCreateParents) {
92             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
93             this.add = add;
94             this.flowsPerDpn = flowsPerDpn;
95             this.batchSize = batchSize;
96             this.sleepMillis = sleepMillis;
97             this.startTableId = startTableId;
98             this.endTableId = endTableId;
99             this.isCreateParents = isCreateParents;
100         }
101
102         @Override
103         public void run() {
104             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
105                     flowsPerDpn / batchSize);
106             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
107
108             Short tableId = startTableId;
109
110             WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
111             short calculatedTableId = tableId;
112
113             int sourceIp = 1;
114             for (; sourceIp <= batchSize; sourceIp++) {
115                 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
116                 LOG.debug("Adding flow with id: {}", flowId);
117                 Flow flow = null;
118                 if (add) {
119                     Match match = BulkOMaticUtils.getMatch(sourceIp);
120                     flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
121                 }
122                 addFlowToTx(writeTransaction, flowId,
123                         BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow);
124
125                 if (sourceIp < batchSize) {
126                     short numberA = 1;
127                     short numberB = (short) (endTableId - startTableId + 1);
128                     calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
129                 }
130             }
131
132             LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId,
133                     calculatedTableId, sourceIp);
134
135             writeTransaction.commit().addCallback(new DsCallBack(dpId, sourceIp, calculatedTableId),
136                     MoreExecutors.directExecutor());
137         }
138
139         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
140                 Flow flow) {
141             if (add) {
142                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
143                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
144             } else {
145                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
146                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
147             }
148         }
149
150         private class DsCallBack implements FutureCallback<Object> {
151             private final String dpId;
152             private int sourceIp;
153             private final Short tableId;
154
155             DsCallBack(String dpId, int sourceIp, Short tableId) {
156                 this.dpId = dpId;
157                 this.sourceIp = sourceIp;
158                 short numberA = 1;
159                 short numberB = (short) (endTableId - startTableId + 1);
160                 this.tableId = (short) ((tableId + numberA) % numberB + startTableId);
161             }
162
163             @Override
164             public void onSuccess(Object notUsed) {
165                 if (sourceIp > flowsPerDpn) {
166                     long dur = System.nanoTime() - startTime;
167                     LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
168                             tableId, sourceIp, dur);
169                     if (0 == countDpnWriteCompletion.decrementAndGet()
170                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
171                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
172                         taskCompletionTime.set(dur);
173                     }
174                     return;
175                 }
176                 try {
177                     if (sleepMillis > 0) {
178                         Thread.sleep(sleepMillis);
179                     }
180                 } catch (InterruptedException e) {
181                     LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
182                 }
183
184                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
185                 int newBatchSize = sourceIp + batchSize - 1;
186                 short calculatedTableId = tableId;
187                 for (; sourceIp <= newBatchSize; sourceIp++) {
188                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
189                     Flow flow = null;
190                     if (add) {
191                         Match match = BulkOMaticUtils.getMatch(sourceIp);
192                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
193                     }
194                     LOG.debug("Adding flow with id: {}", flowId);
195                     addFlowToTx(writeTransaction, flowId,
196                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
197                             flow);
198
199                     if (sourceIp < newBatchSize) {
200                         short numberA = 1;
201                         short numberB = (short) (endTableId - startTableId + 1);
202                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
203                     }
204                 }
205                 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
206                         dpId, tableId, calculatedTableId, sourceIp);
207                 writeTransaction.commit().addCallback(new DsCallBack(dpId, sourceIp, calculatedTableId),
208                         MoreExecutors.directExecutor());
209             }
210
211             @Override
212             public void onFailure(Throwable error) {
213                 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}", error, dpId,
214                         tableId, sourceIp);
215                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
216             }
217         }
218     }
219 }