Merge "OPNFLWPLUG-963 : Updating ports delete reason from OFP:"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterSequential.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
22 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class FlowWriterSequential implements FlowCounterMBean {
27     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
28     private final DataBroker dataBroker;
29     private final ExecutorService flowPusher;
30     protected int dpnCount;
31     private long startTime;
32     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
33     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
34     private final AtomicLong taskCompletionTime = new AtomicLong();
35
36     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
37         this.dataBroker = dataBroker;
38         this.flowPusher = flowPusher;
39         LOG.info("Using Sequential implementation of Flow Writer.");
40     }
41
42     public void addFlows(Integer count, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
43             short endTableId, boolean isCreateParents) {
44         LOG.info("Using Sequential implementation of Flow Writer.");
45         this.dpnCount = count;
46         countDpnWriteCompletion.set(count);
47         startTime = System.nanoTime();
48         for (int i = 1; i <= count; i++) {
49             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
50                     startTableId, endTableId, isCreateParents);
51             flowPusher.execute(task);
52         }
53     }
54
55     public void deleteFlows(Integer count, Integer flowsPerDPN, int batchSize, short startTableId,
56             short endTableId) {
57         LOG.info("Using Sequential implementation of Flow Writer.");
58         countDpnWriteCompletion.set(count);
59         for (int i = 1; i <= count; i++) {
60             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
61                     startTableId, endTableId, false);
62             flowPusher.execute(task);
63         }
64     }
65
66     @Override
67     public int getWriteOpStatus() {
68         return writeOpStatus.get();
69     }
70
71     @Override
72     public long getTaskCompletionTime() {
73         return taskCompletionTime.get();
74     }
75
76     private class FlowHandlerTask implements Runnable {
77         private final String dpId;
78         private final int flowsPerDpn;
79         private final boolean add;
80         private final int batchSize;
81         private final int sleepMillis;
82         private final short startTableId;
83         private final short endTableId;
84         private final boolean isCreateParents;
85
86         FlowHandlerTask(final String dpId,
87                         final int flowsPerDpn,
88                         final boolean add,
89                         final int batchSize,
90                         int sleepMillis,
91                         final short startTableId,
92                         final short endTableId,
93                         final boolean isCreateParents) {
94             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
95             this.add = add;
96             this.flowsPerDpn = flowsPerDpn;
97             this.batchSize = batchSize;
98             this.sleepMillis = sleepMillis;
99             this.startTableId = startTableId;
100             this.endTableId = endTableId;
101             this.isCreateParents = isCreateParents;
102         }
103
104         @Override
105         public void run() {
106             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
107                     flowsPerDpn / batchSize);
108             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
109
110             Short tableId = startTableId;
111             Integer sourceIp = 1;
112
113             WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
114             short calculatedTableId = tableId;
115
116             for (; sourceIp <= batchSize; sourceIp++) {
117                 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
118                 LOG.debug("Adding flow with id: {}", flowId);
119                 Flow flow = null;
120                 if (add) {
121                     Match match = BulkOMaticUtils.getMatch(sourceIp);
122                     flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
123                 }
124                 addFlowToTx(writeTransaction, flowId,
125                         BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow);
126
127                 if (sourceIp < batchSize) {
128                     short numberA = 1;
129                     short numberB = (short) (endTableId - startTableId + 1);
130                     calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
131                 }
132             }
133
134             LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId,
135                     calculatedTableId, sourceIp);
136
137             Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
138                     MoreExecutors.directExecutor());
139         }
140
141         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
142                 Flow flow) {
143             if (add) {
144                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
145                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
146             } else {
147                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
148                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
149             }
150         }
151
152         private class DsCallBack implements FutureCallback {
153             private final String dpId;
154             private Integer sourceIp;
155             private final Short tableId;
156
157             DsCallBack(String dpId, Integer sourceIp, Short tableId) {
158                 this.dpId = dpId;
159                 this.sourceIp = sourceIp;
160                 short numberA = 1;
161                 short numberB = (short) (endTableId - startTableId + 1);
162                 this.tableId = (short) ((tableId + numberA) % numberB + startTableId);
163             }
164
165             @Override
166             public void onSuccess(Object object) {
167                 if (sourceIp > flowsPerDpn) {
168                     long dur = System.nanoTime() - startTime;
169                     LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
170                             tableId, sourceIp, dur);
171                     if (0 == countDpnWriteCompletion.decrementAndGet()
172                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
173                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
174                         taskCompletionTime.set(dur);
175                     }
176                     return;
177                 }
178                 try {
179                     if (sleepMillis > 0) {
180                         Thread.sleep(sleepMillis);
181                     }
182                 } catch (InterruptedException e) {
183                     LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
184                 }
185
186                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
187                 int newBatchSize = sourceIp + batchSize - 1;
188                 short calculatedTableId = tableId;
189                 for (; sourceIp <= newBatchSize; sourceIp++) {
190                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
191                     Flow flow = null;
192                     if (add) {
193                         Match match = BulkOMaticUtils.getMatch(sourceIp);
194                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
195                     }
196                     LOG.debug("Adding flow with id: {}", flowId);
197                     addFlowToTx(writeTransaction, flowId,
198                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
199                             flow);
200
201                     if (sourceIp < newBatchSize) {
202                         short numberA = 1;
203                         short numberB = (short) (endTableId - startTableId + 1);
204                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
205                     }
206                 }
207                 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
208                         dpId, tableId, calculatedTableId, sourceIp);
209                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
210                         MoreExecutors.directExecutor());
211             }
212
213             @Override
214             public void onFailure(Throwable error) {
215                 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}", error, dpId,
216                         tableId, sourceIp);
217                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
218             }
219         }
220     }
221 }